# Multiprocessing

# Setup

In [18]:
import os
import multiprocessing
from multiprocessing import Process
from multiprocessing import Pipe
from multiprocessing import Pool
from multiprocessing import Queue
from multiprocessing import Manager
from multiprocessing import Barrier
from multiprocessing import BoundedSemaphore
import random
import time

In [2]:
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger()
#logger.setLevel(logging.DEBUG)
logger.debug('Hello')

DEBUG:root:Hello


# Ping Pong Pipe
* We'll use a Process as a client
* We'll have our parent & child communicate over a Pipe object
* Pipe creates two Connection objects which can facilitate communication

In [9]:
def play(connection):
    logger.debug('Child Sending Ping')
    connection.send('Ping')
    data = connection.recv()
    logger.debug(f'Child Received {data}')
    
parent_connection, child_connection = Pipe()
child = Process(target=play, args=(child_connection, ))
child.start()

data = parent_connection.recv()
logger.debug(f'Parent Received {data}')
logger.debug('Parent Sending Pong')
parent_connection.send('Pong')

child.join()

DEBUG:root:Child Sending Ping
DEBUG:root:Parent Received Ping
DEBUG:root:Parent Sending Pong
DEBUG:root:Child Received Pong


# Ping Pong Queue
* We continue to use a Process as a client
* We'll swap out the Pipe for a Queue

In [13]:
def play(q):
    logger.debug('Child Sending Ping')
    q.put('Ping')
    q.get()
    logger.debug(f'Child Received {data}')
    
q = Queue()
child = Process(target=play, args=(q, ))
child.start()

data = q.get()
logger.debug(f'Parent Received {data}')
logger.debug('Parent Sending Pong')
q.put('Pong')

child.join()

DEBUG:root:Child Sending Ping
DEBUG:root:Parent Received Ping
DEBUG:root:Parent Sending Pong
DEBUG:root:Child Received Ping


# Pools
* Here we have several examples of a pool of processes to handle tasks

## Send Multiple Jobs

In [25]:
def work(data):
    return data * data

pool = Pool(processes=2)
for item in pool.imap_unordered(work, range(5)):
     logger.debug(item)
        
pool.close()
pool.join()

DEBUG:root:0
DEBUG:root:1
DEBUG:root:4
DEBUG:root:9
DEBUG:root:16


## Send Single Job

In [29]:
def work(data):
    return data * data

pool = Pool(processes=2)
reply = pool.apply_async(work, (20,))    
logger.debug(reply.get(timeout=1))
        
pool.close()
pool.join()

DEBUG:root:400


## Get PID

In [40]:
pool = Pool(processes=3)

for i in range(10):
    reply = pool.apply_async(os.getpid, ())
    pid = reply.get(timeout=1)
    logger.debug(f'Handle {i} PID {pid}')

pool.close()
pool.join()

DEBUG:root:Handle 0 PID 3489
DEBUG:root:Handle 1 PID 3490
DEBUG:root:Handle 2 PID 3489
DEBUG:root:Handle 3 PID 3491
DEBUG:root:Handle 4 PID 3490
DEBUG:root:Handle 5 PID 3489
DEBUG:root:Handle 6 PID 3491
DEBUG:root:Handle 7 PID 3490
DEBUG:root:Handle 8 PID 3489
DEBUG:root:Handle 9 PID 3491


## With Context
* Safely reclaim resources in a context block

In [51]:
def work(items):
    pid = os.getpid()
    for i in range(items):
        logger.debug(f'PID {pid} Count {i + 1} of {items}')
        time.sleep(1)
     
with Pool(3) as p:
    p.map(work, [1, 2, 3, 4, 5])

DEBUG:root:PID 3691 Count 1 of 1
DEBUG:root:PID 3692 Count 1 of 2
DEBUG:root:PID 3693 Count 1 of 3
DEBUG:root:PID 3692 Count 2 of 2
DEBUG:root:PID 3693 Count 2 of 3
DEBUG:root:PID 3691 Count 1 of 4
DEBUG:root:PID 3692 Count 1 of 5
DEBUG:root:PID 3693 Count 3 of 3
DEBUG:root:PID 3691 Count 2 of 4
DEBUG:root:PID 3692 Count 2 of 5
DEBUG:root:PID 3691 Count 3 of 4
DEBUG:root:PID 3692 Count 3 of 5
DEBUG:root:PID 3691 Count 4 of 4
DEBUG:root:PID 3692 Count 4 of 5
DEBUG:root:PID 3692 Count 5 of 5


