# Enhancing performance using Parallel computing
- Start by profiling a serial program to identify bottlenecks
- Are there opportunities for parallelism?
    - Loops
    - Can data be split in parallel?
    - Pipeline of stages?
- Choose an approach and pattern
- Map to the parallel environment
    - Multicore
    - GPU
    - Multinode

## Embarrassingly parallel programs
Many problems are embarrassingly parallel and can be easily decomposed into independent tasks or data sets. Here are several examples:

- Monte Carlo integration
- Bootstrap for calculating statistics
- Fitting the same model on multiple data sets
- Running simulation with different settings

There are many parallel design patterns. The simplest way is to divide it into 

- Data parallelism means that the data is distributed across processes (e.g., MPI, Hadoop, Spark)
- Task parallelism means that tasks (functions) are distributed across processes, and different units of work (data) are sent to each task (e.g., multithreading, multiprocessing, single GPU programming).

## Using multiple cores with `multiprocessing`

The standard implementation of Python uses a Global Interpreter Lock (GIL). This means that only one thread can be run at any one time, and multiple threads work by time-slicing. Hence multi-threaded code with lots of latency (waiting for the network to respond, I/O) can result in speed-ups, but multi-threaded code which is computationally intensive will not see any speed-up. For numerically intensive code, parallel code needs to be run in separate processes to see speed-ups.

- Process
    - Heavyweight
    - Have a separate memory space
    - Large cost for communications
- Thread
    - Lightweight
    - Share the same memory space
    - Small cost for communications

First we see how to split the computation into pieces using a loop.

In [None]:
from multiprocessing import (Pool, Process, cpu_count)
import multiprocessing as mp
import time
from numba import njit
from math import sqrt
import numpy as np

In [None]:
cpu_count()

In [None]:
[sqrt(i ** 2) for i in range(10)]

In [None]:
%%writefile defs.py
from math import sqrt

def sqrt_list(i):
    return sqrt(i**2)

In [None]:
import defs

In [None]:
with mp.Pool(processes=cpu_count()) as pool:
    res = pool.map(defs.sqrt_list, [i for i in range(10)])
res

In [None]:
res

### Functions with multiple arguments

In [None]:
%%writefile defs2.py

def f(a, b, c):
    return a + b + c

In [None]:
import defs2

