# Multiprocessing module

__WARNING__: functions defined inside the Notebook cannot be passed as argument to any Multiprocessing function/object such as
`Process`' constructor.
The code in the cell must be written to file and the file imported as a module.

In [None]:
%%writefile task.py
def task(value):
    # add your work here...
    # ...
    # all done
    print(f'.done {value}', flush=True)

In [None]:
# example of a parallel for loop with the Process class
from multiprocessing import Process
from task import task

# protect the entry point
#if __name__ == '__main__': # no need to use guard in Jupyter it seems
# create all tasks
processes = [Process(target=task, args=(i,)) for i in range(20)]
# start all processes
for process in processes:
    process.start()
# wait for all processes to complete
for process in processes:
    process.join()
# report that all tasks are completed
print('Done')

## Pool

In [None]:
%%writefile sum_product.py
# function to be applied for each element
def sum_product(e):
    w_sum = sum([x*i for i in a])
    return w_sum * e
# pool initializer function
def pool_initializer(X, A):
    global x
    x = X
    global a
    a = A

In [None]:
from multiprocessing import Pool, cpu_count
from sum_product import sum_product, pool_initializer

n = 100000
X = 3
A = [2, 4, 6, 8, 10, 12]

with Pool(processes=cpu_count(), initializer=pool_initializer, initargs=(X, A)) as pool:
    res = pool.map(sum_product, range(n))
print(res)

## ProcessPoolExecutor

In [None]:
%%writefile task.py
def task(i: int):
    return i

In [None]:
from concurrent.futures import ProcessPoolExecutor, as_completed
from task import task
#if __name__ == '__main__':
# create the pool with the default number of workers
with ProcessPoolExecutor() as exe:
    # issue some tasks and collect futures
    futures = [exe.submit(task, i) for i in range(50)]
    # process results as tasks are completed
    for future in as_completed(futures):
        print(f'>got {future.result()}')
    # issue one task for each call to the function
    for result in exe.map(task, range(50)):
        print(f'>got {result}')
# report that all tasks are completed
print('Done')

## Extend Process class

In [None]:
%%writefile custom_process.py
# custom process class
from multiprocessing import Process
from time import sleep
class CustomProcess(Process):
    # override the run function
    def run(self):
        # block for a moment
        sleep(1)
        # display a message
        print('This is coming from another process')

In [None]:
# SuperFastPython.com
# example of extending the Process class
from time import sleep
from multiprocessing import Process
from custom_process import CustomProcess
 
# entry point
if __name__ == '__main__':
    # create the process
    process = CustomProcess()
    # start the process
    process.start()
    # wait for the process to finish
    print('Waiting for the process to finish')
    process.join()

### Return values

Share values and arrays across processes with the `multiprocessing.Value` and `multiprocessing.Array` types.

In [None]:
%%writefile custom_process.py
# example of extending the Process class and adding shared attributes
from time import sleep
from multiprocessing import Process
from multiprocessing import Value
 
# custom process class
class CustomProcess(Process):
    # override the constructor
    def __init__(self, value=99):
        # execute the base constructor
        Process.__init__(self)
        # initialize integer attribute
        self.data = Value('i', value)
 
    # override the run function
    def run(self):
        # block for a moment
        sleep(1)
        # store the data variable
        #self.data.value = 99
        # report stored value
        print(f'Child stored: {self.data.value}')
 

In [None]:
from custom_process import CustomProcess
# entry point
if __name__ == '__main__':
    # create the process
    processes = [CustomProcess(88), CustomProcess(77)]
    # start the process
    for p in processes:
        p.start()
    # wait for the process to finish
    print('Waiting for the child processes to finish')
    # block until child processes are terminated
    for p in processes:
        p.join()
    # report the process attribute
    for p in processes:
        print(f'Parent got: {p.data.value}')

## Process attributes

In [None]:
import multiprocessing
multiprocessing.get_all_start_methods()

