## Parallel Programming

Once all the options in “serial (or sequential) processing” paradigm have been exhausted, and if we
still need further speed-up, “parallel processing” is the next step.


### Shared memory - multiprocessing

Processors share the access to the same memory. OpenMP is a typical example. OpenMP enables
concurrently running multiple threads, with the runtime environment allocating threads to different
processors. 

Python has Global Interpreter Lock (GIL), which prevents multiple native threads from
executing Python bytecodes at once, and as a result, there is no OpenMP package for Python.

Python's standard "multiprocessing" module implements the shared memory programing paradigm. 


### How to spawn a process

The term "spawn" means the creation of a process by a parent process. The parent process can continue its execution asynchronously or wait until the child process ends its execution. 

The `multiprocessing` library allows the spawning of a process through the following steps:

1. Build the object process
2. Call its `start()` method. This method starts the process's activity
3. Call its `join()` method. It waits until the process has completed its works and exited.

In [None]:
from multiprocessing import Process

def call(i):
    print('called function in process: %s' %i)
    return

if __name__ == '__main__':
    Process_jobs = []
    for i in range(5):
        p = Process(target=call, args=(i,))
        Process_jobs.append(p)
        p.start()
        p.join()

called function in process: 0
called function in process: 1
called function in process: 2
called function in process: 3
called function in process: 4


In [71]:
import multiprocessing as mp
from multiprocessing import Process

# dummy function
def f(id):
    #This is a dummy function taking a parameter
    return

if __name__ == '__main__':

    # get the number of CPUs
    np = mp.cpu_count()
    print('You have %d CPUs' %(np))

    # Create the processes
    p_list=[]
    for i in range(1,np+1):
        p = Process(target=f, name='Process'+str(i), args=(i,))
        p_list.append(p)
        print('Process:: ', p.name)
        p.start()
        print('Was assigned PID:: ', p.pid)

    # Wait for all the processes to finish
    for p in p_list:
        p.join()

You have 8 CPUs
Process::  Process1
Was assigned PID::  21892
Process::  Process2
Was assigned PID::  21893
Process::  Process3
Was assigned PID::  21894
Process::  Process4
Was assigned PID::  21895
Process::  Process5
Was assigned PID::  21896
Process::  Process6
Was assigned PID::  21897
Process::  Process7
Was assigned PID::  21898
Process::  Process8
Was assigned PID::  21899


Instantiating the `Process` object within the main section `if __name__ == '__main__'` is important, because the child process created imports the script file where the target function is contained. Then, by instantiating the process object within this block, we prevent an infinite recursive call of such instantiations. 

### How to name a process

It is very useful to associate a name to the processes as debugging an anpplication requires the processes to be well marked and identifiable



In [25]:
import multiprocessing as mp
import time

def call():
    name = mp.current_process().name
    print("Starting %s \n" %name)
    time.sleep(3)
    print("Exiting %s \n" %name)

if __name__ == '__main__':
    process_with_name = mp.Process(name='call_process', target=call)
    process_with_name.daemon = True
    process_with_default_name = mp.Process(target=call)
    process_with_name.start()
    process_with_default_name.start()

Starting call_process 

Starting Process-28 

Exiting call_process 

Exiting Process-28 



### How to use a process in a subclass

* Define a new subclass of the `Process` class
* Override the `_init__(self[, args])` method to add additional arguments
* Override the `run(self[, args])` method to implement what Process should when it is started

In [26]:
import multiprocessing as mp

class MyProcess(mp.Process):
    def run(self):
        print('called run method in process: %s' %self.name)
        return
    
if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = MyProcess()
        jobs.append(p)
        p.start()
        p.join()

called run method in process: MyProcess-29
called run method in process: MyProcess-30
called run method in process: MyProcess-31
called run method in process: MyProcess-32
called run method in process: MyProcess-33


### How to exchange objects between processes

The `multiprocessing` library has two communication channels with which it manages the exchange of objects: queues and pipes.

#### Using queue to exchange objects


In [32]:
import multiprocessing as mp
import random
import time

class producer(mp.Process):
    def __init__(self, queue):
        mp.Process.__init__(self)
        self.queue = queue
        
    def run(self):
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            print("Process Producer : item %d appended to queue %s"\
                  % (item, self.name))
            time.sleep(1)
            #print("The size of queue is %s" % self.queue.qsize()) # Unix system throws error
            
