# Parallel Computing on Ceres with Python and Dask
(adapted from https://github.com/willirath/dask_jobqueue_workshop_materials)

## The Goal

Interactive data analysis on very large datasets. The tools in this tutorial are most appropriate for analysis of large earth-science-type datasets. 

For large dataset analysis, you'll want to run parallel (instead of serial) computations to save time. On a high-performance computer (HPC), you could divide your computing into independent segments (batching) and submit multiple batch scripts to run compute jobs simultaneously or you could parallelize your codes using MPI (Message Passing Interface), a traditional method for parallel computing if your code is in C or Fortran. Actually, there is also an "MPI for Python" package, but the methods in this tutorial are much *much* simpler. Both the batching and MPI methods of parallelization do not allow for interactive analysis, such as analysis using a Jupyter notebook, which is often desire by the earth science research community. Note that interactive analysis here does *not* mean constant visual presentation of the data with a graphical user interface (GUI) such as in ArcGIS. 

For earth-science-type data and analysis with Python, one of the simplest ways to run parallel computations in an interactive environment is with the Dask package.


## Core Lessons

Using Dask to:
- set up SLURM clusters
- scale clusters
- use adaptive clusters
- view Dask diagnostics

This tutorial will demonstrate how to use Dask to manage compute jobs on a SLURM cluster (including setting up your SLURM compute cluster, scaling the cluster, and how to use an adaptive cluster to save compute resources for others). The tutorial will also explain how to access the Dask diagnostics dashboard to view the cluster working in real time. 


<div class="alert alert-block alert-info">
THIS SECTION WILL BE PULLED FROM THE NOTEBOOK AND POSTED IN A TEXT FILE ALONGSIDE IT SO THAT PEOPLE WHO AREN'T YET SET UP ON THE HPC CAN ACCESS THE SETUP INSTRUCTIONS
    
## Pre-Tutorial Setup
#### 1. Be able to login to the Ceres HPC 

   You should receive email instructions for logging in and setting up multifactor authentication when your SCINet account is approved. 
   Instructions can also be found in the [Ceres User Guide](https://scinet.usda.gov/guide/ceres/#logging-in-to-scinet) and the [Multifactor Authentication Guide](https://scinet.usda.gov/guide/multifactor/#authentication-on-your-computer-using-authy). 
   Contact the VRSC at scinet_vrsc@usda.gov if you have problems.
    
#### 2. Get a Github account 
    
   If you don't already have a Github account, sign up for a free account at https://github.com/join.
    
#### 3. Software setup
   
   a. Install miniconda on your Ceres account. Experienced HPC users can go to https://docs.conda.io/en/latest/miniconda.html and install the latest version of miniconda either in your home directory or your project /KEEP directory, and then keep conda up to date using "conda update conda". 
   
   Another option, which may be better for newer HPC users, is to load the miniconda module that is already installed on Ceres. The only downside is that it may be slightly outdated and you will have to load the module every time you want to use it. Follow the instructions in the [User-Installed Software of Ceres with Conda Guide](https://scinet.usda.gov/guide/conda/).
    
#### 4. Clone the tutorials repository from Github to your Ceres account
   
   go to the repository at xx
   click "Clone or Download" and copy the http link to your clipboard
   login to the Ceres HPC and navigate to where you want to copy the repository 
   issue the command: git clone paste-the-repository-link-here
   
#### 5. Activate the python environment for this tutorial
   
   Copy the python environment for this tutorial xx located at xx to your conda environments folder, which should be located wherever you installed conda then under xx/xx/xx.
   Activate the python environment using "xx".

#### 6. Open JupyterLab (or JupyterHub?) to execute the tutorial
   follow the instructions at xx
    
</div>

--------------------------------------------------------------------

# Begin Tutorial: Parallel Computing on Ceres with Python and Dask

In this tutorial we will compute in parallel using Python's Dask package to communicate with the Ceres HPC SLURM job scheduler. 

SLURM (Simple Linux Utility for Resource Management) is a workload manager for HPC systems. From the [SLURM documentation](https://slurm.schedmd.com/quickstart.html), SLURM is "an open source... cluster management and job scheduling system for large and small Linux clusters. As a cluster workload manager, SLURM has three key functions. First, it allocates exclusive and/or non-exclusive access to resources (compute nodes) to users for some duration of time so they can perform work. Second, it provides a framework for starting, executing, and monitoring work (normally a parallel job) on the set of allocated nodes. Finally, it arbitrates contention for resources by managing a queue of pending work."

## Set up a SLURM Cluster with Dask

The first step is to create a SLURM cluster using the dask.distributed and dask_jobqueue packages. The SLURMCluster function can be interpreted as the settings/parameters for 1 SLURM compute job. Later, we can increase our compute power by "scaling our cluster", which means Dask will ask the SLURM scheduler to execute more than one job at a time for any given computation.
<br><br>

**Here's a key to the dask_jobqueue.SLURMCluster input parameters in the code block below:**

**cores** = Number of logical cores per job. This will be divided among the processes/workers. Can't be more than the lowest number of logical cores per node in the queue you choose, see https://scinet.usda.gov/guide/ceres/#partitions-or-queues.
   
**processes** = Number of processes per job (also known as Dask "workers" or "worker processes"). The number of cores per worker will be cores/processes. Can use 1 but more than 1 may help keep your computations running if cores/workers fail. For numeric computations (Numpy, Pandas, xarray, etc.), less workers may run significantly faster due to reduction in communication time. If your computations are mostly pure Python, it may be better to run with many workers each associated with only 1 core. [Here is more info than you'll probably ever want to know about Dask workers](https://distributed.dask.org/en/latest/worker.html). 

**memory** =  Memory per job. This will be divided among the processes/workers. See https://scinet.usda.gov/guide/ceres/#partitions-or-queues for the maximum memory per core you can request on each queue. **WHY LESS GB WHEN REQUESTING 3GB PER CORE = 120GB ON SHORT? GB vs GiB?** 

**queue** = Name of the Ceres queue, a.k.a. partition (e.g. short, medium, long, long60, mem, longmem, mem768, debug, brief-low, scavenger, etc.).

**walltime** = Time allowed before the job is timed out.

**local_directory** = local spill location if the core memory is exceeded **(DO ALL NODES HAVE LOCALLY ATTACHED STORAGE? WHAT TO PUT HERE /SCRATCH, $LOCAL_STORAGE? IF NOT WE HAVE TO MODIFY THE CONFIG FILE TO TURN OF SPILL OVER ~/.config/dask/jobqueue.yaml)**

**log_directory** = Location to write the stdout and stderr files for each worker process. Simplest choice may be the directory you are running your code from. 
<br><br>

You can view additional parameters, methods, and attributes in the Dask documentation for [dask_jobqueue.SLURMCluster](https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html).

In [1]:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import os

cluster = SLURMCluster(
    cores=38,
    processes=1,
    memory="114GB", #38 cores x 3GB/core
    queue="short",
    walltime="00:10:00",
    log_directory="/project/geospatial_user_test_geil/dask_jobqueue_workshop_materials/notebooks")

So far we have only set up a cluster, we have not started any compute jobs/workers running yet. We can verify this by issuing the following command in a Ceres terminal:
```
squeue -u firstname.lastname
```

To see the job script that will be used to start a job running on the Ceres HPC use the method .job_script() as shown in the code block below.
<br><br>

**Here's a key to the output of the cluster.job_script() command below:**

**-J** = Name of the job. This will appear in the "Name" column of the squeue output. "dask-worker" is the default value.

**-e and -o** = Name/Location of the stdout and stderr files for each job. This comes from the SLURMCLuster "log_directory" parameter.

**-p** = Name of the Ceres queue/partition. This comes from the SLURMCLuster "queue" parameter.

**-n** = Number of nodes. **ARE WE ALWAYS LIMITED TO 1 NODE FOR THE SETUP JOB? I GOT ERRORS WHEN PUTTING MORE CORES THAN 40. NEED TO TRY USING THE EXTRA PARAMETER TO SPECIFY NUMBER OF NODES**

**--cpus-per-task** = Number of cores per job (same as -N). This comes from the SLURMCluster "cores" parameter.

**--mem** = Memory per job. This comes from the SLURMCluster "memory" parameter. 

**-t** = Time allowed before the job is timed out. This comes from the SLURMCluster parameter "walltime".
<br>

In [None]:
print(cluster.job_script())

<br><br>
Next, we must initialize a Dask Client, which opens a line of communication between Dask worker processes and the SLURM job scheduler by pointing to the address of the scheduler (tcp://10.1.8.84:41601).


In [2]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.1.4.136:45973  Dashboard: http://10.1.4.136:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


Note: So far we have only set up a cluster and initialized a client. We still have not started any compute jobs running yet, as shown in the Cluster information above. We can also verify that no workers are running yet by issuing the squeue command in a Ceres terminal again as we did previously or we could access the Dask Diagnostics Dashboard for even more information.

## Viewing the Dask Diagnostics Dashboard

We will now take a look at the Dashboard to verify that there a no workers running in our cluster yet. Once we start computing, we will be able to use the Dashboard to see a visual representation of all the workers running.

INSERT INSTRUCTIONS TO OPEN THE DASHBOARD either port forward or better, figure out how to get the Dask extension working in JupyterLab/Hub

## Scale the Cluster to Start Computing

Now let's start multiple SLURM jobs computing. 



In [3]:
from time import time, sleep   #time for timing how long our computations take, sleep for pausing while SLURM starts the requested jobs

In [4]:
cluster.scale(jobs=3)  # scale to more jobs
sleep(15)              # pause while SLURM starts up the jobs
client

0,1
Client  Scheduler: tcp://10.1.4.136:45973  Dashboard: http://10.1.4.136:8787/status,Cluster  Workers: 1  Cores: 38  Memory: 114.00 GB


The .scale() method actually starts the jobs running as shown in the Cluster information above. A quick check of squeue will now show your multiple jobs running as well.

When we set up our original cluster (equivalent of 1 SLURM job) we requested 40 cores spread over 2 workers. When we scaled our cluster to 3 jobs we now have 40x3=120 cores spread over 2x3=6 workers, as shown above. Note: you can also scale your cluster by cores, workers or memory as opposed to jobs.

## Monte-Carlo Estimate of $\pi$

Now we will use the [Monte-Carlo method of estimating $\pi$](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)  to demonstrate how Dask can execute parallel computations with the SLURM Cluster we've built and scaled.

We estimate the number $\pi$ by exploiting that the area of a quarter circle of unit radius is $\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\pi/4$ as well. 

So for N randomly chosen pairs $(x, y)$ with $x\in[0, 1)$ and $y\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimate $\pi \approx 4 \cdot N_{circ} / N$.

[<img src="https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif" 
     width="50%" 
     align=top
     alt="PI monte-carlo estimate">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)

<br><br>
Let's define a function to compute $\pi$ and another function to print out some info during the compute.

In [5]:
import dask.array as da
import numpy as np

def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):  # IS THERE A REASON TO CHOOSE THIS CHUNKSIZE
    """Calculate PI using a Monte Carlo estimate."""
    
    size = int(size_in_bytes / 8)      # size= # of random numbers to generate (x & y vals), divide 8 bcz numpy float64 numbers generated by random.uniform use up 8 bytes
    chunksize = int(chunksize_in_bytes / 8)
    
    xy = da.random.uniform(0, 1,                          # this generates a set of x and y value pairs on the interval [0,1) of type float64
                           size=(size / 2, 2),            # divide 2 because we are generating an equal number of x and y values (to get our points)
                           chunks=(chunksize / 2, 2))     # WHY WOULD WE DIVIDE THE CHUNKSIZE BY 2 HERE????
    
    in_circle = ((xy ** 2).sum(axis=-1) < 1)     # a boolean array, True for points that fall inside the unit circle (x**2 + y**2 < 1)
    pi = 4 * in_circle.mean()                    # mean= sum the number of True elements, divide by the total number of elements in the array

    return pi

def print_pi_stats(size, pi, time_delta, num_workers):  
    """Print pi, calculate offset from true value, and print some stats."""
    print(f"{size / 1e9} GB\n"
          f"\tMC pi: {pi : 13.11f}"
          f"\tErr: {abs(pi - np.pi) : 10.3e}\n"
          f"\tWorkers: {num_workers}"
          f"\t\tTime: {time_delta : 7.3f}s")

## The Actual Calculations

We loop over different volumes (1GB, 10GB, and 100GB) of double-precision random numbers (float64, 8 bytes each) and estimate $\pi$ as described above. Note, we call the function with the .compute() method to start the computations.

In [6]:
for size in (1e9 * n for n in (1, 10, 100)):
    
    start = time()
    pi = calc_pi_mc(size).compute()
    elaps = time() - start

    print_pi_stats(size, pi, time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))

1.0 GB
	MC pi:  3.14149523200	Err:  9.742e-05
	Workers: 2		Time:   0.734s
10.0 GB
	MC pi:  3.14155292800	Err:  3.973e-05
	Workers: 3		Time:   1.279s
100.0 GB
	MC pi:  3.14161598976	Err:  2.334e-05
	Workers: 3		Time:   6.973s


## Scale the Cluster to Twice its Size and Re-run the Same Calculations

We increase the number of workers times 2 and the re-run the experiments. You could also double the size of the cluster by doubling the number of jobs, cores, or memory.

In [7]:
new_num_workers = 2 * len(cluster.scheduler.workers)

print(f"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.")

cluster.scale(new_num_workers)

# the following commands all get you the same amount of compute resources as above
#cluster.scale(12)               # same as code above. default parameter is workers. (original num workers was 6)
#cluster.scale(jobs=6)           # can scale by number of jobs
#cluster.scale(cores=240)        # can also scale by cores
#cluster.scale(memory=600) double check this one

sleep(15)
client




Scaling from 3 to 6 workers.


0,1
Client  Scheduler: tcp://10.1.4.136:45973  Dashboard: http://10.1.4.136:8787/status,Cluster  Workers: 3  Cores: 114  Memory: 342.00 GB


In [8]:
for size in (1e9 * n for n in (1, 10, 100)):
    
        
    start = time()
    pi = calc_pi_mc(size).compute()
    elaps = time() - start

    print_pi_stats(size, pi,
                   time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))

1.0 GB
	MC pi:  3.14180556800	Err:  2.129e-04
	Workers: 3		Time:   0.905s
10.0 GB
	MC pi:  3.14153106560	Err:  6.159e-05
	Workers: 3		Time:   1.086s
100.0 GB
	MC pi:  3.14159702528	Err:  4.372e-06
	Workers: 3		Time:   6.675s


## Automatically Scale the Cluster Up and Down (Adaptive Cluster)

Using the .adapt() method will dynamically scale up the cluster when necessary but scale it down and save compute resources when not actively computing. Dask will ask the SLURM job scheduler to run more jobs, scaling up the cluster, when workload is high and shut the extra jobs down when workload is smaller.

Note that cluster scaling is bound by SCINet HPC user limitations. These limitations on the Ceres HPC are 400 cores, 1512GB memory, and 100 jobs max running simultaneously per user. So for example, if you set your cluster up with 40 cores per job and scale to 20 jobs (40x20=800cores) you will only get 400 cores (10 jobs) running at any time and the remaining requested jobs will not run. Your computations will still run successfully, but they will run on 10 jobs/400 cores instead of the requested 20 jobs/800 cores.

_**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._

In [9]:
ca = cluster.adapt(minimum_jobs=1, maximum_jobs=9);

sleep(5)  # Allow for scale-down
client

0,1
Client  Scheduler: tcp://10.1.4.136:45973  Dashboard: http://10.1.4.136:8787/status,Cluster  Workers: 1  Cores: 38  Memory: 114.00 GB


Now, we'll repeat the calculations with our adaptive cluster and a larger workload. Watch the dash board!

WHY DOES THE ADAPTIVE COMPUTE CHOOSE ONLY 1 JOB FOR 10 GB? CAN WE SET A PARAMETER TO MAKE COMPUTE HAPPEN FASTER??? CANT FIND THE API FOR .ADAPT()
 

In [10]:
for size in (n * 1e9 for n in (1, 10, 100, 1000)):
    
    
    start = time()
    pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute()
    elaps = time() - start

    print_pi_stats(size, pi, time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))
    
    sleep(20)  # allow for scale-down time

1.0 GB
	MC pi:  3.14157190400	Err:  2.075e-05
	Workers: 1		Time:   5.114s
10.0 GB
	MC pi:  3.14158664320	Err:  6.010e-06
	Workers: 1		Time:   5.478s
100.0 GB
	MC pi:  3.14156959040	Err:  2.306e-05
	Workers: 1		Time:  17.916s
1000.0 GB
	MC pi:  3.14159782842	Err:  5.175e-06
	Workers: 9		Time:  133.827s


## Complete listing of software used here

In [None]:
%pip list #DO WE REALLY NEED TO PIP AND CONDA?

In [None]:
%conda list --explicit