# Jupyter + Dask :  local vs cluster

This notebook shows these features : 
* How to start a Dask cluster from a notebook on TREX
* How to start a local Dask cluster (simulation of a computer)
* How to manipulate time series with the dask.dataframe module
* How to use the dataframes and the client API to submit a serie of long operations (>100 ms)
* The code stay the same between a local cluster and a cluster on TREX

## Choosing a Dask subcluster

We can chose to use two types of Dask subcluster : 
* A local cluster with 4 cores, simulating a computer
* A Slurm cluster connected to the TREX cluster

If you decide to use the local cluster, run the cell that defines the Localcluster.

If you decide to use the Slurm cluster, run the cell that defines the SlurmCluster. 
The following cells can be called if you want to increase the size of the Dask cluster or make the size adaptive.

## Start and configure a local cluster

We start a local cluster with a configuration similar to a 4-core PC.

In [20]:
from distributed import LocalCluster, Client

cluster = LocalCluster(n_workers=1, threads_per_worker=4)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 37569 instead


In [21]:
!squeue --me

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          22860805   cpu2022 jupyterh  usseglg  R    3:31:25      1 trex092
          22856454      visu usseglg-  usseglg  R    4:09:07      1 trexvisu03


## Start and configure a Slurm Cluster

Thanks to the dask_jobqueue module, it is possible to start a Dask sub-cluster on the TREX cluster, with a few lines from a notebook.

### Initial imports and creation of the cluster

Here we import the mainly used classes, then we will create a cluster on Slurm.
This cluster will be composed of Dask workers, launched via Slurm jobs. Each Slurm job will use 4 ncpus and 32 GB of memory in Slurm, and will consist of 1 workers. This is what is defined in the constructor below.  
Then, each job will be launched independently on request from us or automatically if we have indicated it.

In [22]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, progress

account_Trex = 'supporthpc'

cluster = SLURMCluster(
    # Dask-worker specific keywords
    n_workers=4,                # start 12 workers
    cores=4,                    # each worker runs on 4 cores
    memory="32GB",              # each worker uses 32GB memory (on TREX g2022 : nb_cores*8Go, on g2019 : nb_cores*4.6Go )
    processes=1,                # Number of Python processes to cut up each job
    local_directory='$TMPDIR',  # Location to put temporary data if necessary
    account=account_Trex,
    walltime='01:00:00',
    interface='ib0',
    log_directory='../dask-logs',
    job_extra_directives=[]
    #job_extra_directives=['--qos="cpu_2022_1280"'],)         # qos to use
)


Perhaps you already have a cluster running?
Hosting the HTTP server on port 39611 instead


We can print the job script equivalent to this cluster.

In [23]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e ../dask-logs/dask-worker-%J.err
#SBATCH -o ../dask-logs/dask-worker-%J.out
#SBATCH -A supporthpc
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=30G
#SBATCH -t 01:00:00

/work/softs/rh8/conda-envs/pangeo_stable/bin/python -m distributed.cli.dask_worker tcp://10.11.2.192:42693 --name dummy-name --nthreads 4 --memory-limit 29.80GiB --nanny --death-timeout 60 --local-directory $TMPDIR --interface ib0



By displaying the cluster in the notebook, we should see a widget that allows us to vary the size of our cluster manually, or automatically (Adaptive cluster). We can also configure this via lines of codes as shown below.

In [36]:
cluster

0,1
Dashboard: http://10.11.2.192:39611/status,Workers: 8
Total threads: 32,Total memory: 238.40 GiB

0,1
Comm: tcp://10.11.2.192:42693,Workers: 8
Dashboard: http://10.11.2.192:39611/status,Total threads: 32
Started: 2 minutes ago,Total memory: 238.40 GiB

0,1
Comm: tcp://10.11.2.187:34363,Total threads: 4
Dashboard: http://10.11.2.187:37047/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.187:39075,
Local directory: /tmp/slurm-22889792/dask-scratch-space/worker-6bbghry6,Local directory: /tmp/slurm-22889792/dask-scratch-space/worker-6bbghry6

0,1
Comm: tcp://10.11.2.189:38329,Total threads: 4
Dashboard: http://10.11.2.189:40101/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.189:41921,
Local directory: /tmp/slurm-22889798/dask-scratch-space/worker-wrpcaitc,Local directory: /tmp/slurm-22889798/dask-scratch-space/worker-wrpcaitc

0,1
Comm: tcp://10.11.2.190:34505,Total threads: 4
Dashboard: http://10.11.2.190:44067/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.190:43403,
Local directory: /tmp/slurm-22889804/dask-scratch-space/worker-nv2io07i,Local directory: /tmp/slurm-22889804/dask-scratch-space/worker-nv2io07i

