# Dask demo on Slurm cluster

I have [miniconda](https://docs.conda.io/en/latest/miniconda.html) in my home folder, so I used that to set up a conda env,

```
conda create -n jlabdask python jupyterlab numpy matplotlib -y
conda activate jlabdask
conda install dask distributed -c conda-forge
pip install -U click==8.0.2
```
To ensure the jupyter lab continues when I'm not connected, I start it in a tmux pane (you could use screen as well)
```
tmux
```
then inside tmux
```
source activate ~/miniconda3/bin/activate
conda activate jlabdask
jupyter lab
```
Look at the port number it chooses, then forward it over your SSH connection (replace 4288 by the number jupyter shows)
```
ssh -L 4288:localhost:4288 me@door
```
and open the lab URL in your browser, then we can get started

In [1]:
%pylab
import dask, os, time
from dask.distributed import Client

Using matplotlib backend: <object object at 0x7fe15460a150>
%pylab is deprecated, use %matplotlib inline and import the required libraries.
Populating the interactive namespace from numpy and matplotlib


# Setting up

We first need to start a scheduler on door, we can just check we don't have one running already,

In [3]:
!ps aux | grep dask-sched

marmadu+  398999  0.0  0.0  13056  3012 pts/15   Ss+  13:48   0:00 /bin/bash -c ps aux | grep dask-sched
marmadu+  399001  0.0  0.0  12136  1104 pts/15   R+   13:48   0:00 grep dask-sched


In [68]:
os.system('dask-scheduler --host=10.11.7.58 &> scheduler.log &')
time.sleep(5)

In [71]:
!tail scheduler.log

distributed.scheduler - INFO - -----------------------------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://10.11.7.58:8786
distributed.scheduler - INFO -   dashboard at:           10.11.7.58:8787


In [70]:
!ps ux | grep dask-scheduler

marmadu+  409653  5.2  0.1 464728 115128 ?       Sl   15:06   0:01 /home/marmaduke.woodman/miniconda3/envs/jlabdask/bin/python3.10 /home/marmaduke.woodman/miniconda3/envs/jlabdask/bin/dask-scheduler --host=10.11.7.58
marmadu+  409711  0.0  0.0  13056  3080 pts/15   Ss+  15:07   0:00 /bin/bash -c ps ux | grep dask-scheduler
marmadu+  409713  0.0  0.0  12136  1064 pts/15   S+   15:07   0:00 grep dask-scheduler


Once the scheduler is running, the client is the interface,

In [72]:
sched_url = 'tcp://10.11.7.58:8786'
client = Client(sched_url)
client

0,1
Connection method: Direct,
Dashboard: http://10.11.7.58:8787/status,

0,1
Comm: tcp://10.11.7.58:8786,Workers: 0
Dashboard: http://10.11.7.58:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


Our scheduler doesn't have any workers yet, we schedule some via Slurm.  First have a look what's idle,

In [73]:
!sinfo -p run | grep idle

run*         up 7-00:00:00     14   idle c[1-4],m1,w[03,06,10-13,17-18,20] 


Then start the workers, asking Slurm to ensure 4G memory available to each

In [74]:
n_workers = 16
os.system(f'srun --mem-per-cpu 4G -p runcores -n {n_workers} dask-worker --nworkers 1 {sched_url} &> workers.log &')

0

Now we can check Slurm and `workers.log` to see workers are active.  If the number of workers requested is yet satisfied, wait a few seconds and re-evaluate the cell to check.  Or check `sinfo`, the nodes may not be available to satisfy that number of workers.

In [75]:
!squeue -u $(whoami)
!grep Register workers.log | nl

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON) 
           2722407  runcores dask-wor marmaduk  R       0:05      2 w[17-18] 
     1	distributed.worker - INFO -         Registered to:      tcp://10.11.7.58:8786
     2	distributed.worker - INFO -         Registered to:      tcp://10.11.7.58:8786
     3	distributed.worker - INFO -         Registered to:      tcp://10.11.7.58:8786
     4	distributed.worker - INFO -         Registered to:      tcp://10.11.7.58:8786
     5	distributed.worker - INFO -         Registered to:      tcp://10.11.7.58:8786
     6	distributed.worker - INFO -         Registered to:      tcp://10.11.7.58:8786
     7	distributed.worker - INFO -         Registered to:      tcp://10.11.7.58:8786
     8	distributed.worker - INFO -         Registered to:      tcp://10.11.7.58:8786
     9	distributed.worker - INFO -         Registered to:      tcp://10.11.7.58:8786
    10	distributed.worker - INFO -         Registered to:      tcp://10

and check Dask client sees the workers,

In [76]:
len(client.scheduler_info()['workers'])

16

# Testing the Dask cluster

Here we generate some random numbers and check overhead vs running locally.  The idea is simple so that we can see how this differs from regular Python functions.