class consumer(mp.Process):
    def __init__(self, queue):
        mp.Process.__init__(self)
        self.queue = queue
        
    def run(self):
        while True:
            if(self.queue.empty()):
                print("the queue is empty")
                break
            else:
                time.sleep(2)
                item = self.queue.get()
                print('Processor Consumer: item %d popped from by %s \n'\
                     % (item, self.name))
                time.sleep(1)

if __name__ == '__main__':
    queue = mp.Queue()
    process_producer = producer(queue)
    process_consumer = consumer(queue)
    process_producer.start()
    process_consumer.start()
    process_producer.join()
    process_consumer.join()

Process Producer : item 138 appended to queue producer-44
Process Producer : item 88 appended to queue producer-44
Processor Consumer: item 138 popped from by consumer-45 
Process Producer : item 56 appended to queue producer-44

Process Producer : item 159 appended to queue producer-44
Process Producer : item 130 appended to queue producer-44
Processor Consumer: item 88 popped from by consumer-45 

Process Producer : item 156 appended to queue producer-44
Process Producer : item 195 appended to queue producer-44
Process Producer : item 56 appended to queue producer-44
Processor Consumer: item 56 popped from by consumer-45 

Process Producer : item 205 appended to queue producer-44
Process Producer : item 242 appended to queue producer-44
Processor Consumer: item 159 popped from by consumer-45 

Processor Consumer: item 130 popped from by consumer-45 

Processor Consumer: item 156 popped from by consumer-45 

Processor Consumer: item 195 popped from by consumer-45 

Processor Consumer:

#### Using pipes to exchange objects

A pipe does the following:
* Returns a pair of connection objects connected by a pipe
* Every object has send/receive methods to communicate between processes

In [34]:
import multiprocessing as mp

def create_items(pipe):
    output_pipe, _ = pipe
    for item in range(10):
        output_pipe.send(item)
    output_pipe.close()
    
def multiply_items(pipe_1, pipe_2):
    close, input_pipe = pipe_1
    close.close()
    output_pipe, _ = pipe_2
    try:
        while True:
            item = input_pipe.recv()
            output_pipe.send(item * item)
    except EOFError:
        output_pipe.close()
        
if __name__ == '__main__':

    # First process pipe with numbers from 0 to 9
    pipe_1 = mp.Pipe(True)
    process_pipe_1 = mp.Process(target=create_items, args=(pipe_1,))
    process_pipe_1.start()
    
    # Second pipe
    pipe_2 = mp.Pipe(True)
    process_pipe_2 = mp.Process(target=multiply_items, args=(pipe_1, pipe_2,))
    process_pipe_2.start()
    
    pipe_1[0].close()
    pipe_2[0].close()
    
    try:
        while True:
            print(pipe_2[1].recv())
            
    except EOFError:
        print("End")

0
1
4
9
16
25
36
49
64
81
End


### How to synchronize processes

Multiple processes can work together to perform a given task. Usually, they share data. It is important that the access to shared data by various processes does not produce inconsistent data. 

Processes that coorperate by sharing data must therefore act in an orderly manner in order to access that data. 

* Lock: This object can be in one of the states: locked and unlocked. A lock object has two methods, `acquire()` and `release()`, to manage the access to a shared resource.
* Event: This realizes simple communication between processes, one process signals an event and the other processes wait for it. An `Event` object has two methods, `set()` and `clear()`, to manage its own internal flag.
* Condition: This object is used to synchronize parts of a workflow, in sequential or parallel processes. It has two basic methods, `wait()` is used to wait for a condition and `notify_all()` is used to communicate the condition that was applied.
* Semaphore: This is used to share a common resource, for example, to support a fixed number of simultaneous connections
* RLock: This defines the recursive `lock` object. The methods and functionality for `RLock` are the same as the `Threading` module
* Barrier: This divides a program into phases as it requires all of the processes to reach it before any of them proceeds. Code that is executed after a barrier cannot be concurrent with the code executed before the barrier

Here we show the use of `barrier()` to synchronize two processes. Say four processes, where process1 and process2 are managed by a barrier statement, while process3 and process4 have no synchronizations directives:

In [35]:
import multiprocessing as mp
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime

def test_with_barrier(synchronizer, serializer):
    name = mp.current_process().name
    synchronizer.wait()
    now = time()
    with serializer:
        print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
        
