# Enhancing performance using Parallel computing
- Start by profiling a serial program to identify bottlenecks
- Are there opportunities for parallelism?
    - Loops
    - Can data be split in parallel?
    - Pipeline of stages?
- Choose an approach and pattern
- Map to the parallel environment
    - Multicore
    - GPU
    - Multinode

## Embarrassingly parallel programs
Many problems are embarrassingly parallel and can be easily decomposed into independent tasks or data sets. Here are several examples:

- Monte Carlo integration
- Bootstrap for calculating statistics
- Fitting the same model on multiple data sets
- Running simulation with different settings

There are many parallel design patterns. The simplest way is to divide it into 

- Data parallelism means that the data is distributed across processes (e.g., MPI, Hadoop, Spark)
- Task parallelism means that tasks (functions) are distributed across processes, and different units of work (data) are sent to each task (e.g., multithreading, multiprocessing, single GPU programming).

## Using multiple cores with `multiprocessing`

The standard implementation of Python uses a Global Interpreter Lock (GIL). This means that only one thread can be run at any one time, and multiple threads work by time-slicing. Hence multi-threaded code with lots of latency (waiting for the network to respond, I/O) can result in speed-ups, but multi-threaded code which is computationally intensive will not see any speed-up. For numerically intensive code, parallel code needs to be run in separate processes to see speed-ups.

- Process
    - Heavyweight
    - Have a separate memory space
    - Large cost for communications
- Thread
    - Lightweight
    - Share the same memory space
    - Small cost for communications

First we see how to split the computation into pieces using a loop.

In [1]:
from multiprocessing import (Pool, Process, cpu_count)
import multiprocessing as mp
import time
from numba import njit
from math import sqrt
import numpy as np

In [2]:
cpu_count()

8

In [None]:
[sqrt(i ** 2) for i in range(10)]

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

In [None]:
%%writefile defs.py
from math import sqrt

def sqrt_list(i):
    return sqrt(i**2)

Writing defs.py


In [None]:
import defs

In [None]:
with mp.Pool(processes=cpu_count()) as pool:
    res = pool.map(defs.sqrt_list, [i for i in range(10)])
res

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

In [None]:
res

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

### Functions with multiple arguments

In [None]:
%%writefile defs2.py

def f(a, b, c):
    return a + b + c

Writing defs2.py


In [None]:
import defs2

