# Parallel Computing on Ceres with Python and Dask
(adapted from https://github.com/willirath/dask_jobqueue_workshop_materials)

## Core Lessons

- setting up SLURM clusters
- scaling clusters
- adaptive clusters
- viewing Dask diagnostics

This tutorial will demonstrate how to use Dask to manage compute jobs on a SLURM cluster (including setting up your SLURM compute cluster, scaling the cluster, and how to use an adaptive cluster to save compute resources for others). The tutorial will also explain how to access the Dask diagnostics dashboard to view the cluster working in real time. 


<div class="alert alert-block alert-info">

## Pre-Tutorial Setup
#### 1. Be able to login to the Ceres HPC 

   You should receive email instructions for logging in and setting up multifactor authentication when your SCINet account is approved. 
   Instructions can also be found in the [Ceres User Guide](https://scinet.usda.gov/guide/ceres/#logging-in-to-scinet) and the [Multifactor Authentication Guide](https://scinet.usda.gov/guide/multifactor/#authentication-on-your-computer-using-authy). 
   Contact the VRSC at scinet_vrsc@usda.gov if you have problems.
    
#### 2. Get a Github account 
    
   If you don't already have a Github account, sign up for a free account at https://github.com/join.
    
#### 3. Software setup
   
   a. Install miniconda on your Ceres account. Experienced HPC users can go to https://docs.conda.io/en/latest/miniconda.html and install the latest version of miniconda either in your home directory or your project /KEEP directory, and then keep conda up to date using "conda update conda". 
   
   Another option, which may be better for newer HPC users, is to load the miniconda module that is already installed on Ceres. The only downside is that it may be slightly outdated and you will have to load the module every time you want to use it. Follow the instructions in the [User-Installed Software of Ceres with Conda Guide](https://scinet.usda.gov/guide/conda/).
    
#### 4. Clone the tutorials repository from Github to your Ceres account
   
   go to the repository at xx
   click "Clone or Download" and copy the http link to your clipboard
   login to the Ceres HPC and navigate to where you want to copy the repository 
        **(where should this be run from, best practice? scratch?)**
   issue the command: git clone paste-the-repository-link-here
   
#### 5. Activate the python environment for this tutorial
   
   Copy the python environment for this tutorial xx located at xx to your conda environments folder, which should be located wherever you installed conda then under xx/xx/xx.
   Activate the python environment using "xx".

#### 6. Open JupyterLab (or JupyterHub?) to execute the tutorials
   follow the instructions at xx
    
</div>

## Monte-Carlo Estimate of $\pi$

This tutorial uses the Monte-Carlo estimate of $\pi$ to demonstrate how Dask works.

We want to estimate the number $\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\pi/4$ as well. 

So for N randomly chosen pairs $(x, y)$ with $x\in[0, 1)$ and $y\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimate $\pi \approx 4 \cdot N_{circ} / N$.

[<img src="https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif" 
     width="50%" 
     align=top
     alt="PI monte-carlo estimate">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)

## Set up a Slurm cluster

We'll now use dask.distributed and dask_jobqueue packages to create a SLURM cluster. The SLURMCluster function can be interpreted as the settings/parameters for 1 SLURM job. Later, we can increase our compute power by "scaling our cluster", which means Dask will execute more than one SLURM job at a time for any given computation.

**Here's a key to the dask_jobqueue.SLURMCluster inpute parameters in the code block below:**<br>

**cores** = Number of logical cores per job. This will be divided among the processes/workers. Can't be more than the lowest number of logical cores per node in the queue you choose, see https://scinet.usda.gov/guide/ceres/#partitions-or-queues.
   
**processes** = Number of processes per job (also known as Dask "workers"). Can use 1 but more than 1 helps keep job running if cores/workers fail. The number of cores per worker will be cores/processes.

**memory** =  Memory per job. This will be divided among the processes/workers. **need info on how to know max mem per node you can request** <br>
queue = name of the ceres queue/partition (e.g. short, brief-low, debug, etc.)<br>
walltime = time allowed for the compute to finish before timing out<br>
local_directory = local spill location if the core memory is exceeded **(is this correct)**<br>
log_directory = location to write the stdout and stderr files for each worker process. 

View additional parameters, methods, and attributes for [dask_jobqueue.SLURMCluster](https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html)

In [1]:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import os

cluster = SLURMCluster(
    cores=40,
    processes=2,
    memory="100GB",
    queue="short",
    walltime="00:10:00",
    log_directory="/project/geospatial_user_test_geil/dask_jobqueue_workshop_materials/notebooks")

So far we have only set up a cluster, we have not started any compute jobs or workers running yet. We can verify this by issuing the following command in our Ceres terminal:
```
squeue -u firstname.lastname
```

To see the job-script that will be used to start workers with the HPC SLURM scheduler use the method .job_script().

Here's a key to the output of the cluster.job_script() command in the code block below:<br>
-J = name of the job (this will appear in the "Name" column of the squeue information) "dask-worker" is the default value<br>
-e and -o = name/location of the stdout and stderr files<br>
-p = queue/partition name<br>
-n = number of nodes<br>
--cpus-per-task = number of cores (same as -N)<br>
--mem = <br>
-t = walltime<br>

In [2]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e /project/geospatial_user_test_geil/dask_jobqueue_workshop_materials/notebooks/dask-worker-%J.err
#SBATCH -o /project/geospatial_user_test_geil/dask_jobqueue_workshop_materials/notebooks/dask-worker-%J.out
#SBATCH -p short
#SBATCH -n 1
#SBATCH --cpus-per-task=40
#SBATCH --mem=94G
#SBATCH -t 00:10:00

/home/kerrie.geil/software/miniconda3/envs/tutorial01/bin/python -m distributed.cli.dask_worker tcp://10.1.4.44:42667 --nthreads 20 --nprocs 2 --memory-limit 50.00GB --name name --nanny --death-timeout 60



<br><br>
Next, we must initialize a Dask Client, which opens a line of communication between Dask workers (processes) and the SLURM scheduler by pointing to the address of the scheduler (tcp://10.1.8.84:41601). <br>


In [3]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.1.4.44:42667  Dashboard: http://10.1.4.44:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


Note: So far we have only set up a cluster and initialized a client. We still have not started any compute jobs or workers (processes) running yet, as shown in the Cluster information output above. We can verify that no workers are running yet by issuing the squeue command again as we did previously or we could access the Dask Diagnostics Dashboard for even more information.

## Viewing the Dask diagnostics dashboard

We will now take a look at the Dashboard to verify that there a no workers running in our cluster yet. Once we start computing, we will be able to use the Dashboard to see a visual representation of all the workers running.

INSERT INSTRUCTIONS TO OPEN THE DASHBOARD either port forward or better, figure out how to get the Dask extension working in JupyterLab/Hub

## Scale the cluster to two nodes

Let's start 4 workers (in 2 SLURM jobs). 

For the distiction between _workers_ and _jobs_, see [the Dask jobqueue docs](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs).

In [4]:
from time import time, sleep

In [5]:
cluster.scale(jobs=6)  # scale to 4 _workers_
sleep(15)
client

0,1
Client  Scheduler: tcp://10.1.4.44:42667  Dashboard: http://10.1.4.44:8787/status,Cluster  Workers: 12  Cores: 240  Memory: 600.00 GB


In [None]:
print(cluster.job_script())

The .scale() method actually starts the workers (processes) running as shown in the Cluster information output above. A quick check of squeue will now show your workers as well.

## The Monte Carlo Method

Now we define a function to compute $\pi$ and another function to print out some info during the compute.

In [None]:
import dask.array as da
import numpy as np

def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):
    """Calculate PI using a Monte Carlo estimate."""
    
    size = int(size_in_bytes / 8)
    chunksize = int(chunksize_in_bytes / 8)
    
    xy = da.random.uniform(0, 1,
                           size=(size / 2, 2),
                           chunks=(chunksize / 2, 2))
    
    in_circle = ((xy ** 2).sum(axis=-1) < 1)
    pi = 4 * in_circle.mean()

    return pi

def print_pi_stats(size, pi, time_delta, num_workers):
    """Print pi, calculate offset from true value, and print some stats."""
    print(f"{size / 1e9} GB\n"
          f"\tMC pi: {pi : 13.11f}"
          f"\tErr: {abs(pi - np.pi) : 10.3e}\n"
          f"\tWorkers: {num_workers}"
          f"\t\tTime: {time_delta : 7.3f}s")

## The actual calculations

We loop over different volumes (1GB, 10GB, and 100GB) of double-precision random numbers and estimate $\pi$ as described above. Note, we call the function with the .compute() method to start the computations.

In [None]:
for size in (1e9 * n for n in (1, 10, 100)):
    
    start = time()
    pi = calc_pi_mc(size).compute()
    elaps = time() - start

    print_pi_stats(size, pi, time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))

## Scaling the Cluster to twice its size

We increase the number of workers by 2 and the re-run the experiments.

In [None]:
new_num_workers = 2 * len(cluster.scheduler.workers)

print(f"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.")

cluster.scale(new_num_workers)

sleep(15)
client

In [None]:
client

## Re-run same experiments with doubled cluster

In [None]:
for size in (1e9 * n for n in (1, 10, 100)):
    
        
    start = time()
    pi = calc_pi_mc(size).compute()
    elaps = time() - start

    print_pi_stats(size, pi,
                   time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))

## Automatically Scaling the Cluster

We want each calculation to take only a few seconds.  Dask will try to add more workers to the cluster when workloads are high and remove workers when idling.

_**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._

In [None]:
ca = cluster.adapt(
    minimum=4, maximum=100);

sleep(4)  # Allow for scale-down

In [None]:
client

## Repeat the calculation from above with larger work loads

(And watch the dash board!)

In [None]:
for size in (n * 1e9 for n in (1, 10, 100, 1000)):
    
    
    start = time()
    pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute()
    elaps = time() - start

    print_pi_stats(size, pi, time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))
    
    sleep(20)  # allow for scale-down time

## Complete listing of software used here

In [None]:
%pip list

In [None]:
%conda list --explicit