# Multiprocessing

Readings:
- https://docs.python.org/3/library/multiprocessing.html

## Basic Setup

In [1]:
import concurrent.futures
import ctypes
import io
import logging
import math
import multiprocessing
import os
import queue
import signal
import sys
import tempfile
import time
from multiprocessing import context
from multiprocessing import managers

import pandas as pd

original_stdin = sys.stdin

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('[%(processName)s]: %(message)s'))
logger.addHandler(handler)

## First taste

In [2]:
fifo = multiprocessing.Queue()


def worker(container, item):
    logger.info('parent process: %(parent_id)s', {'parent_id': os.getppid()})
    logger.info('process id: %(process_id)s', {'process_id': os.getpid()})
    container.put(item)


process = multiprocessing.Process(target=worker, args=(fifo, 'process'))
process.start()
process.join()

fork = multiprocessing.get_context('fork')
fork_process = fork.Process(target=worker, args=(fifo, 'fork'))
fork_process.start()
fork_process.join()

logger.info(fifo.qsize())

[Process-1]: parent process: 275985
[Process-1]: process id: 276004
[ForkProcess-2]: parent process: 275985
[ForkProcess-2]: process id: 276010
[MainProcess]: 2


## Process and exceptions

In [3]:
class Context:
    def __enter__(self, *args, **kwargs):
        yield None
    def __exit__(self, *args, **kwargs):
        logger.info('exit')


def work(file_object=None):
    with Context() as f:
        time.sleep(10)
    

def print_info(process):
    logger.info('Is alive?: %(alive)s', {'alive': process.is_alive()})
    try:
        logger.info('Sentinel: %(sentinel)s', {'sentinel': process.sentinel})
    except ValueError:
        logger.info('Process not started yet')
    logger.info('Exitcode: %(exitcode)s', {'exitcode': process.exitcode})
    

process = multiprocessing.Process(target=work, args=())

logger.info('== before start ==')
print_info(process)

logger.info('== start ==')
process.start()
print_info(process)

logger.info('== terminate ==')
process.terminate()
logger.info('Is alive? (before sleep): %(alive)s', {'alive': process.is_alive()})
time.sleep(1)
print_info(process)

logger.info('== close ==')
process.close()
try:
    print_info(process)
except ValueError:
    logger.info('The process is closed')

[MainProcess]: == before start ==
[MainProcess]: Is alive?: False
[MainProcess]: Process not started yet
[MainProcess]: Exitcode: None
[MainProcess]: == start ==
[MainProcess]: Is alive?: True
[MainProcess]: Sentinel: 59
[MainProcess]: Exitcode: None
[MainProcess]: == terminate ==
[MainProcess]: Is alive? (before sleep): True
[MainProcess]: Is alive?: False
[MainProcess]: Sentinel: 59
[MainProcess]: Exitcode: -15
[MainProcess]: == close ==
[MainProcess]: The process is closed


## Pipes and Queues

In [4]:
def send_register(conn, context, message):
    conn.send(message)
    context.put((time.time(), 'SEND', message))

    
def recived_msg_register(conn, context):
    context.put((time.time(), 'GET', conn.recv()))
    

def taxi(conn, context, event):
    recived_msg_register(conn, context)
    send_register(conn, context, 'Ok, going!')
    time.sleep(1)
    send_register(conn, context, 'I\'m around')
    time.sleep(1)
    send_register(conn, context, 'I arrived')
    event.set()
    conn.close()

def user(conn, context, event):
    send_register(conn, context, 'Hello, I need a taxi near the Park')
    while not event.is_set():
        if conn.poll(0.5):
            recived_msg_register(conn, context)


event = multiprocessing.Event()
taxi_context = multiprocessing.Queue()
user_context = multiprocessing.Queue()
taxi_connection, user_connection = multiprocessing.Pipe()
taxi = multiprocessing.Process(name='taxi', target=taxi, args=(taxi_connection, taxi_context, event))
user = multiprocessing.Process(name='user', target=user, args=(user_connection, user_context, event))
taxi.start()
user.start()
taxi.join()
user.join()

