-
Notifications
You must be signed in to change notification settings - Fork 7
/
dask_kubes_flow.py
55 lines (45 loc) · 1.69 KB
/
dask_kubes_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import dask_kubernetes
from dask_kubernetes import make_pod_spec
from kubernetes.client import V1Pod
from prefect import flow, get_run_logger, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def say_hello(name: str) -> None:
# logs are sometimes dropped see https://github.com/PrefectHQ/prefect/issues/6872
logger = get_run_logger()
logger.info(f"hello {name}")
@task
def say_goodbye(name: str) -> None:
logger = get_run_logger()
logger.info(f"goodbye {name}")
# see https://kubernetes.dask.org/en/latest/
def dask_pod_spec() -> V1Pod:
return make_pod_spec(
# we need a image containing dask + prefect
image="prefect-registry:5550/flow:latest",
# image="ghcr.io/dask/dask:latest",
# env={"EXTRA_PIP_PACKAGES": "prefect==2.18.0"},
memory_limit="1G",
memory_request="1G",
cpu_limit=1,
cpu_request=1,
# this is how to specify a service account for the dask pods
# if not specified Kube will use the service account called `default`
extra_pod_config={"serviceAccountName": "prefect-flows"},
)
@flow(
task_runner=DaskTaskRunner(
cluster_class=dask_kubernetes.KubeCluster,
cluster_kwargs={"pod_template": dask_pod_spec()},
adapt_kwargs={"minimum": 1, "maximum": 2},
)
)
def dask_kubes(names: list[str]) -> None:
for name in names:
# tasks must be submitted to run on dask
# if called without .submit() they are still tracked but
# run immediately and locally rather than async on dask
say_hello.submit(name)
say_goodbye.submit(name)
if __name__ == "__main__":
dask_kubes(["arthur", "trillian", "ford", "marvin"])