def test_without_barrier():
    name = mp.current_process().name
    now = time()
    print("process %s ----> %s" % (name, datatime.fromtimestamp(now)))
    
if __name__ == '__main__':
    synchronizer = Barrier(2) #2 stands for the total number of process
    serializer = Lock()
    Process(name='p1 - test_with_barrier', target=test_with_barrier, \
           args=(synchronizer, serializer)).start()
    Process(name='p2 - test_with_barrier', target=test_with_barrier, \
           args=(synchronizer, serializer)).start()
    Process(name='p3 - test_without_barrier', target=test_with_barrier, \
           args=(synchronizer, serializer)).start()
    Process(name='p4 - test_without_barrier', target=test_with_barrier, \
           args=(synchronizer, serializer)).start()

process p2 - test_with_barrier ----> 2018-01-19 13:07:40.408318
process p1 - test_with_barrier ----> 2018-01-19 13:07:40.408509
process p4 - test_without_barrier ----> 2018-01-19 13:07:40.421257
process p3 - test_without_barrier ----> 2018-01-19 13:07:40.421350


We can see that process1 and process2 print out the same timestamps.

### How to manage a state between processes
Python multiprocessing provides a manager to coordinate shared information between all its users. A maanger object controls a server process that holds Python objects and llows other processes to manipulate them. 

A manager has the following properties:
* It controls the server process that manages a shared object
* It makes sure the shared object gets updated in all processes when anyone modifies it

In [38]:
import multiprocessing as mp

def worker(dictionary, key, item):
    dictionary[key] = item
    print("key = %d, value = %d" % (key, item))

if __name__ == '__main__':
    mgr = mp.Manager()
    dictionary = mgr.dict()
    jobs = [mp.Process(target=worker, args=(dictionary, i, i**2)) for i in range(10)]
    
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    
    print('Results:', dictionary)

key = 0, value = 0
key = 1, value = 1
key = 2, value = 4
key = 3, value = 9
key = 4, value = 16
key = 5, value = 25
key = 6, value = 36
key = 7, value = 49
key = 8, value = 64
key = 9, value = 81
Results: {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}


### How to use a process pool
The multiprocessing library provides the `Pool` class for simple parallel processing tasks. The `Pool` class has the following methods:
`apply()`: It blocks until the result is ready
`apply_async()`: This is a variant of the `apply()` method, which returns a result object. It is an asynchronous operation that will not lock the main thread until all the child classes are executed
`map()`: It blocks until the result is ready, this method chops the iterable data in a number of chunks that submits to the process pool as separate tasks. 
`map_async()`: If a callback is specified, then it should be callable, which accepts a single argument. When the result becomes ready, a callback is applied to it (unless the call failed). A callback should be completed immediately.

In [55]:
import multiprocessing as mp

def square(data):
    result = data*data
    return result

if __name__ == '__main__':
    inputs = list(range(100))
    pool = mp.Pool(processes=4)
    pool_outputs = pool.map(square, inputs)
    pool.close()
    pool.join()
    print('Pool:', pool_outputs)

Pool: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]


In [56]:
import multiprocessing as mp
from multiprocessing import Pool

def square(data):
    result = data*data
    return result

if __name__ == '__main__':
    inputs = list(range(100))
    with Pool(4) as p:
        print('Pool:', p.map(square, inputs))

Pool: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]


### Serial vs. parallel programming

Let's revisit the case of computing Pi using numerical integration. Now we want to calculate the Pi for multiple times with different number of steps.

In [97]:
def Pi(num_steps):

    step = 1.0/num_steps
    sum = 0
    for i in range(num_steps):
        x= (i+0.5)*step
        sum = sum + 4.0/(1.0+x*x)
        pi = step * sum
    
    return pi

if __name__ == '__main__':
    start = time.time()
    pi = []
    for steps in range(1000, 9000, 1000):
        pi.append(Pi(steps))

    print(pi)
    print("Time taken %s second" %(time.time() - start))

[3.1415927369231227, 3.14159267442312, 3.141592662849059, 3.141592658798146, 3.1415926569231227, 3.1415926559046072, 3.1415926552904714, 3.1415926548918773]
Time taken 0.00751805305480957 second


In [98]:
def Pi(num_steps):

    step = 1.0/num_steps
    sum = 0
    for i in range(num_steps):
        x= (i+0.5)*step
        sum = sum + 4.0/(1.0+x*x)
        pi = step * sum
        
    return pi
        