messages_context_data = []
while not taxi_context.empty():
    messages_context_data.append(list(taxi_context.get()))
for data in messages_context_data:
    data.extend(user_context.get())
columns = pd.MultiIndex.from_product([['Taxi POV', 'User POV'], ['Time', 'Action', 'Message']])
messages_context = pd.DataFrame(messages_context_data, columns=columns)
messages_context.head()

Unnamed: 0_level_0,Taxi POV,Taxi POV,Taxi POV,User POV,User POV,User POV
Unnamed: 0_level_1,Time,Action,Message,Time,Action,Message
0,1610905000.0,GET,"Hello, I need a taxi near the Park",1610905000.0,SEND,"Hello, I need a taxi near the Park"
1,1610905000.0,SEND,"Ok, going!",1610905000.0,GET,"Ok, going!"
2,1610905000.0,SEND,I'm around,1610905000.0,GET,I'm around
3,1610905000.0,SEND,I arrived,1610905000.0,GET,I arrived


In [5]:
def fill(content):
    content.put('water')


def draw(content):
    content.get()
    time.sleep(1)
    logger.info('task done')
    content.task_done()
    logger.info('sleep')
    time.sleep(1)


content = multiprocessing.JoinableQueue()
fill = multiprocessing.Process(target=fill, args=(content,))
fill.start()
fill.join()
draw = multiprocessing.Process(target=draw, args=(content,))
draw.start()
logger.info('call queue join')
content.join()
logger.info('queue join')
draw.join()
logger.info('process join')

[MainProcess]: call queue join
[Process-7]: task done
[MainProcess]: queue join
[Process-7]: sleep
[MainProcess]: process join


In [6]:
class dangerousClass:
    def __reduce__(self):
        return (logger.info, ('Hello!',))  # return (os.system, ('shutdown now',)) # Tested HA!


def send(conn):
    conn.send(dangerousClass())


def recv(conn):
    logger.info('recv the msg')
    conn.recv()

    
conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=send, args=(conn[0],))
p2 = multiprocessing.Process(target=recv, args=(conn[1],))
p1.start()
p1.join()
p2.start()
p2.join()

[Process-9]: recv the msg
[Process-9]: Hello!


# Synchronization primitives

In [7]:
def queue_ready():
    logger.info('The stack is ready!')


def add_to_queue(barrier, queue_instance, item):
    logger.info(item)
    queue_instance.put(item)
    if barrier.wait() == 0:
        logger.info('I\'m going to be the FO!, att: %(item)s', {'item': item})


amount_of_process = 10
queue_instance = multiprocessing.Queue()
barrier = multiprocessing.Barrier(amount_of_process, queue_ready)
processes = [multiprocessing.Process(target=add_to_queue, args=(barrier, queue_instance, process_number)) for process_number in range(amount_of_process)]
for process in processes:
    process.start()
for process in processes:
    process.join()
logger.info('FO: %(item)s', {'item': queue_instance.get()})

[Process-10]: 0
[Process-11]: 1
[Process-12]: 2
[Process-14]: 4
[Process-15]: 5
[Process-16]: 6
[Process-17]: 7
[Process-13]: 3
[Process-18]: 8
[Process-19]: 9
[Process-19]: The stack is ready!
[Process-10]: I'm going to be the FO!, att: 0
[MainProcess]: FO: 0


In [8]:
def connect(bounded_semaphore):
    with bounded_semaphore:
        logger.info('semaphore acquired')
        time.sleep(3)
        logger.info('semaphore released')


amount_of_instances = 3
bounded_semaphore = multiprocessing.BoundedSemaphore(amount_of_instances)
processes = [multiprocessing.Process(target=connect, args=(bounded_semaphore,)) for _ in range(amount_of_instances)]

for process in processes:
    process.start()

