Accessing IPython cluster clients and printing their ids to check.

In [1]:
import ipyparallel as parallel

clients = parallel.Client()
clients.block = True  # use synchronous computations
print (clients.ids)
print ("FINSHED")
dv = clients[:]

[0, 1, 2]
FINSHED


Importing mpi4py and numpy.

In [2]:
%%px
from mpi4py import MPI
import numpy as np

Implementing Convolution, you don't need to modify this code.

In [4]:
%%px
def convolve_func(main,kernel,KERNEL_DIM,DIMx,DIMy,upper_pad,lower_pad):
	num_pads = int((KERNEL_DIM - 1) / 2)
	conv = np.zeros(main.shape,dtype=int)
	main = np.concatenate((upper_pad,main,lower_pad))
	for i in range(DIMy):
		for j in range(DIMx):
			for k in range(KERNEL_DIM):
				for l in range(KERNEL_DIM):
					if j+l <= DIMx+1 and i+k>=num_pads and i+k<=DIMy:
						conv[j*DIMy+i] += main[(j+l)*DIMy+i-num_pads+k]#*kernel[k][l]
	return conv

5 points: 
Load MPI communicator, get the total number of processes and rank of the process                              

In [5]:
%%px
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

5 points: 
Load or initialize data array and kernel array only in process 0(rank 0)                                      

In [6]:
%%px
DIMx = 0
DIMy = 0
KERNEL_DIM = 0

#Add a condition such that these intializations below should happen in only process 0
if rank == 0:
    img = np.array([[3, 9, 5, 9],[1, 7, 4, 3],[2, 1, 6, 5],
                    [3, 9, 5, 9],[1, 7, 4, 3],[2, 1, 6, 5],
                    [3, 9, 5, 9],[1, 7, 4, 3],[2, 1, 6, 5]])
    kernel = np.array([[0, 1, 0],[0, 0, 0],[0, -1, 0]])
    DIMx = img.shape[0]
    DIMy = img.shape[1]
    KERNEL_DIM = int(kernel.shape[0])

10 points: 
Broadcast data and kernel array sizes from process 0 to  all other processes                                 

In [7]:
%%px
#broadcast data and kernel array sizes (think why we are broadcasting sizes)
DIMx = comm.bcast(DIMx, root=0)
DIMy = comm.bcast(DIMy, root=0)
KERNEL_DIM = comm.bcast(KERNEL_DIM, root=0)

Initialize empty kernel array for all  processes except rank = 0, why we are not initialzing kernel array for rank 0?

Ans:The kernel array is already initalized for rank 0 from a previous node. If I initialized it here the values would be overwritten, removing the real data. 

In [8]:
%%px
#initialize empty kernel array except for process 0(rank=0)
if rank != 0:
    kernel = np.empty((KERNEL_DIM, KERNEL_DIM), dtype=int)

10 points: 
Broadcast Kernel array from rank 0 to all other processes.                                                   

In [9]:
%%px
#broadcast kernel array from rank 0 to all other processes
kernel = comm.bcast(kernel, root=0)

25 points: 
Split the rows in data array equally and scatter them from process 0 to all other process. To split them 
equally, number of rows in the data array must be a integral multiple of number of processes. MPI has ways 
to send unequal chunks of data between processses. But for here you can do with equal number.

In [10]:
%%px
#split and send data array to corresponding processses (you need to initialize a buffer to receive data from 
#process 0, similar to the random initializing done for kernel array)

rows = DIMx // size

localImg = np.empty((rows, DIMy), dtype=int)

if rank == 0:
    comm.Scatter(img, localImg, root=0)
else:
    comm.Scatter(None, localImg, root=0)
#Here does we initialize buffer for process 0 also, if so why?(Hint: because of the function we are using to send 
#and receieve data)

25 points: 
For convolution of kernel array and data array, you have to pass the kernel padding rows from one
process to another. please see objective for more details. Send and Recieve rows from one process 
to other. Careful with the data size and tags you are sending and receiving should match otherwise
commincator will wait for them indefintely.                                                                  

In [11]:
%%px
#send padding rows from one process to other (carefully observe which process to send data to which process and
# which process receives the data)

upperPad = np.zeros(DIMy, dtype=int)
lowerPad = np.zeros(DIMy, dtype=int)

if rank == 0:
    comm.send(localImg[-1, :], dest=1, tag=0)
    lowerPad = comm.recv(source=1, tag=1)

