# Message Passing using MPI



Let's first startup the IPyParallel cluster so that we can use MPI in this Jupyter notebook. For this notebook, it is a good idea to start the cluster with 24 MPI ranks (check output below). Once we have connected to the cluster we issue `%autopx` which means that the commands of all following code cells will be executed on 24 parallel workers.

In [40]:
import numpy as np
import ipcmagic
import ipyparallel as ipp
%ipcluster start -n 24 --mpi
rc = ipp.Client()
rc.ids
dv = rc[:]
dv.activate()
dv.block = True
print("Running IPython Parallel on {0} MPI engines".format(len(rc.ids)))
print("Commands in the following cells will be executed on the workers in parallel (disable with %autopx)")
%autopx

IPCluster is ready! (7 seconds)
Running IPython Parallel on 24 MPI engines
Commands in the following cells will be executed on the workers in parallel (disable with %autopx)
%autopx enabled


We are going to use MPI for this notebook. `mpi4py` is a Python interface to MPI. There are interfaces to MPI in almost all programming languages (Fortran, C/C++, Python, Julia, ...). The [documentation](https://mpi4py.readthedocs.io/en/stable/) is a good resource for an introduction and further information.

In [58]:
import time
import timeit
import numpy as np
import matplotlib.pyplot as plt 
from mpi4py import MPI

## Welcome to COMM_WORLD!

With the commands above we have 24 parallel processeses (workers) running on our compute node. For those familiar with the `top` command, you can head over to a `File` &rarr; `New` &rarr; `Terminal` and you will find 24 instances of `ipengine` running, which are the 24 workers.

MPI works with groups of these workers called communicators. Each MPI command accepts a communicator and the command will only apply to the processes in this communicator. There is a base communicator called `COMM_WORLD` which contains *all* processes that are available.

In [42]:
comm = MPI.COMM_WORLD

Since we have enabled `%autopx` above, the previous cell and all following cells are being executed on *all* ranks simultaneously and in parallel. So now there is a variable named `comm` defined on all 24 Python processes running on the node and it points to the `COMM_WORLD` communicator.

We can quickly validate that this is true, by defining and printing a variable on all ranks. JupyterHub automatically gathers the standard output from the connected ranks and prints all 24 messages as a result of the cell execution.

In [43]:
a = np.random.rand(1)
print(a)

[stdout:0] [0.52436392]
[stdout:1] [0.3150981]
[stdout:2] [0.14236342]
[stdout:3] [0.43183621]
[stdout:4] [0.20246559]
[stdout:5] [0.24915352]
[stdout:6] [0.85668804]
[stdout:7] [0.46433847]
[stdout:8] [0.73912341]
[stdout:9] [0.062132]
[stdout:10] [0.14578772]
[stdout:11] [0.10527085]
[stdout:12] [0.91758823]
[stdout:13] [0.4902655]
[stdout:14] [0.77023704]
[stdout:15] [0.23535369]
[stdout:16] [0.79037411]
[stdout:17] [0.38931935]
[stdout:18] [0.16488362]
[stdout:19] [0.94837535]
[stdout:20] [0.67645961]
[stdout:21] [0.88278475]
[stdout:22] [0.85071377]
[stdout:23] [0.05138395]


Now we can query our communicator in order to figure out how many ranks it contains. Also, we can query the rank number of each of the processes running. MPI will automatically assign each rank a unique integer increasing from 0.

In [44]:
size = comm.Get_size()
rank = comm.Get_rank()
print("I am rank {} of a total of {} ranks.".format(rank, size))

[stdout:0] I am rank 1 of a total of 24 ranks.
[stdout:1] I am rank 8 of a total of 24 ranks.
[stdout:2] I am rank 19 of a total of 24 ranks.
[stdout:3] I am rank 16 of a total of 24 ranks.
[stdout:4] I am rank 9 of a total of 24 ranks.
[stdout:5] I am rank 11 of a total of 24 ranks.
[stdout:6] I am rank 4 of a total of 24 ranks.
[stdout:7] I am rank 15 of a total of 24 ranks.
[stdout:8] I am rank 20 of a total of 24 ranks.
[stdout:9] I am rank 22 of a total of 24 ranks.
[stdout:10] I am rank 23 of a total of 24 ranks.
[stdout:11] I am rank 7 of a total of 24 ranks.
[stdout:12] I am rank 21 of a total of 24 ranks.
[stdout:13] I am rank 10 of a total of 24 ranks.
[stdout:14] I am rank 3 of a total of 24 ranks.
[stdout:15] I am rank 14 of a total of 24 ranks.
[stdout:16] I am rank 2 of a total of 24 ranks.
[stdout:17] I am rank 18 of a total of 24 ranks.
[stdout:18] I am rank 12 of a total of 24 ranks.
[stdout:19] I am rank 5 of a total of 24 ranks.
[stdout:20] I am rank 17 of a total of

Similar to shared memory parallelism with OpenMP, sometimes there are things that cannot be done in parallel. Good examples are writing data to disk or plotting the result of a computation. For this purpose, we define a *master rank* responsible for executing these parts of the code. By default the master rank is typically rank 0. This is due to the simple fact that rank 0 always exists, even if a MPI-parallel program is executed with a single worker.

In [58]:
# this runs in parallel
a = 1

# now execute I/O only on master rank
if rank == 0:
    print(a)
    
# this runs in parallel again
b = 1

[stdout:23] 1


Note that one of the main differences to the OpenMP programming model is that with MPI the default is to run things in parallel and sequential regions have to be programmed explicitly (using an `if rank == 0` statement in the example above), whereas in the OpenMP programming model parallel regions have to be indicated with directives and the default is sequential execution.

## Point-to-Point Communication

The main functionality that the MPI API has to provide is a means of exchanging data (messages) between the workers running in parallel. Since the workers do not share the same address space, they can not access each others variables.

In [59]:
if rank == 0:
    c = 42
    
if rank == 1:
    print(c)

CompositeError: one or more exceptions from call to method: execute
[7:execute]: NameError: name 'c' is not defined

So how can we send the variable a from rank 0 to rank 1? For this purpose, MPI provides point-to-point communication semantics. These are basically a set of methods that can be used to send information from a single rank to another single rank.

In [60]:
if rank == 0:
    c = 42
    comm.send(c, dest=1, tag=1001)

if rank == 1:
    c = comm.recv(source=0, tag=1001)
    print(c)

[stdout:7] 42


Note that the MPI API allows for tagging messages with a unique ID, which helps avoid errors when there are many messages from many senders arriving at the same time or when there several messages of different type are being received from the same sender. When the receiver requests a message with a specific tag number, messages with different tags will be buffered until the receiver is ready for them. Let's make a simple example.

There is no way of knowing in which order the messages from rank 2 and rank 1 will arrive at rank 0. If we would not have specified a tag, rank 0 might end up waiting for the message from rank 1 indefinitely because the message from rank 2 arrive before (deadlock).

You can try switching around the order of the receives on rank 0 and re-run. You can also try removing the tags, although this might put your MPI library in a undefined state and you might have to restart the JupyterHub in order to start with a fresh environment. (Make sure to save your notebook beforehand.)

In [11]:
if rank == 1:
    c = 1
    comm.send(c, dest=0, tag=1001)

if rank == 2:
    c = 2
    comm.send(c, dest=0, tag=1002)

if rank == 0:
    c1 = comm.recv(source=1, tag=1001)
    c2 = comm.recv(source=2, tag=1002)
    print(c1, c2)

[stdout:23] 1 2


Most MPI methods contain two version, one with a capital letter in front (`comm.Send()`) and one with a lowercase letter in front (`comm.send()`). The main difference is that the latter (that we used above), can be used to send any arbitrary Python object around. Internally, mpi4py uses the `pickle` package to serialize the object into a byte stream which is then unpickled on the reciever side. This type of communication is attached with an overhead.

Often we simply want to send a raw data array using the MPI API. For this purpose, we can use the methods which start with an uppercase letter.

In [61]:
if rank == 0:
    c = np.array( [42], dtype=np.float64 )
    comm.Send(c, dest=1, tag=1002)
    
if rank == 1:
    c = np.empty( (1,), dtype=np.float64 )
    comm.Recv(c, source=0, tag=1002)
    print(c)

[stdout:7] [42.]


If we are sending around raw data buffers, we need to pre-allocate the storage on the receiver. The receiver has to know in advance the type and amount of data that it will receive from the sender. If there is a mismatch between the data type or the number of data elements, your programm will crash and may be in an undefined state.

In [37]:
if rank == 0:
    c = np.array( [42, 43], dtype=np.float64 )
    comm.Send(c, dest=1, tag=1002)
    
if rank == 1:
    c = np.empty( (2,), dtype=np.float64 )
    comm.Recv(c, source=0, tag=1002)
    print(c)

KeyboardInterrupt: 

You can also get silent errors, which are very hard to debug. The MPI library is only really concerned with the size of the send and receive buffer. The necessary and sufficient condition for a send/recv operation to complete successfully, is that the receive buffer is at least as large in number of bytes as the send buffer.

Try chaning the sending side to sending `float32` in the above example. Now the sender will send 2 values of type `float32` which corresponds to a total of 8 bytes. The receiver is expecting one value of type `float64` which also corresponds to a total of 8 bytes. The correct data has been transferred, it's just that the interpretation of the data on the receiving side is incorrect.

Change back the type to `float64` and try increasing the number of elements in the receive buffer from 1 to 4 and rerun the example above. You can see, that as long the number of elements received is smaller or equal to the buffer size, MPI does not issue an error. The values `c[1]`, `c[2]`, `c[3]`, `c[4]` are undefine, because `c` has been allocated using `np.empty()` which does not initialize the initial values of `c`.

The main advantage of using the raw buffer API of the `mpi4py` is speed. In the example below we are transferring a numpy data array of considerable size multiple times using the raw buffer API and the pickle/unpickle API of mpi4py. You can see that there is a considerable time difference and if speed is of concern, the raw buffer interface is preferable, although it provides less safety.

In [17]:
num_bytes = 1024 * 1024
num_iter = 2048
c = np.random.rand(num_bytes // 8)

if rank == 0:
    tic = timeit.default_timer()
    for iter in range(num_iter):
        comm.Send(c, dest=1, tag=1003)
    toc = timeit.default_timer()
    print("Rank 0 spent {:.4f}s sending {} GB using raw buffers".format(toc-tic, num_bytes * num_iter / 1024**3))

    tic = timeit.default_timer()
    for iter in range(num_iter):
        comm.send(c, dest=1, tag=1004)
    toc = timeit.default_timer()
    print("Rank 0 spent {:.4f}s sending {} GB using pickle/unpicle".format(toc-tic, num_bytes * num_iter / 1024**3))

if rank == 1:
    tic = timeit.default_timer()
    for iter in range(num_iter):
        comm.Recv(c, source=0, tag=1003)
    toc = timeit.default_timer()
    print("Rank 1 spent {:.4f}s receiving {} GB using using raw buffers".format(toc-tic, num_bytes * num_iter / 1024**3))

    tic = timeit.default_timer()
    for iter in range(num_iter):
        c = comm.recv(source=0, tag=1004)
    toc = timeit.default_timer()
    print("Rank 1 spent {:.4f}s receiving {} GB using using pickle/unpickle".format(toc-tic, num_bytes * num_iter / 1024**3))

[stdout:14] 
Rank 1 spent 0.1822s receiving 2.0 GB using using raw buffers
Rank 1 spent 1.8997s receiving 2.0 GB using using pickle/unpickle
[stdout:23] 
Rank 0 spent 0.1595s sending 2.0 GB using raw buffers
Rank 0 spent 1.8997s sending 2.0 GB using pickle/unpicle


## Deadlock

Deadlock is a situation where a MPI rank (worker) is waiting for an operation to complete which never does because of an error in the program logic. Such errors can be hard to debug and careful design and checking of the user code may be more efficient than trial and error when parallelizing a sequential code with MPI.

A classical example is if a rank is trying to receive a message that has never been sent. Try commenting out the `Send()` after having run the example below for a first time. If there is no `Send()` the `Recv()` will simply hang and wait for a message to arrive.

You have choose the `Kernel` &rarr; `Interrupt Kernel` menu option in order to stop the kernel. Unfortunately, the state of workers is unefined after such an error (see [Stopping the IPyParallel Cluster](#restarting) at the end of this notebook).

In [30]:
a = np.array([42.], dtype=np.float64)

if rank == 0:
    print("Sending message on rank 0")
    comm.Send(a, dest=1)

if rank == 1:
    print("Receiving message on rank 1")
    comm.Recv(a, source=0)
    print(a)

NameError: name 'rank' is not defined

Another classical way to produce a deadlock is the situation when two ranks want to exchange a piece of information (data from a numpy array in our case). Do you understand why the code below deadlocks?

*Warning: you will have to abort the kernel and possibly restart the ipcluster once more, sorry!*

The reason for the deadlock is that MPI internally often uses a handshake protocol. Normally, in a deadlock free situation, the conversation between rank 0 and 1 in the following way:

- Rank 0: Hey, I would like to send you 128 MB of data (Request to send, RTS)
- Rank 1: Ok, I have a matching receive (Clear to send, CTS)
- Rank 0: I'm sending you the data (RDMA)
- Rank 0: Done, the data is in your memory (Finished transmission, FIN)

This is called the rendevous or handshake protocol. Now in the situation below both ranks request to send data and wait for the other rank.

An easy way to fix the program in the cell below is to switch the order of the `Send()` and `Recv()` around on rank 1.

In [46]:
num_elements = 16 * 1024 * 1024
a = np.random.rand(num_elements)
b = np.empty(num_elements, dtype=np.float64)

if rank == 0:
    comm.Send(a, dest=1, tag=100)
    comm.Recv(b, source=1, tag=101)
    print('a has been received in b on rank 0')

if rank == 1:
    comm.Send(a, dest=0, tag=101)
    comm.Recv(b, source=0, tag=100)
    print('a has been received in b on rank 1')

[stdout:0] a has been received on rank 1
[stdout:23] a has been received on rank 0


Since this is such a common situation, a special MPI API method called `Sendrecv()` is provided exactly for this use case.

In [51]:
num_elements = 16 * 1024 * 1024
a = np.random.rand(num_elements)
b = np.empty(num_elements, dtype=np.float64)

if rank == 0:
    comm.Sendrecv(sendbuf=a, dest=1, sendtag=100, recvbuf=b, source=1, recvtag=101)
    print('a has been received in b on rank 0')

if rank == 1:
    comm.Sendrecv(sendbuf=a, dest=0, sendtag=101, recvbuf=b, source=0, recvtag=100)
    print('a has been received in b on rank 1')

[stdout:0] a has been received in b on rank 1
[stdout:23] a has been received in b on rank 0


## Non-blocking Communication

The send and receive methods introduced above are *blocking*, in the sense that they do not return until the communcation has been executed. On many systems, performance can be significantly increased by overlapping communication and computation. This is particularly true on systems where communication can be executed autonomously by an intelligent, dedicated communication controller.

For this purpose, MPI provides *nonblocking* methods. The general pattern for a non-blocking operation is `req = MPI.Isomething()` which initiates the communication and then followed by a `req.wait()` later in the code which waits until the communication operation has completed (if it has not already done so. This allows to overlap communication and computation with the following pattern.

Let's revisit the deadlock problem above. We can retain the same order of send/receive operations on all ranks but use a non-blocking send instead.

In [67]:
num_elements = 16 * 1024 * 1024
a = np.random.rand(num_elements)
b = np.empty(num_elements, dtype=np.float64)

if rank == 0:
    req = comm.Isend(a, dest=1, tag=100)
    comm.Recv(b, source=1, tag=101)
    req.wait()
    print('a has been received in b on rank 0')

if rank == 1:
    req = comm.Isend(a, dest=0, tag=101)
    comm.Recv(b, source=0, tag=100)
    req.wait()
    print('a has been received in b on rank 1')

[stdout:0] a has been received in b on rank 1
[stdout:23] a has been received in b on rank 0


## Synchronisation

In a distributed memory system individual workers can progress at their own speed and may be at completely different place in the program execution. A simple example is given below, where the ranks 0, 1, and 2 differ by over 2 s where they reach different points in the program.

In [66]:
t1 = timeit.default_timer()

if rank == 0:
    time.sleep(1)

# Point 2
t2 = timeit.default_timer()

if rank == 1:
    time.sleep(2)

# Point 3
t3 = timeit.default_timer()

if rank in [0, 1, 2]:
    print("Rank {} reached Point 2 after {:.5f}s and Point 3 after {:.5f}s".format(rank, t2-t1, t3-t1))

[stdout:0] Rank 1 reached Point 2 after 0.00010s and Point 3 after 2.00231s
[stdout:16] Rank 2 reached Point 2 after 0.00010s and Point 3 after 0.00018s
[stdout:23] Rank 0 reached Point 2 after 1.00131s and Point 3 after 1.00147s


MPI provides the `comm.Barrier()` method which synchronizes all of the ranks (workers). The "barrier" opens only once all of the ranks have reached the barrier. Insert a barrier before the timer at Point 2 and another barrier before the timer at Point 3. Rerun and compare the results.

Note that barriers can also very easily lead to deadlock situations. If a `comm.Barrier()` is put inside an `if`-statement it can happen that some ranks actually never reach the barrier and MPI will wait indefinitely. If you insert an `comm.Barrier()` in the `if rank == 1` body, you will experience yet another deadlock and will have to restart the IPyParallel cluster.

## Collective Communication

Collective communication are communication patterns that involve all ranks in a communicator. If `MPI.COMM_WORLD` is used as the communicator, collective communication involves all available ranks. In principle, collective communication is only a convenience, since all possible communication patterns can be implemented using point-to-point communication. But programs are much easier to read because the communication patterns are expressed on a higher-level and in a consistent manner.

Common collective communication patterns are one-to-all (*broadcast*, *scatter*), all-to-one (*gather*) and all-to-all (*allgather*, *alltoall*). We will only cover the most basic variants here. 

#### Broadcast (same data on all ranks)

A typical use case is that a configuration file is read on the root rank and the the configuration is distributed onto all other ranks. For example, in weather and climate models, the namelist parameters are often only read on the root rank and then broadcast to all the other ranks.

In [70]:
if rank == 0:
    data = {'alpha'  : 0.01,
            'active' : True}
else:
    data = None
data = comm.bcast(data, root=0)
print(data)

[stdout:0] {'alpha': 0.01, 'active': True}
[stdout:1] {'alpha': 0.01, 'active': True}
[stdout:2] {'alpha': 0.01, 'active': True}
[stdout:3] {'alpha': 0.01, 'active': True}
[stdout:4] {'alpha': 0.01, 'active': True}
[stdout:5] {'alpha': 0.01, 'active': True}
[stdout:6] {'alpha': 0.01, 'active': True}
[stdout:7] {'alpha': 0.01, 'active': True}
[stdout:8] {'alpha': 0.01, 'active': True}
[stdout:9] {'alpha': 0.01, 'active': True}
[stdout:10] {'alpha': 0.01, 'active': True}
[stdout:11] {'alpha': 0.01, 'active': True}
[stdout:12] {'alpha': 0.01, 'active': True}
[stdout:13] {'alpha': 0.01, 'active': True}
[stdout:14] {'alpha': 0.01, 'active': True}
[stdout:15] {'alpha': 0.01, 'active': True}
[stdout:16] {'alpha': 0.01, 'active': True}
[stdout:17] {'alpha': 0.01, 'active': True}
[stdout:18] {'alpha': 0.01, 'active': True}
[stdout:19] {'alpha': 0.01, 'active': True}
[stdout:20] {'alpha': 0.01, 'active': True}
[stdout:21] {'alpha': 0.01, 'active': True}
[stdout:22] {'alpha': 0.01, 'active': True

#### Scatter (distribute data to ranks)

Scatter takes a data array on a given root rank and distributes it across the ranks in equally sized chunks. A typical use caes for a scatter operation is when data is being read from disk and the distributed to the ranks in order to work on the data in parallel. For weather and climate models, very often the initial condition is read as entire model levels (often called *global fields*) which are then scattered to the different subdomains on the different ranks according to the domain-decomposition in the horizontal. (This is further discussed in the next notebook.)

In [79]:
num_elements = 6 * size

global_a = None
if rank == 0:
    global_a = np.linspace(0., num_elements - 1., num_elements)

a = np.empty(num_elements // size, dtype=np.float64)

comm.Scatter(global_a, a, root=0)

print("Rank {} has a = {}".format(rank, a))

[stdout:0] Rank 1 has a = [ 6.  7.  8.  9. 10. 11.]
[stdout:1] Rank 8 has a = [48. 49. 50. 51. 52. 53.]
[stdout:2] Rank 19 has a = [114. 115. 116. 117. 118. 119.]
[stdout:3] Rank 16 has a = [ 96.  97.  98.  99. 100. 101.]
[stdout:4] Rank 9 has a = [54. 55. 56. 57. 58. 59.]
[stdout:5] Rank 11 has a = [66. 67. 68. 69. 70. 71.]
[stdout:6] Rank 4 has a = [24. 25. 26. 27. 28. 29.]
[stdout:7] Rank 15 has a = [90. 91. 92. 93. 94. 95.]
[stdout:8] Rank 20 has a = [120. 121. 122. 123. 124. 125.]
[stdout:9] Rank 22 has a = [132. 133. 134. 135. 136. 137.]
[stdout:10] Rank 23 has a = [138. 139. 140. 141. 142. 143.]
[stdout:11] Rank 7 has a = [42. 43. 44. 45. 46. 47.]
[stdout:12] Rank 21 has a = [126. 127. 128. 129. 130. 131.]
[stdout:13] Rank 10 has a = [60. 61. 62. 63. 64. 65.]
[stdout:14] Rank 3 has a = [18. 19. 20. 21. 22. 23.]
[stdout:15] Rank 14 has a = [84. 85. 86. 87. 88. 89.]
[stdout:16] Rank 2 has a = [12. 13. 14. 15. 16. 17.]
[stdout:17] Rank 18 has a = [108. 109. 110. 111. 112. 113.]
[st

#### Gather (assemble data from ranks)


Gather is the inverse operation of scatter. It assembles equally sized chunks from the ranks back into a single data array on a specified root rank.

In [80]:
global_b = None
if rank == 0:
    global_b = global_a.copy()

comm.Gather(a, global_b, root=0)

if rank == 0:
    if np.all(global_a == global_b):
        print("Everything assembled back together on rank 0")

[stdout:23] Everything assembled back together on rank 0


<a id='restarting'></a>
## Stopping the IPyParallel Cluster

Sometimes, the state of the workers in undefined, for example after a deadlock and having to interrupt and restart the kernel. When this happens, it is best to exit the `%autopx` and stop the IPyParallel cluster using `%ipcluster stop`. Then one can restart the kernel and start exeucting the notebook from the beginning again.

If this also doesn't help, you have to restart your JupyterHub Server by `File` &rarr; `Hub Control Panel` &rarr; `Stop Server` and start starting over again.

In [38]:
%autopx

%autopx disabled


In [39]:
%ipcluster stop