Pierre Navaro - [Institut de Recherche Mathématique de Rennes](https://irmar.univ-rennes1.fr) - [CNRS](http://www.cnrs.fr/)

# References
* [multiprocessing basics](https://pymotw.com/2/multiprocessing/basics.html)
* [Python 201: A multiprocessing tutorial](https://www.blog.pythonlibrary.org/2016/08/02/python-201-a-multiprocessing-tutorial/)
* [Multithread - Konrad Hinsen](http://calcul.math.cnrs.fr/Documents/Ecoles/2013/python/Multiprocessing.pdf)

# Multiprocessing

- The multiprocessing allows the programmer to fully leverage multiple processors. 
- It runs on both Unix and Windows.
- The `Pool` object parallelizes the execution of a function across multiple input values.
- The if `__name__ == '__main__'` part is necessary.

In [8]:
from multiprocessing import Pool

def f(x): return x*x+1  # Function executed on worker processes.

if __name__ == '__main__': # Executed only on main process.
    with Pool(4) as p:
        print(p.map(f, list(range(8))))

[1, 2, 5, 10, 17, 26, 37, 50]


In [13]:
def g(x, y): return x+y  # Function executed on worker processes.

if __name__ == '__main__': # Executed only on main process.
    with Pool(4) as p:
        print(p.starmap(g, 
            ((x,y) for x,y in zip(list(range(8)),list(range(8))))))

[0, 2, 4, 6, 8, 10, 12, 14]


## Asynchronous Apply

In [6]:
from multiprocessing import Pool
import numpy

if __name__ == '__main__':
    pool = Pool()
    results = [pool.apply_async(numpy.sqrt, (x,))
               for x in range(10)]
    roots = [r.get() for r in results]
    print(roots)

[0.0, 1.0, 1.4142135623730951, 1.7320508075688772, 2.0, 2.2360679774997898, 2.4494897427831779, 2.6457513110645907, 2.8284271247461903, 3.0]


Use for:
- `pool.apply_async` returns a proxy object immediately
- `proxy.get()` waits for task completion and returns the result
- launching different tasks in parallel
- launching tasks with more than one argument 
- better control of task distribution

# The Process class

In multiprocessing, processes are spawned by creating a Process object and then calling its start() method. 

In [17]:
from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

hello bob


# Contexts and start methods
Depending on the platform, multiprocessing supports 
three ways to start a process:
- *spawn*: The parent process starts a fresh python interpreter process. Unix and Windows. The default on Windows.
- *fork*: The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Available on Unix only. The default on Unix.
- *forkserver*: A server process is started. When a new process is needed, the parent process connects to the server and requests that it fork a new process. Available on Unix.

To select a start method you use the `set_start_method()` in the if `__name__ == '__main__'` clause of the main module. 


# Exchanging objects between processes

## Queues

In [18]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()


[42, None, 'hello']


## Pipes
Pipe() returns two connection objects. Each connection object has send() and recv() methods

In [19]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

[42, None, 'hello']


# Synchronization between processes

In [20]:
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print( i, end = ' ')
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

0 1 2 3 4 5 6 7 8 9 

In [21]:
def f( i):
    print( i, end = ' ')

if __name__ == '__main__':
    for num in range(10):
        Process(target=f, args=(num,)).start()

0 1 2 3 4 5 6 7 8 9 

# Shared memory between processes

In [5]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


- This eliminates the serialization overhead.
- A NumPy extension adds shared NumPy arrays (numpy-sharedmem).
- Don’t modify shared memory contents in the slave processes. 
- Use shared memory only to transfer data from the master to the slaves!

In [23]:
from multiprocessing import Pool, cpu_count
import numpy as np
import sharedmem

def distribute(nitems, nprocs):
    """ Distributes a sequence equally (as much as possible) 
    over the available processors. Returns a list of index pairs 
    (imin, imax) that delimit the slice to give to one task."""
    nitems_per_proc = (nitems+nprocs-1)//nprocs
    return [(i, min(nitems, i+nitems_per_proc))
            for i in range(0, nitems, nitems_per_proc)]

def apply_sqrt(a, imin, imax):
    return np.sqrt(a[imin:imax])

if __name__ == '__main__':
    nprocs = 4
    
    data = sharedmem.empty((100,), np.float)
    data[:] = np.arange(100)
    with Pool(processes=nprocs) as pool:
        
        
        slices = distribute(len(data), nprocs)
        result = pool.apply_async(np.sqrt, data)
        print(result.get(timeout=1))
       
        
        #
        
        #results = [pool.apply_async(apply_sqrt, (data, imin, imax))
        #           for (imin, imax) in slices]
        #for r, (imin, imax) in zip(results, slices):
        #    data[imin:imax] = r.get()
    print(slices)

TypeError: Internal Numpy error: too many arguments in call to PyUFunc_HasOverride

## Parallel Pi calculation

In [12]:
import time
import random
from multiprocessing import Pool, cpu_count

def compute_pi(n):
    count = 0
    for i in range(n):
        x=random.random()
        y=random.random()
        if x*x + y*y <= 1: count+=1
    return count

if __name__=='__main__':
    
    
    for np in range(1,5):
        elapsed_time = time.time()
        assert ( np <= cpu_count())
        
        n = 10000000
        part_count=[n//np for i in range(np)]
        pool = Pool(processes=np)   
        count=pool.map(compute_pi, part_count)
        print ("Number of cores {0}, Estimated value of Pi : {1:.8f}"
       " time : {2:.8f}".format(np, 4*sum(count)/n,time.time()-elapsed_time))

Number of cores 1, Estimated value of Pi : 3.14164440 time : 3.61585712
Number of cores 2, Estimated value of Pi : 3.14194720 time : 1.68908000
Number of cores 3, Estimated value of Pi : 3.14078200 time : 1.13751817
Number of cores 4, Estimated value of Pi : 3.14018080 time : 0.90940595


# Joblib

[Joblib](http://pythonhosted.org/joblib/) provides a simple helper class to write parallel for loops using multiprocessing. 

In [25]:
def f(x):
    return x*x

[f(x) for x in range(10)]

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [26]:
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(f)(x) for x in range(10))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# Exercice

Write the program in which two processes send packets of information back and forth a 100 times and record the amount of time required. 

In [1]:
import os
 
from multiprocessing import Process, current_process
 
 
def doubler(number):
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc_name = current_process().name
    print('{0} doubled to {1} by: {2}'.format(
        number, result, proc_name))
 
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    proc = Process(target=doubler, args=(5,))
 
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()
 
    proc = Process(target=doubler, name='Test', args=(2,))
    proc.start()
    procs.append(proc)
 
    for proc in procs:
        proc.join()

5 doubled to 10 by: Process-2
10 doubled to 20 by: Process-3
15 doubled to 30 by: Process-4
20 doubled to 40 by: Process-5
25 doubled to 50 by: Process-6
2 doubled to 4 by: Test


# sharedmem

http://rainwoodman.github.io/sharedmem/

In [24]:
import sharedmem
counter = sharedmem.empty(1)
counter[:] = 0
with sharedmem.MapReduce() as pool:
    def work(i):
         with pool.critical:
             counter[:] += i
    pool.map(work, range(10))
print(counter)

[ 45.]


In [25]:
import numpy as np
input = np.arange(1024 * 1024 * 128, dtype='f8')
output = sharedmem.empty(1024 * 1024 * 128, dtype='f8')
with sharedmem.MapReduce() as pool:
    chunksize = 1024 * 1024
    def work(i):
        s = slice (i, i + chunksize)
        output[s] = input[s]
        return i, np.sum(input[s])
    def reduce(i, r):
        print('chunk', i, 'done')
        return r
    r = pool.map(work, range(0, len(input), chunksize), reduce=reduce)
print (np.sum(r))



chunk 0 done
chunk 1048576 done
chunk 2097152 done
chunk 3145728 done
chunk 4194304 done
chunk 5242880 done
chunk 6291456 done
chunk 7340032 done
chunk 8388608 done
chunk 9437184 done
chunk 10485760 done
chunk 11534336 done
chunk 12582912 done
chunk 13631488 done
chunk 14680064 done
chunk 15728640 done
chunk 16777216 done
chunk 17825792 done
chunk 18874368 done
chunk 19922944 done
chunk 24117248 done
chunk 20971520 done
chunk 22020096 done
chunk 23068672 done
chunk 25165824 done
chunk 26214400 done
chunk 27262976 done
chunk 29360128 done
chunk 28311552 done
chunk 33554432 done
chunk 30408704 done
chunk 31457280 done
chunk 32505856 done
chunk 34603008 done
chunk 36700160 done
chunk 35651584 done
chunk 37748736 done
chunk 39845888 done
chunk 38797312 done
chunk 40894464 done
chunk 41943040 done
chunk 42991616 done
chunk 44040192 done
chunk 50331648 done
chunk 45088768 done
chunk 46137344 done
chunk 47185920 done
chunk 49283072 done
chunk 48234496 done
chunk 51380224 done
chunk 52428800 d