if __name__ == '__main__':
    np = multiprocessing.cpu_count()
    print('You have {0:1d} CPUs'.format(np))
    start = time.time()
    with Pool(np) as p:
        print(p.map(Pi, list(range(1000, 9000, 1000))))
    print("Time taken %s second" %(time.time() - start))

You have 8 CPUs
[3.1415927369231227, 3.14159267442312, 3.141592662849059, 3.141592658798146, 3.1415926569231227, 3.1415926559046072, 3.1415926552904714, 3.1415926548918773]
Time taken 0.17608308792114258 second


Parallel model actually takes much longer time!! How could this happen?

Let's assume we want more CPU intensive task, by increasing the number of steps by three order of maginitude when estimating Pi value.

In [99]:
def Pi(num_steps):

    step = 1.0/num_steps
    sum = 0
    for i in range(num_steps):
        x= (i+0.5)*step
        sum = sum + 4.0/(1.0+x*x)
        pi = step * sum
    
    return pi

if __name__ == '__main__':
    start = time.time()
    pi = []
    for steps in range(1000000, 9000000, 1000000):
        pi.append(Pi(steps))

    print(pi)
    print("Time taken %s second" %(time.time() - start))

[3.1415926535897643, 3.141592653589994, 3.1415926535896097, 3.141592653589683, 3.141592653589587, 3.1415926535895613, 3.1415926535900187, 3.1415926535900724]
Time taken 6.243754863739014 second


In [100]:
def Pi(num_steps):

    step = 1.0/num_steps
    sum = 0
    for i in range(num_steps):
        x= (i+0.5)*step
        sum = sum + 4.0/(1.0+x*x)
        pi = step * sum
        
    return pi
        
if __name__ == '__main__':
    np = multiprocessing.cpu_count()
    print('You have {0:1d} CPUs'.format(np))
    start = time.time()
    with Pool(np) as p:
        print(p.map(Pi, list(range(1000000, 9000000, 1000000))))
    print("Time taken %s second" %(time.time() - start))

You have 8 CPUs
[3.1415926535897643, 3.141592653589994, 3.1415926535896097, 3.141592653589683, 3.141592653589587, 3.1415926535895613, 3.1415926535900187, 3.1415926535900724]
Time taken 1.9258959293365479 second


Now the paralle model performs faster. It is because tasks are performed by multiple cores.

What if we keep increasing the computation intesntity?

In [93]:
def Pi(num_steps):

    step = 1.0/num_steps
    sum = 0
    for i in range(num_steps):
        x= (i+0.5)*step
        sum = sum + 4.0/(1.0+x*x)
        pi = step * sum
    
    return pi

if __name__ == '__main__':
    start = time.time()
    pi = []
    for steps in range(100000000, 900000000, 100000000):
        pi.append(Pi(steps))

    print(pi)
    print("Time taken %s second" %(time.time() - start))

[3.1415926535904264, 3.141592653590478, 3.141592653589358]
Time taken 107.65656399726868 second


In [106]:
def Pi(num_steps):

    step = 1.0/num_steps
    sum = 0
    for i in range(num_steps):
        x= (i+0.5)*step
        sum = sum + 4.0/(1.0+x*x)
        pi = step * sum
        
    return pi
        
if __name__ == '__main__':
    np = multiprocessing.cpu_count()
    print('You have {0:1d} CPUs'.format(np))
    start = time.time()
    with Pool(np) as p:
        print(p.map(Pi, list(range(1000000, 9000000, 1000000))))
    print("Time taken %s second" %(time.time() - start))

You have 8 CPUs
[3.1415926535897643, 3.141592653589994, 3.1415926535896097, 3.141592653589683, 3.141592653589587, 3.1415926535895613, 3.1415926535900187, 3.1415926535900724]
Time taken 2.1180169582366943 second


We now run the benchmark to understand how multi-core parallel computing can be beneificial to computing intensive tasks

In [52]:
width = [10 ** n for n in range(0,5)]

def Pi(num_steps):

    step = 1.0/num_steps
    sum = 0
    start = time.time()
    for i in range(num_steps):
        x= (i+0.5)*step
        sum = sum + 4.0/(1.0+x*x)
        pi = step * sum
    end = time.time()
        
    return (num_steps, pi, end-start)

def serial():
    return [Pi(num_steps) for num_steps in width]

