# Dask clusters

(the material is based on the notebook https://github.com/jrbourbeau/hacking-dask)

This notebook covers Dask's distributed clusters in detail.

## Cluster overview

In this section we'll discuss:

1. The different components which make up a Dask cluster
2. Survey different ways to launch a cluster

<img src="img/dask-cluster.svg" width="600">

### Components of a cluster

A Dask cluster is composed of three different types of objects:

1. **Scheduler**: A single, centralized scheduler process which responds to requests for computations, maintains relavant state about tasks and worker, and sends tasks to workers to be computed.
2. **Workers**: One or more worker processes which compute tasks and store/serve their results.
3. **Clients**: One or more client objects which are the user-facing entry point to interact with the cluster.

A couple of notes about workers:

- Each worker runs in its own Python process. Each worker Python process has its own `concurrent.futures.ThreadPoolExecutor` which is uses to compute tasks in parallel.
- There's actually a fourth cluster object which is often not discussed: the **Nanny**. By default Dask workers are launched and managed by a separate nanny process. This separate process allows workers to restart themselves if you want to use the `Client.restart` method, or to restart workers automatically if they get above a certain memory limit threshold.

#### Related Documentation

- [Cluster architecture](https://distributed.dask.org/en/latest/#architecture)
- [Journey of a task](https://distributed.dask.org/en/latest/journey.html)

## Deploying Dask clusters

Deploying a Dask cluster means launching scheduler, worker, and client processes and setting up the appropriate network connections so these processes can communicate with one another. Dask clusters can be lauched in a few different ways which we will discuss later.

### Manual setup

Launch a scheduler process using the `dask-scheduler` command line utility:

```terminal
$ dask-scheduler
2023-07-20 20:42:29,894 - distributed.scheduler - INFO - -----------------------------------------------
2023-07-20 20:42:30,432 - distributed.scheduler - INFO - State start
2023-07-20 20:42:30,438 - distributed.scheduler - INFO - -----------------------------------------------
2023-07-20 20:42:30,439 - distributed.scheduler - INFO - Clear task state
2023-07-20 20:42:30,440 - distributed.scheduler - INFO -   Scheduler at: tcp://192.168.150.50:8786
2023-07-20 20:42:30,440 - distributed.scheduler - INFO -   dashboard at:                     :8787
2023-07-20 20:42:31,810 - distributed.scheduler - INFO - Register worker <WorkerState 'tls://oksana-2eshadura-40cern-2ech.dask-worker.coffea-opendata.casa:8788', name: kubernetes-worker-50d255ef-ba72-4162-af66-23fdbf942bb3, status: undefined, memory: 0, processing: 0>
2023-07-20 20:42:31,813 - distributed.scheduler - INFO - Starting worker compute stream, tls://oksana-2eshadura-40cern-2ech.dask-worker.coffea-opendata.casa:8788
2023-07-20 20:42:31,813 - distributed.core - INFO - Starting established connection
```

and then launch several workers by using the `dask-worker` command and providing them the address of the scheduler they should connect to:

```terminal
$ dask-worker tcp://192.0.0.100:8786
2023-07-20 20:45:36,940 - distributed.worker - INFO -       Start worker at: tcp://192.168.150.50:38077
2023-07-20 20:45:36,941 - distributed.worker - INFO -          Listening to: tcp://192.168.150.50:38077
2023-07-20 20:45:36,941 - distributed.worker - INFO -          dashboard at:       192.168.150.50:34591
2023-07-20 20:45:36,941 - distributed.worker - INFO - Waiting to connect to:  tcp://192.168.150.50:8786
2023-07-20 20:45:36,941 - distributed.worker - INFO - -------------------------------------------------
2023-07-20 20:45:36,941 - distributed.worker - INFO -               Threads:                        112
2023-07-20 20:45:36,941 - distributed.worker - INFO -                Memory:                   4.00 GiB
2023-07-20 20:45:36,941 - distributed.worker - INFO -       Local Directory: /home/cms-jovyan/dask-worker-space/worker-q_4j0zc5
2023-07-20 20:45:36,941 - distributed.worker - INFO - -------------------------------------------------
2023-07-20 20:45:36,949 - distributed.worker - INFO -         Registered to:  tcp://192.168.150.50:8786
2023-07-20 20:45:36,949 - distributed.worker - INFO - -------------------------------------------------
2023-07-20 20:45:36,949 - distributed.core - INFO - Starting established connection
                         
```

## Cluster managers 

Dask has the notion of cluster manager objects. Cluster managers offer a consistent interface for common activities like adding/removing workers to a cluster, retrieving logs, etc.

<img src="img/dask-cluster-manager.svg" width="600">

### Dask LocalCluster

LocalCluster creates a "cluster" of a scheduler and workers running on the local machine.

Creating a cluster object will create a Dask scheduler and a number of Dask workers. If no arguments are specified then it will autodetect the number of CPU cores your system has and the amount of memory and create workers to appropriately fill that. You can also specify these arguments yourself.

In [1]:
import dask

import dask.distributed  # populate config with distributed defaults

dask.config.get("distributed.comm.require-encryption")

True

In [2]:
dask.config.set({'distributed.comm.require-encryption': False})

<dask.config.set at 0x7fde8b06ef20>

In [3]:
dask.config.get("distributed.comm.require-encryption")

False

In [12]:
from dask.distributed import LocalCluster
# Launch a scheduler and 4 workers on my local machine
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
cluster

Perhaps you already have a cluster running?
Hosting the HTTP server on port 33695 instead
  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core.computation.check import NUMEXPR_INSTALLED


0,1
Dashboard: /user/oksana.shadura@cern.ch/proxy/33695/status,Workers: 4
Total threads: 4,Total memory: 8.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45869,Workers: 4
Dashboard: /user/oksana.shadura@cern.ch/proxy/33695/status,Total threads: 4
Started: Just now,Total memory: 8.00 GiB

0,1
Comm: tcp://127.0.0.1:40377,Total threads: 1
Dashboard: /user/oksana.shadura@cern.ch/proxy/45147/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:35791,
Local directory: /tmp/dask-scratch-space/worker-4x3azghg,Local directory: /tmp/dask-scratch-space/worker-4x3azghg

0,1
Comm: tcp://127.0.0.1:42989,Total threads: 1
Dashboard: /user/oksana.shadura@cern.ch/proxy/37633/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:46209,
Local directory: /tmp/dask-scratch-space/worker-zmi52cyj,Local directory: /tmp/dask-scratch-space/worker-zmi52cyj

0,1
Comm: tcp://127.0.0.1:44545,Total threads: 1
Dashboard: /user/oksana.shadura@cern.ch/proxy/34089/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:38589,
Local directory: /tmp/dask-scratch-space/worker-15tvl__p,Local directory: /tmp/dask-scratch-space/worker-15tvl__p

0,1
Comm: tcp://127.0.0.1:42795,Total threads: 1
Dashboard: /user/oksana.shadura@cern.ch/proxy/39881/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:34495,
Local directory: /tmp/dask-scratch-space/worker-clkd9pjs,Local directory: /tmp/dask-scratch-space/worker-clkd9pjs


In [None]:
LocalCluster??

In [14]:
# Retrieve cluster logs
cluster.get_logs()

In [15]:
from dask.distributed import Client
client = Client(cluster)

In [None]:
cluster.close()
client.close()

Dask works well at many scales ranging from a single machine to clusters of many machines. In our case we provide each user already preconfigured resource ready to be scale.

### Dask-jobqueue

The Dask-jobqueue project makes it easy to deploy Dask on common job queuing systems typically found in high performance supercomputers, academic research institutions, and other clusters. It provides a convenient interface that is accessible from interactive systems like Jupyter notebooks, or batch jobs.

Launching clusters will follows a similar pattern as using Dask's built-in `LocalCluster`:

```python

# Launch a Dask cluster on a HTCondor job queueing system [For this you will need HTCondor related configurations]
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(...)


# Launch a Dask cluster on a SLURM job queueing system [For this you will need SLURM related configurations]
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(...)


# Launch a Dask cluster on a PBS job queueing system [For this you will need PBS related configurations]
from dask_jobqueue import PBSCluster
cluster = PBSCluster(...)


# Launch a Dask cluster on a Kubernetes cluster [For this you will need Kubernetes related configurations]
from dask_kubernetes import KubeCluster
cluster = KubeCluster(...)

```

### CoffeaCasaCluster

The "scale out" process at Coffea-Casa Analysis Facility is accomplished by using custom dask-jobqueue class that helps easily deploy Dask worker over UNL Tier-2 HTCondor batch queue or Kubernetes cluster available at UNL.

The dask Client is the primary entry point for users of `dask.distributed`.

We pre-configured a Dask cluster for you automatically, and you just need to initialize a Client by pointing it to the address of a Scheduler (in coffea-casa it is always `tls://localhost:8786`):

In [17]:
from dask.distributed import Client

client = Client("tls://localhost:8786")
client

0,1
Connection method: Direct,
Dashboard: /user/oksana.shadura@cern.ch/proxy/35575/status,

0,1
Comm: tls://192.168.121.81:8786,Workers: 0
Dashboard: /user/oksana.shadura@cern.ch/proxy/35575/status,Total threads: 0
Started: Just now,Total memory: 0 B
