### Introduction to Dask

<img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="20%"
     alt="Dask logo\">

Dask is an open-source Python library for parallel computing. Dask can scale Python code from multi-core local machines to large distributed clusters, without the code having to be altered a great deal. Dask provides a familiar user interface by mirroring the APIs of other libraries in Python, such as pandas and NumPy. It also enables programmers to run custom algorithms in parallel. It does so by creating lazy, delayed objects. These objects hold the "recipe" of how to compute a certain task, without actually performing the computation. These objects are saved in memory as task graphs, wich can be visualized to get a better handle on the actual parallelisaton that is taking place. To actually get the desired numerical result, the task graph has to be passed on to a task scheduler which then performs the computation in parallel.
<br>
<br>
<br>
    
<img src="images/dask-overview.svg" align="center" width="60%">

High level collections are used to generate task graphs which can be executed by schedulers on a single machine or a cluster. Source: https://docs.dask.org/en/stable/10-minutes-to-dask.html

## Dask cluster overview

In this section we'll discuss:

1. The different components which make up a Dask cluster
2. Survey different ways to launch a cluster

### Components of a Dask cluster

A Dask cluster is composed of three different types of objects:

1. **Scheduler**: A single, centralized scheduler process which responds to requests for computations, maintains relavant state information about tasks and workers, and sends tasks to workers to be computed.
2. **Workers**: One or more worker processes which compute tasks and store their results.
3. **Clients**: Client objects are the user-facing entry point to interact with the cluster.

<img src="./images/dask-cluster.png"
     width="70%"
     alt="Dask components\">
     

### Nodes on an HPC cluster

An HPC cluster has login nodes on wich users prepare their job scripts etc. without performing expensive computations. For that, an HPC cluster has compute nodes which are allocated by the resource manager (in this case PBS). To access the login nodes you need to SSH from the command line.
At some clusters (like the Vienna Scientific Cluster, VSC), you can access a compute node directly via JupyterHub. VCS users can get resources for JupyterHub of one node max. To get more compute power they need to submit a job script, just as on a login node.
<br>
<br>
<br>
<img src="./images/VSC_Cluster.png"
     width=50%
     alt="Nodes of VSC\"
     align="center">
<br>
<br>
    

### Setting up the ports for communication

This step is necessary, since we are many users each trying to setup our own Dask client to launch workers from and monitor them with the diagnostic tools the Dask dashboard offers. Since we run into trouble when some of us are using the same port, we came up with the Python code below, to ensure we are not getting in each other's way. You do not need this step when working on your local machine.

In [None]:
pip install graphviz

In [None]:
import os
import dask
import dask.config
import dask.distributed 

In [None]:
USER = os.environ.get("USER")
if USER.startswith("dd-23-22-"):
    user_number = int(USER.replace("dd-23-22-", ""))
else:
    user_number = 100

# We all need different ports for our dashboards and schedulers.
DASHBOARD_PORT = 45000 + user_number 
SCHEDULER_PORT = 46000 + user_number
#####

print('dashboard port:', DASHBOARD_PORT)
print('scheduler port:', SCHEDULER_PORT)

#dask.config.set({'temporary_directory': f'/tmp/dask-{USER}'})
#print('temp dir: ', dask.config.get("temporary_directory"))

#dask.config.set({"distributed.dashboard.link": f"/user/{USER}/proxy/{DASHBOARD_PORT}/status"})
#print('dashboard link: ', dask.config.get("distributed.dashboard.link"))

### Distributed scheduler on a local machine