time.sleep(1)
logger.info('Try to acquire new semaphore')
bounded_semaphore.acquire()
logger.info('New semaphore acquired')

for process in processes:
    process.join()

bounded_semaphore.release()
try:
    bounded_semaphore.release()
except ValueError:
    logger.info('ValueError caught')

[Process-20]: semaphore acquired
[Process-21]: semaphore acquired
[Process-22]: semaphore acquired
[MainProcess]: Try to acquire new semaphore
[Process-20]: semaphore released
[MainProcess]: New semaphore acquired
[Process-21]: semaphore released
[Process-22]: semaphore released
[MainProcess]: ValueError caught


In [9]:
def recursive(condition, recursivity_left):
    time.sleep(0.5)
    condition.acquire()
    if recursivity_left:
        recursion = multiprocessing.Process(target=recursive, args=(condition, recursivity_left - 1))
        recursion.start()
    logger.info('waiting')
    condition.wait()
    time.sleep(0.5)
    condition.notify()
    logger.info('release')
    condition.release()


r_lock = multiprocessing.RLock()
cond_1 = multiprocessing.Condition(r_lock)
cond_2 = multiprocessing.Condition(r_lock)
recursion_1 = multiprocessing.Process(target=recursive, args=(cond_1, 3,))
recursion_1.start()
recursion_2 = multiprocessing.Process(target=recursive, args=(cond_2, 3,))
recursion_2.start()
time.sleep(3)
with cond_1:
    logger.info('notify')
    cond_1.notify()
with cond_2:
    logger.info('notify')
    cond_2.notify()
recursion_1.join()

[Process-23]: waiting
[Process-24]: waiting
[Process-23:1]: waiting
[Process-24:1]: waiting
[Process-23:1:1]: waiting
[Process-24:1:1]: waiting
[Process-23:1:1:1]: waiting
[Process-24:1:1:1]: waiting
[MainProcess]: notify
[Process-23]: release
[MainProcess]: notify
[Process-23:1]: release
[Process-24]: release
[Process-23:1:1]: release
[Process-24:1]: release
[Process-23:1:1:1]: release
[Process-24:1:1]: release
[Process-24:1:1:1]: release


In [10]:
def rhombo(condition, recursion_reached_condition, dimensions, recursivity_left):
    if not recursivity_left:
        with recursion_reached_condition:
            recursion_reached_condition.notify()
        return
    locked_print(f'||{" " * dimensions}{"⧫" * (dimensions - recursivity_left + 2): ^{dimensions}}{" " * dimensions}|| {multiprocessing.current_process().name}')
    condition.acquire()
    subprocess = multiprocessing.Process(target=rhombo, args=(condition, recursion_reached_condition, dimensions, recursivity_left - 2))
    subprocess.start()
    condition.wait()
    locked_print(f'||{" " * dimensions}{"⧫" * recursivity_left: ^{dimensions}}{" " * dimensions}|| {multiprocessing.current_process().name}')
    condition.notify()
    condition.release()
    subprocess.join()

    
print_lock = multiprocessing.Lock()
def locked_print(msg):
    with print_lock:
        time.sleep(0.1)
        print(msg)


dimension = 10
    

r_lock = multiprocessing.RLock()
lock = multiprocessing.Lock()
condition = multiprocessing.Condition(r_lock)
recursion_reached_condition = multiprocessing.Condition(lock)
rhombo_process = multiprocessing.Process(target=rhombo, args=(condition, recursion_reached_condition, dimension, dimension))
locked_print(f'OO{"=" * dimension * 3}OO')
locked_print(f'||{" " * dimension * 3}||')
rhombo_process.start()
with lock:
    recursion_reached_condition.wait()
    with r_lock:
        condition.notify()
rhombo_process.join()
locked_print(f'||{" " * dimension * 3}||')
locked_print(f'OO{"=" * dimension * 3}OO')