def multiprocess(processes):
    pool = mp.Pool(processes=processes)
    results = [pool.map(Pi, width)]
    #results = [p.get() for p in results]
    #results.sort() # to sort the results by input window width
    return results

In [53]:
multiprocess(4)

NameError: name 'mp' is not defined

In [57]:
multiprocess(16)

NameError: name 'time' is not defined

In [58]:
serial()

NameError: name 'time' is not defined

In [143]:
import random
import multiprocessing
from multiprocessing import Pool

#caculate the number of points in the unit circle
#out of n points
def monte_carlo_pi_part(n):    
    count = 0
    for i in range(n):
        x=random.random()
        y=random.random()
        
        # if it is within the unit circle
        if x*x + y*y <= 1:
            count=count+1
    return count

if __name__=='__main__':
    
    np = multiprocessing.cpu_count()
    print('You have {0:1d} CPUs'.format(np))

    # Nummber of points to use for the Pi estimation
    n = 10000000
    
    # iterable with a list of points to generate in each worker
    # each worker process gets n/np number of points to calculate Pi from

    part_count=[n/np for i in range(np)]

    pool = Pool(processes=np)   

    # parallel map
    count=pool.map(monte_carlo_pi_part, part_count)

    print("Esitmated value of Pi:: ", sum(count)/(n*1.0)*4) 

You have 8 CPUs


TypeError: 'float' object cannot be interpreted as an integer

Process ForkPoolWorker-342:
Process ForkPoolWorker-346:
Process ForkPoolWorker-340:
Process ForkPoolWorker-341:
Process ForkPoolWorker-344:
Process ForkPoolWorker-345:
Process ForkPoolWorker-347:
Process ForkPoolWorker-343:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/anaconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/anaconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/anaconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/anaconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    se

#### Exercise

Use the multiprocessing to calculate the sum of prime numbers less than n

In [131]:
from multiprocessing import Pool
import time

def sum_prime(num):
    
    sum_of_primes = 0

    ix = 2

    while ix <= num:
        if is_prime(ix):
            sum_of_primes += ix
        ix += 1

    return sum_of_primes

def is_prime(num):
    if num <= 1:
        return False
    elif num <= 3:
        return True
    elif num%2 == 0 or num%3 == 0:
        return False
    i = 5
    while i*i <= num:
        if num%i == 0 or num%(i+2) == 0:
            return False
        i += 6
    return True

if __name__ == '__main__':
    start = time.time()
    with Pool(1) as p:
        print(p.map(sum_prime, [1000000, 2000000, 3000000]))
    print("Time taken = {0:.5f}".format(time.time() - start))

[37550402023, 142913828922, 312471072265]
Time taken = 23.71322


### Distributed Memory – mpi4Py

Each processor (CPU or core) accesses its own memory and processes a job. If a processor needs to access data resident in the memory owned by another processor, these two processors need to exchange “messages”. Python supports MPI (Message Passing Interface) through `mpi4py` module. `mpi4py` provides open source python bindings to most of the functionality of the MPI-2 standard of the message passing interface MPI.

In [133]:
#helloWorld_MPI.py

from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
print("hello world from process ", rank)

hello world from process  0


In MPI, the processes involved in the execution of a parallel program are identified by a sequence of non-negative integers called ranks. 

If we have a number p of processes that runs a program, the processes will then have a rank that goes from 0 to `p-1`. The function MPI that comes to us to solve this problem has the following function calls:

```python
rank = comm.Get_rank()
```

This function returns the rank of the process that called it. The comm argument is called a communicator, as it defines its own set of all processes that can communicate together:

```python
comm = MPI.COMM_WORLD
```

In [4]:
!mpiexec -n 5 python helloWorld_MPI.py

hello world from process  0
hello world from process  4
hello world from process  1
hello world from process  2
hello world from process  3


As multiple processes can apply at the same time by writing on the screen and the operating system arbitrarily chooses the order.

Every process involved in the execution of MPI runs the same compiled binary, so each process receives the same instructions to be executed.

All the communication functions (point-to-pointor collective) refer to a group of processes. `MPI_COMM_WORLD` assigns a rank `0` to `n-1` for each process that belong to a communicator of size `n`. 

### Point-to-point communication

One of the most important features among those provided by MPI is the point-to-point communication, which is a mechanism that enables data transmission between two processes: **a process receiver**, and a **process sender**.

