### Note: This is only applicable to Rhino. The principles of parallel computing are general and can be used to accelerate code on any system, but the implementations here are designed to interact with the scheduler used on the Computational Memory Lab cluster.

## Parallel Computing with Dask

<img src="https://hpc.llnl.gov/sites/default/files/styles/with_sidebar_1_up/public/nodesNetwork.gif?itok=TBqDQmx0">

A node is a like a computer within a much bigger computer!

**CMLDask wrapper**:
* See https://github.com/pennmem/cmldask for wrapper package created for convenience using Rhino + typical parallel setup for our analyses
* Check out [Dask](https://docs.dask.org/en/stable/) and [Dask distributed](https://distributed.dask.org/en/stable/) for more documentation about the underlying implementation. In particular, we're using the [Futures API](https://docs.dask.org/en/stable/futures.html?highlight=futures) for managing parallel tasks and [dask-jobqueue](http://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SGECluster.html#dask_jobqueue.SGECluster) to connect with the Sun Grid Engine (SGE) scheduler on rhino. 
* Note that this package is intended for convenience and will not accomodate very specific computational needs that demand complex parallel architectures. You will need to use dask directly and create your own `Cluster()` and `Client()` instances that are more closely tailored to your needs.

### Basic usage works as follows:

In [1]:
import cmldask.CMLDask as da

  from distributed.utils import tmpfile


In [2]:
import cmldask
from dask.distributed import wait, as_completed, progress

def squared(x):
    return x**2

client = da.new_dask_client("test_dask", "1GB")

Unique port for jrudoler is 51360
{'dashboard_address': ':51360'}
To view the dashboard, run: 
`ssh -fN jrudoler@rhino2.psych.upenn.edu -L 8000:192.168.86.141:51360` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser


**Note that you need to explicitly shut down this client (`client.shutdown()`) or shut down your jupyter kernel before running this cell again, or you might leave workers "stranded" without access to them (since that access is provided by the Client you would overwrite)**

You have options when setting up your Dask cluster/client:

In [6]:
da.new_dask_client?

[0;31mSignature:[0m [0mda[0m[0;34m.[0m[0mnew_dask_client[0m[0;34m([0m[0mjob_name[0m[0;34m,[0m [0mmemory_per_job[0m[0;34m,[0m [0mmax_n_jobs[0m[0;34m=[0m[0;36m100[0m[0;34m,[0m [0mthreads_per_job[0m[0;34m=[0m[0;36m1[0m[0;34m,[0m [0mprocesses_per_job[0m[0;34m=[0m[0;36m1[0m[0;34m,[0m [0madapt[0m[0;34m=[0m[0;32mTrue[0m[0;34m,[0m [0mqueue[0m[0;34m=[0m[0;34m'RAM.q'[0m[0;34m,[0m [0mwalltime[0m[0;34m=[0m[0;34m'1500000'[0m[0;34m,[0m [0mlocal_directory[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mlog_directory[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mscheduler_options[0m[0;34m=[0m[0;34m{[0m[0;34m'dashboard_address'[0m[0;34m:[0m [0;34m':51360'[0m[0;34m}[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Returns a new dask client instance with associated dashboard for
monitoring jobs. The default method assumes a very basic use case - that
is, embarassingly para

## Parallel computing tips

* Jobs are still subject to memory limitations, so you may need to **break up large processes into smaller chunks.** For example, each job could correspond to analyzing one session, instead of one subject. 
* It is often useful to save the output of each job in a dedicated directory, and sometimes useful to save intermediate values to aid in debugging or later nonparallel analyses. The Python "os" library can be helpful here. 
* **Be respectful!** There are only so many cores available to the entire Kahana lab and our collaborators across the country. 
* **Limit typical jobs to 100 cores or less**. Heavy usage means fewer resources for other users to use, and due to shared disk resources might actually slow down all jobs overall. Please ask for permission before using more.
* You can always use the '**qdel**' command in Terminal, followed by your job number, to kill any of your old jobs that may be wasting rhino's resources. 
* Use the '**qstat**' command in Terminal to see cluster usage information.
* Each rhino2 node has ~128 GB of memory and ~40 cores. 


### Simple Dask Futures Usage

In [4]:
import numpy as np
from time import sleep

def add(a, b):
    sleep(np.random.randint(0, 10))
    return a + b

def error_add(a, b):
    sleep(np.random.randint(0, 10))
    if a % 2:
        raise ValueError
    return a + b

futures = client.map(add, range(40), range(40))
progress(futures)

VBox()

### Handling Errors

Now with errors

In [7]:
futures = client.map(error_add, range(40), range(40))

Wait for all jobs to finish, check for errors

In [9]:
wait(futures)
errors = da.get_exceptions(futures, range(40))
errors

Unnamed: 0_level_0,param,exception,traceback_obj
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,1,ValueError(),<traceback object at 0x2b004e8a38c0>
3,3,ValueError(),<traceback object at 0x2b004e99af00>
5,5,ValueError(),<traceback object at 0x2b004e9bbb90>
7,7,ValueError(),<traceback object at 0x2b004ea60dc0>
9,9,ValueError(),<traceback object at 0x2b004ea0bc80>
11,11,ValueError(),<traceback object at 0x2b004e8c9d70>
13,13,ValueError(),<traceback object at 0x2b004ea1a6e0>
15,15,ValueError(),<traceback object at 0x2b004e98bb90>
17,17,ValueError(),<traceback object at 0x2b004e9bdd20>
19,19,ValueError(),<traceback object at 0x2b004e9f6af0>


Pick out the index where you want to view the traceback message

In [11]:
da.print_traceback(errors, 5)

  File "<ipython-input-4-492535f77c2d>", line 11, in error_add
    raise ValueError


Notice gathering these doesn't work because there are errors

In [12]:
client.gather(futures)

ValueError: 

Instead, let's filter for successful ones

In [14]:
good_futures = da.filter_futures(futures)
client.gather(good_futures)

[0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48, 52, 56, 60, 64, 68, 72, 76]

## IMPORTANT: Shutdown your client (or restart your kernel, which will do so automatically)

In [15]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError


Happy parallel computing!

**Exercise: Write a parallel function that returns the number of (bipolar) electrodes for every subject in the RAM example dataset. Run with 5 jobs and 1 core per job. This will require you to integrate the data loading procedures you learned earlier with the dask framework exhibited using the toy examples above**