# Parallel programming for CPU

## process based parallelism via the multiprocessing module

the [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) - module mirrors threading, except that instead of a Thread class it provides a Process. Each Process is a true system process without shared memory, but multiprocessing provides features for sharing data and passing messages between them so that in many cases converting from threads to processes is as simple as changing a few import statements.

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the `Global Interpreter Lock` by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

The multiprocessing module also introduces APIs which do not have analogs in the threading module. A prime example of this is the `Pool` object which offers a convenient means of parallelizing the execution of a function across multiple input values, distributing the input data across processes (data parallelism). The following example demonstrates the common practice of defining such functions in a module so that child processes can successfully import that module.

### The Process class

Similar to the ```Thread``` class it is used to spawn an additional process with a target function.

In [11]:
import multiprocessing
import time

def worker(num):
    """ Worker function
    """
    result = num * 2
    my_name=multiprocessing.current_process().name
    print('My name: ',my_name)
    print('My result: ',result)


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
            target=worker, 
            daemon=False,
            args=[i],
            name="process-{}".format(i)
        )
        jobs.append(p)
        p.start()
    
    for j in jobs:
        j.join()

My name:  My name: My name:  process-0
 process-1My name: My name:  
 process-3My result: process-4 2

My result:  8
process-2
My result: My result:  6

My result:  0
 4


Important difference with ```threading``` is that child processes need to import the script containing the traget function. It is therefore important to wrap the main part of the application with ```__main__``` to ensure this part is not executed by every child process. Alternatively the target function can be stored in a different file that can be then imported into the main.
  
From the example above, try to add the command  ```time.sleep(sec)``` (and pass different values of sec) to each worker and time both sequential and concurrent execution. 

#### Terminating a Process
If a process appears hung or deadlocked it can be useful to be able to kill it forcibly. Calling ```terminate()``` on a process object kills the child process.




In [16]:
import time
from multiprocessing import Process, current_process

def doubler(number):
    """
    A doubling function that can be used by a process
    """
    proc_name = current_process().name
    if proc_name == "Test":
        time.sleep(10)
        "Process Test: Done"
    else:
        result = number * 2
        print('{0} doubled to {1} by: {2}'.format(
            number, result, proc_name))
 
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
 
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,),name='Normal_{}'.format(index))
        procs.append(proc)
        proc.start()
 
    proc = Process(target=doubler, name='Test', args=(None,))
    proc.start()
    procs.append(proc)
 
    for proc in procs:
        proc.join(4)
        if proc.is_alive():
            proc.terminate()
            print('Process {} terminated.'.format(proc.name))
            



5 doubled to 10 by: Normal_0
10 doubled to 20 by: Normal_1
20 doubled to 40 by: Normal_315 doubled to 30 by: Normal_2
25 doubled to 50 by: Normal_4

Process Test terminated.


---
#### Lock

The multiprocessing module supports locks in much the same way as the threading module does. 

All you need to do is import Lock, acquire it, do something and release it. Let’s take a look:

In [34]:
%%writefile p_lock.py
from multiprocessing import Process, Lock, current_process
import time

def printer(item, lock):
    """
    Prints out the item that was passed in
    """
    lock.acquire()
    try:
        print(current_process().name, item)
        time.sleep(1)
    finally:
        lock.release()
        
if __name__ == '__main__':
    lock = Lock()
    for item in range(5):
        p = Process(target=printer, name="Proc-{}".format(item), args=(item, lock))
        p.start()
        

Writing p_lock.py


In [35]:
!python lock.py

Proc-0 0
Proc-1 1
Proc-4 4
Proc-3 3
Proc-2 2


#### Event

Events are also used in the same way as for threadings: 

In [62]:
%%writefile p_event.py
from multiprocessing import Process, Event, current_process
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print(current_process().name,' waiting_for_event: starting')
    e.wait()
    print(current_process().name,' Done waiting : e.is_set()->', e.is_set())


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    #print('wait_for_event_timeout: starting')
    print(current_process().name,' waiting_for_event_timeout: starting')
    e.wait(t)
    print(current_process().name,' Done waiting : e.is_set()->', e.is_set())

        
        
if __name__ == '__main__':
    e = Event()
    w1 = Process(
        name='block',
        target=wait_for_event,
        args=(e,),
    )
    w1.start()

    w2 = Process(
        name='nonblock',
        target=wait_for_event_timeout,
        args=(e, 3),
    )
    w2.start()

    print('main: waiting before calling Event.set()')
    time.sleep(5)
    e.set()
    print('main: event is set')

Overwriting p_event.py


In [63]:
!python  p_event.py

main: waiting before calling Event.set()
block  waiting_for_event: starting
nonblock  waiting_for_event_timeout: starting
nonblock  Done waiting : e.is_set()-> False
main: event is set
block  Done waiting : e.is_set()-> True


Other constructs such as ```Semaphore``` and ```Conditions``` from the ```threading``` modules are also available in ```multiprocessing```, with very little differences between the two implementations.

Try some of the examples showed before for ```threading``` with ```multiprocessing```.

### Share data between processes

It is not advised to use shared states between processes, but if needed (and user is carful in managing them) ```multiprocessing``` offers two approach:

#### Shared memory

```Value``` and ```Array``` can store data in shared memory

In [68]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    print("Before:")    
    print(num.value)
    print(arr[:])

    
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print("After:")    
    print(num.value)
    print(arr[:])

Before:
0.0
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
After:
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


#### Server process