0,1
Comm: tcp://10.11.2.190:39843,Total threads: 4
Dashboard: http://10.11.2.190:40055/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.190:39831,
Local directory: /tmp/slurm-22889801/dask-scratch-space/worker-gj2ocy2k,Local directory: /tmp/slurm-22889801/dask-scratch-space/worker-gj2ocy2k

0,1
Comm: tcp://10.11.2.189:44559,Total threads: 4
Dashboard: http://10.11.2.189:43137/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.189:45447,
Local directory: /tmp/slurm-22889794/dask-scratch-space/worker-753w4du5,Local directory: /tmp/slurm-22889794/dask-scratch-space/worker-753w4du5

0,1
Comm: tcp://10.11.2.189:34881,Total threads: 4
Dashboard: http://10.11.2.189:37181/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.189:33713,
Local directory: /tmp/slurm-22889800/dask-scratch-space/worker-s5f1s8r8,Local directory: /tmp/slurm-22889800/dask-scratch-space/worker-s5f1s8r8

0,1
Comm: tcp://10.11.2.189:34879,Total threads: 4
Dashboard: http://10.11.2.189:34437/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.189:44091,
Local directory: /tmp/slurm-22889796/dask-scratch-space/worker-iuvmjxoz,Local directory: /tmp/slurm-22889796/dask-scratch-space/worker-iuvmjxoz

0,1
Comm: tcp://10.11.2.190:33863,Total threads: 4
Dashboard: http://10.11.2.190:36979/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.190:42525,
Local directory: /tmp/slurm-22889802/dask-scratch-space/worker-6azrqmwx,Local directory: /tmp/slurm-22889802/dask-scratch-space/worker-6azrqmwx


Manual specification of cluster size. The given parameter is the number of Dask workers.

In [32]:
cluster.scale(8)

Indicate that we want an adaptive cluster (the size will vary depending on the load).

Launching the next (or previous) cell should change the information displayed in the widget.

In [34]:
cluster.adapt(minimum=4, maximum=12)

<distributed.deploy.adaptive.Adaptive at 0x14c9ede06210>

In [41]:
!squeue --me

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          22889794   cpu2022 dask-wor  usseglg CG       2:18      1 trex089
          22889797   cpu2022 dask-wor  usseglg CG       2:05      1 trex089
          22889799   cpu2022 dask-wor  usseglg CG       2:05      1 trex089
          22889801   cpu2022 dask-wor  usseglg CG       2:18      1 trex090
          22889802   cpu2022 dask-wor  usseglg CG       2:18      1 trex090
          22889803   cpu2022 dask-wor  usseglg CG       2:05      1 trex090
          22889804   cpu2022 dask-wor  usseglg CG       2:18      1 trex090
          22889792   cpu2022 dask-wor  usseglg  R       3:16      1 trex087
          22889796   cpu2022 dask-wor  usseglg  R       3:16      1 trex089
          22889798   cpu2022 dask-wor  usseglg  R       3:16      1 trex089
          22889800   cpu2022 dask-wor  usseglg  R       3:16      1 trex089
          22860805   cpu2022 jupyterh  usseglg  R    3:34:52      1 trex092
   

## Client creation

In order for all the dask APIs (dataframe, delayed, bag ...) to use the Dask cluster that we have started, it is essential to initialize a client.

This client can also be used to submit tasks in the remainder of this example.

Showing the client should show the current cluster size and give a link to the Dask Dashboard. This link does not necessarily work from your browser. Nevertheless, a proxy technology has been deployed, so you should be able to access the Dask dashboard through the following URL:

https://jupyterhub.sis.cnes.fr/user/username/proxy/8787/status

The port can be different from the default port which is 8787.

In [42]:
from dask.distributed import Client

client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.11.2.192:39611/status,

0,1
Dashboard: http://10.11.2.192:39611/status,Workers: 4
Total threads: 16,Total memory: 119.20 GiB

0,1
Comm: tcp://10.11.2.192:42693,Workers: 4
Dashboard: http://10.11.2.192:39611/status,Total threads: 16
Started: 3 minutes ago,Total memory: 119.20 GiB

0,1
Comm: tcp://10.11.2.187:34363,Total threads: 4
Dashboard: http://10.11.2.187:37047/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.187:39075,
Local directory: /tmp/slurm-22889792/dask-scratch-space/worker-6bbghry6,Local directory: /tmp/slurm-22889792/dask-scratch-space/worker-6bbghry6

0,1
Comm: tcp://10.11.2.189:38329,Total threads: 4
Dashboard: http://10.11.2.189:40101/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.189:41921,
Local directory: /tmp/slurm-22889798/dask-scratch-space/worker-wrpcaitc,Local directory: /tmp/slurm-22889798/dask-scratch-space/worker-wrpcaitc