||                              ||
||              ⧫⧫              || Process-25
||             ⧫⧫⧫⧫             || Process-25:1
||            ⧫⧫⧫⧫⧫⧫            || Process-25:1:1
||           ⧫⧫⧫⧫⧫⧫⧫⧫           || Process-25:1:1:1
||          ⧫⧫⧫⧫⧫⧫⧫⧫⧫⧫          || Process-25:1:1:1:1
||          ⧫⧫⧫⧫⧫⧫⧫⧫⧫⧫          || Process-25
||           ⧫⧫⧫⧫⧫⧫⧫⧫           || Process-25:1
||            ⧫⧫⧫⧫⧫⧫            || Process-25:1:1
||             ⧫⧫⧫⧫             || Process-25:1:1:1
||              ⧫⧫              || Process-25:1:1:1:1
||                              ||


In [11]:
def random_print(condition, dimension, recursivity_left):
    if not recursivity_left:
        return
    condition.acquire()
    subprocess = multiprocessing.Process(target=random_print, args=(condition, dimension, recursivity_left - 1))
    subprocess.start()
    condition.wait()
    print(f'{"#" * (recursivity_left): ^{dimension}}')
    condition.release()
    subprocess.join()


r_lock = multiprocessing.RLock()
condition = multiprocessing.Condition(r_lock)
process = multiprocessing.Process(target=random_print, args=(condition, 20, 20))
process.start()
time.sleep(1)
with r_lock:
    condition.notify_all()
process.join()
logger.info('Look this like a random thing?')  # Maybe not

################### 
####################
 ################## 
 #################  
   ##############   
  ###############   
    ###########     
  ################  
    ############    
      ########      
   #############    
     ##########     
       ######       
     #########      
       #####        
        ####        
      #######       
         ##         
         #          
        ###         


[MainProcess]: Look this like a random thing?


In [12]:
WAITING_CONDITION = multiprocessing.Condition()
QUEUE_INSTANCE = multiprocessing.Queue()
QUEUE_INSTANCE.put(1)


def notify(condition):
    with condition:
        condition.notify()



def wait_for():
    logger.info('Waiting')
    is_empty = QUEUE_INSTANCE.empty()
    logger.info('Is empty? %(is_empty)s', {'is_empty': is_empty})
    time.sleep(0.5)
    subprocess = multiprocessing.Process(target=notify, args=(WAITING_CONDITION,))
    subprocess.start()
    return is_empty


def waiting(waiting_condition):
    with waiting_condition:
        waiting_condition.wait_for(wait_for)
        logger.info('Queue is empty!')


process = multiprocessing.Process(target=waiting, args=(WAITING_CONDITION,))
process.start()
time.sleep(2)
data = QUEUE_INSTANCE.get()
process.join()
logger.info(data)

[Process-27]: Waiting
[Process-27]: Is empty? False
[Process-27]: Waiting
[Process-27]: Is empty? False
[Process-27]: Waiting
[Process-27]: Is empty? False
[Process-27]: Waiting
[Process-27]: Is empty? False
[Process-27]: Waiting
[Process-27]: Is empty? True
[Process-27]: Queue is empty!
[MainProcess]: 1


In [13]:
def stay_home(rain_stopped):
    rain_status = rain_stopped.is_set()
    logger.info('Can leave home?: %(status)s', {'status': 'Yes' if rain_status else 'No'})
    if not rain_status:
        logger.info('Wait for the rain to stop')
    wait = rain_stopped.wait(2)
    if wait:
        logger.info('Leave home')
    else:
        logger.info('The wait was very long...')


logger.info('== First group ==')
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=stay_home, args=(event,)) for _ in range(3)]
for process in processes:
    process.start()
time.sleep(1)
event.set()
for process in processes:
    process.join()
    
logger.info('== Second group ==')
processes = [multiprocessing.Process(target=stay_home, args=(event,)) for _ in range(3)]
for process in processes:
    process.start()
time.sleep(1)
for process in processes:
    process.join()