Regularly, generating a 16 by 16 matrix of normally distributed numbers would be just

In [17]:
z = np.random.randn(16, 16)

This is also fast,

In [18]:
%timeit z = np.random.randn(16, 16)

9.62 µs ± 18 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


With Dask we get "future"s which are a token for the result that will arrive, and we call the `result()` method to get the result. Following the example above, we ask each worker to generate one row of the 16 x 16 matrix,

In [19]:
futures = client.map(np.random.randn, [16 for _ in range(n_workers)])
z = np.array([_.result() for _ in futures])

In a small example we expect communication overhead to be significant,

In [22]:
def dist_randn(m,n):
    work_sizes = [n for _ in range(m)]
    futures = client.map(np.random.randn, work_sizes)
    return np.array([_.result() for _ in futures])

%timeit dist_randn(16, 16)

We can check that overhead,

In [24]:
%timeit dist_randn(1024, 1)

1.74 s ± 14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [25]:
1.74 / 1024

0.00169921875

is about ~1.7ms per task, so for cases like parameter sweep simulations or neuroimaging analysis commands, this is insignificant. 

# Using commands

Running external non-Python programs should work as well:

In [30]:
import subprocess

def get_hostname(i):
    name = subprocess.check_output('hostname -s'.split()).decode('ascii').strip()
    return i, name

[_.result() for _ in client.map(get_hostname, range(16))]

[(0, 'w17'),
 (1, 'w17'),
 (2, 'w17'),
 (3, 'w17'),
 (4, 'w17'),
 (5, 'w17'),
 (6, 'w17'),
 (7, 'w18'),
 (8, 'w18'),
 (9, 'w18'),
 (10, 'w18'),
 (11, 'w18'),
 (12, 'w18'),
 (13, 'w18'),
 (14, 'w18'),
 (15, 'w17')]

So this generalizes to invokving FreeSurfer or some Singularity container that you've already prepared, e.g.

In [56]:
%%file dask_demo.def
BootStrap: library
From: ubuntu
OSVersion: jammy
MirrorURL: http://us.archive.ubuntu.com/ubuntu/

%environment
    DEBIAN_FRONTEND=noninteractive

%post
    apt-get update
    apt-get install -y stress-ng

Overwriting dask_demo.def


In [60]:
!singularity build -F --fakeroot dask_demo dask_demo.def &> dask_demo.log

For this demo, we just use the stress test program to run a CPU stressor for 10 seconds.  Testing that on the login node where Jupyter is running looks like this

In [61]:
!singularity exec dask_demo stress-ng -c 1 -t 10

stress-ng: info:  [409044] setting to a 10 second run per stressor
stress-ng: info:  [409044] dispatching hogs: 1 cpu
stress-ng: info:  [409044] successful run completed in 10.01s


In [82]:
def run_stress(i):
    cmd = 'singularity exec dask_demo stress-ng -c 1 -t 10'
    out = subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)
    out = out.decode('ascii').strip()
    lines = out.split('\n')
    return i, lines[-1]

tic = time.time()
results = [_.result() for _ in client.map(run_stress, range(16))]
toc = time.time()
print('took', toc - tic, 's')
results

took 10.57655644416809 s


[(0, 'stress-ng: info:  [21138] successful run completed in 10.00s'),
 (1, 'stress-ng: info:  [21065] successful run completed in 10.00s'),
 (2, 'stress-ng: info:  [21064] successful run completed in 10.00s'),
 (3, 'stress-ng: info:  [21070] successful run completed in 10.00s'),
 (4, 'stress-ng: info:  [21112] successful run completed in 10.01s'),
 (5, 'stress-ng: info:  [21140] successful run completed in 10.01s'),
 (6, 'stress-ng: info:  [21100] successful run completed in 10.00s'),
 (7, 'stress-ng: info:  [21072] successful run completed in 10.00s'),
 (8, 'stress-ng: info:  [16907] successful run completed in 10.00s'),
 (9, 'stress-ng: info:  [16945] successful run completed in 10.00s'),
 (10, 'stress-ng: info:  [16910] successful run completed in 10.00s'),
 (11, 'stress-ng: info:  [16961] successful run completed in 10.00s'),
 (12, 'stress-ng: info:  [16978] successful run completed in 10.00s'),
 (13, 'stress-ng: info:  [16947] successful run completed in 10.00s'),
 (14, 'stress-ng

# Problems

Ofc sometimes bad things happen.

In [64]:
client

0,1
Connection method: Direct,


No scheduler.. look at the logs

In [77]:
!tail scheduler.log

distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.11.7.27:43463', status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.11.7.27:43463
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.11.7.27:43243', status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.11.7.27:43243
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.11.7.27:46291', status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.11.7.27:46291
distributed.core - INFO - Starting established connection


If this happens, just restart the scheduler above.

The workers might have timed out too, set a longer walltime for the srun command above for the workers. 