# Dask - Launch dynamic cluster

While Open Data Studio Jupyter notebook runs on Kubernetes environment,
we can launch dynamic, short lived dask cluster.


## Install packages

We'll install a couple of packages

  - [dask-kubernetes](https://kubernetes.dask.org/en/latest/) - Deploys dask workers on Kubernetes
  - lz4 - dask client need lz4 when workers have it
  - bokeh - for dask ui
  - kubernetes - It doesn't required by dask. But we'll use this package to create a Service resource to dask ui.
  - numpy - To run example below
  

In [6]:
!pip -q install dask-kubernetes lz4 bokeh
!pip -q install kubernetes
!pip -q install numpy

## Get dask-worker.yaml

Clone https://github.com/open-datastudio/dask-cluster.git and get `dask-worker.yaml`.

After clone, plase open the [dask-worker.yaml](../../dask-cluster/k8s/dask-worker.yaml) file and take look.
You can adjust CPU, Memory and whether you'd like to place the worker on Spot instance or not.

In [None]:
!git clone https://github.com/open-datastudio/dask-cluster.git

## Create dask cluster

Following will create dask workers and establish connections to them.
You can check you kubernetes namespace and workers (Pods) are created.

  - dask-worker.yaml - Kubernetes pod template to create dask worker

In [None]:
from dask_kubernetes import KubeCluster

cluster = KubeCluster.from_yaml('dask-cluster/k8s/dask-worker.yaml')
cluster.scale(10)  # specify number of workers explicitly

cluster.adapt(minimum=1, maximum=100)  # or dynamically scale based on current workload

## Run some compute on the cluster

Let's run simple example

In [None]:
# Example usage
from dask.distributed import Client
import dask.array as da

# Connect Dask to the cluster
client = Client(cluster)

# Create a large array and calculate the mean
array = da.ones((1000, 1000, 1000))
print(array.mean().compute())  # Should print 1.0

## Open dask UI

Following code will create a Service resource object with link on [staroid](https://staroid.com) management console.

In [None]:
import kubernetes, yaml

kubernetes.config.load_incluster_config()
current_namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read()


with open("dask-cluster/k8s/dask-ui-service.yaml") as f:
    dep = yaml.safe_load(f)
    v1 = kubernetes.client.CoreV1Api()
    
    try:
        service = v1.create_namespaced_service(
            namespace=current_namespace,
            body = dep
        )
    except kubernetes.client.rest.ApiException as err:
        if err.status == 409:
            # already exists
            print("Already created")
        else:
            raise(err)
        


Click dask-ui button to open dask UI
![](https://user-images.githubusercontent.com/1540981/89221454-29191680-d588-11ea-9bb9-30a5297ff0d7.png)

You can run some compute on dask and see dask UI is updated in real time.

## Shutdown workers

You can shutdown your workers once you finish your task.
This will terminate all worker pods on Kubernetes.

In [None]:
cluster.close()

## More examples

  - https://examples.dask.org/

In [None]:
# clone example notebooks
!git clone https://github.com/dask/dask-examples.git