# Parallel on HPCs with `ipyparallel`

It's not very common to face extremely heavy computational tasks in bioinformatics study. However, when we really encounter one, it's pain as hell. Nobody like to wait a task that cost more than 24 hrs, especially when knowing that the result might not be enlightening. Parallel computing definitely helps in such scenario: if you can properly split the task and distribute them on different cores, with enough CPUs the runtime could be reduced drastically. In this snippet, I will introduce a package in Python that made parallelization eazy and sweet--`ipyparallel`.

## Rationale
There are multiple libraries for doing parallel computing in python, why should we care about this one (given that it has such a long weired name)? There are two main reasons: 
1. It allows you to distribute tasks not only locally on multiple cores, but also across machines that's in the same network. That means you can utilize 10 cores on each hpc in our server using **one single command**. 
2. It has good integration with jupyter notebook. In fact it's a spin-off from the original ipython project. `ipython-notebook` evolves into `jupyter notebook`, while `IPython.parallel` becomes into `ipyparallel`. When you open a jupyter notebook, on the root page you see 3 tabs: "Files", "Running", and "Clusters". The "Clusters" tab will provide you the realtime information for your current `ipyparallel` session.

## Intro
- The Docs of `ipyparallel` is in https://ipyparallel.readthedocs.io/en/latest/intro.html. 
- The installation can be easily performed using `pip install ipyparallel` or `conda install ipyparallel`.
- The "visible" components in the `ipyparallel` include:
    - Cluster
        - One Controller (the commander)
        - Multiple engines (the workforce)
    - Client(s) (the main interactive python session)
- Settings, including settings for Controller/Engines are stored in `~/.ipython/profile_{default}/*.py`. There could be multiple setting "profiles": For example, for one particular task, you might need 20 cores on hpc1-5; for another task, you might need 160 cores spread across hpc1-12. You can then setup two profiles for these tasks correspondingly.

## Setup (one HPC only)
1. Use `ipcluster start -n 4` to start a controller and 4 engines on local machine (with default profile), each engines would correspond to one CPU.
2. After the command line output says `Engines appear to have started successfully`, open a `ipython/jupyter` session and try:

In [13]:
# import ipyparallel
import ipyparallel as ipp
# start a client in this python session
c = ipp.Client()
# client will automatically identify the controller (on the same machine, with the default profile)
# list ids of engines
c.ids
# Out[3]: [0, 1, 2, 3]

[0, 1, 2, 3]

In [14]:
c[:].apply_sync(lambda : "Hello, World")
# Out[4]: [ 'Hello, World', 'Hello, World', 'Hello, World', 'Hello, World' ]

['Hello, World', 'Hello, World', 'Hello, World', 'Hello, World']

## Setup (multiple HPCs)
0. For convenience's sake, it's better to setup passwork-less ssh login between different HPCs. You can do that by `ssh-keygen` on each HPCs using a prefix like `hpcN(N=1,2,3...)` and `cat ~/.ssh/hpcN.pub >> ~/.ssh/authorized_keys` to append the public keys to the trusted list.

1. Create a new profile PROFILE: `ipython profile create --parallel --profile=PROFILE`
1. Setup the config file for ipcluster: un-comment or add the following lines in `~/.ipython/profile_PROFILE/ipcluster_config.py`:

```python
c.IPClusterEngines.engine_launcher_class = 'SSH'
c.IPClusterStart.controller_launcher_class = 'Local'
c.SSHLauncher.to_fetch = []
c.SSHLauncher.to_send = []
c.SSHClusterLauncher.remote_profile_dir = '/data/d0/gds/USERNAME/.ipython/profile_PROFILE'
c.SSHEngineSetLauncher.engines = {
    # specify the hostname of hpc you want to use and the corresponding number of cores
    'hpc7': 10,
    'hpc8': 10,
    'hpc9': 10,
    'hpc10': 10,
    'hpc11': 10,
    'hpc12': 10,
    'hpc13': 10,
    'hpc14': 10,
    'hpc15': 10
}
c.SSHEngineSetLauncher.engine_cmd = [
    # modify this to your /path/to/bin/of/ipengine, you can `which ipengine`
    '/path/to/ipengine'
]
c.SSHEngineSetLauncher.engine_args = ['--profile=PROFILE']

```  
   
