# Natively scaling python code using `dask`

- The canonical package in the `Python` data science ecosystem to run `Python` code on a cluster of machine is `dask`. As opposed to `slurm` commmand lines utility, dask scales your `Python` code **natively**: no need to get out of your `jupyter` notebook!
- `joblib` integrates with `dask`, making scaling from a single machine to a HPC cluster as seamless as possible: the only additional code you need to add is the specifications of the slurm nodes you want to use:

### Allocate and launch compute resources using dask


In [None]:
# allocate the computing resources outside of joblib

# dask-jobqueue provide some utilies to create and launch compute resources for
# a variety of HPC clusters, including slurm
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    workers=0,      # create the workers "lazily" (upon cluster.scal)
    memory='16GB',  # amount of RAM per worker
    processes=1,    # number of execution units per worker (threads and processes)
    cores=1         # among those execution units, number of processes
)   

In [None]:
# launch 2 dask workers with one CPU each (think sbatch --cpus=1)
cluster.scale(2)

In [None]:
from distributed import Client

# connect to the workers
client = Client(cluster)

### Small introduction to dask's API

`dask`'s `Client` exposes an API to `submit`/`map` functions calls using `futures`

In [None]:
from my_module import my_function
future = client.submit(my_function, 1)
print(future)
print(future.result())

### The joblib-dask integration: no need to use dask's API!

Actually, do not need to use `dask` API to run code on a `SLURM` cluster. By asking `joblib` to use the `dask` backend, joblib will internally use `dask` to distribute computation on the
previously created `SLURMCluster`! 

Having access to a large `SLURMCluster`, we can launch many workers, and have impressive speed improvements:

In [None]:
my_args = list(range(1000))

In [None]:
%%time
results = []
for arg in my_args[:10]:
    results.append(my_function(arg))
print(results)

In [None]:
# scale the cluster: create 50 workers
cluster.scale(50)

In [None]:
from joblib import Parallel, delayed, parallel_backend

In [None]:
%%time

with parallel_backend('dask'):
    results = Parallel()(delayed(my_function)(arg) for arg in my_args)
print(results)

### Caching large distributed computations using `joblib.Memory` 

Of course, you can use `joblib.Memory` (`joblib`'s caching feature) to cache computations on the `cluster`


Make sure you properly specify the cache location of your `Memory` object using 
```python
Memory('/nfs/gatsbystor/your-user-name/joblib-cache')
```
for instance.

In [None]:
from joblib import Memory

memory = Memory('/nfs/gatsbystor/pierreg/joblib-cache')

my_function_cached = memory.cache(my_function)

In [None]:
%%time

with parallel_backend('dask'):
    results = Parallel()(delayed(my_function_cached)(arg) for arg in my_args)
print(results)

In [None]:
len(my_args + list(range(1000, 1200)))

In [None]:
%%time
from my_module import my_function_cached

with parallel_backend('dask'):
    results = Parallel()(delayed(my_function_cached)(arg) for arg in my_args)
print(results)

This is very useful if you want to restart your computation with additional parameters:

In [None]:
%%time
from my_module import my_function_cached

with parallel_backend('dask'):
    results = Parallel()(delayed(my_function_cached)(arg) for arg in my_args + list(range(1000, 1200)))
print(results)