-
Notifications
You must be signed in to change notification settings - Fork 7
/
dask_flow.py
34 lines (24 loc) · 1.07 KB
/
dask_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from prefect import flow, get_run_logger, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def say_hello(name: str) -> None:
# logs not currently working see https://github.com/PrefectHQ/prefect/issues/5850
logger = get_run_logger()
logger.info(f"hello {name}")
@task
def say_goodbye(name: str) -> None:
logger = get_run_logger()
logger.info(f"goodbye {name}")
# TODO: can the task runner be parameterised so we don't duplicate the flow with dask_kubes_flow?
# see https://github.com/PrefectHQ/prefect/issues/5560
# creates a LocalCluster https://docs.dask.org/en/stable/deploying-python.html#localcluster
@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 2}))
def dask(names: list[str]) -> None:
for name in names:
# tasks must be submitted to run on dask
# if called without .submit() they are still tracked but
# run immediately and locally rather than async on dask
say_hello.submit(name)
say_goodbye.submit(name)
if __name__ == "__main__":
dask(["arthur", "trillian", "ford", "marvin"])