logger.info('== Third group ==')
processes = [multiprocessing.Process(target=stay_home, args=(event,)) for _ in range(3)]
event.clear()
for process in processes:
    process.start()
time.sleep(1)
for process in processes:
    process.join()

[MainProcess]: == First group ==
[Process-28]: Can leave home?: No
[Process-29]: Can leave home?: No
[Process-29]: Wait for the rain to stop
[Process-28]: Wait for the rain to stop
[Process-30]: Can leave home?: No
[Process-30]: Wait for the rain to stop
[Process-30]: Leave home
[Process-28]: Leave home
[Process-29]: Leave home
[MainProcess]: == Second group ==
[Process-31]: Can leave home?: Yes
[Process-31]: Leave home
[Process-32]: Can leave home?: Yes
[Process-32]: Leave home
[Process-33]: Can leave home?: Yes
[Process-33]: Leave home
[MainProcess]: == Third group ==
[Process-34]: Can leave home?: No
[Process-34]: Wait for the rain to stop
[Process-35]: Can leave home?: No
[Process-35]: Wait for the rain to stop
[Process-36]: Can leave home?: No
[Process-36]: Wait for the rain to stop
[Process-34]: The wait was very long...
[Process-35]: The wait was very long...
[Process-36]: The wait was very long...


# Shared ctypes Objects

In [14]:
def increment(counter):
    for _ in range(100_000):
        counter.value += 1


def increment_with_lock(counter):
    for _ in range(100_000):
        with counter:  # same as `with.counter.get_lock():` https://github.com/python/cpython/blob/3.9/Lib/multiprocessing/sharedctypes.py#L180
            counter.value += 1


counter = multiprocessing.Value(ctypes.c_int, 0)
processes = [multiprocessing.Process(target=increment, args=(counter,)) for _ in range(4)]
for process in processes:
    process.start()
for process in processes:
    process.join()
logger.info(counter.value)

counter = multiprocessing.Value(ctypes.c_int, 0)
processes = [multiprocessing.Process(target=increment_with_lock, args=(counter,)) for _ in range(4)]
for process in processes:
    process.start()
for process in processes:
    process.join()
logger.info(counter.value)

[MainProcess]: 142487
[MainProcess]: 400000


In [15]:
# play around with multiprocessing.Array
array = multiprocessing.Array(ctypes.c_char, (1, 2, 3, b'\x00', 4))
logger.info(array.value)
logger.info(len(array.value))
logger.info('Index 3: %(value)s', {'value': array[3]})
logger.info('Index 4: %(value)s', {'value': array[4]})
array.value = b'\x01'
logger.info(array.value)
array.value += b'\x01\x01\x01\x01'
try:
    array.value += b'\x01'
except ValueError as exception:
    logger.info('ValueError caught: %(exception)s', {'exception': exception})
array[0] = b'\x02'
logger.info(array.value)
logger.info(array.raw)


def increment_with_lock(array):
    for _ in range(10_000):
        with array:
            array.value += b'\x01'


def increment(array):
    for _ in range(10_000):
        array.value += b'\x01'


logger.info('== test increment without lock ==')
array = multiprocessing.Array(ctypes.c_char, 40_000)
processes = [multiprocessing.Process(target=increment, args=(array,)) for _ in range(4)]
for process in processes:
    process.start()
for process in processes:
    process.join()
logger.info(len(array.value))

logger.info('== test increment with lock ==')
array = multiprocessing.Array(ctypes.c_char, 40_000)
processes = [multiprocessing.Process(target=increment_with_lock, args=(array,)) for _ in range(4)]
for process in processes:
    process.start()
for process in processes:
    process.join()
logger.info(len(array.value))