In [None]:
x = np.arange(24)
x_s = np.array_split(x, x.shape[0]//3)
x_s

In [None]:
with mp.Pool(processes=cpu_count()) as pool:
    res = pool.starmap(defs2.f, x_s)
res

#### MoteCarlo

In [None]:
%%writefile defs3.py
import numpy as np

def monte_carlo_pi(n):
    x = np.random.uniform(-1, 1, (n,2))
    return 4*np.sum((x**2).sum(1) < 1)/n

In [None]:
import defs3

In [None]:
%%timeit  
global res
res = [defs3.monte_carlo_pi(int(1e7)) for i in range(10)]

In [None]:
%%timeit
global re2
with mp.Pool(processes=cpu_count()) as pool:
    res2 = pool.map(defs3.monte_carlo_pi, [int(1e7) for i in range(10)])

In [None]:
%%timeit
global re2
with mp.Pool(processes=40) as pool:
    res2 = pool.map(defs3.monte_carlo_pi, [int(1e7) for i in range(10)])

- Check map_async vs map https://discuss.python.org/t/differences-between-pool-map-pool-apply-and-pool-apply-async/6575/2 

## Using `Threading`

- Check threading vs multiprocessing https://blog.floydhub.com/multiprocessing-vs-threading-in-python-what-every-data-scientist-needs-to-know/

In [None]:
from multiprocessing.dummy import Pool as ThreadPool
import requests

In [None]:
def func(number):
    url = 'http://example.com/'
    for i in range(number):
        response = requests.get(url)
        print(len(response.text))

In [None]:
with ThreadPool(processes=4) as pool:
    res2 = pool.map(func, [3,3,3,3])

## Using `Joblib`

`joblib` provides parallel processing using a comprehension syntax

In [None]:
from joblib import Parallel, delayed
from functools import partial
from tqdm import tqdm
tqdm = partial(tqdm, position=0, leave=True)

In [None]:
Parallel(n_jobs=4)(delayed(sqrt)(i ** 2) for i in tqdm(range(10)))

In [None]:
x = np.arange(24)
x_s = np.array_split(x, x.shape[0]//3)

In [None]:
res = Parallel(n_jobs=4)(delayed(defs2.f)(x_s[i][0], x_s[i][1], x_s[i][2]) for i in tqdm(range(len(x_s))))
res

Using thread

In [None]:
res = Parallel(n_jobs=4, prefer="threads")(delayed(defs2.f)(x_s[i][0], x_s[i][1], x_s[i][2]) for i in tqdm(range(len(x_s))))
res

MonteCarlo

In [None]:
%%timeit
res = Parallel(n_jobs=4, prefer="threads")(delayed(defs3.monte_carlo_pi)(int(1e7)) for i in tqdm(range(10)))

In [None]:
%%timeit
res = Parallel(n_jobs=40, prefer="threads")(delayed(defs3.monte_carlo_pi)(int(1e7)) for i in tqdm(range(10)))

In [None]:
%%timeit
res = Parallel(n_jobs=2)(delayed(defs3.monte_carlo_pi)(int(1e7)) for i in tqdm(range(10)))

In [None]:
%%timeit
res = Parallel(n_jobs=40)(delayed(defs3.monte_carlo_pi)(int(1e7)) for i in tqdm(range(10)))

- Scientific Python libraries such as numpy, `scipy`, `pandas` and `scikit-learn` often release the GIL in performance-critical code paths. It is therefore advised to always measure the speed of thread-based parallelism and use it when the GIL does not limit the scalability.
- The thread-based approach can also ease debugging

- Writing to shared memory requires careful coordination of processes, and many control and communication concepts are implemented in the multiprocessing library for this purpose, including semaphores, locks, barriers, etc. 
- Check share memory and reduction at https://milliams.com/courses/parallel_python/
- Check Numba parallel features

## Laboratories

In [None]:
# Baseline
def cdist(xs, ys):
    """Returns pairwise distance between row vectors in xs and ys.
    
    xs has shape (m, p)
    ys has shape (n, p)
    
    Return value has shape (m, n)    
    """
    
    m, p = xs.shape
    n, p = ys.shape
    
    res = np.empty((m, n))
    for i in range(m):
        for j in range(n):
            res[i, j] = np.sqrt(np.sum((ys[j] - xs[i])**2))
    return res

In [None]:
xs = np.arange(6).reshape(3,2).astype('float')
ys = np.arange(4).reshape(2,2).astype('float')
zs = cdist(xs, ys)

In [None]:
cdist(xs, ys)

In [None]:
np.split(xs, 3, 0)

In [None]:
res = np.concatenate([cdist(x, ys) for x in np.split(xs, 3, 0)])
res

In [None]:
m = 1000
n = 1000
p = 100

X = np.random.random((m, p))
Y = np.random.random((n, p))

In [None]:
%%timeit
Z = cdist(X, Y)

### Using `multiprocessing`

In [None]:
from multiprocessing import Pool

In [None]:
%%writefile defs4.py
import numpy as np

def cdist(xs, ys):
    """Returns pairwise distance between row vectors in xs and ys.
    
    xs has shape (m, p)
    ys has shape (n, p)
    
    Return value has shape (m, n)    
    """
    
    m, p = xs.shape
    n, p = ys.shape
    
    res = np.empty((m, n))
    for i in range(m):
        for j in range(n):
            res[i, j] = np.sqrt(np.sum((ys[j] - xs[i])**2))
    return res

In [None]:
import defs4

In [None]:
%%timeit
with Pool(processes=4) as p:
    Z1 = p.starmap(defs4.cdist, [(X_, Y) for X_ in np.split(X, 100, 0)])
    Z1 = np.concatenate(Z1)

In [None]:
Z = cdist(X, Y)

with Pool(processes=4) as p:
    Z1 = p.starmap(defs4.cdist, [(X_, Y) for X_ in np.split(X, 100, 0)])
    Z1 = np.concatenate(Z1)

np.allclose(Z, Z1)

### Using threads

Note that there is no gain with using multiple threads for computationally intensive tasks because of the GIL.

In [None]:
%%timeit
with ThreadPool(processes=4) as pool:
    Z2 = list(pool.starmap(defs4.cdist, [(X_, Y) for X_ in np.split(X, 100, 0)]))
    Z2 = np.concatenate(Z2)

Check

In [None]:
with ThreadPool(processes=4) as pool:
    Z2 = list(pool.starmap(defs4.cdist, [(X_, Y) for X_ in np.split(X, 100, 0)]))
    Z2 = np.concatenate(Z2)

np.allclose(Z, Z2)

## Exercise 3: 
- Calculate the pairwise euclidean distance between two matrices X and Y using `joblib` and report the speedup (or speed down) over baseline


In [None]:
Z3 = Parallel(n_jobs=4)(delayed(cdist)(X_, Y) for X_ in tqdm(np.split(X, 100, 0)))
Z3 = np.concatenate(Z3)

In [None]:
np.allclose(Z, Z3)

In [None]:
%%timeit
## Solution here
Z3 = Parallel(n_jobs=4)(delayed(cdist)(X_, Y) for X_ in tqdm(np.split(X, 100, 0)))
Z3 = np.concatenate(Z3)

The speedup is 6.84/3.87~ 1.8 times faster

## References
- https://people.duke.edu/~ccc14/sta-663-2018/notebooks/S14A_Parallel_Programming_Introduction.html - A series of great introduction for HPC
- https://blog.floydhub.com/multiprocessing-vs-threading-in-python-what-every-data-scientist-needs-to-know/ - A series of great discussion on multiprocessing and multithreading
- https://www.maxlist.xyz/2020/03/15/gil-thread-safe-atomic/ - The concept of thread-safe
- https://joblib.readthedocs.io/en/latest/parallel.html - A good guide for using `joblib`