# Getting this notebook. 

Run: 

*git clone https://github.com/krishnaa423/parallel_tutorial.git*

# Helper code 

In [1]:
import os 

def write_string_to_file(filename, string):
    with open(filename, 'w') as f: f.write(string)
    
os.environ['MPI_RUN'] = 'mpirun'

# Tutorial Overview

- Python
- MPI Hello World
- Send and Receive
- Race condition
- Broadcast
- Scatter
- Gather
- Reduce
- Plot computation time vs number of tasks for sum calculation

# Python

## Variables, Operations, Printing. 

In [2]:
# Declare some variables. 
a = 2
b = 3

# Add two numbers. 
c = a + b

# Print results. 
print(f'The sum of {a} and {b} is: {c}')

The sum of 2 and 3 is: 5


## Numpy. 

Stands for numeric python. It is a useful module to do operations on arrays. 

In [3]:
# Import the numpy module. 
import numpy as np 

# Create couple arrays. 
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])

# Add them. This adds the arrays elementwise. 
c = a + b

# Print the result. 
print(f'The sum of arrays {a} and {b} is: {c}')

The sum of arrays [1 2 3] and [4 5 6] is: [5 7 9]


# Parallel computing using MPI in Python

MPI stands for Message Passing Interface. It can be thought of as a set of functions that can be used to do parallel computations. 

We will be using Python to demonstate some cool parallel programming concepts. For this, we will use the python module called mpi4py. 

Some terminology:
- Task: Each parallel worker is called a task.
- Communicator: A set of tasks that are used to do a computation.

It is a little tricky to run MPI code right in the notebook, since we have to usually launch it through a scheduler. To get around that, we will write the code out to a file and then run the file using an MPI launcher. 

## MPI Hello World

This example prints out the rank of each task and the size of the communicator. 

In [4]:
filename = 'hello_world.py'

string = \
'''
from mpi4py import MPI

comm = MPI.COMM_WORLD

mpi_rank = comm.Get_rank()
mpi_size = comm.Get_size()

print(f'Hello world from rank {mpi_rank} of {mpi_size}')
'''

write_string_to_file(filename, string)

!${MPI_RUN} -n 4 python3 hello_world.py

Hello world from rank 1 of 4
Hello world from rank 0 of 4
Hello world from rank 2 of 4
Hello world from rank 3 of 4


## Send and Receive Arrays

We can send an array from one task to an another task. 

In the example below, we create an array in rank/task=0 and send it to rank/task=1.

In [5]:
filename = 'send_recv.py'

string = \
'''
from mpi4py import MPI
import numpy as np 

comm = MPI.COMM_WORLD

mpi_size = comm.Get_size()
mpi_rank = comm.Get_rank()

# Set array to zero initially. 
array_size = 4
a = np.zeros(shape=(array_size,))

# Set to some array before sending and receiving. 
if mpi_rank==0: 
    a = np.arange(array_size).astype(dtype='f8')
    

# Print arrays before the operation.  
print(f'Before send and receive. Rank: {mpi_rank}, a: {a}')

# Send and Receive. 
if mpi_rank==0:
    comm.Send([a, MPI.DOUBLE], dest=1, tag=10)
elif mpi_rank==1:
    comm.Recv([a, MPI.DOUBLE], source=0, tag=10)

comm.Barrier()
print('')

# Print arrays after the operation. 
print(f'After send and receive. Rank: {mpi_rank}, a: {a}') 
'''

write_string_to_file(filename, string)

!${MPI_RUN} -n 4 python3 send_recv.py

Before send and receive. Rank: 1, a: [0. 0. 0. 0.]
Before send and receive. Rank: 2, a: [0. 0. 0. 0.]
Before send and receive. Rank: 3, a: [0. 0. 0. 0.]
Before send and receive. Rank: 0, a: [0. 1. 2. 3.]




After send and receive. Rank: 2, a: [0. 0. 0. 0.]
After send and receive. Rank: 3, a: [0. 0. 0. 0.]
After send and receive. Rank: 0, a: [0. 1. 2. 3.]
After send and receive. Rank: 1, a: [0. 1. 2. 3.]


## Race Conditions

It is possible to send and receive without waiting for these operations to finish, in case we want to do other operations in the meantime. But it is important to wait for the receive to compelete before using the array. 

The example below shows the pitfalls of accessing the receive array before waiting for it to be received. We case this a race condition. 

In [6]:
filename = 'race_condition.py'

