# DS-GA 1019

# Lab 11: Collectives 
## Apr. 13, 2023

### Collectives
Communication patterns that involve data exchange across multiple ranks.

### Broadcast 

Root rank sends the same single message to multiple receivers.

- actually it's not sending one info to everyone at once. It's time consuming & one node has bottleneck to send. It's actually more like a tree sequential process

<img   src="images/im1.png" style="width: 500px;"/>




Naive implementation: 7 time steps needed

<img   src="images/im3.png" style="width: 500px;"/>


Parallel implementation: 3 time steps needed

<img   src="images/im4.png" style="width: 500px;"/>

More info on implementaion of MPI Broadcast - https://mpitutorial.com/tutorials/mpi-broadcast-and-collective-communication/#broadcasting-with-mpi_send-and-mpi_recv


### Scatter 

Root rank sends a different message to each receiver.

<img   src="images/im2.png" style="width: 500px;"/>


### Gather 

Root rank sends a different message to each receiver.

<img   src="images/im5.png" style="width: 500px;"/>

**All gather**: One gather for each rank

<img   src="images/im6.png" style="width: 500px;"/>


### Average with gather and scatter

Compute the average of first 100 whole numbers

In [14]:
%%writefile avg.py

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD

my_rank = comm.Get_rank()
p = comm.Get_size()

n = 100

if my_rank == 0:
    data = np.arange(n,dtype='f')       #shape: (100)
    data = data.reshape(p, int(n/p))    #shape: (4,25), each process gets 25
    
else:
    data = None

recvbuf = np.empty(int(n/p),dtype='f')

#Scatter the numbers to all processes
comm.Scatter(data,recvbuf, root = 0)    
# why can't we do Scatter for rank 0 only?
# > because other need to node they are operating Scatter, 
# root=0 is making sure only data from rank=0 is doing operation



#Compute the average of subset
sub_avg = np.mean(recvbuf)             

print('Process {}, sub array average is {}'.format(my_rank,sub_avg))

sub_avgs = None

if my_rank == 0:
    sub_avgs = np.empty(p,dtype='f')

#Gather the partial averages down to the root process
#Comm.Gather(sendbuf, recvbuf, root=0)
comm.Gather(sub_avg, sub_avgs, root=0)
    
#Compute the total average of all numbers
if my_rank == 0:
    avg = np.mean(sub_avgs)
    print('Average is',avg)


Overwriting avg.py


In [15]:
!mpiexec -n 4 python avg.py

Process 1, sub array average is 0.0
Process 2, sub array average is 0.0
Process 3, sub array average is 0.0
Process 0, sub array average is 12.0
Average is 3.0


### Reduce 

Reduces a set of numbers into a smaller set of numbers via a function

<img   src="images/im7.png" style="width: 500px;"/>

**All reduce**: Reduce + each rank obtains the result

<img   src="images/im8.png" style="width: 500px;"/>


### StdDev with All reduce

Using all reduce to compute the standard deviation of a set of numbers

In [5]:
import numpy as np
n=100
p=4
np.empty(int(n/p),dtype='f').shape

(25,)

In [3]:
%%writefile std.py

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD

my_rank = comm.Get_rank()
p = comm.Get_size()

n = 100

if my_rank == 0:
    data = np.random.random(n).astype('f')     # 100   
    data = data.reshape(p, int(n/p))           # 4, 25
    
else:
    data = None

recvbuf = np.empty(int(n/p),dtype='float64') # (25,) to receive info from scatter source

#Scatter the numbers to all processes
comm.Scatter(data,recvbuf, root = 0)   # 0 to 0,1,2,3

local_sum = sum(recvbuf)
print('Process {}, local sum = {:.4f}'.format(my_rank, local_sum))

#Reduce local sums into a global sum to calculate the mean
global_sum = comm.allreduce(local_sum, MPI.SUM)
mean = global_sum/n

#Compute the local sum of squared differences from the mean

local_sq_diff = sum((num-mean)**2 for num in recvbuf)

#Reduce the global sum of the squared differences to the root process
global_sq_diff = comm.reduce(local_sq_diff, MPI.SUM, 0)

if my_rank == 0:
    stddev = np.sqrt(global_sq_diff / n)
    print('\nMean: {:.4f}, Standard deviation: {:.4f}'.format(mean, stddev))
    print('Standard deviation using numpy: {:.4f}'.format(np.std(data)))

Overwriting std.py


In [4]:
!mpiexec -n 4 python std.py

Process 0, local sum = 11.7849
Process 2, local sum = 13.4773
Process 3, local sum = 11.7496
Process 1, local sum = 12.2990

Mean: 0.4931, Standard deviation: 0.2585
Standard deviation using numpy: 0.2585


### Parallel merge sort *

*Inefficient implementation

In [1]:
%%writefile mergesort.py

from mpi4py import MPI
import numpy as np
import timeit

comm = MPI.COMM_WORLD

my_rank = comm.Get_rank()
p = comm.Get_size()

def merge(L, R):

    f=[]
    i = j = 0
          
    # Copy data to temp arrays L[] and R[] 
    while i < len(L) and j < len(R): 
        if L[i] < R[j]: 
            f.append(L[i] )
            i+=1
        else: 
            f.append(R[j]) 
            j+=1

    # Checking if any element was left 
    while i < len(L): 
        f.append(L[i]) 
        i+=1

    while j < len(R): 
        f.append(R[j]) 
        j+=1
    
    return f

def merge_sort(l):
    
    if len(l) <= 1:
        return l
    
    left = merge_sort(l[: len(l)//2])
    right = merge_sort(l[len(l)//2 :])
    
    
    return merge(left, right)


n = 10000


def serial_mergesort():
    data = np.random.randint(0, n, n).astype('f')
    sorted_data = merge_sort(data)
        

def par_mergesort():
    if my_rank == 0:
        data = np.random.randint(0, n, n).astype('f')
        data = data.reshape(p, int(n/p))    
    else:
        data = None

    sub_array = np.empty(int(n/p),dtype='f')
    #Send subarrays to each process
    comm.Scatter(data,sub_array, root = 0)

    sorted_arr = merge_sort(sub_array)

    data = comm.gather(sorted_arr,root=0)
    
    if my_rank == 0:
        j = len(data)-1

        while j != 0:
            i = 0
            while i<j:
                data[i] = merge(data[i], data[j])
                i += 1
                j -= 1

        l = data[0]
        assert all(l[i] <= l[i+1] for i in range(len(l)-1))

        
tp = timeit.Timer("par_mergesort()","from __main__ import par_mergesort")
ts = timeit.Timer("serial_mergesort()","from __main__ import serial_mergesort")

parallel_time = tp.timeit(number=10)


if my_rank == 0:
    serial_time = ts.timeit(number=10)
    
    print ('Parallel time: {:.4f} sec'.format(parallel_time)) 
    print ('Serial time: {:.4f} sec'.format(serial_time)) 



Overwriting mergesort.py


In [2]:
!mpiexec -n 4 python mergesort.py

(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(2500,)
(4, 2500)
(2500,)
(4, 2500)
(2500,)
(4, 2500)
(2500,)
(4, 2500)
(2500,)
(4, 2500)
(2500,)
(4, 2500)
(2500,)
(4, 2500)
(2500,)
(4, 2500)
(2500,)
(4, 2500)
(2500,)
(4, 2500)
(2500,)
Parallel time: 0.2361 sec
Serial time: 0.4283 sec