# Multiprocessing Helpers

## CPU Count

In [46]:
logger.debug(multiprocessing.cpu_count())

DEBUG:root:4


## Child Processes

In [45]:
pool = Pool(processes=3)
for child in multiprocessing.active_children():
    logger.debug(child)
pool.close()
pool.join()

DEBUG:root:<ForkProcess(ForkPoolWorker-34, started daemon)>
DEBUG:root:<ForkProcess(ForkPoolWorker-33, started daemon)>
DEBUG:root:<ForkProcess(ForkPoolWorker-32, started daemon)>


# Synchronous Map & Apply

## Map
* Map serializes (pickles) the current process and sends it to another core

In [54]:
def double(input):
    val = input ** 2
    if input % 10000 == 0:
        logger.debug(f'PID: {os.getpid()} {input} --> {val}')
        
numbers = [x for x in range(0, 100000)]
with Pool(multiprocessing.cpu_count()) as pool:
    pool.map(double, numbers)

DEBUG:root:PID: 3955 0 --> 0
DEBUG:root:PID: 3957 30000 --> 900000000
DEBUG:root:PID: 3956 10000 --> 100000000
DEBUG:root:PID: 3958 20000 --> 400000000
DEBUG:root:PID: 3955 40000 --> 1600000000
DEBUG:root:PID: 3956 60000 --> 3600000000
DEBUG:root:PID: 3957 50000 --> 2500000000
DEBUG:root:PID: 3958 70000 --> 4900000000
DEBUG:root:PID: 3956 80000 --> 6400000000
DEBUG:root:PID: 3955 90000 --> 8100000000


