# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Supplementary Notebook: Implementation of send and receive operation on a dataset using MPI

## Learning Objectives

At the end of the mini-project, you will be able to :

* implement the collective communication operations like scatter, gather, broadcast on a dataset using MPI

### Dataset

Here, we will be using the “Iris dataset”.The Iris dataset contains 50 samples of 3 different species of iris (150 samples total).

The columns in this dataset are:

- SepalLength (cm)
- SepalWidth (cm)
- PetalLength (cm)
- PetalWidth (cm)
- Species

**Note:** We will be using the mpi4py Python package for MPI based code implementation

**Run the below code to install mpi4py package**

In [1]:
# !pip install mpi4py


#### Importing Necessary Packages

In [2]:
# Importing pandas
import pandas as pd

# Importing Numpy
import numpy as np

# Importing MPI from mpi4py package
from mpi4py import MPI

# Importing sqrt function from the Math
from math import sqrt

# Importing Decimal, ROUND_HALF_UP functions from the decimal package
from decimal import Decimal, ROUND_HALF_UP
import time
from sklearn import datasets


In [3]:
# @title Downloading the data
iris = datasets.load_iris()
dataset = pd.DataFrame(iris.data, columns=iris.feature_names)
dataset["species"] = iris.target
dataset["species"] = dataset["species"].apply(lambda x: iris.target_names[x])
dataset.to_csv("iris_dataset.csv", index=False)
print("Dataset downloaded successfully!")


Dataset downloaded successfully!


### Load data

Write a function that takes the filename as input and loads the data in a pandas dataframe.

In [5]:
# FILENAME = "/content/iris_dataset.csv"  # Storing File path
FILENAME = "iris_dataset.csv"  # Storing File path


# Defining a function to load the data
def loadData(filename):
    # Loading the dataset with column names as
    data = pd.read_csv(filename)
    # Returning the dataframe
    return data


# Calling the function loadData and storing the dataframe in a variable named df
df = loadData(FILENAME)
df.head()


Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa


### Point-to-point Blocking Communication

**Passing the entire Dataframe**

In [None]:
%%writefile passing_dataframe.py
from mpi4py import MPI # Importing mpi4py package from MPI module
import pandas as pd
import numpy as np
# Defining a function

FILENAME = "/content/iris_dataset.csv" # Storing File path
# Defining a function to load the data
def loadData(filename):
    # Loading the dataset with column names as
    data = pd.read_csv(filename)
    # Returning the dataframe
    return data
# Calling the function loadData and storing the dataframe in a variable named df
df = loadData(FILENAME)

def main():
    # Creating a communicator
    comm = MPI.COMM_WORLD
    # number of the process running the code
    rank = comm.Get_rank()
    # total number of processes running
    size = comm.Get_size()
    # master process
    if rank == 0:
        # Generate a dictionary with arbitrary data in it
        data = df
        # master process sends data to worker processes by
        # going through the ranks of all worker processes
        for i in range(1, size):
            # Sending data
            comm.send(data, dest=i, tag=i)
            # Displaying the results
            print('Process {} sent data:'.format(rank), data)
    # worker processes
    else:
        # each worker process receives data from master process
        data = comm.recv(source=0, tag=rank)
        # Displaying the results
        print('Process {} received data:'.format(rank), data)
# Calling the function
main()


In [None]:
!mpirun --allow-run-as-root -np 4 python passing_dataframe.py


### Collective Communication

In MPI for Python, the `Comm.Bcast`, `Comm.Scatter`, `Comm.Gather`, `Comm.Allgather`, `Comm.Alltoall` methods provide support for collective communications of memory buffers. The lower-case variants `Comm.bcast`, `Comm.scatter`, `Comm.gather`, `Comm.allgather` and `Comm.alltoall` can communicate general Python objects.

#### **Broadcasting the entire Dataframe**

In [None]:
%%writefile BroadcastingDataframe.py
from mpi4py import MPI # Importing mpi4py package from MPI module
import numpy as np
import pandas as pd

FILENAME = "/content/iris_dataset.csv" # Storing File path
# Defining a function to load the data
def loadData(filename):
    # Loading the dataset with column names as
    data = pd.read_csv(filename)
    # Returning the dataframe
    return data