In [None]:
x = np.arange(24)
x_s = np.array_split(x, x.shape[0]//3)
x_s

[array([0, 1, 2]),
 array([3, 4, 5]),
 array([6, 7, 8]),
 array([ 9, 10, 11]),
 array([12, 13, 14]),
 array([15, 16, 17]),
 array([18, 19, 20]),
 array([21, 22, 23])]

In [None]:
with mp.Pool(processes=cpu_count()) as pool:
    res = pool.starmap(defs2.f, x_s)
res

[3, 12, 21, 30, 39, 48, 57, 66]

#### MoteCarlo

In [None]:
%%writefile defs3.py
import numpy as np

def monte_carlo_pi(n):
    x = np.random.uniform(-1, 1, (n,2))
    return 4*np.sum((x**2).sum(1) < 1)/n

Writing defs3.py


In [None]:
import defs3

In [None]:
%%timeit  
global res
res = [defs3.monte_carlo_pi(int(1e7)) for i in range(10)]

1 loop, best of 5: 4.39 s per loop


In [None]:
%%timeit
global re2
with mp.Pool(processes=cpu_count()) as pool:
    res2 = pool.map(defs3.monte_carlo_pi, [int(1e7) for i in range(10)])

1 loop, best of 5: 3.83 s per loop


In [None]:
%%timeit
global re2
with mp.Pool(processes=40) as pool:
    res2 = pool.map(defs3.monte_carlo_pi, [int(1e7) for i in range(10)])

1 loop, best of 5: 4.59 s per loop


- Check map_async vs map https://discuss.python.org/t/differences-between-pool-map-pool-apply-and-pool-apply-async/6575/2 

## Using `Threading`

- Check threading vs multiprocessing https://blog.floydhub.com/multiprocessing-vs-threading-in-python-what-every-data-scientist-needs-to-know/

In [3]:
from multiprocessing.dummy import Pool as ThreadPool
import requests

In [None]:
def func(number):
    url = 'http://example.com/'
    for i in range(number):
        response = requests.get(url)
        print(len(response.text))

In [None]:
with ThreadPool(processes=4) as pool:
    res2 = pool.map(func, [3,3,3,3])

1256
1256
1256
1256
12561256

1256
1256
1256
1256
1256
1256


## Using `Joblib`

`joblib` provides parallel processing using a comprehension syntax

In [4]:
from joblib import Parallel, delayed
from functools import partial
from tqdm import tqdm
tqdm = partial(tqdm, position=0, leave=True)

In [None]:
Parallel(n_jobs=4)(delayed(sqrt)(i ** 2) for i in tqdm(range(10)))

100%|██████████| 10/10 [00:00<00:00, 11.41it/s]


[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

In [None]:
x = np.arange(24)
x_s = np.array_split(x, x.shape[0]//3)

In [None]:
res = Parallel(n_jobs=4)(delayed(defs2.f)(x_s[i][0], x_s[i][1], x_s[i][2]) for i in tqdm(range(len(x_s))))
res

100%|██████████| 8/8 [00:00<00:00, 1286.40it/s]


[3, 12, 21, 30, 39, 48, 57, 66]

Using thread

In [None]:
res = Parallel(n_jobs=4, prefer="threads")(delayed(defs2.f)(x_s[i][0], x_s[i][1], x_s[i][2]) for i in tqdm(range(len(x_s))))
res

100%|██████████| 8/8 [00:00<00:00, 985.68it/s]


[3, 12, 21, 30, 39, 48, 57, 66]

MonteCarlo

In [None]:
%%timeit
res = Parallel(n_jobs=4, prefer="threads")(delayed(defs3.monte_carlo_pi)(int(1e7)) for i in tqdm(range(10)))

100%|██████████| 10/10 [00:00<00:00, 15.19it/s]
100%|██████████| 10/10 [00:00<00:00, 19.14it/s]
100%|██████████| 10/10 [00:00<00:00, 19.13it/s]
100%|██████████| 10/10 [00:00<00:00, 19.62it/s]
100%|██████████| 10/10 [00:00<00:00, 19.11it/s]
100%|██████████| 10/10 [00:00<00:00, 19.65it/s]


1 loop, best of 5: 3.51 s per loop


In [None]:
%%timeit
res = Parallel(n_jobs=40, prefer="threads")(delayed(defs3.monte_carlo_pi)(int(1e7)) for i in tqdm(range(10)))

100%|██████████| 10/10 [00:00<00:00, 11146.17it/s]
100%|██████████| 10/10 [00:00<00:00, 21215.50it/s]
100%|██████████| 10/10 [00:00<00:00, 24385.49it/s]
100%|██████████| 10/10 [00:00<00:00, 26329.59it/s]
100%|██████████| 10/10 [00:00<00:00, 25025.68it/s]
100%|██████████| 10/10 [00:00<00:00, 18657.94it/s]


1 loop, best of 5: 3.53 s per loop


In [None]:
%%timeit
res = Parallel(n_jobs=2)(delayed(defs3.monte_carlo_pi)(int(1e7)) for i in tqdm(range(10)))

100%|██████████| 10/10 [00:04<00:00,  2.12it/s]
100%|██████████| 10/10 [00:03<00:00,  3.32it/s]
100%|██████████| 10/10 [00:03<00:00,  3.30it/s]
100%|██████████| 10/10 [00:03<00:00,  3.33it/s]
100%|██████████| 10/10 [00:03<00:00,  3.30it/s]
100%|██████████| 10/10 [00:03<00:00,  3.33it/s]


1 loop, best of 5: 3.78 s per loop


In [None]:
%%timeit
res = Parallel(n_jobs=40)(delayed(defs3.monte_carlo_pi)(int(1e7)) for i in tqdm(range(10)))

100%|██████████| 10/10 [00:04<00:00,  2.41it/s]
100%|██████████| 10/10 [00:00<00:00, 1759.72it/s]
100%|██████████| 10/10 [00:00<00:00, 3135.22it/s]
100%|██████████| 10/10 [00:00<00:00, 1396.70it/s]
100%|██████████| 10/10 [00:00<00:00, 2373.28it/s]
100%|██████████| 10/10 [00:00<00:00, 4135.17it/s]


1 loop, best of 5: 4.78 s per loop


- Scientific Python libraries such as numpy, `scipy`, `pandas` and `scikit-learn` often release the GIL in performance-critical code paths. It is therefore advised to always measure the speed of thread-based parallelism and use it when the GIL does not limit the scalability.
- The thread-based approach can also ease debugging

- Writing to shared memory requires careful coordination of processes, and many control and communication concepts are implemented in the multiprocessing library for this purpose, including semaphores, locks, barriers, etc. 
- Check share memory and reduction at https://milliams.com/courses/parallel_python/
- Check Numba parallel features

## Laboratories

In [5]:
# Baseline
def cdist(xs, ys):
    """Returns pairwise distance between row vectors in xs and ys.
    
    xs has shape (m, p)
    ys has shape (n, p)
    
    Return value has shape (m, n)    
    """
    
    m, p = xs.shape
    n, p = ys.shape
    
    res = np.empty((m, n))
    for i in range(m):
        for j in range(n):
            res[i, j] = np.sqrt(np.sum((ys[j] - xs[i])**2))
    return res

In [6]:
xs = np.arange(6).reshape(3,2).astype('float')
ys = np.arange(4).reshape(2,2).astype('float')
zs = cdist(xs, ys)

In [7]:
cdist(xs, ys)

array([[0.        , 2.82842712],
       [2.82842712, 0.        ],
       [5.65685425, 2.82842712]])

In [8]:
np.split(xs, 3, 0)

[array([[0., 1.]]), array([[2., 3.]]), array([[4., 5.]])]

In [9]:
res = np.concatenate([cdist(x, ys) for x in np.split(xs, 3, 0)])
res

array([[0.        , 2.82842712],
       [2.82842712, 0.        ],
       [5.65685425, 2.82842712]])

In [10]:
m = 1000
n = 1000
p = 100

X = np.random.random((m, p))
Y = np.random.random((n, p))

In [11]:
%%timeit
Z = cdist(X, Y)

6.84 s ± 263 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Using `multiprocessing`

In [12]:
from multiprocessing import Pool

In [13]:
%%writefile defs4.py
import numpy as np

def cdist(xs, ys):
    """Returns pairwise distance between row vectors in xs and ys.
    
    xs has shape (m, p)
    ys has shape (n, p)
    
    Return value has shape (m, n)    
    """
    
    m, p = xs.shape
    n, p = ys.shape
    
    res = np.empty((m, n))
    for i in range(m):
        for j in range(n):
            res[i, j] = np.sqrt(np.sum((ys[j] - xs[i])**2))
    return res

Overwriting defs4.py


In [14]:
import defs4

In [15]:
%%timeit
with Pool(processes=4) as p:
    Z1 = p.starmap(defs4.cdist, [(X_, Y) for X_ in np.split(X, 100, 0)])
    Z1 = np.concatenate(Z1)

3.41 s ± 266 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [16]:
Z = cdist(X, Y)

with Pool(processes=4) as p:
    Z1 = p.starmap(defs4.cdist, [(X_, Y) for X_ in np.split(X, 100, 0)])
    Z1 = np.concatenate(Z1)

np.allclose(Z, Z1)

True

### Using threads

Note that there is no gain with using multiple threads for computationally intensive tasks because of the GIL.

In [17]:
%%timeit
with ThreadPool(processes=4) as pool:
    Z2 = list(pool.starmap(defs4.cdist, [(X_, Y) for X_ in np.split(X, 100, 0)]))
    Z2 = np.concatenate(Z2)

7.76 s ± 221 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Check

In [18]:
with ThreadPool(processes=4) as pool:
    Z2 = list(pool.starmap(defs4.cdist, [(X_, Y) for X_ in np.split(X, 100, 0)]))
    Z2 = np.concatenate(Z2)

np.allclose(Z, Z2)

True

## Exercise 3: 
- Calculate the pairwise euclidean distance between two matrices X and Y using `joblib` and report the speedup (or speed down) over baseline


In [21]:
Z3 = Parallel(n_jobs=4)(delayed(cdist)(X_, Y) for X_ in tqdm(np.split(X, 100, 0)))
Z3 = np.concatenate(Z3)

100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [00:02<00:00, 42.12it/s]


In [22]:
np.allclose(Z, Z3)

True

In [19]:
%%timeit
## Solution here
Z3 = Parallel(n_jobs=4)(delayed(cdist)(X_, Y) for X_ in tqdm(np.split(X, 100, 0)))
Z3 = np.concatenate(Z3)

100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [00:04<00:00, 21.34it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [00:03<00:00, 30.60it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [00:03<00:00, 30.55it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [00:03<00:00, 31.16it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [00:03<00:00, 30.67it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [00:03<00:00, 30.93it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [00:03<00:00, 28.30it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 100/100 [00:03<00:00, 30.71it/s]


3.87 s ± 148 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


The speedup is 6.84/3.87~ 1.8 times faster

## References
- https://people.duke.edu/~ccc14/sta-663-2018/notebooks/S14A_Parallel_Programming_Introduction.html - A series of great introduction for HPC
- https://blog.floydhub.com/multiprocessing-vs-threading-in-python-what-every-data-scientist-needs-to-know/ - A series of great discussion on multiprocessing and multithreading
- https://www.maxlist.xyz/2020/03/15/gil-thread-safe-atomic/ - The concept of thread-safe
- https://joblib.readthedocs.io/en/latest/parallel.html - A good guide for using `joblib`