In [4]:
import numpy as np
import pandas as pd
import multiprocessing as mp


def lin_parts(num_atoms, num_threads):
    parts = np.linspace(0, num_atoms, min(num_threads, num_atoms) + 1)
    parts = np.ceil(parts).astype(int)
    return parts


def nested_parts(num_atoms, num_threads, descend=False):
    parts = [0]
    num_threads = min(num_threads, num_atoms)
    for num in range(num_threads):
        part = 1 + 4 * (parts[-1] ** 2 + parts[-1] + num_atoms * (num_atoms + 1.) / num_threads)
        part = 0.5 * (-1 + np.sqrt(part))
        parts.append(part)
    if descend:
        # Computational decreases as index increases
        parts = np.cumsum(np.diff(parts)[::-1])
        parts = np.append(np.array([0]), parts)
    parts = np.round(parts).astype(int)
    return parts 

In [5]:
nested_parts(20, 4)

array([ 0, 10, 14, 17, 20])

In [6]:
nested_parts(20, 4, True)

array([ 0,  3,  6, 10, 20])

In [7]:
lin_parts(20, 4)

array([ 0,  5, 10, 15, 20])

# 20.3

In [8]:
def barrier_tourch(r, width=.5):
    t = dict()
    p = np.log((1 + r).cumprod(axis=0))
    for j in range(r.shape[1]):
        for i in range(r.shape[0]):
            if np.abs(p[i][j]) >= width:
                t[j] = i
                continue
    return t

In [9]:
%%time

r = np.random.normal(0, 0.01, size=(1000, 10000))
num_threads = 24
parts = lin_parts(len(r), num_threads)
print(parts)
jobs = []
for i in range(1, len(parts)):
    jobs.append(r[:, parts[i-1]:parts[i]])
pool = mp.Pool(processes=num_threads)
outputs = pool.imap_unordered(barrier_tourch, jobs)
out = []
for out_ in outputs:
    out.append(out_)
pool.close()
pool.join()

[   0   42   84  125  167  209  250  292  334  375  417  459  500  542
  584  625  667  709  750  792  834  875  917  959 1000]
CPU times: user 349 ms, sys: 69.2 ms, total: 418 ms
Wall time: 880 ms


In [10]:
%%time

r = np.random.normal(0, 0.01, size=(1000, 10000))
num_threads = 24
parts = lin_parts(len(r), num_threads)
parts = nested_parts(len(r), num_threads)
print(parts)
jobs = []
for i in range(1, len(parts)):
    jobs.append(r[:, parts[i-1]:parts[i]])
pool = mp.Pool(processes=num_threads)
outputs = pool.imap_unordered(barrier_tourch, jobs)
out = []
for out_ in outputs:
    out.append(out_)
pool.close()
pool.join()

[   0  204  288  353  408  456  500  540  577  612  645  677  707  736
  764  790  816  842  866  890  913  935  957  979 1000]
CPU times: user 413 ms, sys: 62.2 ms, total: 476 ms
Wall time: 870 ms


# 20.5

In [11]:
import time
from datetime import datetime
import sys

def expand_call(kwargs):
    func = kwargs['func']
    del kwargs['func']
    out = func(**kwargs)
    return out

def report_progress(job_idx, num_jobs, time0, task):
    msg = [float(job_idx) / num_jobs, (time.time() - time0) / 60.]
    msg.append(msg[1] * (1/msg[0] - 1))
    time_stamp = str(datetime.fromtimestamp(time.time()))
    msg_ = time_stamp + ' ' +str(round(msg[0]*100, 2))+ '% ' + task + ' done after ' + \
        str(round(msg[1], 2)) + ' minutes. Remaining ' + str(round(msg[2],2)) + ' minutes.'
    if job_idx < num_jobs:
        sys.stderr.write(msg_ + '\r')
    else:
        sys.stderr.write(msg_ + '\n')

def process_jobs(jobs, task=None, num_threads=24):
    if task is None:
        task = jobs[0]['func'].__name__
    pool = mp.Pool(processes=num_threads)
    outputs = pool.imap_unordered(expand_call, jobs)
    out = []
    time0 = time.time()
    # Execute programs here
    for i, out_ in enumerate(outputs, 1):
        out.append(out_)
        report_progress(i, len(jobs), time0, task)
    pool.close()
    pool.join()
    return out

def mp_pandas_obj(func, pd_obj, num_threads=24, mp_batches=1, lin_mols=True, descend=False, **kwargs):
    if lin_mols:
        parts = lin_parts(len(pd_obj[1]), num_threads * mp_batches)
    else:
        parts = nested_parts(len(pd_obj[1]), num_threads * mp_batches, descend)
    jobs = []
    for i in range(1, len(parts)):
        job = {pd_obj[0]: pd_obj[1][parts[i-1]: parts[i]], 'func': func}
        job.update(kwargs)
        jobs.append(job)
    if num_threads == 1:
        out = process_jobs(jobs)
    else:
        out = process_jobs(jobs, num_threads=num_threads)
        
    if isinstance(out[0], pd.Series):
        df0 = pd.Series()
    elif isinstance(out[0], pd.DataFrame):
        df0 = pd.DataFrame()
    else:
        return out
    
    for i in out:
        df0 = df0.append(i)
    df0 = df0.sort_index()
    return df0

In [13]:
%%time

r = np.random.normal(0, 0.01, size=(1000, 10000))
num_threads = 24
pd_obj =  ["r", r]
out = mp_pandas_obj(barrier_tourch, pd_obj, num_threads)

2018-06-30 10:08:03.486936 100.0% barrier_tourch done after 0.05 minutes. Remaining 0.0 minutes..


CPU times: user 376 ms, sys: 132 ms, total: 508 ms
Wall time: 3.44 s


In [14]:
%%time

r = np.random.normal(0, 0.01, size=(1000, 10000))
num_threads = 24
pd_obj =  ["r", r]
out = mp_pandas_obj(barrier_tourch, pd_obj, num_threads=1)

CPU times: user 344 ms, sys: 147 ms, total: 491 ms
Wall time: 12.6 s


2018-06-30 10:08:23.220720 100.0% barrier_tourch done after 0.2 minutes. Remaining 0.0 minutes.