In [None]:
from multiprocessing import Process
p = Process()
print(f"Exit code before start: {p.exitcode}")
print(p.name)
p.start()
print(p.pid)
p.join()
print(f"Exit code after join: {p.exitcode}")

In [None]:
import multiprocessing as mp
print(mp.parent_process())
print(mp.current_process())

## Locks

### Lock

In [None]:
%%writefile task.py
# example of a mutual exclusion (mutex) lock for processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Lock
 
# work function
def task(lock, identifier, value):
    # acquire the lock
    with lock:
        print(f'>process {identifier} got the lock, sleeping for {value}')
        sleep(value)

In [None]:
# example of a mutual exclusion (mutex) lock for processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Lock
from task import task
 
# entry point
if __name__ == '__main__':
    # create the shared lock
    lock = Lock()
    # create a number of processes with different sleep times
    processes = [Process(target=task, args=(lock, i, random())) for i in range(10)]
    # start the processes
    for process in processes:
        process.start()
    # wait for all processes to finish
    for process in processes:
        process.join()

### Re-entrant Lock

Allows to acquire multiple time the same lock.

In [None]:
%%writefile task.py
# example of a reentrant lock for processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import RLock
 
# reporting function
def report(lock, identifier):
    # acquire the lock
    with lock:
        print(f'>process {identifier} done')
 
# work function
def task(lock, identifier, value):
    # acquire the lock
    with lock:
        print(f'>process {identifier} sleeping for {value}')
        sleep(value)
        # report
        report(lock, identifier)
 

In [None]:
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import RLock
from task import task
# entry point
if __name__ == '__main__':
    # create a shared reentrant lock
    lock = RLock()
    # create processes
    processes = [Process(target=task, args=(lock, i, random())) for i in range(10)]
    # start child processes
    for process in processes:
        process.start()
    # wait for child processes to finish
    for process in processes:
        process.join()

## Condition Variable

In [None]:
%%writefile task.py
# example of wait/notify with a condition for processes
from time import sleep
from multiprocessing import Process
from multiprocessing import Condition
 
# target function to prepare some work
def task(condition):
    # block for a moment
    sleep(1)
    # notify a waiting process that the work is done
    print('Child process sending notification...', flush=True)
    with condition:
        condition.notify()
    # do something else...
    sleep(1)

In [None]:
# example of wait/notify with a condition for processes
from time import sleep
from multiprocessing import Process
from multiprocessing import Condition
 
# entry point
if __name__ == '__main__':
    # create a condition
    condition = Condition()
    # wait to be notified that the data is ready
    print('Main process waiting for data...')
    with condition:
        # start a new process to perform some work
        worker = Process(target=task, args=(condition,))
        worker.start()
        # wait to be notified
        condition.wait()
    # we know the data is ready
    print('Main process all done')

## Semaphore

Grant access to a specific number of processes at a time, while the others wait.

In [None]:
%%writefile task.py
# example of using a semaphore
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Semaphore
 
# target function
def task(semaphore, number):
    # attempt to acquire the semaphore
    with semaphore:
        # simulate computational effort
        value = random()
        sleep(value)
        # report result
        print(f'Process {number} got {value}')

In [None]:
# example of using a semaphore
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Semaphore
from task import task 

 
# entry point
if __name__ == '__main__':
    # create the shared semaphore
    semaphore = Semaphore(2) # grant access to only two processes at a time
    # create processes
    processes = [Process(target=task, args=(semaphore, i)) for i in range(10)]
    # start child processes
    for process in processes:
        process.start()
    # wait for child processes to finish
    for process in processes:
        process.join()


## Event

In [None]:
%%writefile task.py
# example of using an event object with processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Event
 
# target task function
def task(event, number):
    # wait for the event to be set
    print(f'Process {number} waiting...', flush=True)
    event.wait()
    # begin processing
    value = random()
    sleep(value)
    print(f'Process {number} got {value}', flush=True)

In [None]:
# example of using an event object with processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Event
from task import task
 
