Skip to content

Commit

Permalink
[Dask] Enable to set resources for scheduler/worker separately (#914)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hedingber committed May 4, 2021
1 parent 49e5148 commit 4533765
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 71 deletions.
104 changes: 84 additions & 20 deletions mlrun/runtimes/daskjob.py
Expand Up @@ -15,6 +15,7 @@
import inspect
import socket
import time
import warnings
from os import environ
from typing import Dict, List, Optional, Union

Expand All @@ -23,6 +24,8 @@

import mlrun.api.schemas
import mlrun.errors
import mlrun.utils
import mlrun.utils.regex
from mlrun.api.db.base import DBInterface
from mlrun.runtimes.base import BaseRuntimeHandler

Expand Down Expand Up @@ -78,6 +81,8 @@ def __init__(
node_name=None,
node_selector=None,
affinity=None,
scheduler_resources=None,
worker_resources=None,
):

super().__init__(
Expand Down Expand Up @@ -114,6 +119,8 @@ def __init__(
# supported format according to https://github.com/dask/dask/blob/master/dask/utils.py#L1402
self.scheduler_timeout = scheduler_timeout or "60 minutes"
self.nthreads = nthreads or 1
self.scheduler_resources = scheduler_resources or {}
self.worker_resources = worker_resources or {}


class DaskStatus(FunctionStatus):
Expand Down Expand Up @@ -334,6 +341,50 @@ def deploy(
mlrun_version_specifier=mlrun_version_specifier,
)

def with_limits(self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu"):
warnings.warn(
"Dask's with_limits will be deprecated in 0.8.0, and will be removed in 0.10.0, use "
"with_scheduler_limits/with_worker_limits instead",
# TODO: In 0.8.0 deprecate and replace with_limits to with_worker/scheduler_limits in examples & demos
PendingDeprecationWarning,
)
# the scheduler/worker specific function was introduced after the general one, to keep backwards compatibility
# this function just sets the limits for both of them
self.with_scheduler_limits(mem, cpu, gpus, gpu_type)
self.with_worker_limits(mem, cpu, gpus, gpu_type)

def with_scheduler_limits(
self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu"
):
"""set scheduler pod resources limits"""
self._verify_and_set_limits("scheduler_resources", mem, cpu, gpus, gpu_type)

def with_worker_limits(
self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu"
):
"""set worker pod resources limits"""
self._verify_and_set_limits("worker_resources", mem, cpu, gpus, gpu_type)

def with_requests(self, mem=None, cpu=None):
warnings.warn(
"Dask's with_requests will be deprecated in 0.8.0, and will be removed in 0.10.0, use "
"with_scheduler_requests/with_worker_requests instead",
# TODO: In 0.8.0 deprecate and replace with_requests to with_worker/scheduler_requests in examples & demos
PendingDeprecationWarning,
)
# the scheduler/worker specific function was introduced after the general one, to keep backwards compatibility
# this function just sets the requests for both of them
self.with_scheduler_requests(mem, cpu)
self.with_worker_requests(mem, cpu)

def with_scheduler_requests(self, mem=None, cpu=None):
"""set scheduler pod resources requests"""
self._verify_and_set_requests("scheduler_resources", mem, cpu)

def with_worker_requests(self, mem=None, cpu=None):
"""set worker pod resources requests"""
self._verify_and_set_requests("worker_resources", mem, cpu)

def _run(self, runobj: RunObject, execution):

handler = runobj.spec.handler
Expand Down Expand Up @@ -389,33 +440,45 @@ def deploy_function(function: DaskCluster, secrets=None):
env.append(spec.extra_pip)

pod_labels = get_resource_labels(function, scrape_metrics=config.scrape_metrics)
args = ["dask-worker", "--nthreads", str(spec.nthreads)]
worker_args = ["dask-worker", "--nthreads", str(spec.nthreads)]
memory_limit = spec.resources.get("limits", {}).get("memory")
if memory_limit:
args.extend(["--memory-limit", str(memory_limit)])
worker_args.extend(["--memory-limit", str(memory_limit)])
if spec.args:
args.extend(spec.args)

container = client.V1Container(
name="base",
image=image,
env=env,
args=args,
image_pull_policy=spec.image_pull_policy,
volume_mounts=spec.volume_mounts,
resources=spec.resources,
worker_args.extend(spec.args)
scheduler_args = ["dask-scheduler"]

container_kwargs = {
"name": "base",
"image": image,
"env": env,
"image_pull_policy": spec.image_pull_policy,
"volume_mounts": spec.volume_mounts,
}
scheduler_container = client.V1Container(
resources=spec.scheduler_resources, args=scheduler_args, **container_kwargs
)
worker_container = client.V1Container(
resources=spec.worker_resources, args=worker_args, **container_kwargs
)

pod_spec = kube_resource_spec_to_pod_spec(spec, container)
if spec.image_pull_secret:
pod_spec.image_pull_secrets = [
client.V1LocalObjectReference(name=spec.image_pull_secret)
]
scheduler_pod_spec = kube_resource_spec_to_pod_spec(spec, scheduler_container)
worker_pod_spec = kube_resource_spec_to_pod_spec(spec, worker_container)
for pod_spec in [scheduler_pod_spec, worker_pod_spec]:
if spec.image_pull_secret:
pod_spec.image_pull_secrets = [
client.V1LocalObjectReference(name=spec.image_pull_secret)
]

pod = client.V1Pod(
scheduler_pod = client.V1Pod(
metadata=client.V1ObjectMeta(namespace=namespace, labels=pod_labels),
# annotations=meta.annotation),
spec=scheduler_pod_spec,
)
worker_pod = client.V1Pod(
metadata=client.V1ObjectMeta(namespace=namespace, labels=pod_labels),
# annotations=meta.annotation),
spec=pod_spec,
spec=worker_pod_spec,
)

svc_temp = dask.config.get("kubernetes.scheduler-service-template")
Expand All @@ -434,7 +497,8 @@ def deploy_function(function: DaskCluster, secrets=None):
)

cluster = KubeCluster(
pod,
worker_pod,
scheduler_pod_template=scheduler_pod,
deploy_mode="remote",
namespace=namespace,
idle_timeout=spec.scheduler_timeout,
Expand Down
77 changes: 46 additions & 31 deletions mlrun/runtimes/pod.py
Expand Up @@ -237,70 +237,85 @@ def gpus(self, gpus, gpu_type="nvidia.com/gpu"):

def with_limits(self, mem=None, cpu=None, gpus=None, gpu_type="nvidia.com/gpu"):
"""set pod cpu/memory/gpu limits"""
self._verify_and_set_limits("resources", mem, cpu, gpus, gpu_type)

def with_requests(self, mem=None, cpu=None):
"""set requested (desired) pod cpu/memory resources"""
self._verify_and_set_requests("resources", mem, cpu)

def with_node_selection(
self,
node_name: typing.Optional[str] = None,
node_selector: typing.Optional[typing.Dict[str, str]] = None,
affinity: typing.Optional[client.V1Affinity] = None,
):
"""
Enables to control on which k8s node the job will run
:param node_name: The name of the k8s node
:param node_selector: Label selector, only nodes with matching labels will be eligible to be picked
:param affinity: Expands the types of constraints you can express - see
https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity
for details
"""
if node_name:
self.spec.node_name = node_name
if node_selector:
self.spec.node_selector = node_selector
if affinity:
self.spec.affinity = affinity

def _verify_and_set_limits(
self,
resources_field_name,
mem=None,
cpu=None,
gpus=None,
gpu_type="nvidia.com/gpu",
):
if mem:
verify_field_regex(
"function.limits.memory",
f"function.spec.{resources_field_name}.limits.memory",
mem,
mlrun.utils.regex.k8s_resource_quantity_regex,
)
if cpu:
verify_field_regex(
"function.limits.cpu",
f"function.spec.{resources_field_name}.limits.cpu",
cpu,
mlrun.utils.regex.k8s_resource_quantity_regex,
)
if gpus:
verify_field_regex(
"function.limits.gpus",
f"function.spec.{resources_field_name}.limits.gpus",
gpus,
mlrun.utils.regex.k8s_resource_quantity_regex,
)
update_in(
self.spec.resources,
getattr(self.spec, resources_field_name),
"limits",
generate_resources(mem=mem, cpu=cpu, gpus=gpus, gpu_type=gpu_type),
)

def with_requests(self, mem=None, cpu=None):
"""set requested (desired) pod cpu/memory/gpu resources"""
def _verify_and_set_requests(self, resources_field_name, mem=None, cpu=None):
if mem:
verify_field_regex(
"function.requests.memory",
f"function.spec.{resources_field_name}.requests.memory",
mem,
mlrun.utils.regex.k8s_resource_quantity_regex,
)
if cpu:
verify_field_regex(
"function.requests.cpu",
f"function.spec.{resources_field_name}.requests.cpu",
cpu,
mlrun.utils.regex.k8s_resource_quantity_regex,
)
update_in(
self.spec.resources, "requests", generate_resources(mem=mem, cpu=cpu),
getattr(self.spec, resources_field_name),
"requests",
generate_resources(mem=mem, cpu=cpu),
)

def with_node_selection(
self,
node_name: typing.Optional[str] = None,
node_selector: typing.Optional[typing.Dict[str, str]] = None,
affinity: typing.Optional[client.V1Affinity] = None,
):
"""
Enables to control on which k8s node the job will run
:param node_name: The name of the k8s node
:param node_selector: Label selector, only nodes with matching labels will be eligible to be picked
:param affinity: Expands the types of constraints you can express - see
https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity
for details
"""
if node_name:
self.spec.node_name = node_name
if node_selector:
self.spec.node_selector = node_selector
if affinity:
self.spec.affinity = affinity

def _get_meta(self, runobj, unique=False):
namespace = self._get_k8s().resolve_namespace()

Expand Down
43 changes: 25 additions & 18 deletions tests/api/runtimes/base.py
Expand Up @@ -428,24 +428,9 @@ def _assert_pod_creation_config(

container_spec = pod.spec.containers[0]

if expected_limits:
assert (
deepdiff.DeepDiff(
container_spec.resources["limits"],
expected_limits,
ignore_order=True,
)
== {}
)
if expected_requests:
assert (
deepdiff.DeepDiff(
container_spec.resources["requests"],
expected_requests,
ignore_order=True,
)
== {}
)
self._assert_container_resources(
container_spec, expected_limits, expected_requests
)

pod_env = container_spec.env

Expand Down Expand Up @@ -499,3 +484,25 @@ def _assert_pod_creation_config(
)

assert pod.spec.containers[0].image == self.image_name

def _assert_container_resources(
self, container_spec, expected_limits, expected_requests
):
if expected_limits:
assert (
deepdiff.DeepDiff(
container_spec.resources["limits"],
expected_limits,
ignore_order=True,
)
== {}
)
if expected_requests:
assert (
deepdiff.DeepDiff(
container_spec.resources["requests"],
expected_requests,
ignore_order=True,
)
== {}
)

0 comments on commit 4533765

Please sign in to comment.