In [10]:
%%file Q2.py
import numpy as np
from scipy.integrate import quad
from numpy import sqrt, sin, cos, pi, tan
from collections import deque
from mpi4py import MPI
from  time import time
import matplotlib.pyplot as plt
import math
import sys 
"""
This code uses the 1-D parallelized adaptive algorithm to calculate the integral of a funciton f.
a, b is the integration inteval.
tol is the tolerance.
f is the function of integrand.
result is the approximate result of the integration.
errorEstimate is the error estimated.
finestLevel is the finest grid in the interval.

Some part of the code could be referred to the following link. 
https://github.com/stefantaylor/Parallel-Programming-Languages-and-Systems/blob/master/cw2/aquadPartA.c

A stack is used to store the sub-interval of the domain and sub-tolerance. It is initilized with [a, b, tol].

A farmer cpu is used to monitor the other worker cpu, which are used to calculate the integral or divide the interval.

The main idea of the algorithm is as follows:
1.  A worker cpu calculate the integral I1 and I2 by using Simpson rule and composite Simpson rule
        I1 = h*(fleft + fmid*4 + fright )/6
        I2 = h*(fleft + 4*f((3*left+right)/4) + fmid*2 + 4*f((left+3*right)/4) + fright )/12
2.  If the difference between them is smaller than the tolrance, it will return I2 to the farmer cpu.
    If not, it will divide the tomain by 2, and send the interval to farmer cpu. This worker cpu will be idle.
3.  The farmer cpu pop a interval from the stack, and sends it to a worker cpu which is idle.
4.  After receiving the sub-interval from the worker cpus, the farmer cpu will add them to the stack.  

"""

comm = MPI.COMM_WORLD
rank = comm.rank
numprocs = comm.size

# set a, b tol, maxLevels in the following part
global a, b, tol, maxLevels
a = 0
b = 5
tol = 0.000001
maxLevels = 20000

def f(x):
    return math.exp(-x*x)*sin(12*x)+1
    
def farmer(numprocs, maxLevels):
    myStack = deque()
    points = [a, b, tol]
    myStack.append(points)   # initialize myStack
    worker = []
    newdata =[]
    grid = [a, b]
    status = MPI.Status()
    numberLevels = 0
    finestLevel = b-a
    for i in range(numprocs):
        worker.append(0)
    total =0
    errorEstimate = 0

    while numberLevels <= maxLevels:   # while numberLevels <= maxLevels, do the loop
        numberLevels = 1+numberLevels
        for i in range(1, numprocs):
            # if the worker cpu is idle and the stack is not empty, pop it from the stack and send it to the worker cpu
            if worker[i] == 0 and len(myStack) > 0:   
                data = myStack.pop()
                worker[i] = 1;
                comm.send(data, dest = i, tag = 1 )
        
        newdata = comm.recv(source = MPI.ANY_SOURCE, tag = MPI.ANY_TAG, status=status)
        source = status.Get_source()
        tag = status.Get_tag()
        
        # tag =1 means the interval is too big and the error of integration is larger than tolerance. 
        # we need to divide the interval by two
        if tag == 1:
            numberLevels = numberLevels-1
            myStack.append([newdata[0], newdata[1], newdata[3]])
            myStack.append([newdata[1], newdata[2], newdata[3]])
            grid.append(newdata[1])
            if (newdata[1]-newdata[0]) < finestLevel:
                finestLevel = newdata[1]-newdata[0]
        # error of integration is smaller than tolerance. We can add the integral to the total
        else:
            total = total + newdata[0]
            errorEstimate = errorEstimate+ newdata[1]
        worker[source] = 0

        flag = 0
        for i in range(1, numprocs):
            if worker[i] ==1:
                flag =1
                break
        # flag=0, and stack is empty. The calculation will end
        if flag == 0  and len(myStack) == 0:
            break

    # set all the tag=9, which means to end the calculation 
    for i in range(1, numprocs):
        comm.send(data, dest = i, tag = 9 )
    return total, errorEstimate, finestLevel, grid, numberLevels

def worker(rank):
    status = MPI.Status()
    newdata = [0, 0, 0, 0]
    data = [0, 0, 0]
    while 1:
        data = comm.recv(source=0, tag= MPI.ANY_TAG, status=status) 
        tag = status.Get_tag()
        if tag == 9 :
            break
        else:
            left = data[0]
            right = data[1]
            tol = data[2]
            h = data[1]-data[0]
            fleft = f(left)
            fright = f(right)
            mid = (left + right) / 2
            fmid = f(mid)
            I1 = h*(fleft + fmid*4 + fright )/6  # Simpson rule
            I2 = h*(fleft + 4*f((3*left+right)/4) + fmid*2 + 4*f((left+3*right)/4) + fright )/12 #Compsite Simpson rule
            
            if abs(I2-I1 ) >= 15*tol :
                newdata[0] = left
                newdata[1] = mid
                newdata[2] = right
                newdata[3] = tol/2
                comm.send(newdata, dest = 0, tag = 1 )
            else:
                newdata[0] = I2
                newdata[1] = (I2-I1)/15
                comm.send(newdata, dest = 0, tag = 2 )

if numprocs < 2:
    sys.exit("ERROR: Must have at least 2 processes to run") 

if rank == 0:
    result, errorEstimate, finestLevel, grid, numberLevels = farmer(numprocs, maxLevels)
else:
    worker(rank)

if rank == 0:
    print('#################### Input ###################')
    print("a:         " + str(a))
    print("b:         " + str(b))
    print("tol:       " + str(tol))
    print("maxLevels: " + str(maxLevels))
    print('#################### Output ###################')
    print("result:          " + str(result))
    print("errorEstimate:   " + str(errorEstimate))
    print("numberLevels:    " + str(numberLevels))
    print("finestLevel:     " + '1/'+  str((b-a)/finestLevel) + '*(b-a)')
    print('The function evaluation points could be seen in Q2_Adaptive.png')
    
    # plot the function evaluation points
    grid = sorted(grid)
    fy = []
    for i in grid:
        fy.append(f(i))
    fig = plt.figure(figsize=(12, 8))
    plt.plot(grid, fy,  'b', alpha = 0.8)
    plt.plot(grid, fy,  '*', color='black', alpha = 0.8)
    for i in range(len(grid)):
        plt.plot([grid[i], grid[i]], [0, 0.1],  'b', alpha = 0.8)
    plt.savefig("Q2_Adaptive.png")
  
 

Overwriting Q2.py


In [11]:
!mpiexec -np 6 python Q2.py

#################### Input ###################
a:         0
b:         5
tol:       1e-06
maxLevels: 20000
#################### Output ###################
result:          5.084542656189076
errorEstimate:   3.867412674823702e-08
numberLevels:    91
finestLevel:     1/256.0*(b-a)
The function evaluation points could be seen in Q2_Adaptive.png
