From 453376564b7f1a9a0a9a9a1ee34ec6f32d138a19 Mon Sep 17 00:00:00 2001 From: Hedingber Date: Tue, 4 May 2021 04:53:04 +0300 Subject: [PATCH] [Dask] Enable to set resources for scheduler/worker separately (#914) --- mlrun/runtimes/daskjob.py | 104 ++++++++++++++++++++++++++------ mlrun/runtimes/pod.py | 77 +++++++++++++---------- tests/api/runtimes/base.py | 43 +++++++------ tests/api/runtimes/test_dask.py | 56 ++++++++++++++++- 4 files changed, 209 insertions(+), 71 deletions(-) diff --git a/mlrun/runtimes/daskjob.py b/mlrun/runtimes/daskjob.py index 32e557d5944..81403327a0c 100644 --- a/mlrun/runtimes/daskjob.py +++ b/mlrun/runtimes/daskjob.py @@ -15,6 +15,7 @@ import inspect import socket import time +import warnings from os import environ from typing import Dict, List, Optional, Union @@ -23,6 +24,8 @@ import mlrun.api.schemas import mlrun.errors +import mlrun.utils +import mlrun.utils.regex from mlrun.api.db.base import DBInterface from mlrun.runtimes.base import BaseRuntimeHandler @@ -78,6 +81,8 @@ def __init__( node_name=None, node_selector=None, affinity=None, + scheduler_resources=None, + worker_resources=None, ): super().__init__( @@ -114,6 +119,8 @@ def __init__( # supported format according to https://github.com/dask/dask/blob/master/dask/utils.py#L1402 self.scheduler_timeout = scheduler_timeout or "60 minutes" self.nthreads = nthreads or 1 + self.scheduler_resources = scheduler_resources or {} + self.worker_resources = worker_resources or {} class DaskStatus(FunctionStatus): @@ -334,6 +341,50 @@ def deploy( mlrun_version_specifier=mlrun_version_specifier, ) + def with_limits(self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu"): + warnings.warn( + "Dask's with_limits will be deprecated in 0.8.0, and will be removed in 0.10.0, use " + "with_scheduler_limits/with_worker_limits instead", + # TODO: In 0.8.0 deprecate and replace with_limits to with_worker/scheduler_limits in examples & demos + PendingDeprecationWarning, + ) + # the scheduler/worker specific function was introduced after the general one, to keep backwards compatibility + # this function just sets the limits for both of them + self.with_scheduler_limits(mem, cpu, gpus, gpu_type) + self.with_worker_limits(mem, cpu, gpus, gpu_type) + + def with_scheduler_limits( + self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu" + ): + """set scheduler pod resources limits""" + self._verify_and_set_limits("scheduler_resources", mem, cpu, gpus, gpu_type) + + def with_worker_limits( + self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu" + ): + """set worker pod resources limits""" + self._verify_and_set_limits("worker_resources", mem, cpu, gpus, gpu_type) + + def with_requests(self, mem=None, cpu=None): + warnings.warn( + "Dask's with_requests will be deprecated in 0.8.0, and will be removed in 0.10.0, use " + "with_scheduler_requests/with_worker_requests instead", + # TODO: In 0.8.0 deprecate and replace with_requests to with_worker/scheduler_requests in examples & demos + PendingDeprecationWarning, + ) + # the scheduler/worker specific function was introduced after the general one, to keep backwards compatibility + # this function just sets the requests for both of them + self.with_scheduler_requests(mem, cpu) + self.with_worker_requests(mem, cpu) + + def with_scheduler_requests(self, mem=None, cpu=None): + """set scheduler pod resources requests""" + self._verify_and_set_requests("scheduler_resources", mem, cpu) + + def with_worker_requests(self, mem=None, cpu=None): + """set worker pod resources requests""" + self._verify_and_set_requests("worker_resources", mem, cpu) + def _run(self, runobj: RunObject, execution): handler = runobj.spec.handler @@ -389,33 +440,45 @@ def deploy_function(function: DaskCluster, secrets=None): env.append(spec.extra_pip) pod_labels = get_resource_labels(function, scrape_metrics=config.scrape_metrics) - args = ["dask-worker", "--nthreads", str(spec.nthreads)] + worker_args = ["dask-worker", "--nthreads", str(spec.nthreads)] memory_limit = spec.resources.get("limits", {}).get("memory") if memory_limit: - args.extend(["--memory-limit", str(memory_limit)]) + worker_args.extend(["--memory-limit", str(memory_limit)]) if spec.args: - args.extend(spec.args) - - container = client.V1Container( - name="base", - image=image, - env=env, - args=args, - image_pull_policy=spec.image_pull_policy, - volume_mounts=spec.volume_mounts, - resources=spec.resources, + worker_args.extend(spec.args) + scheduler_args = ["dask-scheduler"] + + container_kwargs = { + "name": "base", + "image": image, + "env": env, + "image_pull_policy": spec.image_pull_policy, + "volume_mounts": spec.volume_mounts, + } + scheduler_container = client.V1Container( + resources=spec.scheduler_resources, args=scheduler_args, **container_kwargs + ) + worker_container = client.V1Container( + resources=spec.worker_resources, args=worker_args, **container_kwargs ) - pod_spec = kube_resource_spec_to_pod_spec(spec, container) - if spec.image_pull_secret: - pod_spec.image_pull_secrets = [ - client.V1LocalObjectReference(name=spec.image_pull_secret) - ] + scheduler_pod_spec = kube_resource_spec_to_pod_spec(spec, scheduler_container) + worker_pod_spec = kube_resource_spec_to_pod_spec(spec, worker_container) + for pod_spec in [scheduler_pod_spec, worker_pod_spec]: + if spec.image_pull_secret: + pod_spec.image_pull_secrets = [ + client.V1LocalObjectReference(name=spec.image_pull_secret) + ] - pod = client.V1Pod( + scheduler_pod = client.V1Pod( + metadata=client.V1ObjectMeta(namespace=namespace, labels=pod_labels), + # annotations=meta.annotation), + spec=scheduler_pod_spec, + ) + worker_pod = client.V1Pod( metadata=client.V1ObjectMeta(namespace=namespace, labels=pod_labels), # annotations=meta.annotation), - spec=pod_spec, + spec=worker_pod_spec, ) svc_temp = dask.config.get("kubernetes.scheduler-service-template") @@ -434,7 +497,8 @@ def deploy_function(function: DaskCluster, secrets=None): ) cluster = KubeCluster( - pod, + worker_pod, + scheduler_pod_template=scheduler_pod, deploy_mode="remote", namespace=namespace, idle_timeout=spec.scheduler_timeout, diff --git a/mlrun/runtimes/pod.py b/mlrun/runtimes/pod.py index 60d04effa6d..1613cb4c324 100644 --- a/mlrun/runtimes/pod.py +++ b/mlrun/runtimes/pod.py @@ -237,70 +237,85 @@ def gpus(self, gpus, gpu_type="nvidia.com/gpu"): def with_limits(self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu"): """set pod cpu/memory/gpu limits""" + self._verify_and_set_limits("resources", mem, cpu, gpus, gpu_type) + + def with_requests(self, mem=None, cpu=None): + """set requested (desired) pod cpu/memory resources""" + self._verify_and_set_requests("resources", mem, cpu) + + def with_node_selection( + self, + node_name: typing.Optional[str] = None, + node_selector: typing.Optional[typing.Dict[str, str]] = None, + affinity: typing.Optional[client.V1Affinity] = None, + ): + """ + Enables to control on which k8s node the job will run + + :param node_name: The name of the k8s node + :param node_selector: Label selector, only nodes with matching labels will be eligible to be picked + :param affinity: Expands the types of constraints you can express - see + https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity + for details + """ + if node_name: + self.spec.node_name = node_name + if node_selector: + self.spec.node_selector = node_selector + if affinity: + self.spec.affinity = affinity + + def _verify_and_set_limits( + self, + resources_field_name, + mem=None, + cpu=None, + gpus=None, + gpu_type="nvidia.com/gpu", + ): if mem: verify_field_regex( - "function.limits.memory", + f"function.spec.{resources_field_name}.limits.memory", mem, mlrun.utils.regex.k8s_resource_quantity_regex, ) if cpu: verify_field_regex( - "function.limits.cpu", + f"function.spec.{resources_field_name}.limits.cpu", cpu, mlrun.utils.regex.k8s_resource_quantity_regex, ) if gpus: verify_field_regex( - "function.limits.gpus", + f"function.spec.{resources_field_name}.limits.gpus", gpus, mlrun.utils.regex.k8s_resource_quantity_regex, ) update_in( - self.spec.resources, + getattr(self.spec, resources_field_name), "limits", generate_resources(mem=mem, cpu=cpu, gpus=gpus, gpu_type=gpu_type), ) - def with_requests(self, mem=None, cpu=None): - """set requested (desired) pod cpu/memory/gpu resources""" + def _verify_and_set_requests(self, resources_field_name, mem=None, cpu=None): if mem: verify_field_regex( - "function.requests.memory", + f"function.spec.{resources_field_name}.requests.memory", mem, mlrun.utils.regex.k8s_resource_quantity_regex, ) if cpu: verify_field_regex( - "function.requests.cpu", + f"function.spec.{resources_field_name}.requests.cpu", cpu, mlrun.utils.regex.k8s_resource_quantity_regex, ) update_in( - self.spec.resources, "requests", generate_resources(mem=mem, cpu=cpu), + getattr(self.spec, resources_field_name), + "requests", + generate_resources(mem=mem, cpu=cpu), ) - def with_node_selection( - self, - node_name: typing.Optional[str] = None, - node_selector: typing.Optional[typing.Dict[str, str]] = None, - affinity: typing.Optional[client.V1Affinity] = None, - ): - """ - Enables to control on which k8s node the job will run - - :param node_name: The name of the k8s node - :param node_selector: Label selector, only nodes with matching labels will be eligible to be picked - :param affinity: Expands the types of constraints you can express - see - https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity - for details - """ - if node_name: - self.spec.node_name = node_name - if node_selector: - self.spec.node_selector = node_selector - if affinity: - self.spec.affinity = affinity - def _get_meta(self, runobj, unique=False): namespace = self._get_k8s().resolve_namespace() diff --git a/tests/api/runtimes/base.py b/tests/api/runtimes/base.py index f085c6f354f..cc4a3b9785c 100644 --- a/tests/api/runtimes/base.py +++ b/tests/api/runtimes/base.py @@ -428,24 +428,9 @@ def _assert_pod_creation_config( container_spec = pod.spec.containers[0] - if expected_limits: - assert ( - deepdiff.DeepDiff( - container_spec.resources["limits"], - expected_limits, - ignore_order=True, - ) - == {} - ) - if expected_requests: - assert ( - deepdiff.DeepDiff( - container_spec.resources["requests"], - expected_requests, - ignore_order=True, - ) - == {} - ) + self._assert_container_resources( + container_spec, expected_limits, expected_requests + ) pod_env = container_spec.env @@ -499,3 +484,25 @@ def _assert_pod_creation_config( ) assert pod.spec.containers[0].image == self.image_name + + def _assert_container_resources( + self, container_spec, expected_limits, expected_requests + ): + if expected_limits: + assert ( + deepdiff.DeepDiff( + container_spec.resources["limits"], + expected_limits, + ignore_order=True, + ) + == {} + ) + if expected_requests: + assert ( + deepdiff.DeepDiff( + container_spec.resources["requests"], + expected_requests, + ignore_order=True, + ) + == {} + ) diff --git a/tests/api/runtimes/test_dask.py b/tests/api/runtimes/test_dask.py index 0129620d5dc..72289d932f3 100644 --- a/tests/api/runtimes/test_dask.py +++ b/tests/api/runtimes/test_dask.py @@ -45,9 +45,16 @@ def custom_teardown(self): unittest.mock.patch.stopall() def _get_pod_creation_args(self): + return self._get_worker_pod_creation_args() + + def _get_worker_pod_creation_args(self): args, _ = self.kube_cluster_mock.call_args return args[0] + def _get_scheduler_pod_creation_args(self): + _, kwargs = self.kube_cluster_mock.call_args + return kwargs["scheduler_pod_template"] + def _get_namespace_arg(self): _, kwargs = self.kube_cluster_mock.call_args return kwargs["namespace"] @@ -77,13 +84,52 @@ def _generate_runtime(self): return dask_cluster + def _assert_scheduler_pod_args(self,): + scheduler_pod = self._get_scheduler_pod_creation_args() + scheduler_container_spec = scheduler_pod.spec.containers[0] + assert scheduler_container_spec.args == ["dask-scheduler"] + + def _assert_pods_resources( + self, + expected_worker_requests, + expected_worker_limits, + expected_scheduler_requests, + expected_scheduler_limits, + ): + worker_pod = self._get_pod_creation_args() + worker_container_spec = worker_pod.spec.containers[0] + self._assert_container_resources( + worker_container_spec, expected_worker_limits, expected_worker_requests + ) + scheduler_pod = self._get_scheduler_pod_creation_args() + scheduler_container_spec = scheduler_pod.spec.containers[0] + self._assert_container_resources( + scheduler_container_spec, + expected_scheduler_limits, + expected_scheduler_requests, + ) + def test_dask_runtime(self, db: Session, client: TestClient): - runtime = self._generate_runtime() + runtime: mlrun.runtimes.DaskCluster = self._generate_runtime() expected_requests = generate_resources(mem="2G", cpu=3) runtime.with_requests( mem=expected_requests["memory"], cpu=expected_requests["cpu"] ) + expected_scheduler_limits = generate_resources(mem="4G", cpu=5) + gpu_type = "nvidia.com/gpu" + expected_worker_limits = generate_resources( + mem="4G", cpu=5, gpus=2, gpu_type=gpu_type + ) + runtime.with_scheduler_limits( + mem=expected_scheduler_limits["memory"], + cpu=expected_scheduler_limits["cpu"], + ) + runtime.with_worker_limits( + mem=expected_worker_limits["memory"], + cpu=expected_worker_limits["cpu"], + gpus=expected_worker_limits[gpu_type], + ) _ = runtime.client self.kube_cluster_mock.assert_called_once() @@ -92,9 +138,15 @@ def test_dask_runtime(self, db: Session, client: TestClient): expected_runtime_class_name="dask", assert_create_pod_called=False, assert_namespace_env_variable=False, - expected_requests=expected_requests, ) self._assert_v3io_mount_configured(self.v3io_user, self.v3io_access_key) + self._assert_pods_resources( + expected_requests, + expected_worker_limits, + expected_requests, + expected_scheduler_limits, + ) + self._assert_scheduler_pod_args() def test_dask_with_node_selection(self, db: Session, client: TestClient): runtime = self._generate_runtime()