In [2]:
%matplotlib inline
import numpy as np
from matplotlib import pyplot as plt

from time import sleep
import sys
sys.path.append('../scripts')

# Parallel processing with python

## A brief introduction

#### Tor Nordam, SeaLab Python User Groups, May 31, 2022

## Outline (not in order)

* A pedagogical example of parallelisation: Coffee brewing
* Types of parallelisation:
 * Trivial parallelisation
 * Shared memory parallelisation
 * Distributed memory parallelisation
* Relevant tools:
 * GNU parallel
 * The `multiprocessing` and `concurrent.futures` modules
 * Numba
 * MPI for Python

## A pedagogical example: Coffee brewing

A serial approach to coffee brewing:

<div style="width:100%;align:center">
<img src="coffee_serial.png" alt="Drawing" style="width: 500px;"/>
</div>

Can this be sped up by parallelisation?

Yes!

<div style="width:100%;align:center">
<img src="coffee_parallel1.png" alt="Drawing" style="width: 500px;"/>
</div>

Can we speed up up even more?

## A pedagogical example: Coffee brewing

<div style="width:100%;align:center">
<img src="coffee_parallel1.png" alt="Drawing" style="width: 500px;"/>
</div>

Can we speed up up even more? ...maybe not?

<div style="width:100%;align:center">
<img src="coffee_parallel2.png" alt="Drawing" style="width: 620px;"/>
</div>