# entry point
if __name__ == '__main__':
    # create a shared event object
    event = Event()
    # create a suite of processes
    processes = [Process(target=task, args=(event, i)) for i in range(5)]
    # start all processes
    for process in processes:
        process.start()
    # block for a moment
    print('Main process blocking...')
    sleep(2)
    # trigger all child processes
    event.set()
    # wait for all child processes to terminate
    for process in processes:
        process.join()

## Barrier

In [None]:
%%writefile task.py
# example of using a barrier with processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Barrier
 
# target function to prepare some work
def task(barrier, number):
    # generate a unique value
    value = random() * 10
    # block for a moment
    sleep(value)
    # report result
    print(f'Process {number} done, got: {value}', flush=True)
    # wait on all other processes to complete
    barrier.wait()

In [None]:
# example of using a barrier with processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Barrier
 
from task import task
 
# entry point
if __name__ == '__main__':
    # create a barrier
    barrier = Barrier(5 + 1) # all the processes waiting on barrier: 5 children and 1 parent (Main)
    # create the worker processes
    for i in range(5):
        # start a new process to perform some work
        worker = Process(target=task, args=(barrier, i))
        worker.start()
    # wait for all processes to finish
    print('Main process waiting on all results...')
    barrier.wait()
    # report once all processes are done
    print('All processes have their result')


## Shared ctypes

Shared ctypes provide a mechanism to share data safely between processes in a process-safe manner.

### Shared Array

In [None]:
%%writefile task.py
import multiprocessing as mp
def task(shared_array, barrier, task_id, num_tasks):
    elem_per_task = (len(shared_array) + num_tasks - 1) // num_tasks
    elem_per_task = min(elem_per_task, len(shared_array) - elem_per_task * task_id)
    offset = task_id * elem_per_task
    for i in range(offset, elem_per_task+offset):
        shared_array[i] = task_id
    barrier.wait()

In [None]:
import multiprocessing as mp
from task import task
if __name__ == '__main__':
    NUM_TASKS = 4
    ARRAY_SIZE = 16
    barrier = mp.Barrier(N + 1)
    shared_array = mp.Array('i', ARRAY_SIZE)
    for i in range(NUM_TASKS):
        worker = mp.Process(target=task, args = (shared_array, barrier, i, NUM_TASKS))
        worker.start()
    barrier.wait()
    with shared_array.get_lock():
        for e in shared_array:
            print(e)

In [None]:
import multiprocessing as mp
import numpy as np
from task import task
if __name__ == '__main__':
    NUM_TASKS = 4
    ARRAY_SIZE = 16
    barrier = mp.Barrier(N + 1)
    shared_array = mp.RawArray('i', ARRAY_SIZE)
    for i in range(NUM_TASKS):
        worker = mp.Process(target=task, args = (shared_array, barrier, i, NUM_TASKS))
        worker.start()
    barrier.wait()
    v = np.frombuffer(shared_array, dtype=np.int32)#.reshape((1, ARRAY_SIZE)) # reshape (rows, column)
    print(v)

## Pipes and Queues

### Queue

In [None]:
%%writefile square.py
from multiprocessing import Queue
def square(li, q):
    for x in li:
        square = x * x
        q.put(square)

In [None]:
from multiprocessing import Process, Queue
from square import square

if __name__ == "__main__":
    li = range(0, 5)
    q = Queue()
    process = Process(target=square, args=(li, q))
    process.start()
    process.join()
    while not q.empty():
        print(q.get())

### Pipe

In [None]:
%%writefile square.py
from multiprocessing import Pipe
def square(li, con2):
    for x in li:
        square = x * x
        con2.send(square)

In [None]:
from multiprocessing import Process, Pipe
from square import square

if __name__ == "__main__":
    li = range(0, 5)
    con1, con2 = Pipe()
    process = Process(target=square, args=(li, con2))
    process.start()

    for i in range(0, 5):
        print(con1.recv())
    process.join()