# Introduction to MPI on Amazon SageMaker

Message Passing Interface (MPI) is the fundamental communication protocol for programming parallel computer programs. See its [wiki page](https://en.wikipedia.org/wiki/Message_Passing_Interface). [Open MPI](https://www.open-mpi.org/projects/user-docs/) is the implementation that's used as a basic building block for distributed training systems. 

In Python programs, you can interact with Open MPI APIs via [mpi4py](https://mpi4py.readthedocs.io/en/stable/overview.html) and easily convert your single-process python program into a parallel python program. 

Parallel processes can exist on one host (e.g. one EC2 instance) or multiple hosts (e.g. many EC2 instances). It's trivial to set up a parallel cluster (comm world, in MPI parlance) on one host via Open MPI, but it is less straight-forward to set up an MPI comm world across multiple instances. 

SageMaker does it for you. In this tutorial, you will go through a few basic (but exceeding important) [MPI communications](https://mpi4py.readthedocs.io/en/stable/tutorial.html) on SageMaker with **multiple instances** and you will verify that parallel processes across instances are indeed talking to each other. Those basic communications are the fundamental building blocks for distributed training.

## Environment 
We assume Open MPI and mpi4py have been installed in your environment. This is the case for SageMaker Notebook Instance or Studio. 

## Inspect the Python Program

In [2]:
!pygmentize mpi_demo.py

[34mfrom[39;49;00m [04m[36mmpi4py[39;49;00m [34mimport[39;49;00m MPI
[34mimport[39;49;00m [04m[36mnumpy[39;49;00m [34mas[39;49;00m [04m[36mnp[39;49;00m
[34mimport[39;49;00m [04m[36mtime[39;49;00m

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

[34mif[39;49;00m rank == [34m0[39;49;00m:
    [36mprint[39;49;00m([33m"[39;49;00m[33mNumber of MPI processes that will talk to each other:[39;49;00m[33m"[39;49;00m, size)


[34mdef[39;49;00m [32mpoint_to_point[39;49;00m():
    [33m"""Point to point communication[39;49;00m
[33m    Send a numpy array (buffer like object) from rank 0 to rank 1[39;49;00m
[33m    """[39;49;00m
    [34mif[39;49;00m rank == [34m0[39;49;00m:
        [36mprint[39;49;00m([33m"[39;49;00m[33mpoint to point[39;49;00m[33m"[39;49;00m)
        data = np.array([[34m0[39;49;00m, [34m1[39;49;00m, [34m2[39;49;00m], dtype=np.intc)  [37m# int in C[39;49;00m

        [37m# remember the difference bet

See the program in action with 2 parallel processes on your current environment. Make sure you have at least 2 cores.

In [3]:
!mpirun -np 2 python mpi_demo.py

/bin/bash: mpirun: command not found


## Scale it on SageMaker
You can run the above program with $n$ processes per host across $N$ hosts on SageMaker (and get a comm world of size $n\times N$). In the remaining of this notebook, you will use SageMaker TensorFlow deep learning container to run the above program. There is no particular reason for the choice, all SageMaker deep learning containers have Open MPI installed. So feel free to replace it with your favorite DLC. 

Check out the [SageMaker Python SDK Docs](https://sagemaker.readthedocs.io/en/stable/api/training/smd_model_parallel_general.html?highlight=mpi%20paramters#mpi-parameters) for the parameters needed to set up a distributed training job with MPI. 

In [7]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.tensorflow import TensorFlow

role = get_execution_role()

# Running 2 processes per host
# if we use 3 instances,
# then we should see 6 MPI processes

distribution = {"mpi": {"enabled": True, "processes_per_host": 2}}

tfest = TensorFlow(
    entry_point="mpi_demo.py",
    role=role,
    framework_version="2.3.0",
    distribution=distribution,
    py_version="py37",
    instance_count=3,
    instance_type="ml.c5.xlarge",  # 4 cores
    output_path="s3://" + sagemaker.Session().default_bucket() + "/" + "mpi",
)

In [8]:
tfest.fit()

2022-06-16 16:53:26 Starting - Starting the training job...
2022-06-16 16:53:55 Starting - Preparing the instances for trainingProfilerReport-1655398406: InProgress
.........
2022-06-16 16:55:22 Downloading - Downloading input data...
2022-06-16 16:55:42 Training - Downloading the training image..[34m2022-06-16 16:56:02,651 sagemaker-training-toolkit INFO     Imported framework sagemaker_tensorflow_container.training[0m
[34m2022-06-16 16:56:02,658 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[32m2022-06-16 16:56:02,498 sagemaker-training-toolkit INFO     Imported framework sagemaker_tensorflow_container.training[0m
[32m2022-06-16 16:56:02,505 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[32m2022-06-16 16:56:02,892 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[32m2022-06-16 16:56:02,907 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gp

The stdout "Number of MPI processes that will talk to each other:  6" indicates that the processes on all hosts are included in the comm world. 

## Conclusion
In this notebook, you went through some fundamental MPI operations, which are the bare bones of inner workings of many distributed training frameworks. You did that on SageMaker with multiple instances. You can scale up this set up to include more instances in a real ML project.