# Dask+Jupyter@NERSC Best Practices: Introduction

This notebook outlines current best practices for using
[Dask Distributed](https://distributed.dask.org/en/stable/)
to do parallel computations on 
[Cori's](https://docs.nersc.gov/systems/cori/)
compute nodes, using NERSC's
[Jupyter service.](https://docs.nersc.gov/services/jupyter/)

## Best Practice: Use a Shifter Image to Run a Dask Cluster and Kernel

For your own Dask+Jupyter@NERSC workflows, we recommend that you set up a
[Shifter](https://docs.nersc.gov/development/shifter/overview/)
image that you can use to run both a Jupyter
[kernel](https://jupyter.readthedocs.io/en/latest/projects/kernels.html)
and a Dask cluster (Dask scheduler, dashboard, and workers).
This demonstration notebook uses such a Shifter image for both kernel and cluster.

Using the same Shifter image for both the kernel and the cluster ensures that both have the same Python interpreter, packages, and package versions.
Using Shifter to launch the Dask cluster helps it start up much much faster than otherwise.

## Best Practice: Use dask-mpi to Launch the Dask Cluster

If you want to run a Dask cluster on Cori compute nodes, we recommend using 
[dask-mpi](http://mpi.dask.org/en/latest/).
While it does not leverage MPI for communiction during computation, we have found that using MPI to start Dask cluster processes is faster and less hassle than other options.

## Best Practice: Use start-dask-cluster to Simplify Things

The fastest way to get a Dask cluster running on Cori compute nodes is through Cori's interactive QOS.
But setting up the job with all the right flags is a bit complicated.
To simplify the process on Cori, we have created a software module that provides a wrapper script called "start-dask-cluster."

---
# Start the Dask Cluster

Open a Jupyter terminal tab (or regular Cori terminal), and run these commands:

    cd $SCRATCH
    module load nersc-dask
    start-dask-mpi --ntasks=256 --image=stephey/nersc-dask-example:v0.6.0

The script will not start unless you are somewhere in your \\$SCRATCH directory.
Any path under \\$SCRATCH will be fine.

## What This Does

The wrapper runs `salloc` to launch a Dask cluster for you in the interactive queue, using a Shifter image and dask-mpi.
Just before the job is submitted, you should see something like:

    OMP_NUM_THREADS     1
    PYTHONUNBUFFERED    1
    salloc
        --image=stephey/nersc-dask-example:v0.6.0
        --nodes=8
        --ntasks=256
        --cpus-per-task=2
        --time=30
        --constraint=haswell
        --qos=interactive
        srun -u shifter
            dask-mpi
                --scheduler-file=scheduler.json
                --dashboard-address=0
                --nthreads=1
                --memory-limit='auto'
                --no-nanny
                --local-directory=/tmp
                
The script formatted that job request for you, and along the way figured out how many nodes you need to run the job, did a few checks, and set a few sensible defaults for you as well.
You can change many of these options.
See

    start-dask-mpi.py --help

for more information.

Wait for the job to start (you may need to re-run the command if the interactive queue is really busy).  There will be a lot of output from the cluster when it starts up.

---
# Some Preparations

When your Dask cluster starts it will drop a scheduler file to the path where you ran start-dask-mpi.
Your notebook needs to pick this up so it knows how to connect to the scheduler.
In the above example we had you `cd $SCRATCH` so the scheduler file will be there.

We also need to set up a link for the Dask dashboard to be visible.
This is optional but the dashboard is extremely helpful for understanding what your cluster and your workload is doing.

In [None]:
import os

import dask
from dask.distributed import Client

scheduler_file = os.path.join(os.environ["SCRATCH"], "scheduler.json")
dask.config.config["distributed"]["dashboard"]["link"] = "{JUPYTERHUB_SERVICE_PREFIX}proxy/{host}:{port}/status"

---
# Connect to the Scheduler

The client is an object that manages communication with your cluster.
Initialize a client by passing it the path to the scheduler file.

In [None]:
client = Client(scheduler_file=scheduler_file)

Printing the client object results in a widget with some information in it.
Click the dashboard link, which will start up another tab with the dashboard in it.

In [None]:
client

---
# Set Up a Parallel Calculation

All we will do here is use a crude Monte Carlo calculation to estimate the value of $\pi$.
We use the dart-board method:

- Take the first quadrant of the unit square, and within that the first quadrant of the unit circle.
- Simulate throwing darts randomly at the square with a uniformly random distribution in x and y.
- Take the ratio of darts landing within the first quadrant of the unit circle to all darts thrown.
- Multiply the result by 4 to approximage $\pi$.
- The more darts thrown, the more precise the result.

The method we are using (Monte Carlo) parallelizes trivially.
You can combine the results of multiple random trials to get a more precise answer.
Here's the function that implements the dart board simulation.
Nothing fancy here, it just returns the number of hits inside the circle, and thet total number of throws (count).
Each run should get a unique seed to try to help ensure all the trials are "independent."

In [None]:
import numpy as np
def simulate(seed, count=100):
    np.random.seed(seed)
    xy = np.random.uniform(size=(count, 2))
    return ((xy * xy).sum(1) < 1.0).sum(), count

---
# Use the map() Function to Distribute Tasks to Workers

We're going to ask for some very large number of total throws and chop them up into a smaller number of tasks.
The scheduler figures out how to distribute all the work for us.
Typical scheduling overhead is 1 millisecond per task.
When you actually start the calculation further down it may thus take a little time for the dashboard to react.

In [None]:
total = 100000000000
tasks = 10000
count = total // tasks
futures = client.map(simulate, list(9876543 + np.arange(tasks, dtype=int)), count=count)

---
# Use the submit() Function to Get the Answer

The map() function and a lot of other functions from Dask return "futures" which are placeholders for computation.
These may be not done yet, or they may be done, and you need to do something to realize them into their final result form.
We'll submit a final reducer task that computes the sum of hits and divides it by the sum of all simulated throws.
Here's the reducer function, with a multiplication by 4 to get to our estimate of $\pi$.
It takes in the futures and acts on them like they're real results.
That's the magic of Dask.

In [None]:
def reduce(results):
    total_hits = 0
    total_count = 0
    for hits, count in results:
        total_hits += hits
        total_count += count
    return 4.0 * total_hits / total_count

And finally we submit the reducer.
The whole calculation with 256 processes takes under a minute.

In [None]:
%%time
client.submit(reduce, futures).result()

If you want to shut down the cluster, you can just stop the job either by using `scancel` from another window, or keyboard interrupt (CTRL-C).
The `scheduler.json` file may be cleaned up by dask-mpi.
If it is not, then next time you run the wrapper script, it will be pre-emptively deleted.
Otherwise your workers may try to connect to a scheduler that is no longer there.

---
# Review

Here is a summary of what we did:

- Used a Shifter image to run both the kernel for this notebook and a Dask cluster on compute nodes
- Used dask-mpi to launch the Dask cluster on the compute nodes "in" the Shifter image
- Simplified launching the Dask cluster by using a helpful wrapper script, start-dask-mpi.py
- Ran a simple parallel Monte Carlo estimation of $\pi$.

# Further Reading

- [Dask](https://docs.dask.org/en/stable/)
- [Dask Distributed](https://distributed.dask.org/en/stable/)
- [Dask-MPI](http://mpi.dask.org/en/latest/)