# Advanced MPI Programming with mpi4py

This notebook is designed for advanced users and researchers. We cover the following topics:

1. Basic MPI Environment: Initialization, Rank, Size
2. Point-to-Point Communication: Blocking Send/Recv
3. Non-blocking Communication: Isend/Irecv with Wait/Waitall
4. Collective Communication: Broadcast, Scatter, Gather, Reduce, Allreduce, Alltoall
5. Derived Datatypes and NumPy Integration
6. Subcommunicators and Communicator Splitting
7. Topology Mapping: Cartesian Grids
8. Performance Measurement: MPI_Wtime
9. Final example: Calculating PI

Make sure you have an MPI installation (OpenMPI/MPICH) and `mpi4py` installed:
```bash
pip install mpi4py
```

To run the examples using 4 processes, for instance:
```bash
mpiexec -n 4 python example_file.py
```

## 1. Basic MPI Environment

Initialize MPI, determine each process's rank and the total number of processes. This is the foundation of all MPI programs.

`comm = MPI.COMM_WORLD` is the default communicator that includes all processes. `rank` is the process ID, and `size` is the total number of processes.

`comm.Get_rank()`: Get the rank of the current process.

`comm.Get_size()`: Get the total number of processes.


Code example:
```python
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print(f"[Basic] Process {rank} of {size}")
``` 

In [2]:
%%bash
# Run script
mpirun -n 4 python3 ./py-codes/1_hello_basic.py

[Basic] Process 0 of 4
[Basic] Process 1 of 4
[Basic] Process 2 of 4
[Basic] Process 3 of 4


## 2. Point-to-Point Communication

Demonstrate blocking communication with `Send` and `Recv`. In this example, process 0 sends a message to process 1.

`comm.Send([data, datatype], dest=1, tag=0)`: Send data to a process.

`comm.Recv([data, datatype], source=0, tag=0)`: Receive data from a process.

Code example:
```python
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if size < 2:
    raise Exception("This example requires at least 2 processes.")

if rank == 0:
    data = np.array([1, 2, 3], dtype=np.int32)
    comm.Send([data, MPI.INT], dest=1, tag=0)
    print(f"[P2P] Process {rank} sent {data} to process 1")
elif rank == 1:
    data = np.empty(3, dtype=np.int32)
    comm.Recv([data, MPI.INT], source=0, tag=0)
    print(f"[P2P] Process {rank} received {data} from process 0")
```

In [None]:
%%bash
# Run script
mpirun -n 2 python3 ./py-codes/2_blocking_comm.py

Process 0 sent 1 to process 1
Process 1 received [1 2 3] from process 0


## 3. Non-blocking Communication

Use `Isend` and `Irecv` for nonblocking communications. Here we demonstrate how a process can initiate a send or receive and continue with computation.

```comm.Isend([data, datatype], dest=1, tag=0)```: Initiate a non-blocking send.

```comm.Irecv([data, datatype], source=0, tag=0)```: Initiate a non-blocking receive.

```req.Wait()```: Wait for the non-blocking operation to complete.

Code example:

```python
from mpi4py import MPI
import numpy as np
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if size < 2:
    raise Exception("This example requires at least 2 processes.")

if rank == 0:
    data = np.array([123], dtype=np.int32)
    # Simulate computation
    print(f"[Nonblocking] Process {rank} is sending {data[0]}")
    req = comm.Isend([data, MPI.INT], dest=1, tag=0)
    # Simulate computation
    print(f"[Nonblocking] Process {rank} is doing some computation")
    time.sleep(4)
    req.Wait()  # Ensure the send has completed
    print(f"[Nonblocking] Process {rank} completed send of {data[0]} after req.Wait")
elif rank == 1:
    data = np.empty(1, dtype=np.int32)
    req = comm.Irecv([data, MPI.INT], source=0, tag=0)
    # Simulate computation
    print(f"[Nonblocking] Process {rank} is doing some computation")
    time.sleep(2)
    req.Wait()  # Ensure the receive has completed
    print(f"[Nonblocking] Process {rank} completed receive with {data[0]} after req.Wait")
```

