### Note: This is only applicable to Rhino. The principles of parallel computing are general and can be used to accelerate code on any system, but the implementations here are designed to interact with the scheduler used on the Computational Memory Lab cluster.

## Parallel Computing with Dask

<img src="https://hpc.llnl.gov/sites/default/files/styles/with_sidebar_1_up/public/nodesNetwork.gif?itok=TBqDQmx0">

A node is a like a computer within a much bigger computer!

**CMLDask wrapper**:
* See https://github.com/pennmem/cmldask for wrapper package created for convenience using Rhino + typical parallel setup for our analyses
* Check out [Dask](https://docs.dask.org/en/stable/) and [Dask distributed](https://distributed.dask.org/en/stable/) for more documentation about the underlying implementation. In particular, we're using the [Futures API](https://docs.dask.org/en/stable/futures.html?highlight=futures) for managing parallel tasks and [dask-jobqueue](http://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SGECluster.html#dask_jobqueue.SGECluster) to connect with the Sun Grid Engine (SGE) scheduler on rhino. 
* Note that this package is intended for convenience and will not accomodate very specific computational needs that demand complex parallel architectures. You will need to use dask directly and create your own `Client()` instances that are more closely tailored to your needs.

### Basic usage works as follows:

In [1]:
import cmldask.CMLDask as da

In [2]:
import os
import cmldask
from dask.distributed import wait, as_completed, progress

def squared(x):
    return x**2

logdir = os.path.join(os.path.abspath(os.curdir), 'dask_appendix_logs')
first_run = False
if not os.path.exists(logdir):
  os.mkdir(logdir)
  first_run = True
client = da.new_dask_client_slurm("test_dask", "1GB", log_directory=logdir, max_n_jobs=5)

Unique port for ryan.colyer is 51391
{'dashboard_address': ':51391'}
To view the dashboard, run: 
`ssh -fN ryan.colyer@rhino2.psych.upenn.edu -L 8000:192.168.86.143:51391` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser


In [3]:
# where cluster run logs will be stored
logdir

'/home1/ryan.colyer/COGS4290_DataMemoryBrains/dask_appendix_logs'

**Note that you need to explicitly shut down this client (`client.shutdown()`) or shut down your jupyter kernel before running this cell again, or you might leave workers "stranded" without access to them (since that access is provided by the Client you would overwrite)**

You have options when setting up your Dask cluster/client:

In [4]:
da.new_dask_client_slurm?

[0;31mSignature:[0m
[0mda[0m[0;34m.[0m[0mnew_dask_client_slurm[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mjob_name[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmemory_per_job[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmax_n_jobs[0m[0;34m=[0m[0;36m100[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mthreads_per_job[0m[0;34m=[0m[0;36m1[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mprocesses_per_job[0m[0;34m=[0m[0;36m1[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0madapt[0m[0;34m=[0m[0;32mTrue[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mqueue[0m[0;34m=[0m[0;34m'RAM'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mwalltime[0m[0;34m=[0m[0;34m'1500000'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mlocal_directory[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mlog_directory[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mscheduler_options[0m[0;34m=[0m[0;34m{[0m[0;34m'dashboard_address'[0m[0;34m:[0m 

## Parallel computing tips

* Jobs are still subject to memory limitations, so you may need to **break up large processes into smaller chunks.** For example, each job could correspond to analyzing one session, instead of one subject (this should be done, e.g., for all Morlet wavelet analyses). 
* It is often useful to save the output of each job in a dedicated directory, and sometimes useful to save intermediate values to aid in debugging or later nonparallel analyses. The Python "os" library can be helpful here. 
* **Be respectful!** There are only so many computational resources available to your classmates, the Kahana lab, and our collaborators across the country.
* **Limit the memory usage of jobs**. Course students should confine parallel jobs to 20 in parallel or 100GB, whichever comes first! So if you reserve 10GB per process, that means only running 10 in parallel. Or, 5GB per process would mean 20 in parallel. Set your job launching parameters accordingly on all jobs!
* You can always use the '**qdel**' command in Terminal, followed by your job number, to kill any of your old jobs that may be wasting rhino's resources. 
* Use the '**squeue**' command in Terminal to see cluster usage information for all users. The **'squeue | grep your_username'** command will just show the jobs associated with your username, but requires you to use only the first 8 characters of your username for "your_username".
* Search online for other slurm commands you can utilize if you want to see additional information. This is all very well documented.
* Each rhino2 node has ~128 GB of memory and ~40 cores.


### Simple Dask Futures Usage

In [3]:
import numpy as np
from time import sleep

# functions to run on the cluster
def add(a, b):
    sleep(np.random.randint(0, 10))
    return a + b

def error_add(a, b):
    sleep(np.random.randint(0, 10))
    if a % 2:
        raise ValueError
    return a + b

# launch the jobs
futures = client.map(add, range(40), range(40))
progress(futures)

VBox()

In [4]:
if first_run:
    import time
    time.sleep(30) # Give slurm a chance to create files on the first run

# just for illustration; the log files are just text files, so it's typically easier to open them up manually
with open(os.path.join(logdir, os.listdir(logdir)[0]), 'r') as f:
    log = f.read()
print(log)




In [5]:
client.map?

[0;31mSignature:[0m
[0mclient[0m[0;34m.[0m[0mmap[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mfunc[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0;34m*[0m[0miterables[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mkey[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mworkers[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mretries[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mresources[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpriority[0m[0;34m=[0m[0;36m0[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mallow_other_workers[0m[0;34m=[0m[0;32mFalse[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mfifo_timeout[0m[0;34m=[0m[0;34m'100 ms'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mactor[0m[0;34m=[0m[0;32mFalse[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mactors[0m[0;34m=[0m[0;32mFalse[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpure[0m[0;34m=[0m[0;32mTrue[

### Handling Errors

Now with errors

In [6]:
futures = client.map(error_add, range(40), range(40))

Wait for all jobs to finish, check for errors

In [7]:
wait(futures)
errors = da.get_exceptions(futures, range(40))
errors

Unnamed: 0_level_0,exception,traceback_obj
param,Unnamed: 1_level_1,Unnamed: 2_level_1
1,ValueError(),<traceback object at 0x2abeeb8bc1c0>
3,ValueError(),<traceback object at 0x2abeeb887e80>
5,ValueError(),<traceback object at 0x2abeeb87ecc0>
7,ValueError(),<traceback object at 0x2abeeb8d69c0>
9,ValueError(),<traceback object at 0x2abeeb8e0b80>
11,ValueError(),<traceback object at 0x2abeeb8d4c80>
13,ValueError(),<traceback object at 0x2abeeb8d4480>
15,ValueError(),<traceback object at 0x2abeeb8bee80>
17,ValueError(),<traceback object at 0x2abeeb8bffc0>
19,ValueError(),<traceback object at 0x2abeeb8e3b00>


Pick out the index where you want to view the traceback message

In [10]:
da.print_traceback(errors, 5)

  File "/tmp/ipykernel_11763/762868183.py", line 12, in error_add
    raise ValueError


Notice gathering these doesn't work because there are errors

In [11]:
client.gather(futures)

ValueError: 

Instead, let's filter for successful ones

In [12]:
good_futures = da.filter_futures(futures)
client.gather(good_futures)

[0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48, 52, 56, 60, 64, 68, 72, 76]

## IMPORTANT: Shutdown your client (or restart your kernel, which will do so automatically)

In [13]:
client.shutdown()

2023-09-25 01:12:17,587 - distributed.core - ERROR - 
Traceback (most recent call last):
  File "/usr/global/miniconda/py310_23.1.0-1/envs/workshop_311/lib/python3.11/site-packages/distributed/utils.py", line 803, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/global/miniconda/py310_23.1.0-1/envs/workshop_311/lib/python3.11/site-packages/distributed/scheduler.py", line 7144, in feed
    state = await state
            ^^^^^^^^^^^
  File "/usr/global/miniconda/py310_23.1.0-1/envs/workshop_311/lib/python3.11/site-packages/distributed/diagnostics/progressbar.py", line 293, in setup
    await p.setup()
  File "/usr/global/miniconda/py310_23.1.0-1/envs/workshop_311/lib/python3.11/site-packages/distributed/diagnostics/progress.py", line 203, in setup
    await asyncio.sleep(0.05)
  File "/usr/global/miniconda/py310_23.1.0-1/envs/workshop_311/lib/python3.11/asyncio/tasks.py", line 639, in sleep
    return await future
           ^^^^^^^^^

## Recommended Best Practices

1. Watch your memory usage. Jobs that go over memory limits will error out. Delete unnecessary variables or overwrite variables to reduce memory overhead.
2. Saving out **dimensionally reduced** intermediate outputs. 
    * Clusters can be unreliable and have outages, and jobs can get killed. Saving results from each job ensures that all results will not lost if a run dies partway through. 
    * You can write code to check if the results for a particular job have successfully returned and only recompute results for failed jobs.
3. Avoid heavy disk usage by reading from and writing to a smaller number of medium to large-sized files rather than a large number of small files. The network/disk bandwidth on Rhino is a major bottleneck. If you use file IO (Input/Output, i.e., reading and writing to files) heavily, it will not only makes others' work slow, it can slow down your own analyses substantially.


In [14]:
# example code with the Pickle library, which allows for saving and loading (most) objects in Python
# the pickle library is not recommended for long-term storage since it depends on having the same object 
# code/package versions available when loading an object that were present when the object was saved
import pickle
import numpy as np

arr = np.arange(1000)

# save output
with open('arr_test.pkl', 'wb') as f:
    pickle.dump(arr, f)
    
# load output
with open('arr_test.pkl', 'rb') as f:
    arr_loaded = pickle.load(f)

# confirm that loaded == saved
np.all(arr == arr_loaded)

True

In [15]:
class Settings():
  '''settings = Settings()
     settings.somelist = [1, 2, 3]
     settings.importantstring = 'saveme'
     settings.Save()

     settings = Settings.Load()
  '''
  def __init__(self, **kwargs):
    for k,v in kwargs.items():
      self.__dict__[k] = v

  def Save(self, filename='settings.pkl'):
    import pickle
    with open(filename, 'wb') as fw:
      fw.write(pickle.dumps(self))

  def Load(filename='settings.pkl'):
    import pickle
    return pickle.load(open(filename, 'rb'))

  def __repr__(self):
    return ('Settings(' +
      ', '.join(str(k)+'='+repr(v) for k,v in self.__dict__.items()) +
      ')')

  def __str__(self):
    return '\n'.join(str(k)+': '+str(v) for k,v in self.__dict__.items())


In [16]:
settings = Settings()
settings.experiment = 'FR1'
settings.logbase = 'totalwords_'
settings.sub_list = ['R1341T', 'R1391T', 'R1395M', 'R1467M']

settings.Save('totalwords.pkl')
print(settings)

experiment: FR1
logbase: totalwords_
sub_list: ['R1341T', 'R1391T', 'R1395M', 'R1467M']


In [17]:
import cmldask.CMLDask as da
from dask.distributed import wait, as_completed, progress
from cmlreaders import CMLReader, get_data_index
import numpy as np
import pandas as pd
import datetime
import traceback

def TotalWords(sub):
  logfile = f'{settings.logbase}{sub}.txt'
  
  df = get_data_index('all')
  df = df[df['subject']==sub]
  df = df[df['experiment']==settings.experiment]

  total_words = 0
  for df_sess in df.itertuples():
    try:
      df_sess = df_sess._asdict()
      # Get parameters from the session DataFrame
      exp = df_sess['experiment']
      sess = df_sess['session']
      # Prepare the reader
      reader = CMLReader(sub, exp, sess, df_sess['montage'], df_sess['localization'])
      # Get word events
      evs = reader.load('task_events')
      word_evs = evs[evs['type']=='WORD']
      # Do our analysis!
      total_words += len(word_evs)
    except Exception as e:
      # Log the exception to a subject-labeled filename,
      # along with a label of subject, experiment, and session.
      with open(logfile, 'a') as fw:
        date = datetime.datetime.now().strftime('%F_%H-%M-%S')
        fw.write(f'{date}: {sub}, {exp}, {sess}\n' +
                 ''.join(traceback.format_exception(type(e), e, e.__traceback__)))

    
  # Save the result.
  np.save('wordcount_'+sub+'.npy', [total_words])
  return total_words

In [18]:
# Run in parallel
client = da.new_dask_client_slurm("test_dask", "4GB", max_n_jobs=5)
futures = client.map(TotalWords, settings.sub_list)

Unique port for ryan.colyer is 51391
{'dashboard_address': ':51391'}
To view the dashboard, run: 
`ssh -fN ryan.colyer@rhino2.psych.upenn.edu -L 8000:192.168.86.143:51391` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser


In [19]:
# Wait for results, check errors.
wait(futures)
errors = da.get_exceptions(futures, range(40))
errors

Exception: None of the given futures resulted in exceptions

In [20]:
good_futures = da.filter_futures(futures)
client.gather(good_futures)

[1260, 468, 936, 156]

In [21]:
settings = Settings.Load('totalwords.pkl')
for sub in settings.sub_list:
  total_words = np.load('wordcount_'+sub+'.npy')
  print(f'{sub} had {total_words} words')

R1341T had [1260] words
R1391T had [468] words
R1395M had [936] words
R1467M had [156] words


Happy parallel computing!

**Exercise: Write a parallel function that returns the number of (bipolar) electrodes for the first 20 subjects of the 'r1' RAM dataset (ignore localization and montage details). Run with 5 jobs and 1 core per job. This will require you to integrate the data loading procedures you learned earlier with the dask framework exhibited using the toy examples above**

This example is purely for illustration and this task is likely not much faster with multiple nodes than with a single node.