string = \
'''
from mpi4py import MPI
import numpy as np 

comm = MPI.COMM_WORLD

mpi_size = comm.Get_size()
mpi_rank = comm.Get_rank()

# Set array to zero initially. 
array_size = 4
a = np.zeros(shape=(array_size,))

# Set to some array before sending and receiving. 
if mpi_rank==0: 
    a = np.arange(array_size).astype(dtype='f8')
    

# Print arrays before the operation.  
print(f'Before send and receive. Rank: {mpi_rank}, a: {a}')

# Send and Receive. 
if mpi_rank==0:
    comm.Isend([a, MPI.DOUBLE], dest=1, tag=10)
elif mpi_rank==1:
    req = comm.Irecv([a, MPI.DOUBLE], source=0, tag=10)


# Print arrays after the operation. 
print(f'After send and receive. Rank: {mpi_rank}, a: {a}') 

# Print arrays after waiting. 
if mpi_rank==1: req.Wait()

comm.Barrier()
print('')
print(f'After send and receive and waiting. Rank: {mpi_rank}, a: {a}') 
'''

write_string_to_file(filename, string)

!${MPI_RUN} -n 2 python3 race_condition.py

Before send and receive. Rank: 1, a: [0. 0. 0. 0.]
Before send and receive. Rank: 0, a: [0. 1. 2. 3.]
After send and receive. Rank: 0, a: [0. 1. 2. 3.]
After send and receive. Rank: 1, a: [0. 0. 0. 0.]


After send and receive and waiting. Rank: 0, a: [0. 1. 2. 3.]
After send and receive and waiting. Rank: 1, a: [0. 1. 2. 3.]


## Broadcast Operation

We can send an array from one rank/task to all ranks/tasks. This is called a broadcast operation.

In [7]:
filename = 'broadcast.py'


string = \
'''
from mpi4py import MPI
import numpy as np 

comm = MPI.COMM_WORLD

mpi_size = comm.Get_size()
mpi_rank = comm.Get_rank()

# Set array to zero initially. 
array_size = 4
a = np.zeros(shape=(array_size,))

# Set to some array before broadcast. 
if mpi_rank==0: 
    a = np.arange(array_size).astype(dtype='f8')
    

# Print arrays before the operation.  
print(f'Before broadcast. Rank: {mpi_rank}, a: {a}')

# Broadcast. 
comm.Bcast([a, MPI.DOUBLE], root=0)

comm.Barrier()
print('')

# Print arrays after the operation. 
print(f'After broadcast. Rank: {mpi_rank}, a: {a}') 
'''

write_string_to_file(filename, string)

!${MPI_RUN} -n 4 python3 broadcast.py

Before broadcast. Rank: 1, a: [0. 0. 0. 0.]
Before broadcast. Rank: 2, a: [0. 0. 0. 0.]
Before broadcast. Rank: 3, a: [0. 0. 0. 0.]
Before broadcast. Rank: 0, a: [0. 1. 2. 3.]




After broadcast. Rank: 0, a: [0. 1. 2. 3.]
After broadcast. Rank: 1, a: [0. 1. 2. 3.]
After broadcast. Rank: 2, a: [0. 1. 2. 3.]
After broadcast. Rank: 3, a: [0. 1. 2. 3.]


## Scatter Operation

We can split/scatter an array from a rank/task to all the tasks in the communicator. This is done using Scatter operation in MPI, as shown in the example below. 

In [8]:
filename = 'scatter.py'

string = \
'''
from mpi4py import MPI
import numpy as np 

comm = MPI.COMM_WORLD

mpi_size = comm.Get_size()
mpi_rank = comm.Get_rank()

# Set to some array before scatter operation.
array_size = 8
if mpi_rank==0: 
    a = np.arange(array_size, dtype='f8')
else:
    a = np.zeros(array_size, dtype='f8')
b = np.zeros(int(array_size/mpi_size), dtype='f8')

# Print arrays before the operation.  
print(f'Before scatter. Rank: {mpi_rank}, a: {a}, b: {b}')

# Scatter. 
comm.Scatter(a, b, root=0)

comm.Barrier()
print('')

# Print arrays after the operation. 
print(f'After scatter. Rank: {mpi_rank}, a: {a}, b: {b}') 
'''

write_string_to_file(filename, string)

!${MPI_RUN} -n 4 python3 scatter.py

Before scatter. Rank: 0, a: [0. 1. 2. 3. 4. 5. 6. 7.], b: [0. 0.]
Before scatter. Rank: 3, a: [0. 0. 0. 0. 0. 0. 0. 0.], b: [0. 0.]
Before scatter. Rank: 2, a: [0. 0. 0. 0. 0. 0. 0. 0.], b: [0. 0.]
Before scatter. Rank: 1, a: [0. 0. 0. 0. 0. 0. 0. 0.], b: [0. 0.]




