In [None]:
#|default_exp parallel

In [None]:
#|export
import time
from threading import Thread
from multiprocessing import pool,Process,Queue,set_start_method,get_context

from fastcore.imports import *
from fastcore.basics import *
from fastcore.foundation import *
from fastcore.meta import *
from fastcore.xtras import *
from functools import wraps

In [None]:
# |export
try:
    if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method("fork")
except: pass

In [None]:
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *

# Parallel

> Threading and multiprocessing functions

In [None]:
#|export
def threaded(f):
    "Run `f` in a thread, and returns the thread"
    @wraps(f)
    def _f(*args, **kwargs):
        res = Thread(target=f, args=args, kwargs=kwargs)
        res.start()
        return res
    return _f

In [None]:
@threaded
def _1():
    time.sleep(0.05)
    print("second")

@threaded
def _2():
    time.sleep(0.01)
    print("first")

_1()
_2()
time.sleep(0.1)

first
second


In [None]:
#|export
def startthread(f):
    "Like `threaded`, but start thread immediately"
    threaded(f)()

In [None]:
@startthread
def _():
    time.sleep(0.05)
    print("second")

@startthread
def _():
    time.sleep(0.01)
    print("first")

time.sleep(0.1)

first
second


In [None]:
#|export
def parallelable(param_name, num_workers, f=None):
    f_in_main = f == None or sys.modules[f.__module__].__name__ == "__main__"
    if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0 and f_in_main:
        print("Due to IPython and Windows limitation, python multiprocessing isn't available now.")
        print(f"So `{param_name}` has to be changed to 0 to avoid getting stuck")
        return False
    return True

In [None]:
#|export
class NoDaemonProcess(Process):
    # See https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
    @property
    def daemon(self):
        return False
    @daemon.setter
    def daemon(self, value):
        pass

In [None]:
#|export
@delegates()
class ProcessPool(pool.Pool):
    "Same as Python's `pool.Pool`, except not daemonic and can pass reuse_workers=False"
    def __init__(self, max_workers=defaults.cpus, context=None, reuse_workers=True, **kwargs):
        if max_workers is None: max_workers=defaults.cpus
        if context is None: context = get_context()
        class NoDaemonContext(type(context)): Process=NoDaemonProcess
        super().__init__(max_workers, context=NoDaemonContext(), maxtasksperchild=None if reuse_workers else 1)

In [None]:
#|export
try: from fastprogress import progress_bar
except: progress_bar = None

In [None]:
# |export
def _gen(items, pause):
    for item in items:
        time.sleep(pause)
        yield item

In [None]:
#|export
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
            method=None, chunksize=1, reuse_workers=True, **kwargs):
    "Applies `func` in parallel to `items`, using `n_workers`"
    if not method and sys.platform == 'darwin': method='fork' # Is this really a good idea?
    g = partial(f, *args, **kwargs)
    if not parallelable('n_workers', n_workers, f): n_workers=0
    if n_workers==0: return L(map(g, items))
    with ProcessPool(n_workers, context=get_context(method), reuse_workers=reuse_workers) as ex:
        _items = _gen(items, pause)
        r = ex.imap(g, _items, chunksize=chunksize)
        if progress and progress_bar:
            if total is None: total = len(items)
            r = progress_bar(r, total=len(items), leave=False)
        return L(r) # Collect the items from the iterator before we leave the pool context

In [None]:
#|export
def add_one(x, a=1):
    # this import is necessary for multiprocessing in notebook on windows
    import random
    time.sleep(random.random()/80)
    return x+a

In [None]:
inp,exp = range(50),range(1,51)

test_eq(parallel(add_one, inp, n_workers=2, progress=True, pause=0.01), exp)
test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))
test_eq(parallel(add_one, inp, n_workers=0), exp)
test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52))

Use the `pause` parameter to ensure a pause of `pause` seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set `threadpool=True` to use `ThreadPoolExecutor` instead of `ProcessPoolExecutor`.

In [None]:
from datetime import datetime

In [None]:
def print_time(_):
    time.sleep(random.random()/10)
    return datetime.now()

res = parallel(print_time, range(20), n_workers=2, pause=0.11)
test_eq(len(res), 20)
test_eq(res, sorted(res)) # Confirm that task end time increases monotonically

In [None]:
#|hide
def die_sometimes(x):
#     if 3<x<6: raise Exception(f"exc: {x}")
    return x*2

parallel(die_sometimes, range(8))

(#8) [0,2,4,6,8,10,12,14]

In [None]:
#|export
def run_procs(f, f_done, args):
    "Call `f` for each item in `args` in parallel, yielding `f_done`"
    processes = L(args).map(Process, args=arg0, target=f)
    for o in processes: o.start()
    yield from f_done()
    processes.map(Self.join())

In [None]:
#|export
def _f_pg(obj, queue, batch, start_idx):
    for i,b in enumerate(obj(batch)): queue.put((start_idx+i,b))

def _done_pg(queue, items): return (queue.get() for _ in items)

In [None]:
#|export 
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
    "Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
    if not parallelable('n_workers', n_workers): n_workers = 0
    if n_workers==0:
        yield from enumerate(list(cls(**kwargs)(items)))
        return
    batches = L(chunked(items, n_chunks=n_workers))
    idx = L(itertools.accumulate(0 + batches.map(len)))
    queue = Queue()
    if progress_bar: items = progress_bar(items, leave=False)
    f=partial(_f_pg, cls(**kwargs), queue)
    done=partial(_done_pg, queue, items)
    yield from run_procs(f, done, L(batches,idx).zip())

In [None]:
# class _C:
#     def __call__(self, o): return ((i+1) for i in o)

# items = range(5)

# res = L(parallel_gen(_C, items, n_workers=0))
# idxs,dat1 = zip(*res.sorted(itemgetter(0)))
# test_eq(dat1, range(1,6))

# res = L(parallel_gen(_C, items, n_workers=3))
# idxs,dat2 = zip(*res.sorted(itemgetter(0)))
# test_eq(dat2, dat1)

`cls` is any class with `__call__`. It will be passed `args` and `kwargs` when initialized. Note that `n_workers` instances of `cls` are created, one in each process. `items` are then split in `n_workers` batches and one is sent to each `cls`. The function then returns a generator of tuples of item indices and results.

In [None]:
class TestSleepyBatchFunc:
    "For testing parallel processes that run at different speeds"
    def __init__(self): self.a=1
    def __call__(self, batch):
        for k in batch:
            time.sleep(random.random()/4)
            yield k+self.a

x = np.linspace(0,0.99,20)

res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2))
test_eq(res.sorted().itemgot(1), x+1)

In [None]:
# #|hide
# from subprocess import Popen, PIPE
# # test num_workers > 0 in scripts works when python process start method is spawn
# process = Popen(["python", "parallel_test.py"], stdout=PIPE)
# _, err = process.communicate(timeout=10)
# exit_code = process.wait()
# test_eq(exit_code, 0)

# Export -

In [None]:
#|hide
import nbdev; nbdev.nbdev_export()