## <center>Introduction to Message Passing Interface (MPI)</center>
### <center> Linh B. Ngo </center>
### <center> CPSC 3620 </center>

#### <center>Message Passing

- Processes communicate via messages
- Messages can be
    - Raw data to be used in actual calculations
    - Signals and acknowledgements for the receiving processes regarding the workflow

#### <center>History of MPI

** Early 80s:**
- Various message passing environments were developed
- Many similar fundamental concepts
- N-cube (Caltech), P4 (Argonne), PICL and PVM (Oakridge), LAM (Ohio SC)

** 1992: **
- More than 80 reseachers from different institutions in US and Europe agreed to develop and implement a common standard for message passing
- First meeting colocated with Supercomputing 1992

** After finalization: **
- MPI becomes the *de-factor* standard for distributed memory parallel programming
- Available on every popular operating system and architecture
- Interconnect manufacturers commonly provide MPI implementations optimized for their hardware
- MPI standard defines interfaces for C, C++, and Fortran
    - Language bindings available for many popular languages (quality varies)
    - MPI4PY: Bindings for python

** 1994: MPI-1 **
- Communicators
    - Information about the runtime environments
    - Creation of customized topologies
- Point-to-point communication
    - Send and receive messages
    - Blocking and non-blocking variations
- Collectives
    - Broadcast and reduce
    - Gather and scatter

** 1998: MPI-2 **
- One-sided communication (non-blocking)
    - Get & Put (remote memory access)
- Dynamic process management
    - Spawn
- Parallel I/O
    - Multiple readers and writers for a single file
    - Requires file-system level support (LustreFS, PVFS)

** 2012: MPI-3 **
- Revised remote-memory access semantic
- Fault tolerance model
- Non-blocking collective communication
- Access to internal variables, states, and counters for performance evaluation purposes

#### <center> Set up MPI on Palmetto for C/C++

**Interactive mode:**

`qsub -I -l select=1:ncpus=8:mpiprocs=8:mem=10gb,walltime=01:00:00`

`module load gcc/5.3.0 openmpi/1.10.3`

*The module load command can be added to a script, which then is to be sourced from inside .bashrc to automate module loading. Calling the module load directly from inside .bashrc is not recommended.*

- Create a file named **hello.c** inside directory **cpsc3620** with the following content
```
#include <stdio.h>
#include <sys/utsname.h>
#include <mpi.h>
int main(int argc, char *argv[]){
    MPI_Init(&argc, &argv);
    struct utsname uts;
    uname (&uts);
    printf("My process is on node %s.\n", uts.nodename);
	MPI_Finalize();
	return 0;
}
```

- Compile hello.c
```
mpicc hello.c -o hello
```
- Run hello.c
```
mpirun -np 2 ./hello
```

#### <center> Set up MPI on Palmetto for Python (Interactive via Jupyter Notebook)

**Before launching JupyterHub**
- Make sure that you have the command ``module load gcc/5.3.0 openmpi/1.10.3`` in your .jhubrc file. If you are using JupyterHub to edit the file, the server will need to be stopped and started again. 

**Before launching a Jupyter notebook (only need to be done once)**
- Install mpi4py by executing ``pip install --user mpi4py`` from a terminal. This needs to be done prior to launching a Jupyter notebook. 

**Before launching a Jupyter notebook (any time you wish to run interactive MPI inside Jupyter Notebook)**
- Inside a terminal (that must be kept open), execute the following command:
```
ipcluster start --n <Number of total possible cores> --profile=mpicluster
```

- In the first cell of your Jupyter notebook, type the followings**
```
import ipyparallel
c = ipyparallel.Client(profile="mpicluster")
```

- Test the attached cluster by running the following in a cell:
```
print(c.ids)
```

- Any cell that contains MPI codes must be started with ``%%px``

In [54]:
import ipyparallel
c=ipyparallel.Client(profile="mpicluster")
print(c.ids)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


In [55]:
%%px
from mpi4py import MPI
import socket
print ("My process is on node %s" % (socket.gethostname()))