After scatter. Rank: 0, a: [0. 1. 2. 3. 4. 5. 6. 7.], b: [0. 1.]
After scatter. Rank: 1, a: [0. 0. 0. 0. 0. 0. 0. 0.], b: [2. 3.]
After scatter. Rank: 2, a: [0. 0. 0. 0. 0. 0. 0. 0.], b: [4. 5.]
After scatter. Rank: 3, a: [0. 0. 0. 0. 0. 0. 0. 0.], b: [6. 7.]


## Gather Operation


Sometimes after scattering the array and letting each rank/task do some operations, we might want to collect the array in a single task. This is done using the Gather operation as shown below. 

In [9]:
filename='gather.py'

string = \
'''
from mpi4py import MPI
import numpy as np 

comm = MPI.COMM_WORLD

mpi_size = comm.Get_size()
mpi_rank = comm.Get_rank()

# Set to some array before scatter operation.
array_size = 8
if mpi_rank==0: 
    a = np.arange(array_size, dtype='f8')
else:
    a = np.zeros(array_size, dtype='f8')
b = np.zeros(int(array_size/mpi_size), dtype='f8')
c = np.zeros(array_size, dtype='f8')
comm.Scatter(a, b, root=0)

# Print arrays before the operation.  
print(f'Before gather. Rank: {mpi_rank}, b: {b}, c: {c}')

# Gather. 
comm.Gather(b, c, root=0)

comm.Barrier()
print('')

# Print arrays after the operation. 
print(f'After gather. Rank: {mpi_rank}, b: {b}, c: {c}') 
'''

write_string_to_file(filename, string)

!${MPI_RUN} -n 4 python3 gather.py

Before gather. Rank: 0, b: [0. 1.], c: [0. 0. 0. 0. 0. 0. 0. 0.]
Before gather. Rank: 1, b: [2. 3.], c: [0. 0. 0. 0. 0. 0. 0. 0.]

Before gather. Rank: 2, b: [4. 5.], c: [0. 0. 0. 0. 0. 0. 0. 0.]

Before gather. Rank: 3, b: [6. 7.], c: [0. 0. 0. 0. 0. 0. 0. 0.]


After gather. Rank: 1, b: [2. 3.], c: [0. 0. 0. 0. 0. 0. 0. 0.]
After gather. Rank: 2, b: [4. 5.], c: [0. 0. 0. 0. 0. 0. 0. 0.]
After gather. Rank: 3, b: [6. 7.], c: [0. 0. 0. 0. 0. 0. 0. 0.]
After gather. Rank: 0, b: [0. 1.], c: [0. 1. 2. 3. 4. 5. 6. 7.]


## Reduce Operation

This example showcases a reduce operation, which sums all the arrays in different ranks/tasks and stores them in the root rank/task, which is 0 in this case. 

In [10]:
filename='reduce.py'

string = \
'''
from mpi4py import MPI
import numpy as np 

comm = MPI.COMM_WORLD

mpi_size = comm.Get_size()
mpi_rank = comm.Get_rank()

# Set to some array before scatter operation.
array_size = 8
if mpi_rank==0: 
    a = np.arange(array_size, dtype='f8')
else:
    a = np.zeros(array_size, dtype='f8')
b = np.zeros(array_size, dtype='f8')
c = np.zeros(array_size, dtype='f8')
comm.Scatterv(a, b, root=0)

# Print arrays before the operation.  
print(f'Before reduce. Rank: {mpi_rank}, b: {b}, c: {c}')

# Reduce. 
comm.Reduce(b, c, root=0)


comm.Barrier()
print('')

# Print arrays after the operation. 
print(f'After reduce. Rank: {mpi_rank}, b: {b}, c: {c}') 
'''

write_string_to_file(filename, string)

!${MPI_RUN} -n 4 python3 reduce.py

Before reduce. Rank: 0, b: [0. 1. 0. 0. 0. 0. 0. 0.], c: [0. 0. 0. 0. 0. 0. 0. 0.]
Before reduce. Rank: 1, b: [2. 3. 0. 0. 0. 0. 0. 0.], c: [0. 0. 0. 0. 0. 0. 0. 0.]

Before reduce. Rank: 2, b: [4. 5. 0. 0. 0. 0. 0. 0.], c: [0. 0. 0. 0. 0. 0. 0. 0.]

