# Scaling Dask in Kubernetes

### Table of Contents
1. Dask overview
2. Kubernetes overview
3. Overview of installing Dask
    1. Installation using Helm
    2. Other installation methods
4. Overview of integrating Dask and Kubernetes
    1. Scaling up/down cluster using dask_kubernetes
5. Next Steps

## Dask Overview

Dask is a flexible library for parallel computing in Python. It is purpose-built to parallelize python data science applications from a single laptop all the way up  to a complex 100+ node cluster.

It is composed of a Dask scheduler and a scaling number of Dask workers and the APIs are designed to be familiar for anyone who has used Pandas or Numpy in the past.

By itself Dask accelerates many machine learning applications, but when paired with the additional acclerations of GPUs integrated through the RAPIDS modules it becomes a very powerful tool that no data scientist should be without. 

For more information about dask see [here](https://docs.dask.org/en/latest/).

## Kubernetes
Kubernetes (or K8S) is an open source tool for managing container workloads and services. K8S is designed to scale and can run on single node systems all the way up to entire clouds.

K8S allows you to deploy docker containers to run tasks. These docker containers are deployed in pods, which can have resources limitations defined, execution commands set, and allows you to specify custom docker images.

K8S allows dynamic resources addition/removal and can be run on-prem, in the cloud, or using hybrid models.

For more information about Kubernetes see [here](https://kubernetes.io/docs/home/)

## Dask and Kubernetes Integration

By combining the cluster-management and auto-scaling capabilities of Kubernetes with the parallel computing and distributed resource management capabilities of dask we can create a data science environment that dynamically determines resources needs, grows to meets those needs, and optimally executes all data processesing, training, and inferencing in our cluster.

## Dask Installation


#### Installation using Helm

```
# XXX: These must  be run on the K8S managment server, not through this pod.
# Additional steps can be found in the DeepOps project here: https://github.com/NVIDIA/deepops/blob/master/docs/rapids-dask.md

helm install -n rapids --namespace rapids --values helm/rapids.yml stable/dask
kubectl create -f k8s/roles.yaml
```

#### Manual Installation

Dask can also be installed by using a series of K8S YAML files or by manually deploying and configuring pods. There is more information regarding these methods available [here](http://kubernetes.dask.org/en/latest/). This requires a Docker images with the `dask_kubernetes` library installed and a K8S service account with enough permissions to start and stop pods.


#### Installation using DeepOps

The [DeepOps](https://github.com/NVIDIA/deepops/blob/master/) project offers two different deployment methods for Dask. There is a [standalone deployment script](https://github.com/NVIDIA/deepops/blob/master/docs/rapids-dask.md) which wraps the helm install and there is a Kubeflow deployment script which will deploy Kubeflow along with the required Dask service accounts. The `supertetelman/k8s-rapids-dask:0.9-cuda10.0-runtime-ubuntu18.04` [Docker image](https://github.com/supertetelman/k8s-rapids-dask/blob/master/Dockerfile) contains all the Dask libaries and Jupyter plugins required to manage GPU resources and your Dask cluster through the Jupyter interface.

## Cluster Deployment

#### Jupyter Lab Extension
Dask Clusters can be deployed and managed using the Jupyter interface via the Dask extension. See the [Dask Jupyter Lab Extension GitHub](https://github.com/dask/dask-labextension) for additional usage and configuration information.

#### Helm
If you installed Dask using helm a scheduler and initial cluster was already deployed.

#### dask_kubernetes
Dask clusters can be created and managed using the Python `dask_kubernetes` libarrary.

First it is necessary to define a specification for our workers. This is typically done through a yaml file. It is best practice to use the same Docker image for the workers as this notebook and to allocate a single GPU for each Dask worker. It is also necessary to set the dask-worker resources args to match the K8S resources. Otherwise K8S might kill running Dask jobs.

In [1]:
import dask_kubernetes as dk
import time
from dask.distributed import Client

In [2]:
worker_spec_fname = '/worker_spec.yaml'
worker_spec = '''
# worker-spec.yml

kind: Pod
metadata:
  labels:
    foo : bar
spec:
  restartPolicy: Never
  containers:
  - image: supertetelman/k8s-rapids-dask:0.9-cuda10.0-runtime-ubuntu18.04
    imagePullPolicy: IfNotPresent
    args: [dask-worker, --nthreads, '1', --no-bokeh, --memory-limit, 6GB, --no-bokeh, --death-timeout, '60']
    name: dask
    resources:
      limits:
        cpu: "4"
        memory: 6G
        nvidia.com/gpu: 1
      requests:
        cpu: "4"
        memory: 6G
        nvidia.com/gpu: 1
'''

with open(worker_spec_fname, "w") as yaml_file:
    yaml_file.write(worker_spec)

In [None]:
cluster = dk.KubeCluster.from_yaml(worker_spec_fname)

### Connecting to a cluster

We can create a client that is connectected to the cluster we just created.

In [4]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.233.69.185:41311  Dashboard: http://10.233.69.185:38445/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


### Scaling a cluster

#### Manual Scaling
We can now manually scale up the cluster to contain N workers.

This can take some time to complete. The status can be dynamically monitored by executing `cluster` and watching the output. You can also manually check the status by executing `client`.

Be sure not to launch more workers than you have available resources in your cluster. Also be sure not to scale up/down the cluster while pods are still in a pending state. Doing either of these actions may require manually deleting pods through the Kubernetes interface.

In [5]:
cluster

VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [6]:
cluster.scale(1)

In [7]:
client

0,1
Client  Scheduler: tcp://10.233.69.185:41311  Dashboard: http://10.233.69.185:38445/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


It may take some time for the image to initially be downloaded onto you node and started.

In [8]:
time.sleep(5)
client

0,1
Client  Scheduler: tcp://10.233.69.185:41311  Dashboard: http://10.233.69.185:38445/status,Cluster  Workers: 1  Cores: 1  Memory: 6.00 GB


And we can scale up some more.

In [9]:
cluster.scale(2)
time.sleep(5)
client

0,1
Client  Scheduler: tcp://10.233.69.185:41311  Dashboard: http://10.233.69.185:38445/status,Cluster  Workers: 2  Cores: 2  Memory: 12.00 GB


And then we can scale the cluster all the way down to free up GPU resources.

In [10]:
cluster.scale(0)
time.sleep(5)
client

0,1
Client  Scheduler: tcp://10.233.69.185:41311  Dashboard: http://10.233.69.185:38445/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


#### Adaptive Scaling

It is also possible to set dask_kubernetes to use adapative scaling. This will cause pods to be started and stopped to meet demand.

Be sure to set the minimum to 1 or you may encounter timing issues that impact your machine learning code.

In [11]:
cluster.adapt(minimum=1, maximum=4)

<dask_kubernetes.adaptive.Adaptive at 0x7fbdb1cbeda0>

And then we scale the cluster all the way down to free up GPU resources.

In [12]:
cluster.adapt(minimum=0, maximum=4)

<dask_kubernetes.adaptive.Adaptive at 0x7fbdb1cbe6a0>

## Next Steps

Now that we know the basics of installing Dask into K8S, defining worker nodes, and scaling a Dask cluster we can build a machine learning pipeline to take advantage of the parallelism. 

In the next notebook we'll take a look at creating a single worker Dask workload and then see how easy it is to accelerate via scaling in Kubernetes.

Later, we'll also touch upon best-practices for sharing large volumes and storage across your Kubernetes cluster for Dask to use.