<a href="https://colab.research.google.com/github/ombystoma-young/BI-ml-course/blob/main/lab1/MPI_2022_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MPI (Message-Passing Interface)


## Overview

* Mechanism of data exchange between processes
* Two types of communication: 
 * **point-to-point**: between two processes 
 * **collective**: between multiple processes
* Typically only one program, branching depending on the process 
* Using the mpi4py Python library 

An mpi4py tutorial: 
* https://mpi4py.readthedocs.io/en/stable/tutorial.html


Install mpi4py:




In [None]:
!pip install mpi4py

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mpi4py
  Downloading mpi4py-3.1.3.tar.gz (2.5 MB)
[K     |████████████████████████████████| 2.5 MB 8.4 MB/s 
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
Building wheels for collected packages: mpi4py
  Building wheel for mpi4py (PEP 517) ... [?25l[?25hdone
  Created wheel for mpi4py: filename=mpi4py-3.1.3-cp37-cp37m-linux_x86_64.whl size=2185257 sha256=76a3c725d312a466adbce67cc8e1f4e204395421d5de34eab475fae662600643
  Stored in directory: /root/.cache/pip/wheels/7a/07/14/6a0c63fa2c6e473c6edc40985b7d89f05c61ff25ee7f0ad9ac
Successfully built mpi4py
Installing collected packages: mpi4py
Successfully installed mpi4py-3.1.3


## A basic example (no data exchange)

Save as `mpi.py` and run with `mpirun -n 3 python mpi.py`.

In [None]:
# mpi.py
from mpi4py import MPI
comm = MPI.COMM_WORLD  # ?
rank = comm.Get_rank() # index of the current process 
print ("hello from process ", rank)

hello from process  0


No communication:  
`--allow-run-as-root` for colab, locally dont need it  
`-n` number of copies

In [None]:
!mpirun --allow-run-as-root -n 8 python mpi.py 

hello from process  4
hello from process  5
hello from process  7
hello from process  0
hello from process  1
hello from process  6
hello from process  2
hello from process  3


! No order between processes.

Basic example.

## Point-to-point communication of two processes

### Example: computing $\pi$ with MPI (*why this formula*)

$$\pi=\sqrt{6\sum_{n=1}^{\infty}\frac{1}{n^2}}$$

**Exercise:** Theoretically estimate the error resulting if we truncate the series at $N$ terms.  

Without MPI:

In [None]:
import numpy as np
a = np.arange(1,200000)
print (np.sqrt(6*np.sum(1./(a*a))))

3.1415878789259364


### Functions ``send()``, ``recv()`` (p2p) 

Save as `mpi.py` and run with `mpirun -n 2 python mpi.py`.

In [None]:
# Evaluate the sum of 2M terms by splitting into two groups of M terms.

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 

M = 100
def getPartialSum(start, end):
    a = np.arange(start, end)
    return np.sum(1./(a*a))

s = getPartialSum(1+rank*M, 1+(rank+1)*M)
print ('Process', rank, 'found partial sum from term', 1+rank*M, 'to term', 1+(rank+1)*M-1, ': ', s )

# process 1 sends its partial sum to process 0
if rank == 1:
    comm.send(s, dest=0) 
    
# process 0 receives the partial sum from process 1, adds to its own partial sum
# and outputs the result    
elif rank == 0: 
    s_other = comm.recv(source=1)
    s_total = s+s_other
    print ('total partial sum =', s_total)
    print ('pi_approx =', np.sqrt(6*s_total))
    
print ('Process', rank, 'finished')

In [None]:
!mpirun --allow-run-as-root -n 2 python mpi.py 

Process 0 found partial sum from term 1 to term 100 :  1.6349839001848931
Process 1 found partial sum from term 101 to term 200 :  0.004962645830104402
Process 1 finished
total partial sum = 1.6399465460149976
pi_approx = 3.1368263063309683
Process 0 finished


**Exercise:** Perform the same computation in a "ping-pong" manner: one process sums only even terms, the other only odd terms; after adding a new term the process sends the current result to the other process.  

## Collective communication (many processes)

Perform efficient (fast, load-balanced) collective operations (e.g., summations) involving multiple processes. 

<img src='https://materials.jeremybejarano.com/MPIwithPython/_images/fastSum.png'>

In [None]:
#from IPython.display import Image
#Image(filename="fastSum.png")

### Function ``gather()``

Pass data from all processes to the chosen process.

Save as `mpi.py` and run with `mpirun -n 5 python mpi.py`.

In [None]:
# Evaluate the sum of MN terms by splitting into M groups of N terms.

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size() # total number of processes
rank = comm.Get_rank()

M = 100
def getPartialSum(start, end):
    a = np.arange(start, end)
    return np.sum(1./(a*a))

s = getPartialSum(1+rank*M, 1+(rank+1)*M)

partialSums = comm.gather(s, root=0)
print ('partialSums gathered at process %d:' %(rank), partialSums) 

if rank == 0:
    print ('pi_approx =', np.sqrt(6*np.sum(partialSums)))

In [None]:
!mpirun --allow-run-as-root -n 5 python mpi.py 

partialSums gathered at process 4: None
partialSums gathered at process 1: None
partialSums gathered at process 3: None
partialSums gathered at process 2: None
partialSums gathered at process 0: [1.6349839001848931, 0.004962645830104402, 0.0016597368826256017, 0.0008309063464401552, 0.0004988762708311448]
pi_approx = 3.1396841231387222


### Function ``bcast()`` (broadcasting)

Pass data from the chosen process to all other processes.

Save as `mpi.py` and run with `mpirun -n 3 python mpi.py`.

In [None]:
# basic usage of bcast()
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    some_data = {0: 'abcd', 1:1234}
else:
    some_data = None
    
print ("I'm process", rank, '; data before broadcasting:', some_data)
data = comm.bcast(some_data, root=0)
print ("I'm process", rank, '; data after broadcasting:', data)

In [None]:
!mpirun --allow-run-as-root -n 3 python mpi.py 

I'm process 0 ; data before broadcasting: {0: 'abcd', 1: 1234}
I'm process 2 ; data before broadcasting: None
I'm process 0 ; data after broadcasting: {0: 'abcd', 1: 1234}
I'm process 1 ; data before broadcasting: None
I'm process 1 ; data after broadcasting: {0: 'abcd', 1: 1234}
I'm process 2 ; data after broadcasting: {0: 'abcd', 1: 1234}


### Functions ``scatter()``, ``reduce()``

* ``scatter()``: distribute data from one source to all processes
* ``reduce()``: combine data from all processes using a collective operation like `sum` or `max` (without order dependence)

In [None]:
# Evaluate the sum of N terms by scattering them to N processes.
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
    data2scatter = [a*a for a in range(1,size+1)]
else:
    data2scatter = None
    
data = comm.scatter(data2scatter, root=0)

print ('Data at process', rank, ':', data)

b = 1./data

partialSum = comm.reduce(b, op = MPI.SUM, root = 0)

print ('Partial sum at process', rank, ':', partialSum)

if rank == 0:
    result = np.sqrt(6*partialSum)
    print ('Pi_approx:', result)

In [None]:
!mpirun --allow-run-as-root -n 8 python mpi.py 

## Example: parallel scalar product
* Generate two random vectors $\mathbf x$ and $\mathbf y$ at the root process. Goal: compute their scalar product $\langle\mathbf x,\mathbf y\rangle$ 
* Divide $\mathbf x$ and $\mathbf y$ into chunks and scatter them to the other processes
* Compute scalar products between chunks at each process
* Obtain $\langle\mathbf x,\mathbf y\rangle$ by reducing (summing) local scalar products

In [None]:
#"to run" syntax example: mpirun -n 10 python mpi.py 4000000
from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

#read from command line
N = int(sys.argv[1])    #length of vectors

#arbitrary example vectors, generated to be evenly divided by the number of
#processes for convenience

x = np.random.rand(N) if comm.rank == 0 else None
y = np.random.rand(N) if comm.rank == 0 else None

#initialize as numpy arrays
dot = np.array([0.])
local_N = np.array([0])

#test for conformability
if rank == 0:
                if (N != y.size):
                                print ("vector length mismatch")
                                comm.Abort()

                #currently, our program cannot handle sizes that are not evenly divided by
                #the number of processors
                if (N % size != 0):
                                print ("the number of processors must evenly divide n.")
                                comm.Abort()

                #length of each process's portion of the original vector
                local_N = np.array([N//size])

#communicate local array size to all processes
comm.Bcast(local_N, root=0)

#initialize as numpy arrays
local_x = np.zeros(local_N)
local_y = np.zeros(local_N)

#divide up vectors
comm.Scatter(x, local_x, root=0)
comm.Scatter(y, local_y, root=0)

#local computation of dot product
local_dot = np.array([np.dot(local_x, local_y)])

#sum the results of each
comm.Reduce(local_dot, dot, op = MPI.SUM)

if (rank == 0):
                print ("The dot product computed with MPI:", dot[0])
                print ("The dot product computed w/o  MPI:", np.dot(x,y))

In [None]:
!mpirun --allow-run-as-root -n 10 python mpi.py 4000000

The dot product computed with MPI: 999649.6724805026
The dot product computed w/o  MPI: 999649.672480504



**Exercise:** Why is the result $\approx$ 1E6? A bad RNG?

*2Q: Why near 1m, why not exact*  
BNL

$E[<x, y>]$  
$<x, y>/N - 1/4 \sim O(N^{-1/2})$  
цпт $\sum \frac{(x_ny_n - 1/4)}{\sqrt{N}} → Normal\ sh$

### Reduce-based computation of $\pi$

In [None]:
# run syntax example: mpirun -n 10 python mpi.py 4000000
from mpi4py import MPI
import numpy as np
import sys

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

#read from command line
N = int(sys.argv[1])    #number of terms

#initialize as numpy array
s = np.array([0.])

#test for conformability
if (N % size != 0):
    print ("the number of processors must evenly divide n.")
    comm.Abort()

#length of each process's portion of the original vector
local_N = np.array([N/size])

def getPartialSum(start, end):
    a = np.arange(start, end)
    return np.sum(1./(a*a))

#local computation of partial sum
local_s = getPartialSum(1+rank*local_N, 1+(rank+1)*local_N)
local_s = np.array([local_s])

#sum the results of each local sum
comm.Reduce(local_s, s, op = MPI.SUM)

if (rank == 0):
    pi_approx = np.sqrt(6*s[0])
    print ("pi_approx:", pi_approx)

The program execution time can be measured with commands `time` or `/usr/bin/time -v`:

(`real`: wall clock time).



In [None]:
!time mpirun --allow-run-as-root -n 1 python mpi.py 100000000
!time mpirun --allow-run-as-root -n 2 python mpi.py 100000000
!time mpirun --allow-run-as-root -n 4 python mpi.py 100000000

pi_approx: 3.14159264404049

real	0m1.556s
user	0m0.605s
sys	0m0.905s
pi_approx: 3.1415926440404958

real	0m1.285s
user	0m1.159s
sys	0m1.054s
pi_approx: 3.1415926440404975

real	0m1.673s
user	0m1.801s
sys	0m1.165s


Speedup and efficiency of parallelization with 2 processes: 

In [None]:
Speedup = 1.821/1.325
print ('Speedup:', Speedup)

Efficiency = Speedup/2
print ('Efficiency:', Efficiency)

Speedup: 1.3743396226415094
Efficiency: 0.6871698113207547


| -------------------------------- | t1  
| -------------- |  
| -------------- | (n=3)  
| -------------- | t3 

$$Speedup =\frac{t_1}{t_3}$$
$$Efficiency =\frac{t_1}{t_3\cdot n}$$