Before reduce. Rank: 3, b: [6. 7. 0. 0. 0. 0. 0. 0.], c: [0. 0. 0. 0. 0. 0. 0. 0.]


After reduce. Rank: 1, b: [2. 3. 0. 0. 0. 0. 0. 0.], c: [0. 0. 0. 0. 0. 0. 0. 0.]
After reduce. Rank: 2, b: [4. 5. 0. 0. 0. 0. 0. 0.], c: [0. 0. 0. 0. 0. 0. 0. 0.]
After reduce. Rank: 3, b: [6. 7. 0. 0. 0. 0. 0. 0.], c: [0. 0. 0. 0. 0. 0. 0. 0.]
After reduce. Rank: 0, b: [0. 1. 0. 0. 0. 0. 0. 0.], c: [12. 16.  0.  0.  0.  0.  0.  0.]


## Time for summing an array vs number of MPI tasks

This example combines operations above to sum an array efficiently. We create an array in rank/task=0, then scatter it, reduce it, and add the resulting array in the root rank/task. We do this for a set of task sizes and see how much time it takes. 

Feel free to play around with the *array_size* and *task_sizes* variables in the code below to see how the timings change. 

What do you observe? Do you see any patterns when array_sizes get smaller or larger?

You might notice that as the array size is too small, the performance might get worse with number of tasks, as the communication overhead might overweigh the computational cost. We can say that the calculation is IO bound for small arrays and CPU bound for large arrays. 

In [11]:
filename='time_vs_tasks.py'

string = \
'''
# Some variables. 
array_size = 10000000
task_sizes = [1, 2, 3, 4, 5, 6, 7, 8]


from mpi4py import MPI 
import numpy as np 
import time 

def add_array(array_size, comm: MPI.Comm):
    
    if comm==MPI.COMM_NULL: return 0, 0
    
    mpi_size = comm.Get_size()
    mpi_rank = comm.Get_rank()
    
    if mpi_rank==0: 
        a = np.arange(array_size, dtype='f8')
    else:
        a = np.zeros(array_size, dtype='f8')
    b = np.zeros(array_size, dtype='f8')
    c = np.zeros(1, dtype='f8')
    sum = np.zeros(1, dtype='f8')
    
    start_time = MPI.Wtime()
    comm.Scatterv(a, b, root=0)
    c[0] = np.sum(b)
    comm.Barrier()
    comm.Reduce(c, sum, op=MPI.SUM, root=0)
    
    sum = sum[0]
    
    end_time = MPI.Wtime()
    elapsed_time = end_time - start_time
        
    return sum, elapsed_time

def get_comm_array(comm: MPI.Comm, task_sizes: list):
    
    comm_array = []
    
    for task_size in task_sizes:
        group = comm.Get_group()    
        group = group.Incl(range(task_size))
        comm_temp = comm.Create(group)
        comm_array.append(comm_temp)
        
    return comm_array
    
    
def main():
    global array_size
    global task_sizes

    comm = MPI.COMM_WORLD
    
    mpi_size = comm.Get_size()
    mpi_rank = comm.Get_rank()
    
    
    
    comm_array = get_comm_array(comm, task_sizes)
    
    ntasks = []
    sums = []
    times = []
    
    for comm_entry in comm_array:
        sum, elapsed_time = add_array(array_size, comm_entry)
        if mpi_rank==0: ntasks.append(comm_entry.Get_size())
        if mpi_rank==0: times.append(elapsed_time)
        if mpi_rank==0: sums.append(sum)
        
    # Plot the sums vs time. 
    if mpi_rank==0:
        for ntask, sum, time in zip(ntasks, sums, times):
            print(f'ntask: {ntask}, time: {time}, sum: {sum}')
    
    
if __name__=='__main__':
    main()
'''

write_string_to_file(filename, string)

!${MPI_RUN} -n 8 python3 time_vs_tasks.py

ntask: 1, time: 0.01741371700001082, sum: 49999995000000.0
ntask: 2, time: 0.01752307099999939, sum: 49999995000000.0
ntask: 3, time: 0.01763409800000204, sum: 49999995000000.0
ntask: 4, time: 0.018580310000004374, sum: 49999995000000.0
ntask: 5, time: 0.016705353000020295, sum: 49999995000000.0
ntask: 6, time: 0.017415216000017608, sum: 49999995000000.0
ntask: 7, time: 0.016154229000022724, sum: 49999995000000.0
ntask: 8, time: 0.017784209000012652, sum: 49999995000000.0