In [18]:
%%bash
# Run script
mpirun -n 2 python3 ./py-codes/3_non_blocking_comm.py

[Nonblocking] Process 0 is sending 123
[Nonblocking] Process 0 is doing some computation
[Nonblocking] Process 1 is doing some computation
[Nonblocking] Process 1 completed receive with 123 after req.Wait
[Nonblocking] Process 0 completed send of 123 after req.Wait


## 4. Collective Communication

Collective operations involve all processes in a communicator. We cover broadcast, scatter, gather, reduce, allreduce, and all-to-all.

`comm.bcast(data, root=0)`: Broadcast data from one process to all others.

`comm.scatter(sendbuf, recvbuf, root=0)`: Scatter an array from one process to all others.

`comm.gather(sendbuf, recvbuf, root=0)`: Gather an array from all processes to one.

`comm.reduce(sendbuf, recvbuf, op=MPI.SUM, root=0)`: Reduce data from all processes to one.

`comm.allreduce(sendbuf, recvbuf, op=MPI.SUM)`: Reduce data from all processes and distribute the result to all.

`comm.alltoall(sendbuf, recvbuf)`: Exchange data between all processes.

Code example:
```python
# Import mpi4py's MPI module
import time
from mpi4py import MPI

# Set up MPI communicator and get parameters
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# MPI_Bcast: Broadcast from process 0 to all processes
if rank == 0:
    bcast_data = 100
else:
    bcast_data = None
bcast_data = comm.bcast(bcast_data, root=0)
print(f"[Collective] Rank {rank} received bcast_data: {bcast_data}")
time.sleep(1)

# MPI_Scatter: Scatter an array from process 0 to all processes
if rank == 0:
    scatter_data = [i * 10 for i in range(size)]
else:
    scatter_data = None
scatter_recv = comm.scatter(scatter_data, root=0)
print(f"[Collective] Rank {rank} received scatter data: {scatter_recv}")
time.sleep(1)

# MPI_Gather: Gather data from all processes to process 0
send_val = rank
gather_data = comm.gather(send_val, root=0)
if rank == 0:
    print("[Collective] Gathered data:", gather_data)
time.sleep(1)

# MPI_Reduce: Reduce data from all processes (sum) and send result to process 0
reduce_sum = comm.reduce(send_val, op=MPI.SUM, root=0)
if rank == 0:
    print("[Collective] Sum of ranks (reduce):", reduce_sum)
time.sleep(1)

# MPI_Allreduce: Reduce data (sum) and distribute the result to all processes
allreduce_sum = comm.allreduce(send_val, op=MPI.SUM)
print(f"[Collective] Rank {rank} has allreduce sum: {allreduce_sum}")
time.sleep(1)

# MPI_Alltoall: Exchange data between all processes
alltoall_send = [rank * 100 + i for i in range(size)]
alltoall_recv = comm.alltoall(alltoall_send)
print(f"[Collective] Rank {rank} received alltoall data: {alltoall_recv}")
```


In [21]:
%%bash
# Run script
mpirun -n 4 python3 ./py-codes/4_collective.py

[Collective] Rank 0 received bcast_data: 100
[Collective] Rank 2 received bcast_data: 100
[Collective] Rank 1 received bcast_data: 100
[Collective] Rank 3 received bcast_data: 100
[Collective] Rank 0 received scatter data: 0
[Collective] Rank 2 received scatter data: 20
[Collective] Rank 1 received scatter data: 10
[Collective] Rank 3 received scatter data: 30
[Collective] Gathered data: [0, 1, 2, 3]
[Collective] Sum of ranks (reduce): 6
[Collective] Rank 0 has allreduce sum: 6
[Collective] Rank 1 has allreduce sum: 6
[Collective] Rank 2 has allreduce sum: 6
[Collective] Rank 3 has allreduce sum: 6
[Collective] Rank 0 received alltoall data: [0, 100, 200, 300]
[Collective] Rank 1 received alltoall data: [1, 101, 201, 301]
[Collective] Rank 2 received alltoall data: [2, 102, 202, 302]
[Collective] Rank 3 received alltoall data: [3, 103, 203, 303]