The Python module mpi4py enables point-to-point communication via two functions:
* `Comm.Send(data, process_destination)`: This sends data to the destination process identified by its rank in the communicator group
* `Comm.Recv(process_source)`: This receives data from the source process, which is also identified by its rank in the communicator group

In [None]:
#pointToPointCommunication.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is : ", rank)

if rank == 0:
    data = 10000000
    destination_process = 4
    comm.send(data, dest=destination_process)
    print("sending data %s to process %d" % (data, destination_process))

if rank == 1:
    destination_process = 8
    data = "hello"
    comm.send(data, dest=destination_process)
    print("sending data %s to process %d" % (data, destination_process))

if rank == 4:
    data = comm.recv(source=0)
    print("data received is = %s" % data)

if rank == 8:
    data1 = comm.recv(source=1)
    print("data1 received is = %s" % data1)


In [151]:
!mpiexec -n 9 python pointToPointCommunication.py

my rank is :  0
sending data 10000000 to process 4
my rank is :  3
my rank is :  4
data received is = 10000000
my rank is :  8
my rank is :  5
my rank is :  6
my rank is :  1
sending data hello to process 8
data1 received is = hello
my rank is :  2
my rank is :  7


The `comm.send()` and `comm.recv()` functions are blocking functions: they block the caller until the buffered data involved can safely be used. Also in MPI, there are two management methods of sending and receiving messages:
* The buffered mode 
* The synchronous mode

In the buffered mode, the flow control returns to the program as soon as the data to be sent has been copied to a buffer. This does not mean that the message is sent or received. In the synchronous mode, however, the function only gets terminated when the corresponding receive function begins receiving the message.

### Deadlock problem

A common problem we face is that of the deadlock, where two (or more) processes block each other and wait for the other to perform a certain action that serves to another, and vice versa.

In [152]:
# deadLockProblems.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is : ", rank)

if rank == 1:
    data_send = "a"
    destination_process = 5
    source_process = 5

    data_received = comm.recv(source=source_process)
    comm.send(data_send, dest=destination_process)

    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

if rank == 5:
    data_send = "b"
    destination_process = 1
    source_process = 1

    data_received = comm.recv(source=source_process)
    comm.send(data_send, dest=destination_process)
    
    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)


my rank is :  0


In [154]:
!mpiexec -n 9 python deadLockProblems.py

my rank is :  5
my rank is :  7
my rank is :  0
my rank is :  1
my rank is :  2
my rank is :  4
my rank is :  6
my rank is :  3
my rank is :  8
^C
[mpiexec@Qiweis-MBP.lan] Sending Ctrl-C to processes as requested
[mpiexec@Qiweis-MBP.lan] Press Ctrl-C again to force abort


Both prepare to receive a message from the other and get stuck there. This happens because the function MPI `comm.recv()` as well as the `comm.send()` MPI blocks them. It means that the calling process waits for their completion. As for the `comm.send()` MPI, the completion occurs when the data has been sent and may be overwritten without modifying the message. The completion of the `comm.recv()` MPI, instead, is when the data has been received and can be used.

The solution that allows us to avoid deadlocks is used to swap the sending and receiving functions so as to make them asymmetrical

In [157]:
# deadLockProblems2.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is : ", rank)

if rank == 1:
    data_send = "a"
    destination_process = 5
    source_process = 5

    data_received = comm.recv(source=source_process)
    comm.send(data_send, dest=destination_process)

    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

if rank == 5:
    data_send = "b"
    destination_process = 1
    source_process = 1

    comm.send(data_send, dest=destination_process)
    data_received = comm.recv(source=source_process)

    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

my rank is :  0


In [155]:
!mpiexec -n 9 python deadLockProblems2.py

my rank is :  1
my rank is :  2
my rank is :  3
my rank is :  4
my rank is :  7
my rank is :  5
sending data a to process 5
data received is = b
sending data b to process 1
data received is = a
my rank is :  6
my rank is :  0
my rank is :  8


The solution to the deadlock is not the only solution. MPI has a particular function `sendrecv()` that unifies the single call that sends a message to a given process and receives another message that comes from another process.

In this case, the function blocks, but compared to the two already seen previously it offers the advantage of leaving the communication subsystem responsible for checking the dependencies between sending and receiving, thus avoiding the deadlock

In [158]:
# deadLockProblems3.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is : ", rank)