[stdout:0] My process is on node node1140
[stdout:1] My process is on node node1140
[stdout:2] My process is on node node1140
[stdout:3] My process is on node node1140
[stdout:4] My process is on node node1140
[stdout:5] My process is on node node1140
[stdout:6] My process is on node node1140
[stdout:7] My process is on node node1140
[stdout:8] My process is on node node1140
[stdout:9] My process is on node node1140
[stdout:10] My process is on node node1140
[stdout:11] My process is on node node1140
[stdout:12] My process is on node node1140
[stdout:13] My process is on node node1140
[stdout:14] My process is on node node1140
[stdout:15] My process is on node node1140


#### <center> The working of MPI in a nutshell

- All processes are launched at the beginning of the program execution
    - The number of processes are user-speficied
    - Typically, this number is matched to the total number of cores available across the entire cluster
- All processes have their own memory space and have access to the same source codes

**Basic parameters available to individual processes: **
```
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
```

- MPI defines **communicator** groups for point-to-point and collective communications
    - Unique IDs (**rank**) are defined for individual processes within a communicator group
    - Communications are performed based on these IDs
    - Default **global communication** (COMM_WORLD) contains all processes
    - For $N$ processes, ranks go from $0$ to $N-1$

In [56]:
%%px
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
print ("Hello world from process %s running on host %s out of %s processes" % 
       (rank, name, size))

[stdout:0] Hello world from process 0 running on host node1140 out of 16 processes
[stdout:1] Hello world from process 13 running on host node1140 out of 16 processes
[stdout:2] Hello world from process 4 running on host node1140 out of 16 processes
[stdout:3] Hello world from process 12 running on host node1140 out of 16 processes
[stdout:4] Hello world from process 3 running on host node1140 out of 16 processes
[stdout:5] Hello world from process 6 running on host node1140 out of 16 processes
[stdout:6] Hello world from process 8 running on host node1140 out of 16 processes
[stdout:7] Hello world from process 5 running on host node1140 out of 16 processes
[stdout:8] Hello world from process 14 running on host node1140 out of 16 processes
[stdout:9] Hello world from process 15 running on host node1140 out of 16 processes
[stdout:10] Hello world from process 2 running on host node1140 out of 16 processes
[stdout:11] Hello world from process 10 running on host node1140 out of 16 process

- Ranks are used to enforce execution/exclusion of code segments within the original source code

In [57]:
%%px
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
if (rank % 2 == 0):
    print ("Process %s is even" % (rank))
else:
    print ("Process %s is odd" % (rank))

[stdout:0] Process 0 is even
[stdout:1] Process 13 is odd
[stdout:2] Process 4 is even
[stdout:3] Process 12 is even
[stdout:4] Process 3 is odd
[stdout:5] Process 6 is even
[stdout:6] Process 8 is even
[stdout:7] Process 5 is odd
[stdout:8] Process 14 is even
[stdout:9] Process 15 is odd
[stdout:10] Process 2 is even
[stdout:11] Process 10 is even
[stdout:12] Process 7 is odd
[stdout:13] Process 1 is odd
[stdout:14] Process 9 is odd
[stdout:15] Process 11 is odd


- Ranks can be used as mean to calculate and distributed workload (data) among the processes

In [58]:
%%px
from mpi4py import MPI
import random
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
name = MPI.Get_processor_name()
A = [2,13,4,3,5,1,0,12,10,8,7,9,11,6,15,14]
print ("Elements %s and %s are assigned to process %s" % (A[rank%15], A[1+rank%15], rank))

[stdout:0] Elements 2 and 13 are assigned to process 0
[stdout:1] Elements 6 and 15 are assigned to process 13
[stdout:2] Elements 5 and 1 are assigned to process 4
[stdout:3] Elements 11 and 6 are assigned to process 12
[stdout:4] Elements 3 and 5 are assigned to process 3
[stdout:5] Elements 0 and 12 are assigned to process 6
[stdout:6] Elements 10 and 8 are assigned to process 8
[stdout:7] Elements 1 and 0 are assigned to process 5
[stdout:8] Elements 15 and 14 are assigned to process 14
[stdout:9] Elements 2 and 13 are assigned to process 15
[stdout:10] Elements 4 and 3 are assigned to process 2
[stdout:11] Elements 7 and 9 are assigned to process 10
[stdout:12] Elements 12 and 10 are assigned to process 7
[stdout:13] Elements 13 and 4 are assigned to process 1
[stdout:14] Elements 8 and 7 are assigned to process 9
[stdout:15] Elements 9 and 11 are assigned to process 11


