# Parallelisation

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/lukeconibear/swd6_hpp/blob/main/docs/06_parallelisation.ipynb)

In [None]:
import sys
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
    %pip install dask[dataframe] joblib ray

## What is it?

Parallelisation divides a large problem into many smaller ones and solves them *simultaneously*.
- *Divides up the time/space complexity across workers.*
- Tasks centrally managed by a scheduler.
- Multi-processing (cores)
    - Useful for compute-bound problems.
    - Overcomes the [Global Interpreter Lock, GIL](https://wiki.python.org/moin/GlobalInterpreterLock) (prevents running the bytecode on mutliple threads simutaneously).  
    - Lower performance when need to exchange/aggregate data.
    - Suitable for text data and collections.
- Multi-threading (parts of processes)
    - Useful for memory-bound problems.  
    - Suitable for numeric data (e.g., NumPy).
    
Parallelised code often introduces overheads. So, the speed-up benefits are more pronounced with bigger jobs, rather than some of the small examples used in this tutorial.

```{warning}
Issues can arise from pinning processes to specific cores and oversubscribing threads.
The following lines are recommended to be placed at the top of your job submission script.
```

```bash
# ensure processes not pinned
unset GOMP_CPU_AFFINITY

# ensure linear algebra libraries using 1 thread
# https://docs.dask.org/en/stable/array-best-practices.html#avoid-oversubscribing-threads
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
```

## Parallelising a Python?

Python itself is not designed for massive scalability and controls threads preemptively using the GIL. This has lead many libraries to work around this using C/C++ backends.  

Some options include:  

[multiprocessing](https://docs.python.org/3/library/multiprocessing.html) for creating a pool of asynchronous workers. 

In [6]:
from multiprocessing import Pool

def my_function(x):
    return x * x

if __name__ == '__main__':
    with Pool(3) as workers:
        print(workers.map(my_function, [1, 2, 3]))

[1, 4, 9]


[joblib](https://joblib.readthedocs.io/en/latest/) for creating lightweight pipelines that help with "embaressingly parallel" tasks.

In [5]:
import joblib
import math

joblib.Parallel(n_jobs=1)(
    joblib.delayed(math.sqrt)(i ** 2) for i in range(8)
)

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0]

[asyncio](https://docs.python.org/3/library/asyncio.html) for concurrent programs, especially ones that are IO-bound.  

```python
import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')
    
asyncio.run(main())
```

These options work well for the CPU cores on your machine, though not really beyond that.  

## [Dask](https://docs.dask.org/en/latest/)

Dask has great features, helpful documentation, and a familiar API.

It works through creating and computing [task graphs](https://docs.dask.org/en/stable/graphs.html). 

Task graphs have nodes (functions) and edges (objects). 

For example, the task graph might be:
- [Embarrassingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallel) (apply one function to many pieces of data independently)
- [MapReduce](https://en.wikipedia.org/wiki/MapReduce) (map a function to the data and reduce / summarise the output)

![dask-task-graphs.svg](images/dask-task-graphs.svg)  

*[Image source](https://docs.dask.org/en/stable/graphs.html)*

These task graphs are executed by a [scheduler](https://docs.dask.org/en/stable/scheduling.html).

The resources used by this scheduler are managed by a cluster.

_Note, this is separate to [ARC's scheduler](https://arcdocs.leeds.ac.uk/usage/start.html)._

There are two main types of Dask scheduler which can [deploy jobs](https://docs.dask.org/en/stable/deploying.html):

- [Single machine](https://docs.dask.org/en/stable/deploying-python.html)
    - Cluster manager: [`LocalCluster()`](http://distributed.dask.org/en/stable/api.html#distributed.LocalCluster)
    - Simpler.
    - For your laptop or a local server.
- [Distributed](https://docs.dask.org/en/stable/deploying-hpc.html)
    - Cluster manager: [`SGECluster()`](http://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SGECluster.html), [`SLURMCluster()`](http://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html)
    - More complex.
    - For a cluster on a high performance computer (e.g., SGE, SLURM), Kubernetes, or cloud.

![dask-cluster-manager.svg](images/dask-cluster-manager.svg)  

*[Image source](https://docs.dask.org/en/stable/deploying.html)*

### [Single machine](https://docs.dask.org/en/stable/deploying-python.html)

In [None]:
if not IN_COLAB:
    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster()
    client = Client(cluster)
    client 

If want multiple threads, then could use keyword arguments in Client instance:
```python
client = Client(processes=False, threads_per_worker=4, n_workers=1)
```

Remember (important), always need to close down the client at the end:
```python
client.close()
```

### Applications

#### [dask.array](https://examples.dask.org/array.html) (NumPy)

In [None]:
import dask.array as da

my_array = da.random.random(
    (5_000, 5_000),
    chunks=(500, 500) # dask chunks
)
result = my_array + my_array.T
print(result)

if not IN_COLAB:
    result.compute()

#### [dask.dataframe](https://examples.dask.org/dataframe.html) (Pandas)

#### [dask.bag](https://examples.dask.org/bag.html)

Iterate over a bag of independent objects (embarrassingly parallel).

#### Dask behind the scenes

Dask is under the hood for many libraries e.g. [xarray](http://xarray.pydata.org/en/stable/dask.html), [iris](https://scitools.org.uk/iris/docs/v2.4.0/userguide/real_and_lazy_data.html), [scikit-learn](https://ml.dask.org/).

In [None]:
import xarray as xr

ds = xr.tutorial.open_dataset(
    'air_temperature',
    chunks={'time': 'auto'} # dask chunks
)

ds_mean = ds.mean()
print(ds_mean) # a dask.array (an unexecuted task graph)

ds_mean.compute()

ds.close()

In [None]:
if not IN_COLAB:   
    client.close()

### [Distributed](https://docs.dask.org/en/stable/deploying-hpc.html)

#### [Dask-Jobqueue](http://jobqueue.dask.org/en/latest/)

- A variety of resource managers (e.g., [SGE](http://jobqueue.dask.org/en/latest/examples.html#sge-deployments), [SLURM](http://jobqueue.dask.org/en/latest/examples.html#slurm-deployments)).
- Batch jobs (recommended)
- Interactive jobs 
    - Dask-Jobqueue is more for interactive work and the adaptive, dynamic scaling of workers.

```python
from dask.distributed import Client
from dask_jobqueue import SGECluster

cluster = SGECluster(...)
client = Client(cluster)
```

#### [Configuration files](http://jobqueue.dask.org/en/latest/configuration.html)

- Add ( / update) the [`~/.config/dask/jobqueue.yaml`](https://github.com/lukeconibear/swd6_hpp/blob/main/docs/jobqueue.yaml) file.
- [Examples](http://jobqueue.dask.org/en/latest/configurations.html)

```yaml
jobqueue:

  sge:
    name: dask-worker

    # Dask worker options
    cores: 1                   # Total number of cores per job
    memory: '1 GB'            # Total amount of memory per job
    processes: 1                # Number of Python processes per job

    interface: ib0                       # Network interface to use like eth0 or ib0
    death-timeout: 60                    # Number of seconds to wait if a worker can not find a scheduler
    local-directory: null                # Location of fast local storage like /scratch or $TMPDIR

    # SGE resource manager options
    shebang: "#!/usr/bin/env bash"
    queue: null
    project: null
    walltime: '01:00:00'
    extra: []
    env-extra: []
    job-extra: []
    log-directory: null

    resource-spec: null

distributed:
  worker:
    memory:
      target: false # dont spill to disk
      spill: false # dont spill to disk
      pause: 0.80 # pause memory execution at 80% use
      terminate: 0.95 # restart the worker at 95% use

```

#### Examples

- Create/edit the [`example_dask_jobqueue_sge.py`](https://github.com/lukeconibear/swd6_hpp/blob/main/docs/example_dask_jobqueue_sge.py) file.
- Submit to the queue using [`qsub example_dask_jobqueue_sge.bash`](https://github.com/lukeconibear/swd6_hpp/blob/main/docs/example_dask_jobqueue_sge.bash).
    - If need to share memory across chunks:  
        - Use [shared memory](https://docs.dask.org/en/latest/shared.html) (commonly OpenMP, Open Multi-Processing).
        - `-pe smp np` on [ARC4](https://arcdocs.leeds.ac.uk/usage/batchjob.html#list-of-sge-options)
    - Otherwise:  
        - Use [message passing interface, MPI](https://docs.dask.org/en/latest/setup/hpc.html?highlight=mpi#using-mpi) (commonly OpenMPI).
        - `-pe ib np` on [ARC4](https://arcdocs.leeds.ac.uk/usage/batchjob.html#list-of-sge-options)

#### [Dask-MPI](http://mpi.dask.org/en/latest/)

Uses the `mpi4py` package and MPI to distribute the workers (not communication).

- [Batch jobs](http://mpi.dask.org/en/latest/batch.html) (recommended)
    - Ensure that number of cores here match that in the requested resources at the top
- Interactive jobs


### [Profiling and diagnostics](https://docs.dask.org/en/stable/understanding-performance.html)

Many of the profiling tools we looked at earlier don't work well with parallel code.



#### [Visualise the task graph](https://docs.dask.org/en/stable/graphviz.html)

Before executing the computation, you could visualise the task graph.

This can help find potential bottlenecks.

For example:

```python
import dask.array as da
x = da.ones((15, 15), chunks=(5, 5))
y = x + x.T
y.visualize(filename='dask-transpose.svg')
```

![dask-transpose.svg](images/dask-transpose.svg)  

*[Image source](https://docs.dask.org/en/stable/graphviz.html)*

#### [Local diagnostics](https://docs.dask.org/en/stable/diagnostics-local.html)

For work on your laptop or local server.

Can use the
- [`Profiler()`](https://docs.dask.org/en/stable/diagnostics-local.html#dask.diagnostics.Profiler) for task execution.
    - Each worker (rows on y-axis) has tasks (blocks) of different types (colours).
    - Time duration (x-axis).
- [`ResourceProfiler()`](https://docs.dask.org/en/stable/diagnostics-local.html#dask.diagnostics.ResourceProfiler) for resource use.
    - Percentage CPU usage (left y-axis) and memory usage (right y-axis).
- [`CacheProfiler()`](https://docs.dask.org/en/stable/diagnostics-local.html#dask.diagnostics.CacheProfiler) for scheduler cache.
    - Cache size (y-axis) of different types (colours).

For example:

```python
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, visualize

# Example linear algebra (QR decomposition) with Dask Array to demonstrate diagnostics.
# Taken from https://docs.dask.org/en/stable/diagnostics-local.html#example
random_array = da.random.random(size=(10_000, 1_000), chunks=(1_000, 1_000))
q, r = da.linalg.qr(random_array)
random_array_reconstructed = q.dot(r)

# the profilers are just context managers
# hence, can use many in a with block
with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof, CacheProfiler() as cprof:
    out = random_array_reconstructed.compute()

visualize([prof, rprof, cprof], save=True)
```

Returns local diagnostics that can viewed in the browser:
- Each array creation runs concurrently.
- Reduction step to combine the blocks (serial and held in cache).
- Released from cache.
- Parallel execution of `dot` and `sum`.

In [2]:
from IPython.display import display, HTML

display(HTML(filename='images/dask_local_diagnostics.html'))

#### [Distributed diagnostics](https://docs.dask.org/en/stable/diagnostics-distributed.html)

- ...

[`dask-report_example_mpi_sge.html`](https://github.com/lukeconibear/swd6_hpp/blob/main/docs/images/dask-report_example_mpi_sge.html)  
[`dask-report_example_jobqueue_sge.html`](https://github.com/lukeconibear/swd6_hpp/blob/main/docs/images/dask-report_example_jobqueue_sge.html)

### [More Dask examples](https://examples.dask.org/)

## [Ray](https://www.ray.io/)
Ray will automatically detect the available GPUs and CPUs on the machine.
- Can also [specify required resources](https://docs.ray.io/en/latest/walkthrough.html#specifying-required-resources).

First, initialise Ray.

In [None]:
import ray
ray.init()

### Functions become Tasks
- Parallelise functions by adding `@ray.remote` decorator  
- Then instead of calling it normally, use the `.remote()` method  
- This yields a future object reference that you can retrieve with `ray.get(object)` 

In [None]:
@ray.remote
def f(x):
    return x * x

In [None]:
# asynchronously run a task
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))

### Classes become Actors
- Parallelise classes the same way
- These actors maintain their internal state  

In [None]:
@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0
        
    def increment(self):
        self.value += 1
    
    def read(self):
        return self.value

In [None]:
# construct an actor instance using .remote()
counters = [Counter.remote() for i in range(4)]

In [None]:
# asynchronously run actor methods
[counter.increment.remote() for counter in counters]
futures = [counter.read.remote() for counter in counters]
print(ray.get(futures))

Other key API methods:
- `ray.put()`
    - Put a value in the distributed object store.
    - `put_id = ray.put(my_object)`
- `ray.get()`
    - Get an object from the distributed object store, either placed there by `ray.put()` explicitly or by a task or actor method, blocking until object is available.
    - `thing = ray.get(put_id)`
- `ray.wait()`
    - Wait on a list of ids until one of the corresponding objects is available (e.g., the task completes). Return two lists, one with ids for the available objects and the other with ids for the still-running tasks or method calls.
    `finished, running = ray.wait([train_id, track_id])`

### Ray's [`multiprocessing`](https://docs.ray.io/en/latest/multiprocessing.html)
To scale beyond one machine and generally manage a pool of processes.  

Replace:
```python
from multiprocessing.pool import Pool
```

With:

```python
from ray.util.multiprocessing.pool import Pool
```


In [None]:
from ray.util.multiprocessing.pool import Pool

In [None]:
def my_function(x):
    return x * x

In [None]:
with Pool(2) as workers:
    print(workers.map(my_function, [1, 2, 3]))

### Ray's [`joblib`](https://docs.ray.io/en/latest/joblib.html)
The underpinnings of [scikit-learn](https://scikit-learn.org/stable/), which Ray can scale to a cluster.

Import and instantiate `register_ray`, which registers Ray as a `joblib` backend for `scikit-learn`:  
```python
import joblib
from ray.util.joblib import register_ray
register_ray()
```

Then run your original `scikit-learn` code within a Ray/`joblib` backend:
```python
with joblib.parallel_backend('ray'):
    # original scikit-learn code
```

For example, here's some parallel hyperparameter tuning:
```python
import joblib
from ray.util.joblib import register_ray
register_ray()

import numpy as np
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from sklearn.model_selection import RandomizedSearchCV

digits = load_digits()
param_space = {
    'C': np.logspace(-6, 6, 30),
    'gamma': np.logspace(-8, 8, 30),
    'tol': np.logspace(-4, -1, 30),
    'class_weight': [None, 'balanced'],
}
model = SVC(kernel='rbf')
search = sklearn.model_selection.RandomizedSearchCV(
    model, param_space, cv=5, n_iter=300, verbose=10)

with joblib.parallel_backend('ray'):
    search.fit(digits.data, digits.target)
```

When finished, remember to shut down the Ray connection.

In [None]:
ray.shutdown()

Please see this [repository](https://github.com/lukeconibear/distributed_deep_learning) for examples of how to do distributed deep learning using Ray Train with TensorFlow, PyTorch, and Horovod.

## [Dask on Ray](https://docs.ray.io/en/latest/data/dask-on-ray.html)
Use Ray as a backend for Dask tasks.  
Dask dispatches tasks to Ray for scheduling and execution.

In [10]:
import ray
import dask
import dask.dataframe as dd 
import pandas as pd
import numpy as np
from ray.util.dask import ray_dask_get

In [11]:
dask.config.set(scheduler=ray_dask_get) 
ray.init()

{'node_ip_address': '129.11.87.102',
 'raylet_ip_address': '129.11.87.102',
 'redis_address': '129.11.87.102:6379',
 'object_store_address': '/tmp/ray/session_2021-12-20_11-47-12_888567_25572/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2021-12-20_11-47-12_888567_25572/sockets/raylet',
 'webui_url': None,
 'session_dir': '/tmp/ray/session_2021-12-20_11-47-12_888567_25572',
 'metrics_export_port': 63246,
 'node_id': 'e37649cbb30caaafbcfb5fdbe4645c1e07fbbc9c2a044468ebf0a02d'}

In [None]:
df = pd.DataFrame(np.random.randint(0, 100, size=(2**10, 2**8)))
df = dd.from_pandas(df, npartitions=10)
df.head(10)

In [None]:
ray.shutdown()

## Exercise

...

## Further information

### Good practises

- Start small.
- Avoid very large chunks / partitions / task graphs (a good first option is to allow auto-chunking).
- Only use parallelisation (e.g., Dask) when needed, then move back to normal Python ( / NumPy / Pandas).
- Persist data in memory (RAM) where can, as faster than accessing from disk.
- Load data with the parallel library (e.g., Dask), rather than just passing data to it to manage.
- Call compute once, on lots of computations.
- Avoid global state.
- Don't modify the data in place.
- More information:
    - [Dask](https://docs.dask.org/en/stable/best-practices.html), [Dask Array](https://docs.dask.org/en/stable/array-best-practices.html), [Dask DataFrame]()
    [Dask Delayed](https://docs.dask.org/en/stable/delayed-best-practices.html)

### Other options

- [Modin](https://modin.readthedocs.io/en/latest/)
  - Swap out the library import and use the same API.
  - Uses Ray or Dask to easily speed up your Pandas code.  
  - To use Modin, simply replace the import and use Pandas API as normal.
- [Mars](https://docs.pymars.org/en/latest/)
  - A tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and many other libraries.
  - Swap out the library import, use the same API, and add `.execute()`.
  - [Mars Tensor](https://docs.pymars.org/en/latest/getting_started/tensor.html) for NumPy.  
  - [Mars DataFrame](https://docs.pymars.org/en/latest/getting_started/dataframe.html) for Pandas.
  - Mars can also use Ray as the backend ([instructions](https://docs.ray.io/en/latest/data/mars-on-ray.html)).
- [Polars](https://www.pola.rs/)
  - Lightning-fast DataFrame library for Rust and Python.
- [RayDP](https://docs.ray.io/en/latest/data/raydp.html)
  - Combines your Spark and Ray clusters, making it easy to do large scale data processing using the PySpark API and seemlessly use that data to train your models using TensorFlow and PyTorch.

### Resources

- [Using `eliot` with Dask](https://eliot.readthedocs.io/en/stable/scientific-computing.html)
- [Using IPython for parallel computing](https://ipyparallel.readthedocs.io/en/latest/)
- [Concurrency](https://youtu.be/18B1pznaU1o) can also run different tasks together, but work is not done at the same time ([concurrency from the ground up](https://youtu.be/MCs5OvhV9S4)).   
- [Asynchronous](https://youtu.be/iG6fr81xHKA) (multi-threading), useful for massive scaling, threads controlled explicitly.  