# multiprocessing


note: use `multiprocess` for better compatibility with jupyter notebook. works the same as `multiprocessing` but with some minor additional features.

each worker process dynamically gets new tasks from a queue and the main process waits for all workers to finish their tasks.

chunks may be executed by one process only. it depends on who is fastest. pick a reasonable chunk size such that there is good load balancing and little process creation/destruction overhead in comparison to the actual work.

pool functions:

- `map` - apply function to each element of a list. block until _all_ tasks are done.
- `starmap` - same but with multiple arguments.
- `imap` - same but works as a generator. block until _each_ task is done.
- ...

In [44]:
%pip install multiprocess

Note: you may need to restart the kernel to use updated packages.


In [45]:
import os
import multiprocess as mp

inputs = list(range(8))
num_cores = 4
print('inputs:', inputs)
print('num_cores:', num_cores)

def square(n):
	print(f'pid {os.getpid()} gets task n={n}\n')
	result = n*n
	return result

pool = mp.Pool(processes = num_cores)
results = pool.map(square, inputs)
pool.close()
pool.join()
print('outputs:', results)

inputs: [0, 1, 2, 3, 4, 5, 6, 7]
num_cores: 4
pid 53322 gets task n=0
pid 53325 gets task n=3
pid 53324 gets task n=2
pid 53323 gets task n=1




pid 53324 gets task n=4
pid 53323 gets task n=5
pid 53325 gets task n=6

pid 53322 gets task n=7



outputs: [0, 1, 4, 9, 16, 25, 36, 49]


In [46]:
import os
import multiprocess as mp

inputs = list(range(8))
num_cores = 4
print('inputs:', inputs)
print('num_cores:', num_cores)

def square(n):
	print(f'pid {os.getpid()} gets task n={n}\n')
	result = n*n
	return result

pool = mp.Pool(processes = num_cores)

task_handles = []
for i in range(4):
	print("spawn task: n=", i+1)
	t = pool.apply_async(square, (i+1,))
	task_handles.append(t)

pool.close() # wait for all tasks to finish
results = [t.get() for t in task_handles]
pool.join()
print('outputs:', results)

inputs: [0, 1, 2, 3, 4, 5, 6, 7]
num_cores: 4
pid 53326 gets task n=1
pid 53328 gets task n=3
pid 53327 gets task n=2
pid 53329 gets task n=4




spawn task: n= 1
spawn task: n= 2
spawn task: n= 3
spawn task: n= 4
outputs: [1, 4, 9, 16]


# inter-process communication

In [86]:
import multiprocess as mp

def concurrent_update(n, r):
	globsum = mp.Value('i', 0) # shared memory
	lock = mp.Lock()

	def worker_function(n, r, global_val, global_lock):
		for i in range(r):
			inc = sum([1] * n)

			global_lock.acquire() # mutex lock
			global_val.value += inc
			global_lock.release()

	c0 = mp.Process(target=worker_function, args=(n, r, globsum, lock))
	c1 = mp.Process(target=worker_function, args=(n, r, globsum, lock))
	c0.start()
	c1.start()
	c0.join()
	c1.join()
	return globsum.value

n = 10
r = 10_000

print("expected:", n*r*2)
print("observed:", concurrent_update(n, r))

expected: 200000
observed: 200000


In [87]:
import multiprocess as mp
import numpy as np
import time

def write_ordered_function(n, cid, fh, turnid, nchilds):
    # do work
    inc = sum([1] * n)
    out_str = f"child {cid} sum {inc}\n"
    
	# wait for turn
    while True:
        with turnid.get_lock():
            if turnid.value == cid:
                break
        time.sleep(10/1e3)
	
    # write to file (one at a time - to prove we are in order)
    for i in range(len(out_str)):
        fh.write(out_str[i])
        fh.flush()
        time.sleep(np.random.randint(10, 100)/1e3)

    # update turn
    with turnid.get_lock():
        turnid.value = (turnid.value + 1) % nchilds

n = 10
nchilds = 10
childs = []

turnid = mp.Value('i', 0, lock=True)
fh = open("out.txt", "w")
for i in range(nchilds):
    ch = mp.Process(target=write_ordered_function, args=(n, i, fh, turnid, nchilds))
    childs.append(ch)
    ch.start()

for ch in childs:
    ch.join()
fh.close()
os.system("cat out.txt")
os.system("rm -rf out.txt")

child 0 sum 10
child 1 sum 10
child 2 sum 10
child 3 sum 10
child 4 sum 10
child 5 sum 10
child 6 sum 10
child 7 sum 10
child 8 sum 10
child 9 sum 10


0

_queue types_

- `Queue` - regular queue
- `JoinableQueue` - queue with `task_done` method. can block until all tasks are done.
- `Pipe` - same as `Queue` but with direct connection between two processes. same as linux pipe.

In [88]:
import multiprocess as mp


def worker_function(taskq, resq):
    while True:
        t = taskq.get() # use taskq.get(timeout=1) if you don't know how many tasks you have
        if t == 'EXIT':
            break
        n, r = t
        randsum = 0
        for _ in range(r):
            randsum += sum([1] * n)
        resq.put(randsum)
        taskq.task_done()

ncores = 4
n = 10
r = 10000
task_queue = mp.JoinableQueue()
result_queue = mp.Queue()

workers = []
for i in range(ncores):
    w = mp.Process(target=worker_function, args=(task_queue, result_queue))
    w.start()
    workers.append(w)

for i in range(ncores):
    task_queue.put([n ,r]) # send tasks through queue
    task_queue.join() # wait until task_done() was called for each task
for i in range(ncores):
    task_queue.put("EXIT") # exit while loop in worker
task_queue.close()

for i in range(ncores):
    workers[i].join()

globsum = 0
while not result_queue.empty():
    globsum += result_queue.get()
print("expected: ", n*r*ncores)
print("observed: ", globsum)

expected:  400000
observed:  400000