if rank == 1:
    data_send = "a"
    destination_process = 5
    source_process = 5

    data_received = comm.sendrecv(data_send, dest=destination_process,
                                  source=source_process)

    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

if rank == 5:
    data_send = "b"
    destination_process = 1
    source_process = 1

    data_received = comm.sendrecv(data_send, dest=destination_process,
                                  source=source_process)

    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

my rank is :  0


In [6]:
!mpiexec -n 9 python deadLockProblems3.py

my rank is :  3
my rank is :  5
my rank is :  0
my rank is :  8
my rank is :  7
my rank is :  1
my rank is :  2
sending data b to process 1
data received is = a
my rank is :  6
sending data a to process 5
data received is = b
my rank is :  4



### Collective communication using broadcast

During the development of a parallel code, we often find ourselves in the situation where we have to share between multiple processes the value of a certain variable at runtime or certain operations on variables that each process provides (presumably with different values).

To resolve this type of situations, the communication trees are used (for example the process 0 sends data to the processes 1 and 2, which respectively will take care of sending them to the processes 3, 4, 5, and 6, and so on).

Instead, MPI libraries provide functions ideal for the exchange of information or the use of multiple processes that are clearly optimized for the machine in which they are performed.

A communication method that involves all the processes belonging to a communicator is called a collective communication. Consequently, a collective communication generally involves more than two processes. However, instead of this, we will call the collective communication broadcast, wherein a single process sends the same data to any other process. 

The mpi4py functionalities in the broadcast are offered by the following method:

```python
buf = comm.bcast(data_to_share, rank_of_root_process)
```
We have a root process of `rank` equal to zero that shares its own data, `variable_to_share`, with the other processes defined in the communicator group:

In [159]:
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    variable_to_share = 100
else:
    variable_to_share = None

variable_to_share = comm.bcast(variable_to_share, root=0)
print("process = %d variable shared = %d" %(rank, variable_to_share))

process = 0 variable shared = 100


The process root of rank zero instantiates a variable, `variabile_to_share`, equal to 100. This variable will be shared with the other processes of the communication group. 

In our case, we have a communication group of ten processes, variable_to_share is shared between the others processes in the group.

In [5]:
!mpiexec -n 9 python broadcast.py

process = 2 variable shared = 100
process = 3 variable shared = 100
process = 4 variable shared = 100
process = 5 variable shared = 100
process = 6 variable shared = 100
process = 7 variable shared = 100
process = 0 variable shared = 100
process = 1 variable shared = 100
process = 8 variable shared = 100


### Collective communication using scatter

The scatter functionality is very similar to a scatter broadcast but has one major difference, while `comm.bcast` sends the same data to all listening processes, `comm.scatter` can send the chunks of data in an array to different processes. 

The `comm.scatter` function takes the elements of the array and distributes them to the processes according to their rank, for which the first element will be sent to the process zero, the second element to the process 1, and so on.

In [None]:
# scatter.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    array_to_share = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
else:
    array_to_share = None

recvbuf = comm.scatter(array_to_share, root=0)
print("process = %d recvbuf = %d " % (rank, array_to_share))


The process of rank zero distributes the `array_to_share` data structure to other processes. 

The `recvbuf` parameter indicates the value of the ith variable that will be sent to the ith process through the `comm.scatter` statement.

In [25]:
!mpiexec -n 10 python scatter.py

process = 2, recvbuf = 3 
process = 3, recvbuf = 4 
process = 4, recvbuf = 5 
process = 5, recvbuf = 6 
process = 0, recvbuf = 1 
process = 8, recvbuf = 9 
process = 6, recvbuf = 7 
process = 7, recvbuf = 8 
process = 1, recvbuf = 2 
process = 9, recvbuf = 10 


One of the restrictions to `comm.scatter` is that you can scatter as many elements as the processors you specify in the execution statement.

In [26]:
!mpiexec -n 5 python scatter.py

Traceback (most recent call last):
  File "scatter.py", line 11, in <module>
    recvbuf = comm.scatter(array_to_share, root=0)
  File "mpi4py/MPI/Comm.pyx", line 1267, in mpi4py.MPI.Comm.scatter
  File "mpi4py/MPI/msgpickle.pxi", line 731, in mpi4py.MPI.PyMPI_scatter
  File "mpi4py/MPI/msgpickle.pxi", line 120, in mpi4py.MPI.Pickle.dumpv