0,1
Comm: tcp://10.11.2.189:34881,Total threads: 4
Dashboard: http://10.11.2.189:37181/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.189:33713,
Local directory: /tmp/slurm-22889800/dask-scratch-space/worker-s5f1s8r8,Local directory: /tmp/slurm-22889800/dask-scratch-space/worker-s5f1s8r8

0,1
Comm: tcp://10.11.2.189:34879,Total threads: 4
Dashboard: http://10.11.2.189:34437/status,Memory: 29.80 GiB
Nanny: tcp://10.11.2.189:44091,
Local directory: /tmp/slurm-22889796/dask-scratch-space/worker-iuvmjxoz,Local directory: /tmp/slurm-22889796/dask-scratch-space/worker-iuvmjxoz


## Using the Client API to Submit Simulations

A typical use case of the cluster: the submission of a complex calculation (from a few seconds to several minutes) on a different set of input parameters. What we can see by doing Monte Carlo for example, but in other cases too.

The principle is therefore to generate or read all the parameters to be used for the calculation, then to launch this calculation for each set of parameters. This demo is of course simplified, the calculation function is a pure python function.

### Generating/reading input data

We consider here that the input parameters are read in a pandas dataframe, which is able to read CSV files, but it is also possible to generate one ourselves as below (but with more interesting data ...).  
We go on 1000 simulations here, you are free to modify that!

In [43]:
#Generates parameters 
import pandas as pd
import numpy as np
#We generate random params, but could do this with some intelligence, or read it from a csv file
input_params = pd.DataFrame(np.random.randint(low=0, high=1000, size=(1000, 4)),
               columns=['a', 'b', 'c', 'd'])
input_params.head()

Unnamed: 0,a,b,c,d
0,11,537,527,854
1,719,624,117,134
2,442,268,710,817
3,625,37,931,220
4,259,818,787,355


### Definition of the calculation method


The method below simply simulates a calculation lasting between 0 and 2 seconds. We can of course adapt it, and possibly call a much longer external process. Note that it is important to return the result via python, and not in a file!

In [44]:
# Launch a task on all of this params, dont wait for result
def my_costly_simulation(line):
    #print(line)
    import time
    import random
    time.sleep(random.random() * 2)
    return sum(line)

### Submission of calculation

All that remains is to submit and possibly wait for the end of the calculation.  
The submission is not blocking, the calculation will be carried out in the background on the Dask cluster, do not hesitate to open the Dask Dashboard application to see the progress, and possibly the elasticity of the cluster if you are in adaptive mode.

In [45]:
futures = client.map(my_costly_simulation, input_params.values)

You can also look at the jobs in progress on the cluster, and see their variation by executing the following cell several times.
Feel free to run the above simulation several times to experiment. It may be necessary to rerun the simulation function cell, otherwise Dask may assume that the same calculation is being asked of it, and do nothing.

In [49]:
!squeue --me

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
          22890603   cpu2022 dask-wor  usseglg CG       0:29      1 trex075
          22890608   cpu2022 dask-wor  usseglg CG       0:30      1 trex076
          22889794   cpu2022 dask-wor  usseglg CG       2:18      1 trex089
          22889797   cpu2022 dask-wor  usseglg CG       2:05      1 trex089
          22889799   cpu2022 dask-wor  usseglg CG       2:05      1 trex089
          22889800   cpu2022 dask-wor  usseglg CG       4:12      1 trex089
          22890609   cpu2022 dask-wor  usseglg  R       0:33      1 trex076
          22889792   cpu2022 dask-wor  usseglg  R       4:13      1 trex087
          22889796   cpu2022 dask-wor  usseglg  R       4:13      1 trex089
          22889798   cpu2022 dask-wor  usseglg  R       4:13      1 trex089
          22860805   cpu2022 jupyterh  usseglg  R    3:35:49      1 trex092
          22856454      visu usseglg-  usseglg  R    4:13:31      1 trexvisu03


### Gathering the results

For now, the results are stored on the Dask workers, we can retrieve them using the gather method.
Here, we also make sure to merge input parameters and results into a nice table, which we can save in tabular format: CSV, HDF5 ...

In [48]:
# Block until result, and add the column to initial input tables
results = client.gather(futures)
output = input_params.copy()
output['result'] = pd.Series(results, index=output.index)
output.sample(5)

Unnamed: 0,a,b,c,d,result
13,624,168,590,119,1501
399,461,301,369,295,1426
545,65,435,406,830,1736
788,893,856,223,520,2492
443,172,841,298,585,1896


In [50]:
client.close()
cluster.close()