2. Setup the config file for controllers: un-comment or add the following lines in `~/.ipython/profile_PROFILE/ipcontroller_config.py`:

```python
c.HubFactory.client_ip = '*'
c.HubFactory.engine_ip = '*'
c.HubFactory.monitor_ip = '*'
```

3. Use `ipcluster start --profile=='PROFILE'` to start the specified profile
4. Open a client python session as below:

In [None]:
# import ipyparallel
import ipyparallel as ipp
# start a client in this python session
c = ipp.Client(profile='PROFILE')

## Import in engines
It's common to use other libraries in computation. For example, to import `numpy` on both engines and clients, use `sync_imports`:

In [26]:
dview = c[:] #direct view interface, see https://ipyparallel.readthedocs.io/en/latest/direct.html 
with dview.sync_imports():
    import numpy
    
[numpy.version.version, c[:].apply_sync(lambda : numpy.version.version)]

importing numpy on engine(s)


['1.15.0', ['1.15.0', '1.15.0', '1.15.0', '1.15.0']]

## Push
Memories are not shared across engines and clients. Thus, sometimes we need to send the data objects and functions to engines:

In [None]:
a = numpy.array([1,2,3,4,5])
def print_sum(arr):
    s = arr.sum()
    print(s)
    return s

push a and print_sum to all engines

In [None]:
dview.push(dict(
    print_sum=print_sum, # format: name_in_engines=name_in_current_client
    a=a
))

View a on all engines

In [34]:
dview['a']

[array([1, 2, 3, 4, 5]),
 array([1, 2, 3, 4, 5]),
 array([1, 2, 3, 4, 5]),
 array([1, 2, 3, 4, 5])]

You can also `pull` back variables:

In [71]:
dview.pull('a').get()

[array([1, 2, 3, 4, 5]),
 array([1, 2, 3, 4, 5]),
 array([1, 2, 3, 4, 5]),
 array([1, 2, 3, 4, 5])]

## Sync and async call
`apply` and `map` function have two versions: `sync` and `async`.
`sync` call will block the current client session and wait until all results been transmit back to the client, the result is return directly by the call

In [36]:
dview.apply_sync(lambda x : print_sum(x), a)

[15, 15, 15, 15]

`async` call will not block current session, the results need to be queried using `.get()` method

In [40]:
async_result = dview.apply_async(lambda x : print_sum(x), a)
async_result

<AsyncResult: <lambda>>

In [41]:
async_result.get()

[15, 15, 15, 15]

## `scatter` and `gather`
Sometimes we have a long list of objects and we'd like to have some divide-and-conquer, `scatter` can help us distribute an list of objects across engines, while `gather` helps us to collect results back from engines, and flatten them in a list.

In [47]:
b = numpy.array([1,2,3,4,5,6,7,8,9])
def print_sum(arr):
    s = arr.sum()
    print(s)
    return s

In [59]:
dview.scatter('bpart', b)

<AsyncResult: scatter>

In [60]:
dview['bpart']

[array([1, 2, 3]), array([4, 5]), array([6, 7]), array([8, 9])]

In [61]:
dview.gather('bpart').get()

array([1, 2, 3, 4, 5, 6, 7, 8, 9])

In [65]:
dview.execute('bpart_sum=print_sum(bpart)')
dview.gather('bpart_sum').get()

[6, 9, 13, 17]

In [67]:
sum(dview.gather('bpart_sum').get())

45

## ipython magic commands
In short, use `%px` before commands you want to run in engines:

In [72]:
%px print(bpart)

[stdout:0] [1 2 3]
[stdout:1] [4 5]
[stdout:2] [6 7]
[stdout:3] [8 9]


## List comprehension parallelization
List comprehension is a commonly used grammar sugar in python. We can use `scatter` and `gather` with list comprehension for parallel computing:

In [76]:
dview.scatter('x',range(16))
%px y = [i**3 for i in x]
y = dview.gather('y')
y.get()

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375]