elif rank == 1:
    upperPad = comm.recv(source=0, tag=0)
    comm.send(localImg[0, :], dest=0, tag=1)

    comm.send(localImg[-1, :], dest=2, tag=2)
    lowerPad = comm.recv(source=2, tag=3)

elif rank == 2:
    upperPad = comm.recv(source=1, tag=2)
    comm.send(localImg[0, :], dest=1, tag=3)
    

Why we are loading data into process 0 and broadcasting input data to all other processes? are there any other methods to load data into all processes (not for evaluation)

Ans: The scatter operation requires a source array to split from and process 0 holds the original data. We could use Bcast to send the data but this would send the entire array instead of just the smaller, strictly needed data which saves resources. 

5 points: 
Perform Convolution operation by calling convolve_func() provided for each of the process with 
corresponding rows as arguments.                                                                             

In [12]:
%%px
#convolution function arguments
#main - data array (flattened array), only the part of the data array that is processed for each process
#kernel - kernel array
#DIMy - ColumnSize
#Dimx - RowSize
#upper_pad = upper padding row
#lower_pad = lower padding row
main = localImg.flatten()

convResult = convolve_func(main, kernel, KERNEL_DIM, rows, DIMy, upperPad, lowerPad)

10 points: 
Gather the computed convolutional matrix rows to process 0.                                                 

In [13]:
%%px
#To receive data from all processes, process 0 should have a buffer
if rank == 0:
    recvbuf = np.empty((DIMx * DIMy), dtype=int)
else:
    recvbuf = None

comm.Gather(convResult, recvbuf, root=0)

Reshape the flattened array to match input dimensions

In [14]:
%%px
#Reshape the collected array to the input image dimensions
if rank == 0:
    recvbuf = recvbuf.reshape(DIMx, DIMy)

5 points: 
Test to check sequential convolution and MPI based parallel convolution outputs                               

In [15]:
#I could not resolve the LookupError that was occuring when trying to do the parallel 
#execution so I added this instead using zero padding instead of upper/lower. 
#Based on this output I believe I correctly implemented the lab.


def _compare_on_rank0():
    try:
        main_grid = img.flatten()
        conv1 = convolve_func(main_grid, kernel, KERNEL_DIM, DIMx, DIMy, np.zeros(DIMy, dtype=int), np.zeros(DIMy, dtype=int))
        conv1 = conv1.reshape(DIMx, DIMy)
        return np.array_equal(conv1, recvbuf)
    except Exception as e:
        return f"error: {e}"

result = clients[0].apply_sync(_compare_on_rank0)
print("Compare result:", result)

Compare result: True


In [16]:
%%px
if rank == 0:
    #main_grid is the actual input input image array that is flattened
    #convolution function arguments
    #main_grid - data array (flattened array)
    #kernel - kernel array
    #DIMy - ColumnSize
    #Dimx - RowSize
    #upper_pad = upper padding row
    #lower_pad = lower padding row
    
    #rename the below arguments according to your variable names
    main_grid = img.flatten()
    
    #Entire convolution in a single process
    conv1 = convolve_func(main_grid,kernel,KERNEL_DIM,DIMx,DIMy,upperPad,upperPad)
    conv1 = np.reshape(conv1, (-1, DIMx))
    #recvbuf is the convolution computed by parallel processes and gathered in process 0, 
    #if you named it different, modify that name below
    
    #Checking with parallel convolution output
    print(np.array_equal(conv1, recvbuf))

Exception in callback BaseAsyncIOLoop._handle_events()
handle: <Handle BaseAsyncIOLoop._handle_events()>
Traceback (most recent call last):
  File "C:\Users\joeno\anaconda3\envs\myenv\Lib\asyncio\events.py", line 89, in _run
    self._context.run(self._callback, *self._args)
    ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\joeno\anaconda3\envs\myenv\Lib\site-packages\tornado\platform\asyncio.py", line 208, in _handle_events
    handler_func(fileobj, events)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "C:\Users\joeno\anaconda3\envs\myenv\Lib\site-packages\zmq\eventloop\zmqstream.py", line 600, in _handle_events
    self._handle_recv()
    ~~~~~~~~~~~~~~~~~^^
  File "C:\Users\joeno\anaconda3\envs\myenv\Lib\site-packages\zmq\eventloop\zmqstream.py", line 629, in _handle_recv
    self._run_callback(callback, msg)
    ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^
  File "C:\Users\joeno\anaconda3\envs\myenv\Lib\site-packages\zmq\eventloop\zmqstream.py", line 550, in _run_callback
    f =