# MPI for Python.

MPI is a standardized and portable message-passing system designed to function on a wide variety of parallel computers. The standard defines the syntax and semantics of library routines. MPI for Python provides an object oriented approach to message passing which grounds on the standard MPI-2 C++ bindings. The interface was desined with focus in translating MPI syntax and semantics of standard MPI-2 bindings for C++ to Python. [1]

## mpi4py basics.

In MPI for Python, `Comm` is the base class of communicators. The two available predefined intracommunicator instances are `COMM_SELF` and `COMM_WORLD`. The number of processes in a communicator and the calling process rank can be respectively obtained with methods `Get_size()` and `Get_rank()`.

In [None]:
!cat communicator.py

To run an MPI-enabled Python application, one can use command **`mpirun -np .. python3 myprog.py`**, where users can specify how many processes MPI should start. The `mpirun` command below starts five-processes to run the `communicator.py` script. Each process gets the total number of processes and its own rank number.

In [None]:
!mpirun -np 5 python3 communicator.py

To look up the communication function definition, one can use `help(...)` as shown below.

In [None]:
from mpi4py import MPI
help(MPI.COMM_WORLD.Get_rank)

## Point-to-Point Communications.

Point to point communication enables the transmission of data between a pair of processes, one side sending, the other reciving. MPI provides a set of *send* and *receive* functions allowing the communication of *typed* data with an associated *tag*.

### Blocking Communications.

Blocking functions in MPI block the caller until the data buffers involved in the communication can be safely reused by the application program.

In [None]:
!cat p2psendrecv.py

In [None]:
!mpirun -np 2 python3 p2psendrecv.py

### Nonblocking Communications.

Nonblocking send and receive functions return immediately after *send/receive* operation. This means the process can continue to do something else, e.g. computation and check the status of the *send/receive* operation later.
This gives the possibility of overlapping communication and computation, such that the performance of the program can be increased.

In [None]:
!cat p2pisendirecv.py

In [None]:
!mpirun -np 2 python3 p2pisendirecv.py

MPI for Python can communicate any built-in or user-defined Python object by using the Python pickle module under the hood.
It also supports direct communication of any object exporting the single-segment buffer interface (e.g. Numpy arrays) with negligible overhead.
As seen in the above examples, communication of generic Python objects makes use of **all-lowercase** methods of the `Comm` class, i.e. `send()`, `recv()`, `isend()`, etc.
To communicate buffer-like objects, one has to use method names starting with an **upper-case** letter, like `Send()`, `Recv()`, `Bcast()`, etc.

In [None]:
!cat p2pnumpysendrecv.py

In [None]:
!mpirun -np 2 python3 p2pnumpysendrecv.py

**Exercise:**

Modify `p2psendrecv.py` to communicate 1000 integers. How long does the communication take?Compare the results with the ones obtained from `p2pnumpysendrecv.py` (the example above).
Which one is faster?

## Collective Communications.

Collective communications allow the communication of data between multiple processes of a group simultaneously. Collective functions come in blocking versions only.

![Alt text](collective_comm.gif)

The *bcast* collective communication function broadcasts data from one member to all members of a group.

In [None]:
!cat bcastnumpy.py

In [None]:
!mpirun -np 2 python3 bcastnumpy.py

The *gather* collective communication function gathers data from all members to one member of a group.

In [None]:
!cat gathernumpy.py

In [None]:
!mpirun -np 2 python3 gathernumpy.py

In [None]:
!cat scatternumpy.py

In [None]:
!mpirun -np 2 python3 scatternumpy.py

What do you think the reduction collective communication function does? Can you figure it out from the image shown above?

In [None]:
help(MPI.COMM_WORLD.Reduce)

**Exercise**:

Let each MPI process create a 10-elements numpy array, initialized with its own rank number.
Let process 0 calculate the total sum of all numpy arrays element-wise.
You can use the hints and template given below.

