<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg" 
     width="30%" 
     align=right
     alt="Dask logo">

# Embarrassingly parallel Workloads

This notebook shows how to use Dask to parallelize embarrassingly parallel workloads where you want to apply one function to many pieces of data independently.  It will show three different ways of doing this with Dask:

1. [dask.delayed](http://dask.pydata.org/en/latest/delayed.html) 
2. [concurrent.Futures](https://dask.pydata.org/en/latest/futures.html) 
3. [dask.bag](https://dask.pydata.org/en/latest/bag.html)

This example focuses on using Dask for building large embarrassingly parallel computation as often seen in scientific communities and on High Performance Computing facilities, for example with Monte Carlo methods. This kind of simulation assume the following:

 - We have a function that runs a heavy computation given some parameters.
 - We need to compute this function on many different input parameters, each function call being independent.
 - We want to gather all the results in one place for further analysis.

# if you get an error you need to add slurm to your path first. EG

echo 'export PATH=$PATH:/opt/slurm/slurm-curr/bin' >> ~/.bashrc
source ~/.bashrc



## Start Dask Client for Dashboard

Starting the Dask Client will provide a dashboard which 
is useful to gain insight on the computation.  We will also need it for the
Futures API part of this example. Moreover, as this kind of computation
is often launched on super computer or in the Cloud, you will probably end
up having to start a cluster and connect a client to scale.  See 
[dask-jobqueue](https://github.com/dask/dask-jobqueue),
[dask-kubernetes](https://github.com/dask/dask-kubernetes) or 
[dask-yarn](https://github.com/dask/dask-yarn) for easy ways to achieve this
on respectively an HPC, Cloud or Big Data infrastructure.

The link to the dashboard will become visible when you create the client below.  We recommend having it open on one side of your screen while using your notebook on the other side.  This can take some effort to arrange your windows, but seeing them both at the same time is very useful when learning.

In [1]:
from dask.distributed import Client, progress
from dask_jobqueue import SLURMCluster

# Simplify SLURMCluster parameters to basic working configuration
cluster_kwargs = {
    "cores": 1,  
    "memory": "10GB", 
    "shebang": "#!/bin/bash",
    "account": "facet",
    "walltime": "00:10:00",
    "job_script_prologue": ["source ~/.bashrc"],
    # Only basic directives, add more as needed and test each step
    "job_extra_directives": ["-q debug", "--partition=milano", "--qos=preemptable"],
}

cluster = SLURMCluster(**cluster_kwargs)
print(cluster.job_script())


slurm_jobs = 10
cluster.scale(jobs=slurm_jobs)
client = Client(cluster)


#!/bin/bash

#SBATCH -J dask-worker
#SBATCH -A facet
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=10G
#SBATCH -t 00:10:00
#SBATCH -q debug
#SBATCH --partition=milano
#SBATCH --qos=preemptable
source ~/.bashrc
/sdf/home/s/sanjeev/miniforge3/envs/dask/bin/python -m distributed.cli.dask_worker tcp://172.24.48.101:41657 --name dummy-name --nthreads 1 --memory-limit 9.31GiB --nanny --death-timeout 60



In [2]:
!echo $PATH
!type sbatch

/sdf/home/s/sanjeev/miniforge3/envs/dask/bin:/sdf/home/s/sanjeev/miniforge3/bin:/sdf/home/s/sanjeev/miniforge3/condabin:/sdf/home/s/sanjeev/.local/bin:/sdf/home/s/sanjeev/bin:/opt/slurm/slurm-curr/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sdf/home/s/sanjeev/github/act/bin
sbatch is /opt/slurm/slurm-curr/bin/sbatch


## Define your computation calling function

This function does a simple operation: add all numbers of a list/array together, but it also sleeps for a random amount of time to simulate real work. In real use cases, this could call another python module, or even run an executable using subprocess module.

In [3]:
import time
import random

def costly_simulation(list_param):
    time.sleep(random.random())
    return sum(list_param)

We try it locally below

In [4]:
%time costly_simulation([1, 2, 3, 4])

CPU times: user 4.16 ms, sys: 2.99 ms, total: 7.15 ms
Wall time: 582 ms


10

## Define the set of input parameters to call the function

We will generate a set of inputs on which we want to run our simulation function. Here we use Pandas dataframe, but we could also use a simple list. Lets say that our simulation is run with four parameters called param_[a-d].

In [5]:
import pandas as pd
import numpy as np

input_params = pd.DataFrame(np.random.random(size=(500, 4)),
                            columns=['param_a', 'param_b', 'param_c', 'param_d'])
input_params.head()

Unnamed: 0,param_a,param_b,param_c,param_d
0,0.418072,0.811685,0.190744,0.057874
1,0.779889,0.881468,0.407978,0.119676
2,0.895901,0.825243,0.758951,0.399057
3,0.841485,0.173812,0.494408,0.326264
4,0.63522,0.825829,0.700957,0.656053


Without using Dask, we could call our simulation on all of these parameters using normal Python for loops.

Let's only do this on a sample of our parameters as it would be quite long otherwise.

In [6]:
results = []

In [7]:
%%time
for parameters in input_params.values[:10]:
    result = costly_simulation(parameters)
    results.append(result)

CPU times: user 45.5 ms, sys: 19 ms, total: 64.5 ms
Wall time: 5.75 s


In [8]:
results

[np.float64(1.4783755655642539),
 np.float64(2.1890106973867827),
 np.float64(2.879151560619454),
 np.float64(1.83596802795219),
 np.float64(2.818059252252342),
 np.float64(2.5732730742287444),
 np.float64(1.7219073849173852),
 np.float64(1.3631452534619402),
 np.float64(1.8867208640448427),
 np.float64(2.699092812413768)]

Note that this is not very clever as we can easily parallelize code. 

There are many ways to parallelize this function in Python with libraries like `multiprocessing`, `concurrent.futures`, `joblib` or others.  These are good first steps.  Dask is a good second step, especially when you want to scale across many machines.


## Use [Dask Delayed](http://dask.pydata.org/en/latest/delayed.html) to make our function lazy

We can call `dask.delayed` on our funtion to make it lazy.  Rather than compute its result immediately, it records what we want to compute as a task into a graph that we'll run later on parallel hardware. Using `dask.delayed` is a relatively straightforward way to parallelize an existing code base, even if the computation isn't embarrassingly parallel like this one. 

Calling these lazy functions is now almost free.  In the cell below we only construct a simple graph.

In [9]:
import dask
lazy_results = []

In [10]:
%%time

for parameters in input_params.values[:10]:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)

CPU times: user 731 μs, sys: 17 μs, total: 748 μs
Wall time: 680 μs


In [11]:
lazy_results[0]

Delayed('costly_simulation-7f70dc14-b370-43d6-9695-b982d7e8e8bd')

## Run in parallel

The `lazy_results` list contains information about ten calls to `costly_simulation` that have not yet been run.  Call `.compute()` when you want your result as normal Python objects.

If you started `Client()` above then you may want to watch the status page during computation.

In [12]:
%time dask.compute(*lazy_results)

CPU times: user 153 ms, sys: 46.5 ms, total: 200 ms
Wall time: 14.9 s


(np.float64(1.4783755655642539),
 np.float64(2.1890106973867827),
 np.float64(2.879151560619454),
 np.float64(1.83596802795219),
 np.float64(2.818059252252342),
 np.float64(2.5732730742287444),
 np.float64(1.7219073849173852),
 np.float64(1.3631452534619402),
 np.float64(1.8867208640448427),
 np.float64(2.699092812413768))

Notice that this was faster than running these same computations sequentially with a for loop.  

We can now run this on all of our input parameters:

In [13]:
import dask
lazy_results = []

for parameters in input_params.values:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)
    
futures = dask.persist(*lazy_results)  # trigger computation in the background

In [14]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://172.24.48.101:8787/status,

0,1
Dashboard: http://172.24.48.101:8787/status,Workers: 10
Total threads: 10,Total memory: 93.10 GiB

0,1
Comm: tcp://172.24.48.101:41657,Workers: 10
Dashboard: http://172.24.48.101:8787/status,Total threads: 10
Started: Just now,Total memory: 93.10 GiB

0,1
Comm: tcp://172.24.48.188:42905,Total threads: 1
Dashboard: http://172.24.48.188:39645/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:33307,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-4ylkxh7o,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-4ylkxh7o

0,1
Comm: tcp://172.24.48.188:35177,Total threads: 1
Dashboard: http://172.24.48.188:42137/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:33061,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-szbuavyl,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-szbuavyl

0,1
Comm: tcp://172.24.48.188:36787,Total threads: 1
Dashboard: http://172.24.48.188:43041/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:40101,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-k1yna_tw,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-k1yna_tw

0,1
Comm: tcp://172.24.48.188:43011,Total threads: 1
Dashboard: http://172.24.48.188:38877/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:35311,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-evjbefv_,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-evjbefv_

0,1
Comm: tcp://172.24.48.188:38691,Total threads: 1
Dashboard: http://172.24.48.188:33843/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:39391,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-goocgsvz,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-goocgsvz

0,1
Comm: tcp://172.24.48.188:44131,Total threads: 1
Dashboard: http://172.24.48.188:37917/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:45019,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-qmx6j10j,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-qmx6j10j

0,1
Comm: tcp://172.24.48.188:42821,Total threads: 1
Dashboard: http://172.24.48.188:32999/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:44093,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-iqtbzuv6,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-iqtbzuv6

0,1
Comm: tcp://172.24.48.188:43523,Total threads: 1
Dashboard: http://172.24.48.188:38079/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:33127,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-y51teh8x,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-y51teh8x

0,1
Comm: tcp://172.24.48.188:34141,Total threads: 1
Dashboard: http://172.24.48.188:37967/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:34469,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-18x2tfhd,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-18x2tfhd

0,1
Comm: tcp://172.24.48.188:42217,Total threads: 1
Dashboard: http://172.24.48.188:35717/status,Memory: 9.31 GiB
Nanny: tcp://172.24.48.188:38801,
Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-0hua7tmq,Local directory: /lscratch/sanjeev/tmp/dask-scratch-space/worker-0hua7tmq


To make this go faster, we can add additional workers.

(although we're still only working on our local machine, this is more practical when using an actual cluster)

In [15]:
#client.cluster.scale(100)  # ask for a hundred 4-thread workers

By looking at the Dask dashboard we can see that Dask spreads this work around our cluster, managing load balancing, dependencies, etc..

Then get the result:

In [None]:
%time
results = dask.compute(*futures)
results[:5]

CPU times: user 3 μs, sys: 0 ns, total: 3 μs
Wall time: 3.58 μs


## Using the [Futures API](http://dask.pydata.org/en/latest/futures.html)

The same example can be implemented using Dask's Futures API by using the `client` object itself.  For our use case of applying a function across many inputs both Dask delayed and Dask Futures are equally useful.  The Futures API is a little bit different because it starts work immediately rather than being completely lazy.

For example, notice that work starts immediately in the cell below as we submit work to the cluster:

In [None]:
futures = []
for parameters in input_params.values:
    future = client.submit(costly_simulation, parameters)
    futures.append(future)

We can explicitly wait until this work is done and gather the results to our local process by calling `client.gather`:

In [None]:
%time
results = client.gather(futures)
results[:5]

But the code above can be run in fewer lines with `client.map()` function, allowing to call a given function on a list of parameters.

As for delayed, we can only start the computation and not wait for results by not calling `client.gather()` right now.

It shall be noted that as Dask cluster has already performed tasks launching `costly_simulation` with Futures API on the given input parameters, the call to `client.map()` won't actually trigger any computation, and just retrieve already computed results.

In [None]:
futures = client.map(costly_simulation, input_params.values)

Then just get the results later:

In [None]:
results = client.gather(futures)
len(results)

In [None]:
print(results[0])

We encourage you to watch the [dashboard's status page](http://127.0.0.1:8787) to watch on going computation.

## Doing some analysis on the results

One of the interests of Dask here, outside from API simplicity, is that you are able to gather the result for all your simulations in one call.  There is no need to implement a complex mechanism or to write individual results in a shared file system or object store.

Just get your result, and do some computation.

Here, we will just get the results and expand our initial dataframe to have a nice view of parameters vs results for our computation

In [None]:
output = input_params.copy()
output['result'] = pd.Series(results, index=output.index)
output.sample(5)

Then we can do some nice statistical plots or save result locally with pandas interface here

In [None]:
%matplotlib inline
output['result'].plot()

In [None]:
output['result'].mean()

In [None]:
filtered_output = output[output['result'] > 2]
print(len(filtered_output))
filtered_output.to_csv('/tmp/simulation_result.csv')

## Handling very large simulation with [Bags](http://dask.pydata.org/en/latest/bag.html)

The methods above work well for a size of input parameters up to about 100,000.  Above that, the Dask scheduler has trouble handling the amount of tasks to schedule to workers.  The solution to this problem is to bundle many parameters into a single task.
You could do this either by making a new function that operated on a batch of parameters and using the delayed or futures APIs on that function.  You could also use the Dask Bag API.  This is described more in the documentation about [avoiding too many tasks](http://dask.pydata.org/en/latest/delayed-best-practices.html#avoid-too-many-tasks).

Dask Bags hold onto large sequences in a few partitions.  We can convert our `input_params` sequence into a `dask.bag` collection, asking for fewer partitions (so at most 100,000, which is already huge), and apply our function on every item of the bag.

In [None]:
import dask.bag as db
b = db.from_sequence(list(input_params.values), npartitions=100)
b = b.map(costly_simulation)

In [None]:
%time results_bag = b.compute()

Looking on Dashboard here, you should see only 100 tasks to run instead of 500, each taking 5x more time in average, because each one is actually calling our function 5 times.

In [None]:
np.all(results) == np.all(results_bag)

In [None]:
client

In [None]:

client.shutdown()

In [None]:
client