## Message Passing Interface (MPI) Exercises

High-Performance Computing is a field that leverages parallel processing to solve complex problems efficiently. 

A popular approach to parallel programming is the `Message Passing Interface (MPI)`.

In this tutorial, we will use Python and MPI to analyze power outage data from EAGLE-i, which contains information about the number of power outages, aggregated by county, in 15-minute time intervals. 

We will explore how using multiple processors to our advantage can save time when working with large datasets, and use the MPI for Python (`mpi4py`) package, which provides a simple Python interface to the MPI.

MPI is designed to allows users to easily perform distributed parallel processing across multiple processors on a single computer or even multiple nodes on an HPC system.

### Pre-requisites

- The eaglei dataset is already present in the data folder. So that's all we'll be using
- Also make sure that for just this exercise, you're using the login node jupyter server. If you've started your server in a reservation according to the instructions given to you, you can start a login node server by clicking on File->Hub Control Panel and clicking the Start button under the login node.

(2014-2022 dataset available at: https://drive.google.com/drive/u/0/folders/15skV7CWnMBLkrGIPmLrh1GgNPA00iUuB)

### Getting Started
1. Load and pre-process the power outage data
2. Divide the data into chunks to distribute among MPI processes
3. Implement parallel processing using MPI
4. Analyze and interpret the results

### Additional Resources: 
* Commonly Used HPC Terms: https://researchcomputing.princeton.edu/learn/glossary
* MPI Learning Resources: https://researchcomputing.princeton.edu/education/external-online-resources/mpi
  * MPI Tutorial by LBNL: https://hpc-tutorials.llnl.gov/mpi/
* Additional Research Computing Resources: https://researchcomputing.princeton.edu/learn/tutorials/external-resources-learning


#### 1.  Load and pre-process the power outage data
Let's first start by loading in a power outage data set using only one processor. The code for that is below.

In [1]:
import pandas as pd
import time as tp

# this line creates a variable to hold the time in which we began running the script
t1=tp.time()

# now we create a python list to hold the year of each file in the data set
file_nums = ['2014','2015','2016','2017','2018','2019','2020','2021','2022']

# we can also keep a count of the total blackouts as we read in the files using pandas
blackouts_total = 0

# this for loop iterates through each entry in our file_nums list we created above
for i in file_nums:
    # read the csv using pandas and assign the returned dataframe to our df variable
    df = pd.read_csv(f'../data/eaglei_outages/eaglei_outages_{i}.csv')

    # keep track of the number of blackouts in each file as we read them in
    blackouts = df['sum']
    blackouts_local = blackouts.sum()
    print(blackouts_local)

    blackouts_total+=blackouts_local

    print('Done with year', i)

# variable for the time after the script is completed so we can calculate the time the script took to run
t2 = tp.time()

print('The total time of the script was:',t2-t1, 'seconds')
print('The total blackouts from 2014-2022:', blackouts_total)

245259387
Done with year 2014
1565317037
Done with year 2015
2064484348
Done with year 2016
4378327281.0
Done with year 2017
3296135480.0
Done with year 2018
2824598528.0
Done with year 2019
5219277476.0
Done with year 2020
5325390318.0
Done with year 2021
4441543481.0
Done with year 2022
The total time of the script was: 64.9508786201477 seconds
The total blackouts from 2014-2022: 29360333336.0


We can see that the script took a total of ~70 seconds to read in the files. 

#### 2. Divide the data into chunks to distribute among MPI processes

Now let's try the same using MPI and multiple processors. In this example, we will use 9 processes each reading in a different file using its own processor all in parallel. To do this, we need to submit a job to Perlmutter using the sbatch script "eagle_mpi.sbatch"
Now let's try the same using MPI and multiple processors. In this example, we will use 9 processes each reading in a different file using its own processor all in parallel. To do this, we need to submit a job to Perlmutter using the sbatch script "eagle_mpi.sbatch"


But let's first look at the small changes we made to the code and some MPI terminology to help us understand what the changes did.

Here is the MPI code:

```
import pandas as pd
from mpi4py import MPI
import time as tp


# running with 9 threads because 9 years of data, so assuming something like `mpirun -np 9 ...`
comm = MPI.COMM_WORLD
rank = comm.Get_rank()

t1 = tp.time()
file_nums = ['2014','2015','2016','2017','2018','2019','2020','2021','2022']

df = pd.read_csv(f'../data/eaglei_outages/eaglei_outages_**{file_nums[rank]}.csv'**)

blackouts = df['sum']
blackouts_local = blackouts.sum()

**blackouts_total = comm.reduce(blackouts_local, op=MPI.SUM, root=0)**

t2 = tp.time()

if rank==0:
    print('The total time of the script was:',t2-t1, 'seconds')
    print("Rank 0 got the sum, the total blackouts from 2014-2022:", int(blackouts_total))

```

The first small change is that we are using mpi4py, which is a python package that uses MPI.
```
from mpi4py import MPI
```
To use this on your laptop or cluster, there must be a regular MPI implementation installed, like OpenMPI, in addition to the mpi4py. For the bootcamp we will use the default MPI that loaded on the Perlmutter supercomputer, which is where the Jupyer hub will send this exercise to run.



# MPI Terminology

To handle the parallelization, MPI sets different ranks that serve as IDs to track each of the parallel tasks. These ranks are tracked in something called a Communicator.

**Communicator**   An object that represents a group of processes than can communicate with each other.

**Rank** Within a communicator each process is given a unique integer ID. Ranks start at 0 and are incremented contiguously. Ranks can be mapped to hardware processing elements like CPU cores.

For a typical MPI program, the number of ranks is set by the programmer in the command used to run the program. This allows the programmer to try different numbers of processes per task without needing to change the code.



In the code above, the next small change is setting up the communicator and ranks with the lines:
```
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
```
You can think of a communicator as a package that holds all the needed organizational information for its MPI region in the code. Inside the communicator each process is given a rank.

All MPI functions within the same MPI region will get each process’s rank from the communicator. Each rank has copies of the same code, so programmer must use logic, based on the MPI rank's ID to differentiate what that code processes. 

In the code above, the logic we use is to tie the rank number to the file number that gets opened, so that each rank is opening a different set of outage data.

Look at the difference in the loop code and MPI code:

Here is the loop from the serial code, which opens each file one after the other:
```
for i in file_nums:
    # read the csv using pandas and assign the returned dataframe to our df variable
    df = pd.read_csv(f'../data/eaglei_outages/eaglei_outages_{i}.csv')

```
Here is the MPI Code line that will open all the files at once, each one a different processor based on the rank ID.

```
df = pd.read_csv(f'../data/eaglei_outages/eaglei_outages_**{file_nums[rank]}.csv'**)

```
Note that the loop is gone and its index, i, has been replaced by th MPI rank ID. 

## Controlling the number of MPI ranks from a batch script 

The last piece to see is the fact that we can control the number of ranks from outside the code. This is done in the batch script that runs the code.


A `batch script` is a file used to submit a job to a job scheduler/workload manager like Slurm. 
- The batch script contains details of the job like:
  * the job name, 
  * max time the job can run, 
  * how many nodes are being requested for the job, 
  * which account the job should be charged to, etc.

Our batch script looks like this:

---
```bash
#!/bin/bash
#SBATCH --constraint=cpu
#SBATCH --nodes=1
#SBATCH --time=10
#SBATCH --ntasks-per-node=9
#SBATCH --job-name eaglei_mpi-%j
#SBATCH -o eaglei_mpi-%j.out
#SBATCH -e eaglei_mpi-%j.err

module load evp-patch
module load python

srun -n9 python3 eaglei_mpi.py

```

---
- The `#!/bin/bash` line specifies that the script is a Bash shell script, and the commands within it will be interpreted and executed using the Bash shell.

- The `#SBATCH` terms are Slurm directives, which specifies the 



Let's break down the components of the job execution command `srun -n9 python3 eaglei_mpi.py`:

| Terms | Meaning |
| :------- | :------- |
| `srun` | command used to submit and execute parallel jobs on a cluster |
| `-n9`| the `-n` flag specifies the number of processes to run in parallel, which will each execute the `eaglei_mpi.py` script independently, processing a portion of the dataset| 
|`python3` | interpreter that will be used to execute the script |
| `eaglei_mpi.py` | python script containing the MPI code to distribute the data and tasks, that will be executed in the MPI parallel environment |

#### 3. Implement parallel processing using MPI

To run it, just execute the next shell which submits our script to Perlmutter.

In [2]:
ID=!sbatch --parsable eaglei_mpi.sbatch

Once you see a new file in your directory to the left, you can open and print it with the following cell.

In [4]:
with open(f"eaglei_mpi-{ID.n}.out") as f:
    print(f.read())

The total time of the script was: 12.58487057685852 seconds
Rank 0 got the sum, the total blackouts from 2014-2022: 29360333336



We can see there was quite a speedup from sharing the load of reading in the files using MPI. 

Let's look at another example, in this example we will use the 2014 Eagle-I power outage data set to calculate the total number of blackouts for each county.

In [5]:
import pandas as pd
import numpy as np
import time as tp

df = pd.read_csv(f'../data/eaglei_outages/eaglei_outages_2014.csv')

all_counties = df.fips_code.unique()

t1=tp.time()
counties = []
sums = []

for i in all_counties:
    #Find the position in the CSV of a specific county (i)
    specific_county_filter = df['fips_code'] == i

    #Filter out the data so that we only look at a specific county (i)
    specific_county_data  = df[specific_county_filter]['sum']

    #Add up the total blackouts of a specific county (i)
    specific_county_sum = specific_county_data.sum()
    
    counties.append(int(i))
    sums.append(int(specific_county_sum))
    

d = {'FIPS Code': counties, 'Blackout Sum' : sums}
blackouts_df = pd.DataFrame(data=d)  
    
t2=tp.time()

print('Total time of script', t2-t1)
blackouts_df.sort_values(by=['FIPS Code'])
blackouts_df.head()

Total time of script 2.512744188308716


Unnamed: 0,FIPS Code,Blackout Sum
0,1037,970
1,1051,126098
2,1109,4131
3,1121,196
4,4017,2848


Now, let's try the same thing but use MPI. 

In [6]:
ID=!sbatch --parsable eaglei_mpi_ex2.sbatch

In [7]:
with open(f"eaglei_mpi_ex2-{ID.n}.out") as f:
    print(f.read())
    
blackouts_df.sort_values(by=['FIPS Code'], ascending=True)
blackouts_df.head()

# Saves dataframe to .csv file  
blackouts_df.to_csv('county_blackouts.csv')

Total time of script 0.6176290512084961



#### 4. Analyze and interpret the results
To see the results, look at the column on the left and click on county_blackouts.csv.

We can definitely see a speedup in our calculation using MPI to run in parallel, and we even added a step that writes our dataframe to the filesystem in .csv format so that we could load it back up here in our Jupyter notebook. Do you think we would continue to see gains in performance if we add even more parallel processes?

Try editing the `my_ex2.sbatch` file and submit it with the cells below.

In [8]:
ID=!sbatch --parsable my_ex2.sbatch

In [9]:
with open(f"my_ex2-{ID.n}.out") as f:
    print(f.read())
    
blackouts_df = pd.read_csv('county_blackouts.csv')
blackouts_df.sort_values(by=['FIPS Code'], ascending=True)
blackouts_df.head()

FileNotFoundError: [Errno 2] No such file or directory: 'my_ex2-sbatch: error: Invalid numeric value "<CHANGE" for --ntasks-per-node..out'