# Working with processes/threads 
------------------------------------------------------

<br>

## Table of Content <a id="toc"></a>


1. [Multiprocessing (and refactoring)](#8)
2. [numba and parallelization](#9)

    2.1. [automatic parallelization](#2.1)
    
    2.2. [explicit parallelization (prange)](#2.2)
    
    2.3. [controling the number of threads used](#2.3)

Annex A - [Parallelization of pairwise distance computation with multiprocess](#annexa)

Annex B - [Parallelization of pairwise distance computation with numba](#annexb)

## 1 Multiprocessing (and refactoring) <a id='8'></a>

We can take advantage of multiple cores using the `multiprocessing` module. 

In this approach, separate __processes__ are used, __not threads__. 

The use of threads is generally blocked by Python because of the "*Global Interpreter Lock*". This was a necessary design feature as a trade-off for the enormous flexibility in memory management that Python makes possible. This means that there is no shared memory when using multiprocessing, and thus the individual tasks must be independent.

`multiprocessing` generally works well with lists, where one maps a function to each element of the list and these operations are computed as separated processes on separate cores per element of the list. 

Indeed, any kind of parralelization technique is really only worth it if the task you want to do is actually *parallelizable*. It is sometimes hard to judge what is and is not easily paralellizable, and can often require that you refactor your code quite a bit.

A rule of thumb for parallelization is that the task can be divided up is subtasks which : 
1. **do not depend on each others results**
2. are very similar
3. use independent part of the data

Point 1. is the most important, the others are helpful but not entirely necessary.

Consider our function to compute an integral from the previous lesson:

In [82]:
## pure python 
def f_native(x):
    return x ** 2 - x


def integrate_f_native(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_native(a + i * dx)
    return s * dx

print( integrate_f_native(0,2,100) )
%timeit -n 3 -r 7 _=integrate_f_native(0,2,1000000)

0.6467999999999999
149 ms ± 2.26 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)


Ideally, we would like to reduce this something that looks like:

```python

for i in range(len(data)):
    result[i] = function(data[i])

```
equivalent to:
```python
map(function,data)
```

So, we apply a `function` to each element (`data[i]`) of `data`.

So the game is to re-write it slightly so it fits this template.

In [60]:
#let's work outside the function and focus on the main loop:
a=0
b=2
N=1000000
dx = (b - a) / N

data = [ i for i in range(N)]

def f2(i):
    x = a+i*dx
    return x ** 2 - x

result = map( f2 , data )
## equivalent to
# for i in range(N):
#     result[i] = f_native(data[i])


final_result = sum(result) *dx
print(final_result)

0.666664666668016


Now, everything is ready for us to use `multiprocessing`.

The simplest usage is to open up a pool of processes using the `with` keyword:

In [61]:
import multiprocessing as mp

with mp.Pool(2) as pool :
    
    result2 = pool.map(f2, data)
    
final_result2 = sum(result2) *dx
print(final_result2)

0.666664666668016


Ok so we get the same result when splitting the task on 2 processes, but does it perform faster?

In [68]:
%timeit -n 3 -r 7  list( map(f2, data) )
with mp.Pool(2) as pool :
    %timeit -n 3 -r 7  pool.map(f2, data)

146 ms ± 4.2 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
196 ms ± 7.74 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)


Mhmm, what? the multiprocessing version is even slower...

This is due to the fact that opening, closing, and communicating data to and from processes are costly operation.
In other words, the **overhead is great with multiprocessing**, and it tends to work better with a few long tasks than with a lot of very small ones (NB: eahc paralelization techniques have different overhead and react differently to this).

For instance , let's try with a few, long tasks:

In [93]:
# our "long" task will be the integrate_f_native function between 0 and i, with 4millions points
# which takes around 0.06 second
%timeit -n 3 -r 3 integrate_f_native(0,1,4*10**5)

61.8 ms ± 4.49 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)


In [114]:
from time import sleep

## around 0.06sec per task
def task(i):
    return integrate_f_native(0,i,4*10**5)

## 100 tasks to perform
data = list(range(1,101))


#serial execution: ~6seconds
%time _ = list(map(task,data))

CPU times: user 5.88 s, sys: 0 ns, total: 5.88 s
Wall time: 5.88 s


In [115]:
with mp.Pool(2) as pool :
    %time  pool.map(task, data)

CPU times: user 1.25 ms, sys: 158 µs, total: 1.41 ms
Wall time: 3.19 s


With such long tasks, the overhead is lower than the gained time.

Indeed, on the basis of 0.06 seconds per task, we would expect 100 tasks on 2 processes to take ~3seconds, so we have ~0.45 seconds of overhead here.


In [116]:
#let's vary the number of processes
for NP in [1,2,4,8]:
    print(NP)
    with mp.Pool(NP) as pool :
        %time  pool.map(task, data)

1
CPU times: user 1.73 ms, sys: 218 µs, total: 1.95 ms
Wall time: 6.13 s
2
CPU times: user 29.3 ms, sys: 649 µs, total: 29.9 ms
Wall time: 3.33 s
4
CPU times: user 4.32 ms, sys: 546 µs, total: 4.87 ms
Wall time: 2.88 s
6
CPU times: user 7.69 ms, sys: 973 µs, total: 8.67 ms
Wall time: 2.94 s


As we increase the number of processes, the overhead increases and after some value this actually hurts the overall performance.

<br> 

So, as we just saw, multiprocessing works better when the individual tasks are longer. 

## Exercise: 

 * re-think the `integrate_f_native` function so it is parallelizable in a few large tasks (rather than a lot of small tasks as we have done before) ?
 * implement your chosen solution.



Solutions:

concept:

In [None]:
# %load -r -3 solutions/03_multiprocess_integrate.py

function definitions:

In [None]:
# %load -r 4-35 solutions/03_multiprocess_integrate.py

application:

In [133]:
# %load -r 36- solutions/03_multiprocess_integrate.py

<br>
<br>

[back to the toc](#toc)


## 2. numba and parallelization  <a id='9'></a>

It is possible to provide a `numba` function to `mp.pool`, but `numba` already provides what's necessary to parallelize your code.

By setting `parallel=True` when callin `@jit` (in no-python mode), numba will attempt to automatically parallelize your code.

In particular, by default, it works on the array operations.

### 2.1 automatic parallelization <a id='2.1'></a>


In [177]:
from numba import njit

def integrate_f(a, b, N):
    dx = (b - a) / N
    X = np.arange(a,b,dx)
    return ( X**2 - X ).sum() * dx

## njit is a shortcut for jit(nopython=True)
## the code does not change, so no need to re-write it; just give the function to njit
integrate_f_numba = njit(integrate_f)

integrate_f_numba_parallel = njit(integrate_f , parallel=True)

## check that we get similar results: (+, this let's numba do the compilation now)
print( "native         :", integrate_f(0,2,100) )
print( "numba          :", integrate_f_numba(0,2,100) )
print( "numba parallel :", integrate_f_numba_parallel(0,2,100) )

native         : 0.6468
numba          : 0.6467999999999999
numba parallel : 0.6468


In [179]:
# now let's time it
N = 10**7
print( "native         :")
%timeit integrate_f(0,2,N) 
print( "numba          :")
%timeit integrate_f_numba(0,2,N)
print( "numba parallel :")
%timeit integrate_f_numba_parallel(0,2,N)

native         :
48.6 ms ± 1.8 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
numba          :
70.9 ms ± 439 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
numba parallel :
5.85 ms ± 157 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


So the basic numba seems less efficient than numpy, but the parallel version is showing quite a speedup!

Here, numba was able to parallelize the `np.arange`, all the array operations, and the `sum()`, so actually almost all the code.


[back to the ToC](#toc)

### 2.2 explicit parallelization (prange) <a id='2.2'></a>

In [188]:
def integrate_f2(a, b, N):
    dx = (b - a) / N
    s =0
    for i in range(N):
        x = a+i*dx
        s += x**2-x
    return s * dx

integrate_f2_numba = njit(integrate_f2)

integrate_f2_numba_parallel = njit(integrate_f2,parallel=True)


## check that we get similar results: (+, this let's numba do the compilation now)
print( "native         :", integrate_f2(0,2,100) )
print( "numba          :", integrate_f2_numba(0,2,100) )
print( "numba parallel :", integrate_f2_numba_parallel(0,2,100) )

native         : 0.6467999999999999
numba          : 0.6467999999999999
numba parallel : 0.6467999999999999


The keyword argument 'parallel=True' was specified but no transformation for parallel execution was possible.

To find out why, try turning on parallel diagnostics, see https://numba.readthedocs.io/en/stable/user/parallel.html#diagnostics for help.
[1m
File "../../../../../../../tmp/ipykernel_4256/1462592780.py", line 1:[0m
[1m<source missing, REPL/exec in use?>[0m
[0m


In [189]:
from numba import prange 

@njit(parallel=True)
def integrate_f2_numba_parallel(a, b, N):
    dx = (b - a) / N
    s =0
    for i in prange(N):
        x = a+i*dx
        s += x**2-x
    return s * dx
integrate_f2_numba_parallel(0,2,100) 


0.6468

In [195]:
# now let's time it
N = 10**7
print( "numba from native         :")
%timeit -n 10 -r 7 integrate_f2_numba(0,2,N)
print( "numba parallel with prange:")
%timeit -n 10 -r 7 integrate_f2_numba_parallel(0,2,N)

numba from native         :
10 ms ± 877 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
numba parallel with prange:
6.56 ms ± 895 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


Let's use a bit more data to compare the 2 parallel versions (auto and manual):

In [198]:
N = 10**8
print( "numba parallel auto:")
%timeit -n 3 -r 7 integrate_f_numba_parallel(0,2,N)
print( "numba parallel with prange:")
%timeit -n 3 -r 7 integrate_f2_numba_parallel(0,2,N)

numba parallel auto:
54.5 ms ± 5.4 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
numba parallel with prange:
52.9 ms ± 4.32 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)


The two results are similar. The one you end up using will depend on the structure of your problem and the shape of your code.

<br>

[back to ToC](#toc)

### 2.3 controling the number of threads used <a id="2.3"></a>

Up until now, we have let numba use its default number of threads.

In [200]:
import numba
numba.config.NUMBA_DEFAULT_NUM_THREADS

8

To control the number of threads, just use the `set_num_threads` function:

In [202]:
from numba import set_num_threads

N = 10**8

## max number of threads
print("default number of threads")
%timeit -n 3 -r 7 integrate_f2_numba_parallel(0,2,N)

for num_thread in range(2,8):
    print(num_thread)
    set_num_threads(num_thread)
    %timeit -n 3 -r 7 integrate_f2_numba_parallel(0,2,N)

default number of threads
55.1 ms ± 5.73 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
2
65 ms ± 16 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
3
61.2 ms ± 7.26 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
4
50.9 ms ± 1.66 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
5
62.9 ms ± 3.85 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
6
55.3 ms ± 3.39 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
7
50.8 ms ± 1.38 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)


To go further, we recommend you have a look at [numba documentation on parallelization](https://numba.pydata.org/numba-doc/latest/user/parallel.html) which explains what can, and what cannot be parallelized, and how to diagnose the automatic parallelization process.


<br>

[back to the ToC](#toc)

# Annex A - parallelization of pairwise distance computation with multiprocess <a id='annexa'></a>

In [3]:
import numpy as np 

def pairwise_distance_numpy(X):

    num_vectors = X.shape[0]
    num_measurements = X.shape[1] 
    D = np.empty((num_vectors, num_vectors), dtype=np.float64)
    
    for i in range(num_vectors):
        for j in range(num_vectors):
            d = np.square( np.subtract(X[i], X[j]) )
            D[i, j] = np.sqrt(np.sum(d))
    return(D)

Right now, this function operates onto a whole array.

Ideally, we would like to reduce this something that looks like:

```python

for i in range(len(data)):
    result[i] = function(data[i])

```
equivalent to:
```python
map(function,data)
```

So, we apply a `function` to each element (`data[i]`) of `data`.

**Question:** how can we go from the `pairwise_distance_numpy` function to this? what would be `function`? `data`? 

<br>

<br>

<br>

<br>

<br>

 ... don't scroll - spoilers ahead ...

<br>

<br>

<br>

<br>

<br>

<br>

<br>

So, my proposition to solve this (not the only one possible, maybe not even the best) is that :
 1. the `function` is computing distance between 2 vectors
 2. the `data[i]` is a couple of vector
 3. consequently, `data` is a list of couples of vectors.


I will even go one (small) step further, and rather than keeping the whole vectors in data, I will just keep the vector indexes

In [145]:
## generate 200 vectors with 100 measurements each 
data = np.random.uniform(size=(200,100))

In [146]:
def pairwise_list_I(X):
    """ create a list of the pairs of vector index we have to compute distances for (ie. all possible pair of indexes)"""
    list_of_tuples = list()
    
    num_vectors = X.shape[0]
    num_measurements = X.shape[1] 
    
    for i in range(num_vectors):
        for j in range(num_vectors):
            list_of_tuples.append((i,j))
            
    return list_of_tuples

def pairwise_distance_from_indexes(indexes ):
    """takes a tuple containing a pair of  indexes, and computes the distance between the 2"""
    assert(len(indexes) == 2)
    X1 = data[indexes[0]]
    X2 = data[indexes[1]]
    
    return np.sqrt( np.sum( np.square( X1-X2 ) ) )


list_of_tuples_I = pairwise_list_I(data)

%timeit -n 1 -r 3  result = list(map(pairwise_distance_from_indexes,  list_of_tuples_I))

import multiprocessing as mp

with mp.Pool(2) as pool :
    
    %timeit -n 1 -r 3  result2 = pool.map(pairwise_distance_from_indexes, list_of_tuples_I)

238 ms ± 3.51 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
237 ms ± 17.3 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


Some speedup, but nothing tremendous.

Let's see if that holds up :

In [147]:
for NP in [1,2,3,4,5,6]:
    print(NP)
    with mp.Pool(NP) as pool :
        %time result = pool.map(pairwise_distance_from_indexes, list_of_tuples_I)

1
CPU times: user 10.5 ms, sys: 11.8 ms, total: 22.3 ms
Wall time: 352 ms
2
CPU times: user 26.8 ms, sys: 11.3 ms, total: 38.1 ms
Wall time: 246 ms
3
CPU times: user 45 ms, sys: 0 ns, total: 45 ms
Wall time: 197 ms
4
CPU times: user 39.6 ms, sys: 4.51 ms, total: 44.1 ms
Wall time: 177 ms
5
CPU times: user 45.7 ms, sys: 4.82 ms, total: 50.6 ms
Wall time: 199 ms
6
CPU times: user 57.6 ms, sys: 4.79 ms, total: 62.4 ms
Wall time: 181 ms


In [165]:
## of course, we want to compare this with the original version of the function
%timeit pairwise_distance_numpy( data )

237 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


So there is some gain. 
Nothing tremendous, but sill a 1.5x speefup, and it beats the alternative of having all these core idle.

<br>

Going one step further, we know multiprocessing works better when the task are somewhat large. So, instead of say that a task is "compute a single distance", and having NxN tasks, we could have the task be "compute a full row of the distance matrix", and then we only have N tasks.

So for the task, we compute the distance between one vector and all the others. You will see this is a very good idea, even in a non-multiprocessing framework, because this plays into some of numpy's strength.

First, let's implement our "task":

In [167]:
## computing reference results for testing validity
toy_data = np.random.uniform(size=(10,100))

res = pairwise_distance_numpy( toy_data )
#we want our task to compute something like this : 
res[0]


array([0.        , 4.12395672, 4.00365722, 4.51227131, 4.02450461,
       3.73729676, 3.81267711, 4.19146637, 3.83317922, 3.91421242])

In [170]:
## we have seen that numpy makes operation between 2 vector easy.
## but actually, operation between a matrix and a vector works as well
## so, matrix - vector will perform the subtraction on each row independently.
## then, if we make the sum also on each row independently we can get the distances we want!

def compute_distance_row( i ):
    ## Here, I presume that there exists a DATA_GLOB
    ##  variable in global memory with my data in it
    
    squared_diff = ( DATA_GLOB - DATA_GLOB[i])**2 ## squared differences between the matrix and a single vector 
    sums = np.sum( squared_diff , axis = 1) ## axis=1 --> to get 1 sum per row
    return np.sqrt( sums ) ## compute square root of all these sums

## of course we want to test this:
DATA_GLOB = toy_data
res_new = compute_distance_row( 0 )
print(res_new)
print(res_new == res[0])

[0.         4.12395672 4.00365722 4.51227131 4.02450461 3.73729676
 3.81267711 4.19146637 3.83317922 3.91421242]
[ True  True  True  True  True  True  True  True  True  True]


All good so far. How does it perform?

In [171]:
DATA_GLOB = data  # redefine the data used by the function as the bigger dataset

## we map this onto the list of possible indices : from 0 to N
%timeit -n 1 -r 3  result = list(map(compute_distance_row, range(DATA_GLOB.shape[0])))

12.5 ms ± 2.87 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)


So you can see how this actually performs even better even with a single process.

Actually, let's make the data larger.

In [174]:
big_data = np.random.uniform(size=(500,100))

In [175]:
DATA_GLOB=big_data
%timeit -n 3 -r 3  result = list(map(compute_distance_row, range(DATA_GLOB.shape[0])))
for NP in [1,2,3,4,5,6]:
    print(NP)
    with mp.Pool(NP) as pool :
        %time result = pool.map(compute_distance_row, range(DATA_GLOB.shape[0]))

40.9 ms ± 6.55 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
1
CPU times: user 5.87 ms, sys: 269 µs, total: 6.14 ms
Wall time: 46 ms
2
CPU times: user 2.86 ms, sys: 3.86 ms, total: 6.71 ms
Wall time: 28.5 ms
3
CPU times: user 0 ns, sys: 7.35 ms, total: 7.35 ms
Wall time: 22.5 ms
4
CPU times: user 0 ns, sys: 9.04 ms, total: 9.04 ms
Wall time: 19.7 ms
5
CPU times: user 13 ms, sys: 722 µs, total: 13.7 ms
Wall time: 23.9 ms
6
CPU times: user 4.7 ms, sys: 11.6 ms, total: 16.3 ms
Wall time: 28.1 ms


Same as before: with larger individual takes we seem to get better speedup in general (~x2 speedup for 4processes).

[back to the ToC](#toc)

# Annex B - parallelization of pairwise distance computation with numba <a id='annexb'></a>

In [218]:
from numba import njit, prange

# njit -> no-python jit

@njit(parallel=True)
def pairwise_distance_numba_prange(X):

    num_vectors = X.shape[0]
    num_measurements = X.shape[1] 
    D = np.empty((num_vectors, num_vectors), dtype=np.float64)
    
    for i in prange(num_vectors): # note usage of prange
        for j in range(num_vectors):
            d = 0.
            for k in range(num_measurements):
                d += np.square( np.subtract(X[i][k], X[j][k])  )
            D[i, j] = np.sqrt(d)
    return(D)

toydata = np.random.uniform(size=(10,10)) # I make toy data to launch the function once and compile it
toyresult = pairwise_distance_numba_prange( toydata ) 

In [217]:
print("numba parallel=True")
%timeit -n 1 -r 3 result = pairwise_distance_numba_prange( big_data )

numba parallel=True
22.6 ms ± 5.43 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