## 5. Derived Datatypes and NumPy Integration

While Python is dynamically typed, using NumPy arrays with MPI can be considered analogous to using derived datatypes in C. In mpi4py, NumPy arrays can be sent directly.

`MPI.Datatype.Create_struct(blocklengths, offsets, types)`: Create a derived datatype.

`MPI.Datatype.Create_resized(lb, extent)`: Resize a datatype to match the size of a NumPy data type.

`datatype.Commit()`: Commit the datatype for use.

`datatype.Free()`: Free the datatype after use.



Code example:
```python
from mpi4py import MPI
import numpy as np

# Define a structured NumPy data type.
derived_dtype = np.dtype([
    ('id', np.int32),
    ('value',   np.float64),
    ('timestamp', np.float64)
])

# Compute block lengths, displacements (offsets), and MPI types.
blocklengths = [1, 1, 1]
offsets = [derived_dtype.fields[field][1] for field in derived_dtype.names]
types = [MPI.INT, MPI.DOUBLE, MPI.DOUBLE]

# Create the MPI datatype
derived_mpi_type = MPI.Datatype.Create_struct(blocklengths, offsets, types)
derived_mpi_type.Commit()

# Align it properly to match NumPy's structure size
aligned_derived_mpi_type = derived_mpi_type.Create_resized(0, derived_dtype.itemsize)
aligned_derived_mpi_type.Commit()

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if size < 2:
    raise Exception("This example requires at least 2 processes.")

if rank == 0:
    sensor_data = np.array([
        (101, 23.45, 1618033.98),
        (102, 27.89, 1618034.98)
    ], dtype=derived_dtype)
    
    comm.Send([sensor_data, aligned_derived_mpi_type], dest=1, tag=42)
    print(f"[Derived] Process {rank} sent sensor data:")
    print(sensor_data)
    
elif rank == 1:
    sensor_data_recv = np.empty(2, dtype=derived_dtype)
    comm.Recv([sensor_data_recv, aligned_derived_mpi_type], source=0, tag=42)
    print(f"[Derived] Process {rank} received sensor data:")
    print(sensor_data_recv)

aligned_derived_mpi_type.Free()
derived_mpi_type.Free()

```

In [36]:
%%bash
# Run script
mpirun -n 2 python3 ./py-codes/5_derived.py

[Derived] Process 0 sent sensor data:
[(101, 23.45, 1618033.98) (102, 27.89, 1618034.98)]
[Derived] Process 1 received sensor data:
[(101, 23.45, 1618033.98) (102, 27.89, 1618034.98)]


## 6. Subcommunicators and Communicator Splitting

Splitting communicators allows you to create groups of processes that can work on subproblems. In this example, we split processes based on even/odd rank.

`comm.Split(color, key)`: Split the communicator based on color and key.

Code example:
```python
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# Each process selects a color based on even/odd rank
color = rank % 2

# Split the communicator into subcommunicators based on the color.
subcomm = comm.Split(color, rank)

subrank = subcomm.Get_rank()
subsize = subcomm.Get_size()

print(f"[Subcomm] Global rank {rank} in group {color}, subrank {subrank} (subsize {subsize})")

subcomm.Free()
MPI.Finalize()
```

In [30]:
%%bash
# Run script
mpirun -n 4 python3 ./py-codes/6_subcommunicators.py

[Subcomm] Global rank 0 in group 0, subrank 0 (subsize 2)
[Subcomm] Global rank 1 in group 1, subrank 0 (subsize 2)
[Subcomm] Global rank 2 in group 0, subrank 1 (subsize 2)
[Subcomm] Global rank 3 in group 1, subrank 1 (subsize 2)


