# Python Multiprocessing with Progress Bar and Timeout

## Setup

In [1]:
import time
import tqdm
import pebble
import multiprocessing
from tqdm import notebook
from tqdm.contrib.concurrent import process_map, thread_map
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool 
from pebble import ProcessPool
from concurrent.futures import TimeoutError

In [2]:
# fix needed for Python 3.8+ on mac
# alternative is to use the 'multiprocess' library 
multiprocessing.set_start_method('fork')

In [3]:
cpu = multiprocessing.cpu_count()

## No Multiprocessing

In [4]:
inputs = [0,1,2,3,4,5,6,7,8]

In [5]:
def function(x):
    time.sleep(x)
    return x

In [6]:
%%timeit -n1 -r1
# standard loop
results = []
for each_input in inputs:
    results.append(function(each_input))

36 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [7]:
%%timeit -n1 -r1
# list comprehension 
results = [
    function(each_input) 
    for each_input in inputs
]

36 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## Progress Bars

In [8]:
results = []
# text progress bar
for each_input in tqdm.tqdm(inputs):
    results.append(function(each_input))

100%|██████████| 9/9 [00:36<00:00,  4.01s/it]


In [9]:
results = []
# notebook progress bar
for each_input in notebook.tqdm(inputs):
    results.append(function(each_input))

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=9.0), HTML(value='')))




## Multiprocessing

In [10]:
%%timeit -n1 -r1
# map will convert entire iterable to a list before multiprocessing
# use for cpu limited
with Pool(processes=cpu) as pool:
    results = pool.map(function, inputs)

12 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [11]:
%%timeit -n1 -r1
# imap will iterate one element at a time, use if memory limited
# use for cpu limited
with Pool(processes=cpu) as pool:
    pool_results = pool.imap(function, inputs)
    results = [p for p in pool_results]

12 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [12]:
%%timeit -n1 -r1
# use apply_async (or starmap) for multiple arguments and callbacks
# submits all at once and retrieve once they are finished
# results order not guaranteed 
# use for cpu limited
with Pool(processes=cpu) as pool:
    pool_results = [
        pool.apply_async(function, args=(inputs[i],))
        for i in range(0,len(inputs))
    ]
    global results
    results = [p.get() for p in pool_results]

12 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## Multithreading

In [13]:
%%timeit -n1 -r1
# use for i/o-limited
with ThreadPool(processes=cpu) as pool:
    results = pool.map(function, inputs)

12 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [14]:
%%timeit -n1 -r1
# use for i/o-limited
with ThreadPool(processes=cpu) as pool:
    pool_results = pool.imap(function, inputs)
    results = [p for p in pool_results]

12 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [15]:
%%timeit -n1 -r1
# use for i/o-limited
with ThreadPool(processes=cpu) as pool:
    pool_results = [
        pool.apply_async(function, args=(inputs[i],))
        for i in range(0,len(inputs))
    ]
    global results
    results = [p.get() for p in pool_results]

12 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## Multiprocessing with Progress Bar

In [16]:
results = process_map(function, inputs, max_workers=cpu)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=9.0), HTML(value='')))




In [17]:
results = thread_map(function, inputs, max_workers=cpu)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=9.0), HTML(value='')))




In [18]:
# custom callback functions 
progress_bar = notebook.tqdm(total=len(inputs))
call_back = lambda _ : progress_bar.update(1)
with Pool(processes=cpu) as pool:
    pool_results = [
        pool.apply_async(function, args=(inputs[i],), callback=call_back)
        for i in range(0,len(inputs))
    ]
    results = [p.get() for p in pool_results]

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=9.0), HTML(value='')))

## Multiprocessing with Timeout

In [19]:
# pool map
progress_bar = notebook.tqdm(total=len(inputs))
with ProcessPool(max_workers=cpu) as pool:
    future = pool.map(function, inputs, timeout=5)
    iterator = future.result()
    results = []
    for i in range(0,len(inputs)):
        try:
            each_result = next(iterator)
            results.append(each_result)
        except TimeoutError as error:
            print("function took longer than %d seconds" % error.args[1])
            results.append(None)
        except StopIteration:
            break
        finally:
            progress_bar.update(1)

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=9.0), HTML(value='')))

function took longer than 5 seconds
function took longer than 5 seconds
function took longer than 5 seconds


In [20]:
# pool schedule and callback function 
def task_done(future):
    try:
        result = future.result()
    except TimeoutError as error:
        print("function took longer than %d seconds" % error.args[1])
    except Exception as error:
        print("function raised %s" % error)
        print(error.traceback)
    finally:
        progress_bar.update(1)
global progress_bar
progress_bar = notebook.tqdm(total=len(inputs))
with ProcessPool(max_workers=cpu) as pool:
    pool_results = []
    for i in range(0, len(inputs)):
        future = pool.schedule(function, args=[inputs[i]], timeout=5)
        future.add_done_callback(task_done)
        pool_results.append(future)
    global results
    results = [p.result() if not p.exception() else None for p in pool_results]

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=9.0), HTML(value='')))

function took longer than 5 seconds
function took longer than 5 seconds
function took longer than 5 seconds
