# Dask clusters

(the material is based on the notebook https://github.com/jrbourbeau/hacking-dask)

This notebook covers Dask's distributed clusters in detail.

## Cluster overview

In this section we'll discuss:

1. The different components which make up a Dask cluster
2. Survey different ways to launch a cluster

<img src="img/dask-cluster.svg" width="600">

### Components of a cluster

A Dask cluster is composed of three different types of objects:

1. **Scheduler**: A single, centralized scheduler process which responds to requests for computations, maintains relavant state about tasks and worker, and sends tasks to workers to be computed.
2. **Workers**: One or more worker processes which compute tasks and store/serve their results.
3. **Clients**: One or more client objects which are the user-facing entry point to interact with the cluster.

A couple of notes about workers:

- Each worker runs in its own Python process. Each worker Python process has its own `concurrent.futures.ThreadPoolExecutor` which is uses to compute tasks in parallel.
- There's actually a fourth cluster object which is often not discussed: the **Nanny**. By default Dask workers are launched and managed by a separate nanny process. This separate process allows workers to restart themselves if you want to use the `Client.restart` method, or to restart workers automatically if they get above a certain memory limit threshold.

#### Related Documentation

- [Cluster architecture](https://distributed.dask.org/en/latest/#architecture)
- [Journey of a task](https://distributed.dask.org/en/latest/journey.html)

## Deploying Dask clusters

Deploying a Dask cluster means launching scheduler, worker, and client processes and setting up the appropriate network connections so these processes can communicate with one another. Dask clusters can be lauched in a few different ways which we will discuss later.

### Manual setup

Launch a scheduler process using the `dask-scheduler` command line utility:

```terminal
$ dask-scheduler
2023-07-20 20:42:29,894 - distributed.scheduler - INFO - -----------------------------------------------
2023-07-20 20:42:30,432 - distributed.scheduler - INFO - State start
2023-07-20 20:42:30,438 - distributed.scheduler - INFO - -----------------------------------------------
2023-07-20 20:42:30,439 - distributed.scheduler - INFO - Clear task state
2023-07-20 20:42:30,440 - distributed.scheduler - INFO -   Scheduler at: tcp://192.168.150.50:8786
2023-07-20 20:42:30,440 - distributed.scheduler - INFO -   dashboard at:                     :8787
2023-07-20 20:42:31,810 - distributed.scheduler - INFO - Register worker <WorkerState 'tls://oksana-2eshadura-40cern-2ech.dask-worker.coffea-opendata.casa:8788', name: kubernetes-worker-50d255ef-ba72-4162-af66-23fdbf942bb3, status: undefined, memory: 0, processing: 0>
2023-07-20 20:42:31,813 - distributed.scheduler - INFO - Starting worker compute stream, tls://oksana-2eshadura-40cern-2ech.dask-worker.coffea-opendata.casa:8788
2023-07-20 20:42:31,813 - distributed.core - INFO - Starting established connection
```

and then launch several workers by using the `dask-worker` command and providing them the address of the scheduler they should connect to:

```terminal
$ dask-worker tcp://192.0.0.100:8786
2023-07-20 20:45:36,940 - distributed.worker - INFO -       Start worker at: tcp://192.168.150.50:38077
2023-07-20 20:45:36,941 - distributed.worker - INFO -          Listening to: tcp://192.168.150.50:38077
2023-07-20 20:45:36,941 - distributed.worker - INFO -          dashboard at:       192.168.150.50:34591
2023-07-20 20:45:36,941 - distributed.worker - INFO - Waiting to connect to:  tcp://192.168.150.50:8786
2023-07-20 20:45:36,941 - distributed.worker - INFO - -------------------------------------------------
2023-07-20 20:45:36,941 - distributed.worker - INFO -               Threads:                        112
2023-07-20 20:45:36,941 - distributed.worker - INFO -                Memory:                   4.00 GiB
2023-07-20 20:45:36,941 - distributed.worker - INFO -       Local Directory: /home/cms-jovyan/dask-worker-space/worker-q_4j0zc5
2023-07-20 20:45:36,941 - distributed.worker - INFO - -------------------------------------------------
2023-07-20 20:45:36,949 - distributed.worker - INFO -         Registered to:  tcp://192.168.150.50:8786
2023-07-20 20:45:36,949 - distributed.worker - INFO - -------------------------------------------------
2023-07-20 20:45:36,949 - distributed.core - INFO - Starting established connection
                         
```

## Cluster managers 

Dask has the notion of cluster manager objects. Cluster managers offer a consistent interface for common activities like adding/removing workers to a cluster, retrieving logs, etc.

<img src="img/dask-cluster-manager.svg" width="600">

### Dask LocalCluster

LocalCluster creates a "cluster" of a scheduler and workers running on the local machine.

Creating a cluster object will create a Dask scheduler and a number of Dask workers. If no arguments are specified then it will autodetect the number of CPU cores your system has and the amount of memory and create workers to appropriately fill that. You can also specify these arguments yourself.

In [8]:
from dask.distributed import LocalCluster
# Launch a scheduler and 4 workers on my local machine
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
cluster

Task exception was never retrieved
future: <Task finished name='Task-22' coro=<_wrap_awaitable() done, defined at /opt/conda/lib/python3.8/asyncio/tasks.py:688> exception=TypeError('TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?) Instead got None')>
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 309, in _
    await self.start()
  File "/opt/conda/lib/python3.8/site-packages/distributed/nanny.py", line 327, in start
    await self.listen(
  File "/opt/conda/lib/python3.8/site-packages/distributed/core.py", line 455, in listen
    listener = await listen(
  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 366, in listen
    return backend.get_listener(loc, handle_comm, deserialize, **kwargs)
  File "/opt/conda/lib/python3.8

AssertionError: Status.init

In [6]:
from dask.distributed import LocalCluster
LocalCluster??

[0;31mInit signature:[0m
[0mLocalCluster[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mname[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mn_workers[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mthreads_per_worker[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mprocesses[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mloop[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mstart[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mhost[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mip[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mscheduler_port[0m[0;34m=[0m[0;36m0[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msilence_logs[0m[0;34m=[0m[0;36m30[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mdashboard_address[0m[0;34m=[0m[0;34m':8787'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0m

In [7]:
# Scale up to 3 workers
cluster.scale(3)

NameError: name 'cluster' is not defined

In [12]:
# Scale down to 2 workers
cluster.scale(2)

In [8]:
# Retrieve cluster logs
cluster.get_logs()



In [9]:
# Shut down cluster
cluster.close()