ValueError: expecting 5 items, got 10
^C
[mpiexec@Qiweis-MacBook-Pro.local] Sending Ctrl-C to processes as requested
[mpiexec@Qiweis-MacBook-Pro.local] Press Ctrl-C again to force abort


### Collective communication using gather

The `gather` function performs the inverse of the scatter functionality. In this case, all processes send data to a root process that collects the data received.

```python
recvbuf = comm.gather(sendbuf, rank_of_root_process)
```

In [None]:
# gather.py

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
data = (rank + 1) ** 2

data = comm.gather(data, root=0)

if rank == 0:
    print("rank = %s ...receiving data to other process" % rank)

    for i in range(1, size):
        data[i] = (i + 1) ** 2
        value = data[i]
        print(" process %s receiving %s from process %s" % (rank, value, i))


In [29]:
!mpiexec -n 5 python gather.py

rank = 0 ...receiving data to other process
 process 0 receiving 4 from process 1
 process 0 receiving 9 from process 2
 process 0 receiving 16 from process 3
 process 0 receiving 25 from process 4


### Collective communication using Alltoall

The `Alltoall` collective communication combines the `scatter` and `gather` functionality. In `mpi4py`, there are three types of `Alltoall` collective communication:
* `comm.Alltoall(sendbuf, recvbuf)`: The all-to-all scatter/gather sends data from all-to-all processes in a group
* `comm.Alltoallv(sendbuf, recvbuf)`: The all-to-all scatter/gather vector sends data from all-to-all processes in a group, providing different amount of data and displacements
* `comm.Alltoallw(sendbuf, recvbuf)`: Generalized all-to-all communication allows different counts, displacements, and datatypes for each partner

We consider a communicator group of processes, where each process sends and receives an array of numerical data from the other processes defined in the group:

In [None]:
#alltoall.py

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

a_size = 1
senddata = (rank + 1) * np.arange(size, dtype=int)
recvdata = np.empty(size * a_size, dtype=int)
comm.Alltoall(senddata, recvdata)

print(" process %s sending %s receiving %s" % (rank, senddata, recvdata))

In [31]:
!mpiexec -n 5 python alltoall.py

 process 4 sending [ 0  5 10 15 20] receiving [ 4  8 12 16 20]
 process 3 sending [ 0  4  8 12 16] receiving [ 3  6  9 12 15]
 process 0 sending [0 1 2 3 4] receiving [0 0 0 0 0]
 process 1 sending [0 2 4 6 8] receiving [1 2 3 4 5]
 process 2 sending [ 0  3  6  9 12] receiving [ 2  4  6  8 10]


The `comm.Alltoall` method takes the ith object from `sendbuf` of the task j and copies it into the jth object of the `recvbuf` argument of the task i.

![alt text](image/alltoall.png "Alltoall collective communication")



### The reduction operation
Similar to `comm.gather`, `comm.reduce` takes an array of input elements in each process and returns an array of output elements to the root process. The output elements contain the reduced result.
```python
comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_ reduction_operation)
```
`op` parameter contains a set of reduction operations, such as:
* MPI.MAX: returns the maximum element
* MPI.MIN: reutns the minimum element
* MPI.SUM: sum up the elements
* MPI.PROD: multiplies all elements
* MPI.LAND: performs a logical operation across elements
* MPI.MAXLOC: returns the maximum value and the rank of the process that owns it
* MPI.MINLOC: returns the minimum value and the rank of the process that owns it

In [None]:
# reduction.py

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.size
rank = comm.rank

array_size = 3
recvdata = np.zeros(array_size, dtype=np.int)
senddata = (rank + 1) * np.arange(array_size, dtype=np.int)

print(" process %s sending %s " % (rank, senddata))
comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM)
print("on task %d after Reduce: data = %s" % (rank, recvdata))

In [37]:
!mpiexec -n 3 python reduction.py

 process 0 sending [0 1 2] 
 process 1 sending [0 2 4] 
 process 2 sending [0 3 6] 
on task 2 after Reduce: data = [0 0 0]
on task 1 after Reduce: data = [0 0 0]
on task 0 after Reduce: data = [ 0  6 12]


With the op=MPI.SUM option, the reduction operation sums the ith elements of each task and then puts the result in the ith element of the array in the root process P0.

![alt text](image/reduction.png "The reduction collective communication")

What will be the output if we apply option MPI.PROD?