The example illustrates two important points in parallelisation:
* Dependencies: ome task must be complete before others can start)
* Overhead (sometimes the extra work related to parallelisation

## Types of parallelisation -- Trivial parallelisation

* Typical use case: You have a large number of independent tasks
 * Run a script on many different inputs (e.g. analysing many images)
 * Monte Carlo simulation with many random inputs
 * ...
 
* Can be parallelised by simply running several copies of the script at once
* Also known as embarrassingly parallel problems

* Caveats: Will not work so well if each copy of the script uses a lot of memory and/or disk/network access


## Relevant tools -- GNU parallel

Trivial parallelisation is often best done without modifying your python code.

* Assumptions:
 * A script, which reads some input (or creates random input)
 * Output is written to a unique file for each copy


* The command line tool `parallel` is helpful here:
 * Makes it easy to run the script once for each input
 * Can automatically keep $N$ copies running simultaneously
 * Also has advanced features that can run scripts across several machines on a network

## A small digression: `jot`

`jot` is a handy command line tool to, for example, 
* generate sequences of integers

```shell
> jot 2
1
2
```
* generate random integers

```shell
> jot -r 2
34
82
```

* generate random 17-digit floats between 0 and 1

```bash
> jot -r -p17 1 0 1
0.86810522642917931
0.61719403299503028
```


## Relevant tools -- GNU parallel

<div style="width:100%;align:center">
<img src="parallel.png" alt="Drawing" style="width: 500px;"/>
</div>

https://www.gnu.org/software/parallel/

## Relevant tools -- GNU Parallel

#### Example 1: Processing a folder full of images

Assumptions:
* The script `analyse.py` takes one filename as argument
* This is running on a machine with 8 cores

```shell
> ls *.png | parallel -n1 -P8 python analyse.py
```

## Relevant tools -- GNU Parallel

#### Example 2: Running 1000 ensemble members in a Monte Carlo simulation

Assumptions:
* The script `montecarlo.py` uses different random numbers each time
* Output is written to a unique location each time
* This is running on a machine with 8 cores

```shell
> jot 1000 | parallel -n1 -P8 python montecarlo.py
```

## Relevant tools -- The multiprocessing module

Typical use case: You have a large number of independent tasks
* Run a function on many different inputs (e.g. analysing many images)
* Monte Carlo simulation with many random inputs
* Often quite similar to the cases where you would use GNU parallel

Properties:
* Runs each copy of a function in a separate process
* No shared memory between processes (all data must be input)

Use `multiprocessing.pool` to iterate over a list of inputs, keeping $N$ processes running until all inputs have been processed.

In [8]:
from multiprocessing import Pool

# For some reason, multiprocessing does not work with functions
# defined inside a notebook, therefore importing this from a file
from examples import wait_function

# List of inputs for which to run simulation
parameters = np.arange(4)

# Using a pool of 2 processes will keep
# 2 copies of the function running at all times,
# until the function has been run for each parameter
with Pool(2) as pool:
    output = pool.map(wait_function, parameters)
    
print(output)

[0, 1, 2, 3]


## Relevant tools -- The `concurrent.futures` module

* Introduced in Python 3.2
* Can do essentially the same things as multiprocessing
* Provides a common interface for asyncronous execution with either threads or processes

In [9]:
from concurrent.futures import ProcessPoolExecutor

# For some reason, multiprocessing does not work with functions
# defined inside a notebook, therefore importing this from a file
from examples import wait_function

# List of inputs for which to run simulation
parameters = np.arange(4)

# Using a pool of 2 processes will keep
# 2 copies of the function running at all times,
# until the function has been run for each parameter
with ProcessPoolExecutor(2) as pool:
    output = pool.map(wait_function, parameters)
    
# output is a generator containing the return values
for o in output:
    print(o)

0
1
2
3


## Types of parallelisation -- Shared memory parallelisation

Typical use case:
* You want to run one large simulation
* The simulation fits in memory on one machine


Shared memory parallelisation means:
* Several CPU cores work on a common, shared memory
* Typically each CPU works on a part of a large array

## Types of parallelisation -- Shared memory parallelisation


In C and Fortran, OpenMP is the typical example of shared memory parallelisation

```Fortran
do it = 2, Nt
    !$OMP PARALLEL DO
    do i = 2, Nx-1
        U(i, it) = U(i, it-1) + alpha*(U(i+1, it-1) - 2*U(i, it-1) + U(i-1, it-1))
    end do
    !$OMP END PARALLEL DO
end do
```

## Relevant tools -- Numba

* In Python, shared memory parallelisation is not completely straightforward due to the GIL (Global Interpreter Lock).
* Numba provides `prange`, which is quite similar to `$OMP PARALLEL DO`.
* https://numba.pydata.org/numba-doc/latest/user/parallel.html



```python
@jit(nopython=True, parallel=True)
def diffusion():
    ...
    for i in prange(1, Nx-1):
        C_ = C[i] + alpha*(C[i+1] - 2*C[i] + C[i-1])
```

In [12]:
from numba import jit, prange

@jit(nopython=True, parallel=True)
def diffusion(Nt):
    alpha = 0.49
    x = np.linspace(0, 1, 10000000)
    # Initial condition
    C = 1/(0.25*np.sqrt(2*np.pi)) * np.exp(-0.5*((x-0.5)/0.25)**2)
    # Temporary work array
    C_ = np.zeros_like(C)
    # Loop over time (normal for-loop)
    for j in range(Nt):
        # Loop over array elements (space, parallel for-loop)
        for i in prange(1, len(C)-1):
            C_[i] = C[i] + alpha*(C[i+1] - 2*C[i] + C[i-1])
        # A neat trick below, see link for details
        # https://stackoverflow.com/questions/72431361/poor-performance-from-numba-prange#comment127966065_72433087
        C, C_ = C_, C
    return C

# Run once to just-in-time compile
C = diffusion(1)

# Check timing
%timeit C = diffusion(10)

KeyboardInterrupt: 

## Types of parallelisation -- Distributed memory parallelisation

* Typical use case:
 * You want to run one large simulation
 * The problem does not fit in memory on one machine
 * (or it takes so long to run that you want to use additional CPUs)
 
* This type of parallel computing is usually what is meant by HPC (High-Perfomance Computing)
* Often implemented with MPI (Message Passing Interface)
* (but see also coarrays in Fortran, and Unified Parallel C)

## Relevant tools -- MPI for Python

* The `mpi4py` module provides a Python wrapper for MPI
* Syntax is somewhat similar to MPI in Fortran or C
* Designed to work efficiently with numpy arrays


* In MPI, there is no shared memory
* Use `mpirun` to launch $N$ copies of your program
* Communication between processes must be handled explicitly

## Trivial example: Parallel hello world

```python
from mpi4py import MPI

# Initialise MPI communicator
comm = MPI.COMM_WORLD
MPI_rank = comm.Get_rank()
MPI_size = comm.Get_size()

# Print rank number, and exit
print(f'Hello, this is rank {MPI_rank} of {MPI_size}')
```

## Less trivial example: Parallel 2D diffusion solver

* Split domain into smaller pieces
* One process (rank) will handle each piece
* Must write explicit communication to exchange boundaries (halo swap)
* This is an example of the SPMD paradigm (Single Program, Multiple Data)


<div style="width:100%;align:center">
<img src="2D_diffusion.png" alt="Drawing" style="width: 500px;"/>
</div>

```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from mpi4py import MPI


def FTCS(C, D, dt, dx):
    # One iteration with the FTCS method
    # Interior points only
    alpha = D*dt/dx**2
    C[1:-1, 1:-1] = C[1:-1, 1:-1] + alpha*(
         C[2:  , 1:-1] + C[ :-2, 1:-1]
        +C[1:-1, 2:  ] + C[1:-1,  :-2]
        -4*C[1:-1, 1:-1]
    )
    return C


# Initialise MPI communicator
comm = MPI.COMM_WORLD
MPI_rank = comm.Get_rank()
MPI_size = comm.Get_size()

if MPI_rank == 0:
    tic = time()

# Numerical parameters
Nx, Ny = 1024, 512
Xmin, Xmax = 0, 2
Ymin, Ymax = 0, 1
Tmax = 5
dt = 0.001

# Diffusivity
D = 0.0009

# Global coordinates
xc, dx = np.linspace(Xmin, Xmax, Nx, retstep = True)
yc, dy = np.linspace(Ymin, Ymax, Ny, retstep = True)

# Check stability condition for this method
assert D*dt/dx**2 < 0.25

# Infer local coordinates from rank, splitting the array along the x-axis only
cells_per_rank = int(Nx/MPI_size)

if MPI_size > 1:
    # Treat first and last rank separately
    if MPI_rank == 0:
        x_local = xc[ : cells_per_rank + 1]
    elif MPI_rank == MPI_size - 1:
        x_local = xc[MPI_rank * cells_per_rank - 1 : ]
    else:
        x_local = xc[MPI_rank * cells_per_rank - 1 : (1+MPI_rank)*cells_per_rank + 1]
else:
    x_local = xc

y_local = yc

print(f'This is rank: {MPI_rank} of {MPI_size}. Local x-axis is from {x_local[0]} to {x_local[-1]}')
sys.stdout.flush()



# Initial concentration (gaussian)
x0, y0 = 1.0, 0.5
sx, sy = 0.05, 0.05

# local grid points
gridy, gridx = np.meshgrid(y_local, x_local)
# Create concentration field for local cells only
C0 = np.exp(-(gridx - x0)**2/(2*sx**2) - (gridy-y0)**2/(2*sy**2))

# Work array
C = C0.copy()


# Loop over time
Nt = int(Tmax/dt)
for i in range(Nt):
    # Update interior points
    C = FTCS(C, D, dt, dx)

    # Only communicate if there is more than one rank
    if MPI_size > 1:
        # Exchange halo with neighbours
        # First set up non-blocking receives
        if MPI_rank == 0:
            # Set up non-blocking receive from the right
            comm.Irecv(C[-1,:], source = MPI_rank + 1)
        elif MPI_rank == MPI_size - 1:
            # Set up non-blocking receive from the left
            comm.Irecv(C[ 0,:], source = MPI_rank - 1)
        else:
            # Set up non-blocking receive from both the left and the right
            comm.Irecv(C[ 0,:], source = MPI_rank - 1)
            comm.Irecv(C[-1,:], source = MPI_rank + 1)

        # Then, set up blocking sends
        if MPI_rank == 0:
            # Set up blocking send to the right
            comm.Send(C[-2,:], dest = MPI_rank + 1)
        elif MPI_rank == MPI_size - 1:
            # Set up blocking send to the left
            comm.Send(C[ 1,:], dest = MPI_rank - 1)
        else:
            # Set up non-blocking send to both the left and the right
            comm.Send(C[ 1,:], dest = MPI_rank - 1)
            comm.Send(C[-2,:], dest = MPI_rank + 1)

        comm.Barrier()

print(f'This is rank: {MPI_rank} of {MPI_size}, finished computations')
sys.stdout.flush()

# Finally, gather results on rank 0, and store to file
if MPI_rank == 0:
    C0_global = np.zeros((MPI_size, cells_per_rank, Ny))
    C_global = np.zeros((MPI_size, cells_per_rank, Ny))
else:
    C0_global = None
    C_global = None

# Create a view of the local array that excludes boundary cells
if MPI_rank == 0:
    C0_local = C0[:-1, :]
    C_local = C[:-1, :]
elif MPI_rank == MPI_size -1:
    C0_local = C0[1:, :]
    C_local = C[1:, :]
else:
    C0_local = C0[1:-1, :]
    C_local = C[1:-1, :]

# Only communicate if there is more than one rank
if MPI_size > 1:
    # Collective communication
    comm.Gather(C_local, C_global, root = 0)
    comm.Gather(C0_local, C0_global, root = 0)
else:
    # If there is only one rank, store the entire array
    C0_global = C0
    C_global = C

if MPI_rank == 0:
    np.save('C0_MPI.npy', C0_global.reshape(Nx, Ny))
    np.save('C_MPI.npy', C_global.reshape(Nx, Ny))
    toc = time()
    print(f'Simulation took {toc - tic:.5f} seconds')
```