# Resilience

Dask is able to [cope with disappearing workers](http://distributed.dask.org/en/latest/resilience.html).

## Multiple Dask batch job workers

We define a Dask jobqueue cluster with Dask workers that each have 4 CPUs and 24 GB of memory.

In [1]:
import dask, dask.distributed
import dask_jobqueue

client.close()
cluster.close()

NameError: name 'client' is not defined

In [2]:
cluster = dask_jobqueue.SLURMCluster(

    # Dask worker size
    cores=4, memory='10GB',
    processes=1, # Dask workers per job
    
    # SLURM job script things
    queue='CPU', walltime='00:10:00',
    
    # Dask worker network and temporary storage
    interface='ib0', local_directory='/tmp', #'$TMPDIR',
)

client = dask.distributed.Client(cluster)

In [7]:
# trick: Adaptive mode will recover from stopped workers
cluster.adapt(minimum_jobs=2, maximum_jobs=2)

<distributed.deploy.adaptive.Adaptive at 0x7f30e8b47a00>

In [8]:
client

In [13]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p CPU
#SBATCH -n 1
#SBATCH --cpus-per-task=4
#SBATCH --mem=10G
#SBATCH -t 00:10:00

/gpfs/soma_fs/home/valerio/anaconda3/envs/neuron/bin/python -m distributed.cli.dask_worker tcp://10.102.0.62:44462 --nthreads 4 --memory-limit 9.31GiB --name dummy-name --nanny --death-timeout 60 --local-directory /tmp --interface ib0 --protocol tcp://



In [10]:
!squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
             96194       CPU dask-wor  valerio  R       0:03      1 somacpu059
             96189       CPU dask-wor  valerio  R       1:02      1 somacpu060


### From here everything is (almost) the same

We'll return the Dask array for `pi` and handle computation more explicitly.

In [11]:
import numpy, dask.array

def calculate_pi(size_in_bytes, number_of_chunks):
    
    """Calculate pi using a Monte Carlo method."""
    
    array_shape = (int(size_in_bytes / 8 / 2), 2)
    chunk_size = (int(array_shape[0] / number_of_chunks), 2)
    
    # 2D random positions array using dask.array
    xy = dask.array.random.uniform(
        low=0.0, high=1.0, size=array_shape,
        # specify chunk size, i.e. task number
        chunks=chunk_size )
  
    xy_inside_circle = (xy ** 2).sum(axis=1) < 1 # boolean

    pi = 4 * xy_inside_circle.sum() / xy_inside_circle.size
        
    return pi

### Let's calculate again...

Note the `.compute()`. For this demo, we'll need to handle construction of the Dask graph and compute more explicitly.

In [12]:
%time pi = calculate_pi(size_in_bytes=10_000_000_000, number_of_chunks=100).compute() # 10 GB

CPU times: user 8.99 s, sys: 1.14 s, total: 10.1 s
Wall time: 2min 53s


In [8]:
%time pi = calculate_pi(size_in_bytes=200_000_000_000, number_of_chunks=500).compute() # 100 GB

CPU times: user 3.74 s, sys: 143 ms, total: 3.88 s
Wall time: 18.1 s


### Alternative way for handling computation

In [9]:
pi = calculate_pi(size_in_bytes=200_000_000_000, number_of_chunks=500)
print(pi)

dask.array<truediv, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>


In [10]:
pi = client.compute(
    pi
)
print(pi)

<Future: pending, key: finalize-ccb02a8b7207ffa277805fddd752597d>


In [12]:
pi.done()

False

In [13]:
print(pi.result())

3.14159670112


## What happens if a worker dies?

We'll find out all "our" job ids, mark a few of them non-preemptible, filter for the preemptible jobs, and define a function to kill one randomly selected preemptible job.

Note that here, we won't use any pre-emptibility that may be provided by the batch scheduler, but just pick jobids by hand.

In [14]:
def get_current_jobs():
    current_jobs = !squeue | grep R | grep $USER | grep dask | awk '{print $1}'
    return current_jobs

In [15]:
non_preemptible_jobs = get_current_jobs()[:4]
non_preemptible_jobs

['55692', '55685', '55686', '55687']

In [16]:
def get_preemptible_jobs():
    return list(filter(lambda j: j not in non_preemptible_jobs, get_current_jobs()))

In [17]:
get_preemptible_jobs()

['55688', '55689', '55690', '55691']

Now, we need a way of randomly killing a preemptible job.

In [18]:
import random

def kill_random_preemptible_job():
    preemptible_jobs = get_preemptible_jobs()
    if preemptible_jobs:
        worker_to_kill = random.choice(preemptible_jobs)
        print(f"will cancel job {worker_to_kill}")
        !scancel {worker_to_kill}

In [19]:
from time import sleep

In [20]:
print(get_preemptible_jobs())
kill_random_preemptible_job()
sleep(1)
print(get_preemptible_jobs())

['55688', '55689', '55690', '55691']
will cancel job 55689
['55688', '55690', '55691']


In [21]:
!squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON) 
             55371   cluster jupyterl smomw122  R      46:36      1 neshcl100 
             55692   cluster dask-wor smomw122  R       3:18      1 neshcl103 
             55685   cluster dask-wor smomw122  R       3:21      1 neshcl230 
             55686   cluster dask-wor smomw122  R       3:21      1 neshcl251 
             55687   cluster dask-wor smomw122  R       3:21      1 neshcl251 
             55688   cluster dask-wor smomw122  R       3:21      1 neshcl266 
             55690   cluster dask-wor smomw122  R       3:21      1 neshcl266 
             55691   cluster dask-wor smomw122  R       3:21      1 neshcl103 