A manager object returned by ```Manager()``` controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

Server processes can be also shared across network.

In [70]:
from multiprocessing import Process, Manager


def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        print("Before")
        print(d)
        print(l)

        
        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print("After")
        print(d)
        print(l)

Before
{}
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
After
{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]


### The Pool class

The ```Pool``` class is used to represent a pool of worker processes. It has methods which can allow you to offload tasks to the worker processes. Let’s look at a really simple example:

In [7]:
%%writefile p_pool.py
import multiprocessing as mp
import time
from random import randint

def worker(sec):
    """ 
    Worker function: sleeping num seconds
    """
    print('Start sleeping {} in {}'.format(sec,mp.current_process().name))
    time.sleep(sec)
    print('Done sleeping {} in {}'.format(sec,mp.current_process().name))
    return sec**2
    
def start_process():
    print('Starting', mp.current_process().name)

if __name__ == '__main__':

    input_data = [10,8,6,4,2,1]
    
    print("INPUT:",input_data)
    
    start_t = time.perf_counter()
    
    pool_size = 4
    pool = mp.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_out = pool.map(worker, input_data)
    pool.close()
    pool.join()
    
    end_t = time.perf_counter()
    
    print('Total time =', end_t - start_t)
    print('OUTPUT:', pool_out)

Overwriting p_pool.py


In [8]:
!python p_pool.py

INPUT: [10, 8, 6, 4, 2, 1]
Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-3
Starting ForkPoolWorker-4
Start sleeping 10 in ForkPoolWorker-1
Start sleeping 8 in ForkPoolWorker-2
Start sleeping 6 in ForkPoolWorker-3
Start sleeping 4 in ForkPoolWorker-4
Done sleeping 4 in ForkPoolWorker-4
Start sleeping 2 in ForkPoolWorker-4
Done sleeping 6 in ForkPoolWorker-3
Done sleeping 2 in ForkPoolWorker-4
Start sleeping 1 in ForkPoolWorker-3
Done sleeping 1 in ForkPoolWorker-3
Done sleeping 8 in ForkPoolWorker-2
Done sleeping 10 in ForkPoolWorker-1
Total time = 10.041265523992479
OUTPUT: [100, 64, 36, 16, 4, 1]


Or using the ```map_async``` method to let processes do their work asynchronously

In [28]:
%%writefile p_pool_async.py
import multiprocessing as mp
import time
from random import randint

def worker(sec):
    """ 
    Worker function: sleeping num seconds
    """
    print('Start sleeping {} in {}'.format(sec,mp.current_process().name))
    time.sleep(2)
    print('Done sleeping {} in {}'.format(sec,mp.current_process().name))
    return sec**2
    
def start_process():
    print('Starting', mp.current_process().name)

if __name__ == '__main__':

    input_data = [10,8,6,4,2,1]
    
    print("INPUT:",input_data)
    
    start_t = time.perf_counter()
    
    pool_size = 4
    pool = mp.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_out = pool.map_async(worker, input_data)
    pool.close()
    pool.join()
    
    end_t = time.perf_counter()
    
    print('Total time =', end_t - start_t)
    print('OUTPUT:', pool_out.get())

Overwriting p_pool_async.py


In [29]:
!python p_pool_async.py

INPUT: [10, 8, 6, 4, 2, 1]
Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-3
Starting ForkPoolWorker-4
Start sleeping 8 in ForkPoolWorker-2
Start sleeping 10 in ForkPoolWorker-1
Start sleeping 6 in ForkPoolWorker-3
Start sleeping 4 in ForkPoolWorker-4
Done sleeping 8 in ForkPoolWorker-2
Done sleeping 10 in ForkPoolWorker-1
Start sleeping 2 in ForkPoolWorker-1
Done sleeping 6 in ForkPoolWorker-3
Done sleeping 4 in ForkPoolWorker-4
Start sleeping 1 in ForkPoolWorker-2
Done sleeping 2 in ForkPoolWorker-1
Done sleeping 1 in ForkPoolWorker-2
Total time = 4.04389403690584
OUTPUT: [100, 64, 36, 16, 4, 1]


### Communication between processes

When it comes to communicating between processes, the multiprocessing modules has two primary methods: Queues and Pipes. The Queue implementation is actually both thread and process safe. Let’s take a look at a fairly simple example that’s based on the Queue code:

In [31]:
from multiprocessing import Process, Queue
import time 

sentinel = -1
 
def creator(data, q):
    """
    Creates data to be consumed and waits for the consumer
    to finish processing
    """
    print('Creating data and putting it on the queue')
    for item in data:
        q.put(item)
 
 
def my_consumer(q):
    """
    Consumes some data and works on it
 
    In this case, all it does is double the input
    """
    while True:
        data = q.get()
        print('data found to be processed: {}'.format(data))
        processed = data * 2
        time.sleep(1)
        print('data processed: {}'.format(processed))
 
        if data is sentinel:
            break
 
 
if __name__ == '__main__':
    q = Queue()
    data = [5, 10, 13, -1]
    process_one = Process(target=creator, args=(data, q))
    process_two = Process(target=my_consumer, args=(q,))
    process_one.start()
    process_two.start()
 
    q.close()
    q.join_thread()
 
    process_one.join()
    process_two.join()
    print(data)


Creating data and putting it on the queue
data found to be processed: 5
data processed: 10
data found to be processed: 10
data processed: 20
data found to be processed: 13
data processed: 26
data found to be processed: -1
data processed: -2
[5, 10, 13, -1]