# Calling the function loadData and storing the dataframe in a variable named df
df = loadData(FILENAME)

# Defining a function
def main():
    comm = MPI.COMM_WORLD
    id = comm.Get_rank()            #number of the process running the code
    numProcesses = comm.Get_size()  #total number of processes running
    if id == 0:
        # Generate a dictionary with arbitrary data in it
        data = df
    else:
        # start with empty data
        data = None
    # Broadcasting the data
    data = comm.bcast(data, root=0)
    # Printing the data along with the id number
    print('Rank: ', id,', received data: ' , data, '\n')

# Calling a function
main()


In [None]:
! mpirun --allow-run-as-root -np 4 python BroadcastingDataframe.py


#### **Scatter Operation on the Dataframe**

- Create a function to divide the dataframe equally among different processes.
- Perform scatter operation

In [None]:
%%writefile ScatteringDataframe.py
from mpi4py import MPI # Importing mpi4py package from MPI module
import numpy as np
import pandas as pd
from decimal import Decimal, ROUND_HALF_UP # Importing Decimal, ROUND_HALF_UP functions from the decimal package

FILENAME = "/content/iris_dataset.csv" # Storing File path
# Defining a function to load the data
def loadData(filename):
    # Loading the dataset with column names as
    data = pd.read_csv(filename)
    # Returning the dataframe
    return data
# Calling the function loadData and storing the dataframe in a variable named df
df = loadData(FILENAME)

def dividing_data(dataset, size_of_workers):
    #Divide the data among the workers
    slice_for_each_worker = int(Decimal(dataset.shape[0]/size_of_workers).quantize(Decimal('1.'), rounding = ROUND_HALF_UP))
    print('Slice of data for each worker: {}'.format(slice_for_each_worker))
    data_for_worker = []
    for i in range(0, size_of_workers):
        if i < size_of_workers - 1:
            data_for_worker.append(dataset[slice_for_each_worker*i:slice_for_each_worker*(i+1)])
        else:
            data_for_worker.append(dataset[slice_for_each_worker*i:])
    return data_for_worker

# Defining a function
def main():
    # communicator
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()   # number of the process running the code
    size = comm.Get_size()   # total number of processes running
    data = None # Starting with an empty  data
    if rank == 0:
        # Creating a Numpy array.
        data = dividing_data(df, size)
    # scatter operation
    received_data = comm.scatter(data, root=0)
    # Displaying the result
    print('Rank: ', rank, ', recvbuf: ', received_data)

# Calling the main function
main()


In [None]:
! mpirun --allow-run-as-root -np 4 python ScatteringDataframe.py


#### **Gather Operation on the Dataframe**

In [None]:
%%writefile GatherringDataframe.py
from mpi4py import MPI # Importing mpi4py package from MPI module
import numpy as np
import pandas as pd
from decimal import Decimal, ROUND_HALF_UP # Importing Decimal, ROUND_HALF_UP functions from the decimal package

FILENAME = "/content/iris_dataset.csv" # Storing File path
# Defining a function to load the data
def loadData(filename):
    # Loading the dataset with column names as
    data = pd.read_csv(filename)
    # Returning the dataframe
    return data
# Calling the function loadData and storing the dataframe in a variable named df
df = loadData(FILENAME)

# Defining a function
def main():
    # communicator
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()   # number of the process running the code
    size = comm.Get_size()   # total number of processes running
    slice_for_each_worker = int(Decimal(df.shape[0]/size).quantize(Decimal('1.'), rounding = ROUND_HALF_UP))   # Number of elements in a array for each rank
    # Creating a sender buffer array
    if rank < size-1:
        sendbuf = df[slice_for_each_worker*rank:slice_for_each_worker*(rank+1)]
    else:
        sendbuf = df[slice_for_each_worker*rank:]
    # Printing the result
    print('Rank: ',rank, ', sendbuf: ', sendbuf)
    recvbuf = None
    # Gathering the Information
    recvbuf = comm.gather(sendbuf, root = 0)
    # Display the result
    if rank == 0:
        print('Rank: ',rank, ', recvbuf received: ', recvbuf)

# Calling a function
main()


In [None]:
! mpirun --allow-run-as-root -np 4 python GatherringDataframe.py