- Individual processes rely on communication (message passing) to enforce workflow
    - Point-to-point Communication
    - Collective Communication

#### <center> Point-to-Point: Send and Receive

```
comm = MPI.COMM_WORLD
```
- Sender process:
```
comm.send(data, dest_rank)
```
- Receiver process:   
```
data = comm.recv(source_rank)
```

** Original MPI C Syntax: MPI_Send**
```
int MPI_Send(void *buf, 
	int count, 
	MPI_Datatype datatype, 
	int dest, 
	int tag, 
	MPI_Comm comm)
```

- MPI_Datatype may be MPI_BYTE, MPI_PACKED, MPI_CHAR, MPI_SHORT, MPI_INT, MPI_LONG, MPI_FLOAT, MPI_DOUBLE, MPI_LONG_DOUBLE, MPI_UNSIGNED_CHAR
- *dest* is the rank of the process the message is sent to
- *tag* is an integer identify the message. Programmer is responsible for managing tag


** Original MPI C Syntax: MPI_Recv**
```
int MPI_Recv(
	void *buf, 
	int count, 
	MPI_Datatype datatype, 
	int source, 
	int tag, 
	MPI_Comm comm,
	MPI_Status *status)
```

- MPI_Datatype may be MPI_BYTE, MPI_PACKED, MPI_CHAR, MPI_SHORT, MPI_INT, MPI_LONG, MPI_FLOAT, MPI_DOUBLE, MPI_LONG_DOUBLE, MPI_UNSIGNED_CHAR
- *source* is the rank of the process from which the message was sent.
- *tag* is an integer identify the message. MPI_Recv will only place data in the buffer if the tag from MPI_Send matches. The constant MPI_ANY_TAG may be used when the source tag is unknown or not important. 


In [59]:
%%px
from mpi4py import MPI
import random
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if (rank == 0):
    send_pkg = random.random()
    print (send_pkg)
    comm.send(send_pkg, dest = 1, tag = 1)
if (rank == 1):
    recv_pkg = 0
    recv_pkg = comm.recv(source = 0, tag = 1)
    print (recv_pkg)

[stdout:0] 0.15986945642629524
[stdout:13] 0.15986945642629524


**Blocking risks**
- Send data larger than available network buffer (Blocking send)
- Lost data (or missing sender) leading to receiver hanging indefinitely (Blocking receive)

**Data types**
- MPI4PY supports all default MPI's data types
- MPI4PY uses *pickle* to facilitate sending and receiving of complex data

In [60]:
%%px
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
    data = {'class': 'cpsc3620', 'semester': 'Fall 2016', 'enrollments': 40}
    comm.send(data, dest=1, tag=11)
elif rank == 1:
    data = comm.recv(source=0, tag=11)
    print(data)

[stdout:13] {'enrollments': 40, 'semester': 'Fall 2016', 'class': 'cpsc3620'}


In [61]:
%%px
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
    data = [1,2,3,4]
    comm.send(data, dest=1, tag=11)
elif rank == 1:
    data = comm.recv(source=0, tag=11)
    print(data)

[stdout:13] [1, 2, 3, 4]


#### <center> Collective Communication

- Must involve ALL processes within the scope of a communicator
- Unexpected behavior, including programming failure, if even one process does not participate
- Types of collective communications:
    - Synchronization: barrier
    - Data movement: broadcast, scatter/gather, all-to-all
    - Collective computation (aggregate data to perform computation): Reduce

**broadcast:**

**scatter:**

**gather:**

**all-to-all**

**reduce**