In this notebook we will explore how using multiple processors to our advantage can save time when working with large data sets. We will use the MPI for Python (mpi4py) package which provides a simple Python interface to the Message Passing Interface (MPI). MPI is designed to allow users to more easily perform distributed parallel processing across multiple processors on a single computer or even multiple nodes on an HPC system.

Lets first start by loading in a power outage data set using only one processor. The code for that is below.

In [1]:
import pandas as pd
import time as tp

# this line creates a variable to hold the time in which we began running the script
t1=tp.time()

# now we create a python list to hold the year of each file in the data set
file_nums = ['2014','2015','2016','2017','2018','2019','2020','2021','2022']

# we can also keep a count of the total blackouts as we read in the files using pandas
blackouts_total = 0

# this for loop iterates through each entry in our file_nums list we created above
for i in file_nums:
    # read the csv using pandas and assign the returned dataframe to our df variable
    df = pd.read_csv(f'../data/eaglei_outages/eaglei_outages_{i}.csv')

    # keep track of the number of blackouts in each file as we read them in
    blackouts = df['sum']
    blackouts_local = blackouts.sum()
    print(blackouts_local)

    blackouts_total+=blackouts_local

    print('Done with year', i)

# variable for the time after the script is completed so we can calculate the time the script took to run
t2 = tp.time()

print('The total time of the script was:',t2-t1, 'seconds')
print('The total blackouts from 2014-2022:', blackouts_total)

245259387
Done with year 2014
1565317037
Done with year 2015
2064484348
Done with year 2016
4378327281.0
Done with year 2017
3296135480.0
Done with year 2018
2824598528.0
Done with year 2019
5219277476.0
Done with year 2020
5325390318.0
Done with year 2021
4441543481.0
Done with year 2022
The total time of the script was: 67.16525363922119 seconds
The total blackouts from 2014-2022: 29360333336.0


Unnamed: 0,fips_code,county,state,sum,run_start_time
0,1005,Barbour,Alabama,4.0,2022-01-01 00:00:00
1,1009,Blount,Alabama,160.0,2022-01-01 00:00:00
2,1051,Elmore,Alabama,3.0,2022-01-01 00:00:00
3,1055,Etowah,Alabama,4.0,2022-01-01 00:00:00
4,1057,Fayette,Alabama,4.0,2022-01-01 00:00:00
...,...,...,...,...,...
22327828,72031,Carolina,Puerto Rico,1869.0,2022-11-12 18:30:00
22327829,72097,Mayagüez,Puerto Rico,420.0,2022-11-12 18:30:00
22327830,72113,Ponce,Puerto Rico,214.0,2022-11-12 18:30:00
22327831,72127,San Juan,Puerto Rico,8.0,2022-11-12 18:30:00


We can see that the script took a total of ~70 seconds to read in the files. Now lets try the same using MPI and multiple processors. In this example, we will use 9 processes each reading in a different file using its own processor all in parallel. To do this, we need to submit a job to Perlmutter using the sbatch script "eagle_mpi.sbatch"

A batch script is a file used to submit a job to a job scheduler/workload manager like Slurm. The batch script contains details of the job like the job name, max time the job can run, how many nodes are being requested for the job, which account the job should be charged to, etc.

Our batch script is very simple and looks like this:

```bash
#!/bin/bash
#SBATCH --constraint=cpu
#SBATCH --nodes=1
#SBATCH --time=10
#SBATCH --ntasks-per-node=9
#SBATCH --job-name eaglei_mpi-%j
#SBATCH -o eaglei_mpi-%j.out
#SBATCH -e eaglei_mpi-%j.err

cd ~/jupyter_bootcampproject_examples/eaglei_mpi_scripts

srun -n9 python3 eaglei_mpi.py
```

To run it, just execute the next shell which submits our script to Perlmutter.

In [27]:
ID=!sbatch --parsable eaglei_mpi.sbatch

Once you see a new file in your directory to the left, you can open and print it with the following cell.

In [None]:
with open(f"eaglei_mpi-{ID.n}.out") as f:
    print(f.read())

We can see there was quite a speedup from sharing the load of reading in the files using MPI. Lets look at another example, in this example we will use the 2014 EagleI power outage data set to calculate the total number of blackouts for each county.

In [97]:
import pandas as pd
import numpy as np
import time as tp

df = pd.read_csv(f'../data/eaglei_outages/eaglei_outages_2014.csv')

all_counties = df.fips_code.unique()

t1=tp.time()
counties = []
sums = []

for i in all_counties:
    #Find the position in the CSV of a specific county (i)
    specific_county_filter = df['fips_code'] == i

    #Filter out the data so that we only look at a specific county (i)
    specific_county_data  = df[specific_county_filter]['sum']

    #Add up the total blackouts of a specific county (i)
    specific_county_sum = specific_county_data.sum()
    
    counties.append(int(i))
    sums.append(int(specific_county_sum))
    

d = {'FIPS Code': counties, 'Blackout Sum' : sums}
blackouts_df = pd.DataFrame(data=d)  
    
t2=tp.time()

print('Total time of script', t2-t1)
blackouts_df.sort_values(by=['FIPS Code'])
blackouts_df.head()

Total time of script 2.6224021911621094


Unnamed: 0,FIPS Code,Blackout Sum
0,1037,970
1,1051,126098
2,1109,4131
3,1121,196
4,4017,2848


Now, lets try the same thing but use MPI 

In [122]:
ID=!sbatch --parsable eaglei_mpi_ex2.sbatch

In [107]:
with open(f"eaglei_mpi_ex2-{ID.n}.out") as f:
    print(f.read())
    
blackouts_df = pd.read_csv('county_blackouts.csv')
blackouts_df.sort_values(by=['FIPS Code'], ascending=True)
blackouts_df.head()

Total time of script 0.5700955390930176



Unnamed: 0,FIPS Code,Blackout Sum
0,17029,3348
1,37161,32891
2,40047,26920
3,40101,16712
4,45073,11271


We can definitely see a speedup in our calculation using MPI to run in parallel, and we even added a step that writes our dataframe to the filesystem in .csv format so that we could load it back up here in our Jupyter notebook. Do you think we would continue to see gains in performance if we add even more parallel processes?

Try editing the `my_ex2.sbatch` file and submit it with the cells below.

In [123]:
ID=!sbatch --parsable my_ex2.sbatch

In [None]:
with open(f"my_ex2-{ID.n}.out") as f:
    print(f.read())
    
blackouts_df = pd.read_csv('county_blackouts.csv')
blackouts_df.sort_values(by=['FIPS Code'], ascending=True)
blackouts_df.head()