```
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank =                 # get process rank
size =                 # get total number of processes

sendbuf = np.zeros(10, dtype='i') + rank
recvbuf = None
if rank == 0:
  recvbuf = np.zeros(10, dtype='i')
comm.Reduce(, , op= , root=0)    # What should be reduced? And which operation is used?

if rank == 0:
  sum = sum(range(size))
  assert (recvbuf[:]==sum).all()
  print(recvbuf)
```

The result of the exercise should look like:

```
$mpirun -np 4 python3 reducenumpy.py 
[6 6 6 6 6 6 6 6 6 6]
```

```
$mpirun -np 5 python3 reducenumpy.py 
[10 10 10 10 10 10 10 10 10 10]
```

## Example: Kmeans clustering.

k-means clustering is a method of vector quantization, originally from signal processing, that is popular for cluster analysis in data mining. k-means clustering aims to partition n observations into k clusters in which each observation belongs to the cluster with the nearest mean, serving as a prototype of the cluster. [2]

Below is a serial implementation of Kmeans clustering, where we try the different values of K ranging from 1 to 40. We can see from the result that, the returned score of the different K values does not change much anymore after K=3. This code is a modification of the original code shown at [3].

In [None]:
import pandas
import pylab as pl
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import time
 
variables = pandas.read_csv('sample_stocks.csv')

Y = variables[['returns']]

X = variables[['dividendyield']]

start = time.time()

Nc = range(1, 40)
kmeans = [KMeans(n_clusters=i) for i in Nc]
kmeans

score = [kmeans[i].fit(Y).score(Y) for i in range(len(kmeans))]
score
stop = time.time()

elapsed = stop - start

print('Elapsed time: %f seconds' % elapsed)

pl.plot(Nc,score)

pl.xlabel('Number of Clusters')

pl.ylabel('Score')

pl.title('Elbow Curve')

pl.show()

Below is a parallel version of Kmeans clustering code presented above.

In [None]:
!cat kmeans.py

In [None]:
!mpirun -np 2 python3 kmeans.py

In [None]:
import pandas
import numpy as np
import pylab as pl
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
 
variables = pandas.read_csv('sample_stocks.csv')
Y = variables[['returns']]
X = variables[['dividendyield']]

pca = PCA(n_components=1).fit(Y)
pca_d = pca.transform(Y)
pca_c = pca.transform(X)

kmeans=KMeans(n_clusters=3)
kmeansoutput=kmeans.fit(Y)

pl.figure('3 Cluster K-Means')
pl.scatter(pca_c[:, 0], pca_d[:, 0], c=kmeansoutput.labels_)
pl.xlabel('Dividend Yield')
pl.ylabel('Returns')
pl.title('3 Cluster K-Means')

pl.show()

## Example: Matrix Vector multiplication.

Serial version of Matrix Vector multiplication.

In [None]:
"""
Serial version of Matrix-Vector Multiplication.
This code will run *iter* iterations of
  v(t+1) = M * v(t)
where v is a vector of length *size* and M a dense size*size
matrix. 
"""

import numpy as np
from numpy.fft import fft2, ifft2
from math import ceil, fabs
import time

size = 10000          # lengt of vector v
iter = 50             # number of iterations to run

# This is the complete vector
vec = np.zeros(size)            # Every element zero...
vec[0] = 1.0                    #  ... besides vec[0]

mat =np.zeros([size, size] , dtype='f')
mat[:,0] = 1.0
start = time.time()

for t in range(iter):
  result = np.inner(mat, vec)

stop = time.time()
elapsed = stop - start    ### Stop stopwatch ###

if fabs(result[iter]-1.0) > 0.01:
    print("!! Error: Wrong result!")

print(" %d iterations of size %d in %5.2fs: %5.2f iterations per second" %
    (iter, size, elapsed, iter/elapsed) 
)
print("============================================================================")

Below is a parallel version of Matrix Vector multiplication implemented with mpi4py and numpy arrays. It is a modification of the original code provided at [4].

Try different number of processes and see whether you get any speed up. Can you explain the result you get?

In [None]:
!cat matvec.py

In [None]:
!mpirun -np 4 python3 matvec.py

## Example: Nbody simulation.