## Let's start a computation with disappearing workers

In [22]:
pi = calculate_pi(
    size_in_bytes=500_000_000_000, number_of_chunks=4_000
)
display(pi)

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,25336 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Count 25336 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,25336 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [23]:
pi = client.compute(pi)
print(pi)

<Future: pending, key: finalize-240b411f1edd54aa1372b9cb30622640>


In [24]:
sleep(5)

while not pi.done():
    kill_random_preemptible_job()
    sleep(10)

will cancel job 55695
will cancel job 55690
will cancel job 55696
will cancel job 55691
will cancel job 55688
will cancel job 55697


In [25]:
print(pi)

<Future: error, key: finalize-240b411f1edd54aa1372b9cb30622640>


## And get the result

In [26]:
# print(pi.result())

## What happened?

The Dask scheduler keeps a suspiciousness counter for each task it manages.  Whenever a worker dies, all tasks that belong to the worker at the time of its death will have their suspiciousness increased by one. In doing so, the scheduler has no way of telling which exact task was responsible for the death of the worker and just flag all of them as bad.

All tasks with suspiciousness `>= 3` (default) are considered bad and won't be rescheduled.

## Make dask more resilient

We can increase the number of allowed failures.  Let's practically disable the threshold and re-do the calculation.

In [27]:
cluster.scheduler.allowed_failures = 1000

_(Note that the above is internal API that we need to use to increase the number of allowed failures for now.  With the current Dask.distributed release that we can't, however, use with Dask jobqueue yet, this can be changed by changing the Dask configuration at runtime.)_

## Let's start a computation with disappearing workers again

In [28]:
pi = calculate_pi(
    size_in_bytes=500_000_000_000, number_of_chunks=4_000
)
display(pi)

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,25336 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Count 25336 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,25336 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [29]:
pi = client.compute(pi)
print(pi)

<Future: pending, key: finalize-a1c802aa626c0ded9bf48d09e5884114>


In [30]:
sleep(5)

while not pi.done():
    kill_random_preemptible_job()
    sleep(15)

will cancel job 55702
will cancel job 55701
will cancel job 55699
will cancel job 55698


In [31]:
print(pi)

<Future: finished, type: numpy.float64, key: finalize-a1c802aa626c0ded9bf48d09e5884114>


## And get the result

In [32]:
print(pi.result())

3.141595972864