## IMap
* IMap is more efficient.  
* Rather than copying the memory space of the process, it sends an iterator
* Note:  We no longer get the logger (it's in a different process)

In [57]:
def double(input):
    val = input ** 2
    if input % 10000 == 0:
        logger.debug(f'PID: {os.getpid()} {input} --> {val}')
        
numbers = [x for x in range(0, 100000)]
with Pool(multiprocessing.cpu_count()) as pool:
    pool.imap(double, numbers)

## StarMap
* The final mapper is starmap
* It behvaes like map, but can handle multiple arguments

In [60]:
def double(input, power):
    val = input ** power
    if input % 10000 == 0:
        logger.debug(f'PID: {os.getpid()} {input} --> {val}')
        
size = 100000
numbers = [(x, 2) for x in range(0, size)]
with Pool(multiprocessing.cpu_count()) as pool:
    pool.starmap(double, numbers)

DEBUG:root:PID: 3984 0 --> 0
DEBUG:root:PID: 3985 10000 --> 100000000
DEBUG:root:PID: 3984 30000 --> 900000000
DEBUG:root:PID: 3985 40000 --> 1600000000
DEBUG:root:PID: 3987 20000 --> 400000000
DEBUG:root:PID: 3986 50000 --> 2500000000
DEBUG:root:PID: 3987 60000 --> 3600000000
DEBUG:root:PID: 3985 70000 --> 4900000000
DEBUG:root:PID: 3987 80000 --> 6400000000
DEBUG:root:PID: 3986 90000 --> 8100000000


## Apply

In [83]:
def sum_between(numbers, start, end):
    total = sum([i for i in range(numbers) if i >= start and i <= end])
    if numbers % 10 == 0:
        logger.debug(f'PID: {os.getpid()} {numbers} {start} {end} = {total}')
    return total


size = 1000
with Pool(multiprocessing.cpu_count()) as pool:
    start = random.randint(1, size)
    end   = random.randint(start, size)
    results = [pool.apply(sum_between, args=(x, start, end)) for x in range(size)]

DEBUG:root:PID: 4324 0 900 961 = 0
DEBUG:root:PID: 4326 10 900 961 = 0
DEBUG:root:PID: 4324 20 900 961 = 0
DEBUG:root:PID: 4326 30 900 961 = 0
DEBUG:root:PID: 4324 40 900 961 = 0
DEBUG:root:PID: 4326 50 900 961 = 0
DEBUG:root:PID: 4324 60 900 961 = 0
DEBUG:root:PID: 4326 70 900 961 = 0
DEBUG:root:PID: 4324 80 900 961 = 0
DEBUG:root:PID: 4326 90 900 961 = 0
DEBUG:root:PID: 4324 100 900 961 = 0
DEBUG:root:PID: 4326 110 900 961 = 0
DEBUG:root:PID: 4324 120 900 961 = 0
DEBUG:root:PID: 4326 130 900 961 = 0
DEBUG:root:PID: 4324 140 900 961 = 0
DEBUG:root:PID: 4326 150 900 961 = 0
DEBUG:root:PID: 4324 160 900 961 = 0
DEBUG:root:PID: 4326 170 900 961 = 0
DEBUG:root:PID: 4324 180 900 961 = 0
DEBUG:root:PID: 4326 190 900 961 = 0
DEBUG:root:PID: 4324 200 900 961 = 0
DEBUG:root:PID: 4326 210 900 961 = 0
DEBUG:root:PID: 4324 220 900 961 = 0
DEBUG:root:PID: 4326 230 900 961 = 0
DEBUG:root:PID: 4324 240 900 961 = 0
DEBUG:root:PID: 4326 250 900 961 = 0
DEBUG:root:PID: 4324 260 900 961 = 0
DEBUG:root:P

# Asynchronous Map & Apply
* We can run map, imap, starmap and apply asynchonrously
* We can send jobs without locking
* The results can come back out of order, but the work may complete faster

## Map_async
* Note:  imap_async is not an available implementation

In [105]:
def double(input):
    val = input ** 2
    if input % 10000 == 0:
        logger.debug(f'PID: {os.getpid()} {input} --> {val}')
        
numbers = [x for x in range(0, 100000)]
with Pool(multiprocessing.cpu_count()) as pool:
    pool.map_async(double, numbers)
    pool.close()
    pool.join()

DEBUG:root:PID: 4423 0 --> 0
DEBUG:root:PID: 4424 10000 --> 100000000
DEBUG:root:PID: 4425 30000 --> 900000000
DEBUG:root:PID: 4426 20000 --> 400000000
DEBUG:root:PID: 4423 40000 --> 1600000000
DEBUG:root:PID: 4424 50000 --> 2500000000
DEBUG:root:PID: 4425 60000 --> 3600000000
DEBUG:root:PID: 4423 70000 --> 4900000000
DEBUG:root:PID: 4426 90000 --> 8100000000
DEBUG:root:PID: 4425 80000 --> 6400000000


## Starmap

In [107]:
def double(input, power):
    val = input ** power
    if input % 10000 == 0:
        logger.debug(f'PID: {os.getpid()} {input} --> {val}')
        
size = 100000
numbers = [(x, 2) for x in range(0, size)]
with Pool(multiprocessing.cpu_count()) as pool:
    pool.starmap(double, numbers)
    pool.close()
    pool.join()

DEBUG:root:PID: 4457 0 --> 0
DEBUG:root:PID: 4457 40000 --> 1600000000
DEBUG:root:PID: 4457 50000 --> 2500000000
DEBUG:root:PID: 4457 60000 --> 3600000000
DEBUG:root:PID: 4457 70000 --> 4900000000
DEBUG:root:PID: 4457 80000 --> 6400000000
DEBUG:root:PID: 4460 20000 --> 400000000
DEBUG:root:PID: 4458 10000 --> 100000000
DEBUG:root:PID: 4459 30000 --> 900000000
DEBUG:root:PID: 4459 90000 --> 8100000000


## Apply_async
* We call sum_between function and wait for the results to be handled in our gather callback
* Note, because we're running asynchronously, we need to wait for the processes in the pool to complete
* If we allow the context manager to exit, it's possible it will leave without some of the work having finished

In [37]:
def sum_between(numbers, start, end):
    total = sum([i for i in range(numbers) if i >= start and i <= end])
    if numbers % 10 == 0:
        logger.debug(f'PID: {os.getpid()} {numbers} {start} {end} = {total}')
    return total

results = []
def gather(result):
    results.append(result)
    
with Pool(multiprocessing.cpu_count()) as pool:
    size = 1000
    start = random.randint(1, size)
    end   = random.randint(start, size)
    for x in range(size): pool.apply_async(sum_between, args=(x, start, end), callback=gather)
    pool.close()
    pool.join()

DEBUG:root:PID: 5937 0 288 475 = 0
DEBUG:root:PID: 5939 100 288 475 = 0
DEBUG:root:PID: 5938 10 288 475 = 0
DEBUG:root:PID: 5938 20 288 475 = 0
DEBUG:root:PID: 5939 30 288 475 = 0
DEBUG:root:PID: 5939 50 288 475 = 0
DEBUG:root:PID: 5940 40 288 475 = 0
DEBUG:root:PID: 5938 60 288 475 = 0
DEBUG:root:PID: 5939 70 288 475 = 0
DEBUG:root:PID: 5940 80 288 475 = 0
DEBUG:root:PID: 5938 90 288 475 = 0
DEBUG:root:PID: 5940 110 288 475 = 0
DEBUG:root:PID: 5939 120 288 475 = 0
DEBUG:root:PID: 5939 130 288 475 = 0
DEBUG:root:PID: 5937 140 288 475 = 0
DEBUG:root:PID: 5939 150 288 475 = 0
DEBUG:root:PID: 5939 160 288 475 = 0
DEBUG:root:PID: 5937 170 288 475 = 0
DEBUG:root:PID: 5939 180 288 475 = 0
DEBUG:root:PID: 5938 190 288 475 = 0
DEBUG:root:PID: 5940 200 288 475 = 0
DEBUG:root:PID: 5939 220 288 475 = 0
DEBUG:root:PID: 5939 230 288 475 = 0
DEBUG:root:PID: 5938 210 288 475 = 0
DEBUG:root:PID: 5940 240 288 475 = 0
DEBUG:root:PID: 5940 260 288 475 = 0
DEBUG:root:PID: 5939 270 288 475 = 0
DEBUG:root:P

# Synchronizing Access

## Barrier
* We create 3 processes that need to wait for all three to complete
* When the third reaches the barrier, we continue

In [36]:
barrier = Barrier(3)

def work():
    pid = os.getpid()
    logger.debug(f'PID: {pid} - Waiting')
    barrier.wait()
    logger.debug(f'PID: {pid} - Continuing')
  
def startup(p):
    logger.debug(f'MAIN - Starting Worker')
    p.start()
    time.sleep(2)
    
def cleanup(p):
    logger.debug(f'MAIN - Closing Worker')
    p.join()

processes = [Process(target=work, args=()) for p in range(3)]
for p in processes: startup(p) 
for p in processes: cleanup(p)

DEBUG:root:MAIN - Starting Worker
DEBUG:root:PID: 5934 - Waiting
DEBUG:root:MAIN - Starting Worker
DEBUG:root:PID: 5935 - Waiting
DEBUG:root:MAIN - Starting Worker
DEBUG:root:PID: 5936 - Waiting
DEBUG:root:PID: 5934 - Continuing
DEBUG:root:PID: 5935 - Continuing
DEBUG:root:PID: 5936 - Continuing
DEBUG:root:MAIN - Closing Worker
DEBUG:root:MAIN - Closing Worker
DEBUG:root:MAIN - Closing Worker


# Managers
* A manager allows the coordinates state across processes
* In this example we have a tracking dict
* We would like each process to update the job / pid to tracking

In [15]:
manager = multiprocessing.Manager()
shared = manager.Namespace()
shared.tracker = manager.dict()

In [17]:
def work(job_number, state):
    pid = os.getpid()
    state.tracker[job_number] = pid

jobs = [Process(target=work, args=(i, shared)) for i in range(5)]

print(f'Job Worked By: {shared.tracker}')

for job in jobs:
    job.start()
    
for job in jobs:
    job.join()

print(f'Job Worked By: {shared.tracker}')

Job Worked By: {}
Job Worked By: {0: 5085, 1: 5086, 2: 5087, 3: 5088, 4: 5089}
