**DeapSECURE module 6: Parallel Programming**

# Session 1: Message Passing Interface (MPI)

Welcome to the DeapSECURE online training program!
This is a Jupyter notebook for the hands-on learning activities of the
["Parallel and High-Performance Programming" module](https://deapsecure.gitlab.io/deapsecure-lesson06-par/),
episodes 3: ["Introduction to MPI: Distributed Memory Programming in Python using mpi4py"](https://deapsecure.gitlab.io/deapsecure-lesson06-par/10-mpi-intro/index.html) and 4: ["Communicating Data with MPI"](https://deapsecure.gitlab.io/deapsecure-lesson06-par/11-communicate-data/index.html).
Please visit the [DeapSECURE](https://deapsecure.gitlab.io/) website to learn more about our training program.

<a id="TOC"></a>
**Quick Links** (sections of this notebook):

* 1 [Setup](#sec-Setup)
* 2 [Introduction to MPI Functionalities](#sec-Intro_mpi)
* 3 [More Advanced MPI](#sec-Adv_mpi)



<!--
* 4 [Scaling of a Parallel Program](#sec-par)
-->

> **CAUTIONS**
>
> <!-- FIXME FIXME FIXME
> In this session, we will use this notebook as partly Python and partly UNIX shell to invoke MPI programs directly from the Jupyter notebook in order to capture the results in the same notebook.
> This notebook was designed with ODU Wahab cluster and Open OnDemand in mind, in which it is possible to launch MPI programs within the notebook environment.
> Sufficient number of CPU cores must be requested when starting this Jupyter session to run MPI programs interactively from the notebook.
> Otherwise, the program must be invoked through your cluster's job scheduler.
> -->
>
> **FOR THE TIME BEING, THIS NOTEBOOK IS ONLY USED FOR REFERENCE.**
>
> The trick to launch MPI programs from within a notebook is still broken and still canoot be done.
> Please launch MPI jobs directly from the login node's terminal using an appropriate job script and the `sbatch` command, not from Jupyter terminal.
> Please follow your instructor regarding the specific way of running MPI programs on your HPC site.
>
> There is a lot of variabilities regarding how one must run MPI programs on a particular HPC site, as well as the timing results of the MPI programs.
> Running the programs on different clusters will definitely result in different timing as well.
> For this reason, this notebook is not intended to be strictly reproducible across different environments.

<a id="sec-Setup"></a>
## 1. Setup Instructions

If you are opening this notebook from Wahab cluster's OnDemand interface, you're all set.

If you see this notebook elsewhere and want to perform the exercises on Wahab cluster, please follow the steps outlined in our setup procedure.

1. Make sure you have activated your HPC service.
2. Point your web browser to https://ondemand.wahab.hpc.odu.edu/ and sign in with your MIDAS ID and password.
3. Create a new Jupyter session with the following parameters: Python version **3.7**, Python suite `tensorflow 2.6 + pytorch 1.10`, Number of Cores **1**, Number of GPU **0**, Partition `main`, and Number of Hours at least **4**. (See <a href="https://wiki.hpc.odu.edu/en/ood-jupyter" target="_blank">ODU HPC wiki</a> for more detailed help.)

4. Get the necessary files using commands below within Jupyter:

       mkdir -p ~/CItraining/module-par
       cp -pr /shared/DeapSECURE/module-par/. ~/CItraining/module-par

Using the file manager on the left sidebar, now change the working directory to `~/CItraining/module-par`.
The file name of this notebook is `Par-session-1-MPI.ipynb`.

<!--
> **VERY IMPORTANT:** Make sure that you start the Jupyter session with at least **4 cores**. We will use this notebook to launch real MPI programs, so multiple CPU cores are needed.
-->

### 1.1 Reminder

* Throughout this notebook, `#TODO` is used as a placeholder where you need to fill in with something appropriate. 
* To run a code in a cell, press `Shift+Enter`.
* Use `ls` to view the contents of a directory.

### 1.2 Loading Python Libraries

<!--
Now we need to **import** the required libraries into this Jupyter notebook:`numpy`.
-->
<!--
**Important**: On Wahab HPC, software packages, including Python libraries, are managed and deployed via *environment modules*.
Before we can import the Python libraries in our current notebook, we have to load the corresponding environment modules.
We have setup a custom environment "DeapSECURE" which will load all the required libraries for this workshop. Please load "DeapSECURE" module.

* Load the modules above using the `module("load", "MODULE")` or `module("load", "MODULE1", "MODULE2", "MODULE n")` statement.
* Next, invoke `module("list")` to confirm that these modules are loaded.
* In this module, we have setup a custom environment "DeapSECURE" including all the required libraries. Please load "DeapSECURE" module. (You can also setup your [custom environment](https://wiki.hpc.odu.edu/Software/Python#install-additional-python-modules))
-->

Now we can import the following standard Python libraries:
`os`, `re`, `sys`.

In [None]:
"""Uncomment, edit, and run code below to import libraries""";
#import #TODO

Now load the functions from DeapSECURE's special module, `parallel_prog_env`, to make MPI programs available & invocable from your notebook:

In [None]:
"""Uncomment and run these commands""";

#import parallel_prog_env
#from parallel_prog_env import *

In [None]:
# Makes MPI-related programs and modules available to this notebook:
load_parallel_prog_env(module)

> **NOTE**: `parallel_prog_env` is a DeapSECURE-specific module.

### 1.3 Running Shell Commands inside the Jupyter Notebook

In code cells, lines that start with `!` are passed directly to the system shell.
For example, `!ls -F` will run the `ls` command with the `-F` flag in the current directory.

Let's practice running some shell commands to find out the following bits of information about the machine:

* Invoke `hostname` to print the name of the compute node you're running in.
* Invoke `date` command to print the current date/time.
* Invoke `pwd` command to print your current directory.
* Invoke `ls` command to print the contents of your current directory.

After issuing `ls`, do you see files such as `bcast.py`, `hello_world.py`, etc?
If not, you will have to `%cd` into `mpi4py` subdirectory:

In [None]:
"""Issue a '%cd mpi4py' command to get to the working directory of this MPI notebook.""";

#%cd mpi4py

<a id="sec-Intro_mpi"></a>
## 2. Introduction to MPI Functionalities

In this section, we will cover and demonstrate the basic functionalities of MPI.
Since MPI is a library, we will first explore several MPI functions to perform basic communication and synchronization.
But before doing this, we first need to know how to launch an MPI program.
An MPI program must be run externally using the `mpiexec` or `mpirun` command *from the shell*, as demonstrated below.

First, check if the MPI program is available:

In [None]:
! which mpiexec

If you have an error message, please consult with your instructor or TA to resolve the problem.

### 2.1 Running an MPI Program

Let us now launch our first MPI program using the `mpiexec` command:

```bash
! mpiexec -n 4 python3 hello_world.py
```

> **WORKAROUND**: To run successfully on ODU Wahab cluster *inside* this Jupyter session, you will need to insert the `--oversubscribe` flag:
>   ```bash
>   ! mpiexec --oversubscribe -n 4 python3 hello_world.py
>   ```
> The `--oversubscribe` flag is used only to work around limitations of running MPI codes under a Jupyter session. Under normal circumstances, *this should not be used in a batch job!*

In [None]:
"""Use this cell to run hello_world.py with 4 processes, as shown above""";
#TODO

> **NOTES**:
>
> * `mpiexec` is a external program, it will have to be invoked with the `!` prefix within the Jupyter notebook environment.
> * The `mpiexec -n N` command runs `N` copies of the same program. We will call each running program a *process*.
> * Older MPI implementations may use `mpirun` command instead of `mpiexec`, which is the standardized name introduced only recently.

**QUESTION**: Take a look at the contents of `hello_world.py`. How many `print` statements exist in the code? How many got printed? Can you explain the apparent discrepancy?

> **CAUTION**:
> Unlike other topics in this workshop, where we can interactively try out parts and pieces of a program using a Jupyter notebook, a part of an MPI program cannot be tried out in an *a la carte* manner.
> We cannot just place a few lines of code in this notebook and expect that they will be running "in the MPI way".
> We have to run the entire program to obtain the result of the program.
> Even if we want to try a part of a program, it still has to be coded as a complete program.


#### Running MPI Jobs on HPC

In this workshop we will explore both the traditional way of running MPI program via job scheduler, as well as an unorthodox way to launch the MPI program directly from the Jupyter session.
Under most circumstances, you need to ***submit the MPI-based programs to the HPC job scheduler***, as they tend to consume a lot of resources (memory and CPU).

Let's create a SLURM job script for the `hello_world.py` program:

```bash
#!/bin/bash
#SBATCH --job-name=hello_world
#SBATCH --ntasks=4
#SBATCH --output=hello_world.out
#SBATCH --time=0:10:00

source parallel-prog-env
mpiexec python3 hello_world.py
```

Several interesting things here:

* The `source parallel-prog-env` reads additional commands from `parallel-prog-env`, which loads the necessary environment modules for `mpiexec`, `python3` and other tools needed later on.

* There is no need to explicitly mention `-n 4` because the `mpiexec` knows to get that information from the job scheduler.

**EXERCISE**: Try submitting the `hello_world` program to the batch scheduler.

* Copy and paste the job script above to `hello_world.job` located in the same path as `hello_world.py`
* Submit the job script from the terminal: `sbatch hello_world.job`
* Program's output will be saved in `hello_world.out`

#### Running MPI Jobs Interactively

In this training workshop, for the convenience of making this notebook a record of your learning, we will guide you to try some small examples of MPI code interactively from this notebook.
Please understand that this is fine for learning, testing, and development; but this is *not* how we typically run real MPI programs.
Interactive MPI jobs must be run on the compute node, with sufficient resources (for example, to run 8 tasks you need to have 8 cores allocated).

### 2.2 Communicator, Size, and Rank

In MPI, a parallel program consists of a set of processes (independently-running *serial* programs) that use MPI library functions to communicate with one another.
In an MPI program, a group of interacting processes is represented by a *communicator* object.
The number of processes in that group is the *size* of the communicator.
Each process in this group is distinctly labeled by a *rank*, which is an integer from 0, 1, ..., N-1 (where N is the total number of processes in the execution).

Within a communicator, each process will be able to inquire the number of processes in that group, as well as its own rank:

```python
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
```

> **IMPORTANT**: The code snippet above is a standard preamble for any MPI program written in Python.

Please take a look at the code of `rank_size.py` and run this program now with 4 processes to see its output:

In [None]:
#TODO

The key approach of a parallel program is to divide up the work into parts that can be executed out *concurrently* (independently and simultaneously) so that the entire computation would take shorter amount of time.
Parallel programming, therefore, involves:

1. **Problem decomposition** -- dividing up work into a set of tasks that can execute independently, and assigning these tasks to the workers;

2. **Concurrency** -- making workers carry out their respective tasks at the same time;

3. **Communication** -- coordinating and exchanging data with among the workers as necessary.

Different workers will be assigned to work on different parts of the same data (known as **data parallelism**) or to work on (completely) different tasks that can be processed concurrently (**task parallelism**).

### 2.3 How to Make Processes Do Different Things?

As shown earlier, we launch `N` copies of processes using MPI, each executing the same program (i.e. the same set of instructions).
How can then we make these programs do different things?
The secret lies in the MPI-defined `rank` of a process: we use the `if (rank == SOMETHING) ... else` construct throughout an MPI program to decide what needs to be done by each process.
In the following subsections we will see how this constract is being used in two settings:
(1) to set up the so-called "master process" to handle things on behalf of all the processes;
(2) to send and receive messages.

### 2.4 Master Process (Master Rank)

In an MPI program, process rank 0 has a special role to coordinate the work for and results from the other processes. This is called the *master rank* or *master process*. The master process typically also reads the input file(s), saves the results to the output file(s), and prints messages while the program is running.
Depending on the nature of the computation, rank 0 may participate in the computation as one of the workers, or simply coordinating the other processes (which can be the case if this role already makes the processor very busy).

Here is a Python ***pseudocode*** example of the beginning of an MPI program, where the master process takes the special roles of reading input data and broadcasts the input data and parameters to the rest of the ranks:

```python
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    PROCESS_INPUT_DATA()

# Broadcast common parameters so that all workers have their parameters initialized
# All ranks must participate in the broadcast step
BROADCAST_COMMON_PARAMETERS()

if rank == 0:
    DOMAIN_DECOMPOSITION(num_domains=size)
    for r in range(1, size):
        SEND_WORK_TO_RANK(dest=r)
    # Typically, work for rank 0 is copied locally without send/receive here
else:
    RECEIVE_WORK_FROM_RANK(source=0)
```

The first `if rank == 0: ...` statement encloses statements that will be executed by only master rank.

In the second conditional block (`if rank == 0: ... else: ...`), the information bits about the work for each rank was distributed by the master rank to each worker (except the master rank itself).
The master itself transfers its portion of the work by simple variable assignments.

**EXERCISE**:
Please carefully read the `master.py` program in your hands-on package and try to comprehend the sequence of the "tasks" in this program.
This `master.py` is a generic template for a complete MPI program; the "tasks" are just the print statements.
The all-caps pseudo-functions above (such as `READ_INPUT_DATA`) will appear commented because these are just placeholders.
When using this as a basis to write your own program, you will replace these "tasks" with your own tasks.

Now, run `master.py` now with 4 processes to observe the sequence of events.
Take a close look how the tasks from various ranks unfold--as evidenced by the printouts:

In [None]:
"""Run the `master.py` with 4 processes""";
#TODO

**QUESTION**: Try to comprehend the output of the program above!
Remember there are four processes printing four streams of messages.
Are they synchronized -- or, how synchronized are the outputs from four processes?

> ***Pause here before reading the next paragraph.*** If possible, have discussion with your friend or group members. Compare the output obtained by different people (or different runs, if you are by yourself).

-----

The sequence of the messages printing emulates the sequence of tasks occuring during the parallel program's execution.
However, the output again underscores the asynchronous nature of the parallel (concurrent) processes.
Because there is no real MPI communications in the program, the relative ordering of the events among ranks does not look logical.

For an improved printout, try running `master_demo.py`, which tries to simulate the relative order of events across ranks.
We try to synchronize the printouts using `barrier()` function calls throughout the program in order to simulate the collective actions.

> The use of `barrier()` to synchronize printing is a nonstandard hack, but it seems to work.
> The MPI standard does not guarantee this.

In [None]:
"""Try running `master_demo.py` and comment on the result""";
#TODO

**QUESTION**: Try to comprehend the output now. Does it make sense?

### 2.5 Sending and Receiving Data: Point-to-Point

Here is a common example of sending and receiving data among two ranks, utilizing the `if...else...` construct.

```python
message_to_send = "SENT_FROM_RANK_" + str(rank)

if rank == 0:
    comm.send(message_to_send, dest=1)
    received_message = comm.recv(source=1)
else:
    received_message = comm.recv(source=0)
    comm.send(message_to_send, dest=0)
```

This example sends one mesage from rank 0 to rank 1, and sends another message from rank 1 to rank 0.
(Here, rank 0 is just like any other rank in the MPI process world, not representing nothing special such as the "master" rank.)
Notice the reversal of send and receive on the two respective ranks, because of the direction of the two messages.

**EXERCISE**: Please inspect and run `send_recv.py` the sample code for sending and receiving messages between two ranks. **Run this program with exactly 2 processes.**

### 2.6 Broadcast

Quite frequently, data from one rank needs to be *broadcast* to all the other ranks so they all have this data as well.
This is accomplished using the `bcast` function.
The `bcast` function propagates data from the one rank (often called the "root") to all the ranks, including the root itself.

A common usage is to broadcast simulation parameters (e.g. number of grid points, number of particles, etc.) from the master rank to the others.
Another usage is to perform global update of the state of the simulation (e.g. "change the temperature now").
While it is possible to accomplish this with a series of `send` and `recv` function calls, the use of broadcast results in shorter code and opens the opportunity for MPI library to perform its own optimized version of broadcast algorithm.

**EXERCISE**: Please inspect and run `bcast.py` the sample code for broadcasting messages. Run this program with 4 processes.

In [None]:
#TODO


After the broadcast, the variable `message` in all ranks contains the same value as that in rank 0.

> ### Point-to-Point and Collective Communications
>
> Sending message from one rank to another is a type of the [*point-to-point communication*](https://mpitutorial.com/tutorials/mpi-send-and-receive/).
> Broadcast is an example of the [*collective communications*](https://mpitutorial.com/tutorials/mpi-broadcast-and-collective-communication/) in MPI.
>
> For point-to-point communications, only the sending and receiving processes must participate.
> For collective communications, all processes (ranks) in the communicator (group) *must* participate, or else the program will deadlock (or exit with an error).
> Other collective communications include:
>
> * barrier (synchronization)
> * global data reduction (e.g. summation, maximum or minimum values)
> * data scattering or gathering

### 2.7 Barrier (Synchronization)

In MPI, all ranks are for the most part working independently until they have to communicate (either point-to-point or collective).
We can force all the ranks to synchronize their execution through a *barrier*:

```python
# ... prior part of program executing in parallel

comm.barrier()

# ... next part of program executing in parallel
```

The `master_demo.py` program we tried earlier is a demonstration of how barrier works.
Ranks that have reached the barrier earlier will wait until all the ranks have reached this point before continuing computation.
This is sometimes necessary to ensure a consistent global state before proceeding.

Aside from `barrier` itself, all other collective communications imply a barrier, since all the processes must communicate at that same point.

> **CAVEAT on MPI program outputs**: The MPI standard explicitly states that there is *no* guarantee on the order of the standard output streams from all the MPI processes.
> In our example above, we were lucky that the `barrier` function led to the the synchronized look to the output: the messages from all ranks were flushed before proceeding, thereby providing a consistent state of output, in line with what we expect from the program.
> But please do not rely on this on a real program.
> Usually it is best to let only the master process print things out.
> The exceptions are diagnostic messages, which do not need to appear in order.
> If there are many diagnostic messages to print, then (multiple) log files are appropriate.

### 2.8 Reduction

Another very common collective operation is data reduction, through the `reduce` function.
Consider this program:

```python
my_number = rank * rank
SUM_NUMBER = comm.reduce(my_number, op=MPI.SUM, root=0)
```

The `reduce` function will gather the individual `my_number` values from all the ranks and perform the desired operation (in this case, a sum): `SUM_NUMBER` $= 0 + 1 + 4 + .. + (N-1)^2$.

> The sum total (`SUM_NUMBER`) is defined only on the master rank.

**EXERCISE**: Please inspect and run `reduce.py`, an example of using data reduction. Run this program with 4 processes.

In [None]:
#TODO


### 2.9 Scatter

The `scatter` function takes $N$ messages and scatters them to $N$ processes so that rank number $i$ will receive the $i$-th message.
Take for example a list or an array (called `data` in the program below): If we scatter the list, the $i$-th element will be sent to the $i$-th rank.
Note that the sending process (root) also keeps a chunk for itself.
This function is very useful to **distribute** chunks of data to many processes.
*Scatter* differs from broadcast is that receiving process only get a part of the original message, this is, one chunk that has been assigned to them.

**EXERCISE**:
Please inspect and run `scatter.py` sample code for scattering message. Run this program with 4 processes:

In [None]:
#TODO

### 2.10 Gather

The `gather` function is the converse of the `scatter`: It *gathers* individual messages from all the ranks to the root rank.
Each rank sends a single piece of data back to the root;
the receiving side (root rank, which is also one of the sending ranks) arranges the received element in ordered manner into a list (or an array), where the $i$-th element comes from the $i$-th rank.

**EXERCISE**: Please inspect and run `gather.py`  sample code for gathering messages. Run this program with 4 processes:

In [None]:
#TODO


<a id="sec-Adv_mpi"></a>
## 3. More Advanced MPI

### 3.1 Blocking and Non-blocking Communication

There are the *blocking* and *non-blocking* variants for every function: the former variant will wait until the message is sent or received, whereas the latter will return immediately, while MPI library will process the request in the background, i.e., in an asynchronous manner.
In this introductory workshop, we only focus on the blocking variant.
For writing real codes, you may need to use the non-blocking variants in order to achieve the highest level of performance.

### 3.2 Using the Right Tools

Use the highest level of MPI functions that achieve your desired objective. For example, in lieu of a series of MPI `send`'s to all the other workers, use the `scatter` function.


### 3.3 More Resources

The following tutorial is highly recommended to learn more about MPI. More complete list of resources is given in the next notebook.

"**Message Passing Interface (MPI) -- A Tutorial by LLNL**"

https://hpc-tutorials.llnl.gov/mpi/

Written for C/C++/Fortran languages.
Has more complete coverage than basic functionalities, to include derived data types and topology.

## What's Next?

This notebook is only concerned with the basics of MPI library.
In our next session, we will make use of these functionalities to create a real parallel program.
The next notebook will guide you through the required additional ingredients to create such a program.