## 7. Topology Mapping: Cartesian Grids

Cartesian communicators are useful for structured grid applications. Here we create a 2D grid communicator.

`MPI.Compute_dims(size, ndims)`: Compute the dimensions of a Cartesian grid.

`comm.Create_cart(dims, periods, reorder)`: Create a Cartesian communicator.

`comm.Get_coords(rank)`: Get the coordinates of a process in the grid.


Code example:
```python
# python
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

# Create a 2D Cartesian grid communicator.
dims = MPI.Compute_dims(size, 2)
periods = [True, True]
reorder = True
cart_comm = comm.Create_cart(dims, periods=periods, reorder=reorder)

cart_rank = cart_comm.Get_rank()
coords = cart_comm.Get_coords(cart_rank)

print(f"[Cartesian] Global rank {rank} has coords: {coords}")

cart_comm.Free()
MPI.Finalize()
```

In [32]:
%%bash
# Run script
mpirun -n 4 python3 ./py-codes/7_cartesian.py

[Cartesian] Global rank 0 has coords: [0, 0]
[Cartesian] Global rank 1 has coords: [0, 1]
[Cartesian] Global rank 2 has coords: [1, 0]
[Cartesian] Global rank 3 has coords: [1, 1]


## 8. Performance Measurement

Use `MPI.Wtime()` to measure elapsed time. In performance-critical applications, time barriers and collective operations to optimize communication.

`MPI.Wtime()`: Get the current time in seconds.

Code example:
```python
# python
from mpi4py import MPI
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# Synchronize processes.
comm.Barrier()
start = MPI.Wtime()

comm.Barrier()
time.sleep(1)  # Simulate work

end = MPI.Wtime()

if rank == 0:
    print(f"[Performance] Barrier took {end - start:.6f} seconds")

MPI.Finalize()
```


In [34]:
%%bash
# Run script
mpirun -n 2 python3 ./py-codes/8_performance_measure.py

[Performance] Barrier took 1.000600 seconds


## 9. Simple example for calculating PI using MPI


```python
from mpi4py import MPI
import sys
import math

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# Process 0 reads the number of intervals from the command line.
if rank == 0:
    if len(sys.argv) != 2:
        print("Usage: python3 9_final.py <number_of_intervals>")
        comm.Abort(1)
    try:
        n = int(sys.argv[1])
    except ValueError:
        print("Please provide an integer for number_of_intervals.")
        comm.Abort(2)
else:
    n = None

# Broadcast the number of intervals to all processes
n = comm.bcast(n, root=0)

h = 1.0 / n
local_sum = 0.0

t_start = MPI.Wtime()

# Each process computes its portion of the sum.
# Process 'rank' handles i = rank, rank+size, rank+2*size, ...
for i in range(rank, n, size):
    x = i * h
    local_sum += 4.0 / (1.0 + x * x)
local_sum *= h

# Reduce all partial sums into the final value of pi on process 0.
pi = comm.reduce(local_sum, op=MPI.SUM, root=0)
t_end = MPI.Wtime()

if rank == 0:
    error = abs(pi - math.pi)
    print(f"Calculated Pi = {pi:.16f}")
    print(f"Elapsed time = {t_end - t_start:.6f} seconds")
    print(f"Error = {error:.16f}")

MPI.Finalize()
```

In [48]:
%%bash
# Run script
mpirun -n 4 python3 ./py-codes/9_final.py 100000000

Calculated Pi = 3.1415926635898743
Elapsed time = 2.942581 seconds
Error = 1.000008e-08


## Conclusion

We have covered advanced MPI topics in Python using mpi4py including process management, collective communication, derived datatypes, subcommunicators, topology mapping, performance measurement, and a final real-life example of parallel matrix multiplication. Use these building blocks to develop scalable and efficient HPC applications.

Happy HPC programming!