In [22]:
import pandas as pd
from beakerx import *
from beakerx.object import beakerx
import numpy as np
from multiprocess import Pool, cpu_count
%load_ext line_profiler
%load_ext memory_profiler

import sys
sys.path.append('../Code/')
from utils import *

In [37]:
def parallel_apply(df, group_list, f, num_cores, print_every_n=1000):
    """
    A lightweight version of apply using the multiprocess library

    :param df: the pandas dataframe that needs to be grouped
    :param group_list: a list of variable names to group by
    :param f: the function that operates on dataframes to apply to each grouped dataframe
    :param num_cores: the number of cores to use
    :param print_every_n: a message will be printed every print_every_n groups processed by
    each core. If print_every_n = None, then no messages will be printed

    :return: a dataframe that has had the apply function done on it

    Usage:

    NUM_GROUPS = 10000
    SIZE_OF_GROUP = 4
    N = NUM_GROUPS * SIZE_OF_GROUP
    df_one_group = pd.DataFrame({'g1': np.random.randint(low = 1, high = NUM_GROUPS, size = N),
                                'data': np.random.random(N) - 0.5})

    def slow_max(d):
        ret = d.sort_values(['data'], ascending = False)
        return ret.iloc[0, :]

    par = parallel_apply(df_one_group, ['g1'], slow_max, 4).head()
    serial = df_one_group.groupby(['g1']).apply(slow_max).head()
    assert(np.all(par['data'].values == serial['data'].values))
    """
    
    # Cut up the dataframes into a list
    num_cores = int(num_cores)
    group_numbers = df.groupby(by=group_list).ngroup()
    group_cuts = group_numbers.quantile(np.linspace(0, 1, num = num_cores + 1), interpolation='nearest').values
    df['_Group'] = group_numbers

    if print_every_n != None:
        print('Total of ' + str(group_cuts[-1]) + ' groups')

    cuts = []
    for ind in range(num_cores - 1):
        cuts.append(
            df.loc[(group_numbers >= group_cuts[ind]) & (group_numbers < group_cuts[ind + 1])].groupby(group_list))
    cuts.append(df.loc[(group_numbers >= group_cuts[num_cores - 1]) & (group_numbers <= group_cuts[num_cores])].groupby(
        group_list))

    # Define functions to be passed to parallel process
    def verbose_function(dataframe):
        curr = dataframe['_Group'].values[0]
        if curr % print_every_n == 0:
            print('Group: ' + str(curr))
        return f(dataframe)

    def verbose_func_to_apply(group_by_object):
        return group_by_object.apply(verbose_function)

#     def silent_func(dataframe):
#         return f(dataframe.drop(['_Group'], axis = 1))
#         #return f(dataframe.loc[:, dataframe.columns != '_Group'])

    def silent_func_to_apply(group_by_object):
        return group_by_object.apply(f)
#         return group_by_object.apply(silent_func)

    with Pool(num_cores) as p:
        if print_every_n != None:
            parallel_results = p.map(verbose_func_to_apply, cuts)
        else:
            parallel_results = p.map(silent_func_to_apply, cuts)

    ret = pd.concat(parallel_results)
    df.safe_drop(['_Group'], inplace = True)
    
    if len(ret.shape) > 1: # The case when returning a series
        ret.safe_drop(['_Group'], inplace = True)

    return ret

In [48]:
NUM_GROUPS = 20000
SIZE_OF_GROUP = 4
N = NUM_GROUPS * SIZE_OF_GROUP
df_one_group = pd.DataFrame({'g1': np.random.randint(low = 1, high = NUM_GROUPS, size = N),
                            'data': np.random.random(N) - 0.5})

def slow_max(d):
    ret = d.sort_values(['data'], ascending = False)
    return ret.iloc[0, :]

par = parallel_apply(df_one_group, ['g1'], slow_max, 4).head()
serial = df_one_group.groupby(['g1']).apply(slow_max).head()
assert(np.all(par['data'].values == serial['data'].values))

Total of 19648 groups
Group: 0
Group: 0
Group: 5000
Group: 10000
Group: 15000
Group: 1000
Group: 6000
Group: 11000
Group: 16000
Group: 2000
Group: 7000
Group: 12000
Group: 17000
Group: 3000
Group: 8000
Group: 13000
Group: 18000
Group: 4000
Group: 9000
Group: 14000
Group: 19000


In [12]:
cpu_count()

4

In [18]:
df_one_group = df_one_group.safe_index(['g1'])

In [19]:
df_one_group.head()

In [49]:
%timeit parallel_apply(df_one_group, ['g1'], slow_max, 2, print_every_n = 5000).head()

Total of 19648 groups
Group: 0
Group: 0
Group: 10000
Group: 5000
Group: 15000
Total of 19648 groups
Group: 0
Group: 0
Group: 10000
Group: 5000
Group: 15000
Total of 19648 groups
Group: 0
Group: 0
Group: 10000
Group: 5000
Group: 15000
Total of 19648 groups
Group: 0
Group: 0
Group: 10000
Group: 5000
Group: 15000
Total of 19648 groups
Group: 0
Group: 0
Group: 10000
Group: 5000
Group: 15000
Total of 19648 groups
Group: 0
Group: 0
Group: 10000
Group: 5000
Group: 15000
Total of 19648 groups
Group: 0
Group: 0
Group: 10000
Group: 5000
Group: 15000
Total of 19648 groups
Group: 0
Group: 0
Group: 10000
Group: 5000
Group: 15000
6 s ± 192 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [53]:
%timeit parallel_apply(df_one_group, ['g1'], slow_max, 2, print_every_n = None).head()

4.14 s ± 49.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [52]:
%timeit df_one_group.groupby(['g1']).apply(slow_max).head()

10.2 s ± 464 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [27]:
%lprun -f parallel_apply parallel_apply(df_one_group, ['g1'], slow_max, 4, print_every_n = None).head()

In [28]:
%mprun -f parallel_apply parallel_apply(df_one_group, ['g1'], slow_max, 4, print_every_n = None).head()