The `dask.distributed` system is composed of a single centralized scheduler and one or more worker processes. [Deploying](https://docs.dask.org/en/latest/setup.html) a remote Dask cluster involves some additional effort. But doing things locally it just involves creating a `Client` object, which lets you interact with the "cluster" (local threads or processes on your machine). For more information see [here](https://docs.dask.org/en/latest/setup/single-distributed.html). 

You could create a Dask cluster on your own PC, however, in this case we are on a node running VSC's JupyterHub. With LocalCluster, you only use the resources avaliable to you on the specific node you are on. If you want to leverage the compute power of the entire VSC by submitting jobs to SLURM (resource manager on VSC), you need Dask's SLURMCluster (see further down). 

In [None]:
from dask.distributed import LocalCluster

# Launch a scheduler and 4 workers on my local machine
cluster = LocalCluster(n_workers=4, threads_per_worker=2, scheduler_port=SCHEDULER_PORT, dashboard_address=DASHBOARD_PORT)
cluster

In [None]:
from dask.distributed import Client

client = Client(cluster)

In [None]:
client

In [None]:
# Scale up to 10 workers
cluster.scale(10)

In [None]:
# Scale down to 2 workers
cluster.scale(1)

In [None]:
# Retrieve cluster logs
cluster.get_logs()

In [None]:
# Caution is required when scaling on a cluster.
# Dask might terminate a worker at a memory use which is too high.
# To change that, modify ~/.config/dask/distributed.yaml accordingly.

from dask.distributed import system

system.MEMORY_LIMIT/1024/1024/1024

In [None]:
if client:
    client.close()
    client = None

In [None]:
if cluster:
    cluster.close()
    cluster = None

### Distributed scheduler on a remote machine

There are several projects in the Dask ecosystem for easily deploying clusters on commonly used computing resources:

- [Dask-Kubernetes](https://kubernetes.dask.org/en/latest/) for deploying Dask using native Kubernetes APIs
- [Dask-Cloudprovider](https://cloudprovider.dask.org/en/latest/) for deploying Dask clusters on various cloud platforms (e.g. AWS, GCP, Azure, etc.)
- [Dask-Yarn](https://yarn.dask.org/en/latest/) for deploying Dask on YARN clusters
- [Dask-MPI](http://mpi.dask.org/en/latest/) for deploying Dask on existing MPI environments
- [Dask-Jobqueue](https://jobqueue.dask.org/en/latest/) for deploying Dask on job queuing systems (e.g. PBS, Slurm, etc.)

Launching clusters with any of these projects follows a similar pattern as using Dask's built-in `LocalCluster`:

```python
# Launch a Dask cluster on a Kubernetes cluster
from dask_kubernetes import KubeCluster
cluster = KubeCluster(...)

# Launch a Dask cluster on AWS Fargate
from dask_cloudprovider.aws import FargateCluster
cluster = FargateCluster(...)

# Launch a Dask cluster on a Slurm job queueing system
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(...)
```

#### Related Documentation

- [Cluster setup](https://docs.dask.org/en/latest/setup.html)

Now we need to specify the hardware we want for our cluster. Note that the cores and memory are just what you want from one node. Later we will scale that to our requirements. As we would like to get through the PBS queue as quickly as possible, it makes sense to use fewer cores and memory and set a shorter walltime for each worker, but then scale the number of workers accordingly.

In [None]:
if client:
    client.close()
    client = None

In [None]:
if cluster:
    cluster.close()
    cluster = None

In [None]:
!pip install dask_jobqueue

In [None]:
from dask_jobqueue import PBSCluster

In [None]:
#PBSCluster?

In [None]:
cluster = PBSCluster(cores=64,
                     memory = "4 GB",
                     queue = "qcpu",
                     account="DD-23-22",
                     walltime="00:05:00",
                     scheduler_options={
                           "interface": "ib0",
                           "dashboard_address": f":{DASHBOARD_PORT}",
                           "port": SCHEDULER_PORT})

In [None]:
print(cluster.job_script())

Unfortunately this does not work out of out of a singularity container.

In [None]:
!qstat -u $USER

In [None]:
cluster.scale(4)

In [None]:
!qstat -u $USER

In [None]:
from dask.distributed import Client

In [None]:
client = Client(cluster)

Most Dask deployments are static with a single scheduler and a fixed number of workers. This results in predictable behavior, but is wasteful of resources in two situations:

* The user may not be using the cluster, or perhaps they are busy interpreting a recent result or plot, and so the workers sit idly, taking up valuable shared resources from other potential users

* The user may be very active, and is limited by their original allocation.

Adaptive deployments are particularly helpful for interactive workloads, which are characterized by long periods of inactivity interrupted with short bursts of heavy activity. They can result in both faster analyses that give users much more power, but with much less pressure on computational resources.

<br>
<br>
<br>
<img src="images/dask-adaptive.svg"
     width=50%
     alt="Nodes of VSC\"
     align="center">
<br>

In [None]:
cluster.adapt(minimum_jobs=0, maximum_jobs=60)  # scale between 0 and 20 workers

In [None]:
!qstat -u $USER # It might take some time for the jobs to become visible.

### Executing with the distributed client
Let's see what our dask cluster can achieve with this computationally challenging calculation. Note that you might have to wait until sufficient workers are running in the queue.

In [None]:
import dask.array as da

In [None]:
cluster.scale(1)

In [None]:
x = da.random.random((10_000,10_000,10), chunks=(1000,1000,5))
y = da.random.random((10_000,10_000,10), chunks=(1000,1000,5))
z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1,))

In [None]:
x

In [None]:
z

In [None]:
%%time
z.compute()

In [None]:
z.dask.visualize() # High level graph. Low level graph with z.visualize(), but this would be overwhelming

By default, creating a `Client` makes it the default scheduler. Any calls to `.compute` will use the cluster your `client` is attached to, unless you specify otherwise, as above.


In [None]:
cluster.close()
client.shutdown()

Since some of you will be dealing with the SLURM scheduler on other HPC clusters, the code below shows you how you could set up a SLURMCluster:
```python
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(queue="skylake_0096", # this is the partition we want to use
                       account="p70824", # this is the account used for billing
                       cores=16,          # number of cores per SLURM job
                       processes=4,      # number of python dask-worker processes per SLURM job
                       name=f'{USER}-worker',  # custom name of workers
                       memory="4GB",     # memory one SLURM job should have avaliable (will be divided by number of worker processes in job)
                       walltime="00:05:00",
                       interface="ib0",  # ib0 is infiniband, the fast network connection
                       scheduler_options={
                           "interface": "ib0",
                           "dashboard_address": f":{DASHBOARD_PORT}",
                           "port": SCHEDULER_PORT
                       },
                       # worker_extra_args=["--lifetime", "4m", "--lifetime-stagger", "2m"],
                       # lifetime ensures that workers will be properly shut down before the scheduling system kills them, and all their states moved.
                       # lifetime-stagger will prevent workers from terminating at the same time, thus ease rebalancing tasks and scheduling burden.
                       job_directives_skip=[
                           '-J dask-worker',
                       ],
                       job_extra_directives=[
                           f'--job-name={USER}-worker',
                           '--qos=skylake_0096',
                           #'--reservation=training', # only to be used during this training                          
                       ]) 
print(cluster.job_script()) # this is turned into a job script
```

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)