In physics and astronomy, an N-body simulation is a simulation of a dynamical system of particles, usually under the influence of physical forces, such as gravity. N-body simulations are widely used tools in astrophysics, from investigating the dynamics of few-body systems like the Earth-Moon-Sun system to understanding the evolution of the large-scale structure of the universe. [5]

Below is a serial version of the Nbody simulation implemented in Python.

In [None]:
import numpy as np

def remove_i(arr, i):
  """Drops the ith element of an array."""
  shape = (arr.shape[0]-1,) + arr.shape[1:]
  new_arr = np.empty(shape, dtype=float)
  new_arr[:i] = arr[:i]
  new_arr[i:] = arr[i+1:]
  return new_arr

def acceleration(i, position, G, mass):
  """The acceleration of the ith mass."""
  ith_pos = position[i]
  rest_pos = remove_i(position, i)
  rest_mass = remove_i(mass, i)
  diff = rest_pos - ith_pos
  mag3 = np.sum(diff**2, axis=1)**1.5
  result = G * np.sum(diff * (rest_mass / mag3)[:,np.newaxis], axis=0)
  return result

def timestep(position, velocity, G, mass, dt):
  """Computes the next position and velocity for all masses given
  initial conditions and a time step size.
  """
  N = len(position)
  new_pos = np.empty(position.shape, dtype=float)
  new_velocity = np.empty(velocity.shape, dtype=float)
  for i in range(N):
    acceleration_i = acceleration(i, position, G, mass)
    new_velocity[i] = acceleration_i * dt + velocity[i]
    new_pos[i] = acceleration_i * dt ** 2 + velocity[i] * dt + position[i]
  return new_pos, new_velocity

def initial_cond(N, Dim):
  """Generates initial conditions for N unity masses at rest
  starting at random positions in D-dimensional space.
  """
  position0 = np.random.rand(N, Dim)
  velocity0 = np.zeros((N, Dim), dtype=float)
  mass = np.ones(N, dtype=float)
  return position0, velocity0, mass 

def simulate(timesteps, G, dt, position0, velocity0, mass):
  """N-body simulation of certain timesteps."""
  position, velocity = position0, velocity0
  for step in range(timesteps):
    new_pos, new_velocity = timestep(position, velocity, G, mass, dt)
    position , velocity = new_pos, new_velocity
  return position, velocity

if __name__ == "__main__":
  import time
  import h5py

  N = 256
  # Initialize N-body conditions
  # Set gravitational constant to 1
  Dim = 3
  G=1.0
  dt = 1.0e-3
  timesteps = 600
  path = '/Users/zhengm/src/play/python/mpi4py/'
  name = path + 'data_' + str(N).zfill(4) + 'nbody_seq.h5'

  start = time.time()
  position0, velocity0, mass = initial_cond(N, Dim)
  position, velocity = simulate(timesteps, G, dt, position0, velocity0, mass)
  stop = time.time()
  elapsed = stop - start
  print('Elapsed time is: %f seconds' % elapsed)  

  with h5py.File(name, 'w') as hf:
    hf.create_dataset('position', data=position)
    hf.create_dataset('velocity', data=velocity)

The exampe below is from the book "Effective computation in Physics" [6]. The aim of this example is to show how to use mpi4py. It is a relatively slow parallel implementation of the Nbody simulation.

In [None]:
!cat nbody_mpi4py_slow.py

In [None]:
!mpirun -np 4 python3 nbody_mpi4py_slow.py

Here is a Parallel version of the Nbody simulation implemented with mpi4py and numpy.

In [None]:
!cat nbody.py

In [None]:
!mpirun -np 4 python3 nbody.py

# References
#### [1] http://mpi4py.scipy.org/docs/usrman/index.html
#### [2] https://en.wikipedia.org/wiki/K-means_clustering
#### [3] http://www.michaeljgrogan.com/k-means-clustering-python-sklearn/.
#### [4] https://github.com/jbornschein/mpi4py-examples/blob/master/07-matrix-vector-product
#### [5] https://en.wikipedia.org/wiki/N-body_simulation
#### [6] http://shop.oreilly.com/product/0636920033424.do