[MainProcess]: b'\x01\x02\x03'
[MainProcess]: 3
[MainProcess]: Index 3: b'\x00'
[MainProcess]: Index 4: b'\x04'
[MainProcess]: b'\x01'
[MainProcess]: ValueError caught: byte string too long
[MainProcess]: b'\x02\x01\x01\x01\x01'
[MainProcess]: b'\x02\x01\x01\x01\x01'
[MainProcess]: == test increment without lock ==
[MainProcess]: 16354
[MainProcess]: == test increment with lock ==
[MainProcess]: 40000


In [16]:
def replace_with_lock(lock, raw_array, number):
    for n in range(len(raw_array)):
        with lock:
            raw_array[n] = number


def replace_with_lock_v2(lock, raw_array, number):
    with lock:
        for n in range(len(raw_array)):
            raw_array[n] = number


def replace(raw_array, number):
    for n in range(len(raw_array)):
        raw_array[n] = number
        

logger.info('== replace without lock ==')
raw_array = multiprocessing.sharedctypes.RawArray(ctypes.c_int, 1_000_000)
processes = [multiprocessing.Process(target=replace, args=(raw_array, n)) for n in range(4)]
for process in processes:
    process.start()
for process in processes:
    process.join()
listed_array = [x for x in raw_array]
# Random distribution expected
logger.info(listed_array.count(0))
logger.info(listed_array.count(1))
logger.info(listed_array.count(2))
logger.info(listed_array.count(3))

logger.info('== replace with lock ==')
lock = multiprocessing.Lock()
raw_array = multiprocessing.sharedctypes.RawArray(ctypes.c_int, 1_000_000)
processes = [multiprocessing.Process(target=replace_with_lock, args=(lock, raw_array, n)) for n in range(4)]
for process in processes:
    process.start()
for process in processes:
    process.join()
listed_array = [x for x in raw_array]
# Random distribution expected
logger.info(listed_array.count(0))
logger.info(listed_array.count(1))
logger.info(listed_array.count(2))
logger.info(listed_array.count(3))

logger.info('== replace with lock v2 ==')
lock = multiprocessing.Lock()
raw_array = multiprocessing.sharedctypes.RawArray(ctypes.c_int, 1_000_000)
processes = [multiprocessing.Process(target=replace_with_lock_v2, args=(lock, raw_array, n)) for n in range(4)]
for process in processes:
    process.start()
for process in processes:
    process.join()
listed_array = [x for x in raw_array]
# Expected one process with the maximum value and the rest of them with 0
logger.info(listed_array.count(0))
logger.info(listed_array.count(1))
logger.info(listed_array.count(2))
logger.info(listed_array.count(3))

[MainProcess]: == replace without lock ==
[MainProcess]: 0
[MainProcess]: 0
[MainProcess]: 664867
[MainProcess]: 335133
[MainProcess]: == replace with lock ==
[MainProcess]: 0
[MainProcess]: 420338
[MainProcess]: 386528
[MainProcess]: 193134
[MainProcess]: == replace with lock v2 ==
[MainProcess]: 0
[MainProcess]: 0
[MainProcess]: 0
[MainProcess]: 1000000


# Pool

In [17]:
def heavy(n):
    logger.info('Running heavy function')
    a = 0
    for x in range(n):
        for y in range(x):
            a += x**y
    return int(math.log(a, 10))


with multiprocessing.Pool() as pool:
    result_1 = pool.apply_async(heavy, (10,))
    result_2 = pool.apply_async(heavy, (100,))
    result_3 = pool.apply_async(heavy, (1_000,))
    result_4 = pool.apply_async(heavy, (10_000,))
    logger.info(result_1.get())
    logger.info(result_2.get())
    logger.info(result_3.get())
    try:
        result_4.get(timeout=3)
    except context.TimeoutError:
        logger.info('TimeoutError caught')

[ForkPoolWorker-67]: Running heavy function
[ForkPoolWorker-68]: Running heavy function
[ForkPoolWorker-65]: Running heavy function
[MainProcess]: 7
[ForkPoolWorker-66]: Running heavy function
[MainProcess]: 195
[MainProcess]: 2993
[MainProcess]: TimeoutError caught
