# Install these modules/versions

In [None]:
#!pip install dask==2.9.1 distributed==2.9.1 kubernetes==10.0.0 dask_kubernetes==0.10.0 kubernetes-asyncio==10.0.0 msgpack==0.6.2

# Using MLRUN with Dask Distributed Jobs

## Writing a function code

In [None]:
# function that will be distributed 
def inc(x):
    return x+2

The MLRun context in the case of Dask will have an extra param `dask_client`
which is initialized based on the function spec (below), and can be used to submit Dask commands.

In [None]:
def hndlr(context, x=1,y=2):
    context.logger.info('params: x={},y={}'.format(x,y))
    print('params: x={},y={}'.format(x,y))
    x = context.dask_client.submit(inc, x)
    print(x)
    print(x.result())
    context.log_result('y', x.result())

In [None]:
# nuclio: end-code
# marks the end of a code section

In [None]:
# nuclio: ignore
import nuclio

In [None]:
from mlrun import new_function, mlconf, code_to_function, mount_v3io, NewTask
mlconf.remote_host = '40.70.31.79'  # remote cluster IP/DNS for link to dask dashboard
mlconf.dbpath = 'http://mlrun-api:8080'

## Define the function object
dask functions can be local (local workers), or remote (use containers in the cluster),
in the case of `remote` users can specify the number of replica (optional) or leave blank for auto-scale.

We use `code_to_function()` which packs the function code into the function object/yaml (eliminate the need to update the function image), we can use `new_function()` if the function code is part of the image or can be remotely mounted.

In [None]:
# create the function from the notebook code + annotations, add volumes
dsf = code_to_function('mydask',project='lobproject', runtime='dask').apply(mount_v3io())

In [None]:
dsf.spec.image = 'daskdev/dask:2.9.1'
dsf.spec.remote = True
dsf.spec.replicas = 1
dsf.spec.service_type = 'NodePort'
dsf.spec.image_pull_policy = 'Always'
#dsf.spec.kfp_image = 'mlrun/dask:dev2'  # select specific image for pipeline step (must have MLRun & Dask)
#print(dsf.to_yaml())

## Build the function with extra packages
We can skip the build section if we dont add packages (instead need to specify the image e.g. `dsf.spec.image='daskdev/dask:2.9.1'`) 

In [None]:
# uncomment if you want to add packages to the workers
# dsf.build_config(base_image='daskdev/dask:2.9.1', commands=['pip install pandas'])
# dsf.deploy()

## Run a task using our distributed dask function (cluster)

In [None]:
myrun = dsf.run(handler=hndlr, params={'x': 12})

In [None]:
import kfp
from kfp import dsl

In [None]:
kfp_client = kfp.Client(namespace='default-tenant')

In [None]:
artifacts_path = '/User/test'

In [None]:
@dsl.pipeline(name="dask_pipeline")
def dask_pipe(x=1,y=10):
    # use_db option will use a function (DB) pointer instead of adding the function spec to the YAML
    myrun = dsf.as_step(NewTask(handler=hndlr, name="dask_pipeline", params={'x': x, 'y': y}), use_db=True)
    # is the step (dask client) need v3io access u should add: .apply(mount_v3io())
    myrun.container.set_image_pull_policy('Always')

In [None]:
# for pipeline debug
kfp.compiler.Compiler().compile(dask_pipe, 'daskpipe.yaml')

In [None]:
arguments={'x':4,'y':-5}
kfp_client.create_run_from_pipeline_func(dask_pipe,
                                 arguments, 
                                     run_name="DaskExamplePipeline", 
                                     experiment_name="pipe")