In [1]:
from io import BytesIO
from urllib.request import urlopen
from zipfile import ZipFile

import math
import time
import pytest
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask import compute, delayed
import dask.threaded
import joblib

from sklearn.utils.testing import assert_array_equal
from sklearn.preprocessing import FunctionTransformer
from sklearn.compose import ColumnTransformer

def read_data(file):
    '''
    adhoc function to read data
    '''
    data = file.readlines()
    rows = [row.decode('utf-8').strip().split('  ') for row in data]
    X = pd.DataFrame(rows, dtype=np.float)
    y = X.pop(0)
    return X, y

In [2]:
# For simplicity, the classification labels are used as regression targets for testing
url = 'http://www.timeseriesclassification.com/Downloads/GunPoint.zip'
url = urlopen(url)
zipfile = ZipFile(BytesIO(url.read()))

train_file = zipfile.open('GunPoint_TRAIN.txt')
X_train_pd, y_train_pd = read_data(train_file)

test_file = zipfile.open('GunPoint_TEST.txt')
X_test_pd, y_test_pd = read_data(test_file)
Xsf_test = pd.Series([row for _, row in X_test_pd.iterrows()])
Xdf_test = pd.DataFrame({'ts': Xsf_test, 'ts_copy': Xsf_test})

y_train = pd.Series(np.array(y_train_pd, dtype=np.int))
Xsf_train = pd.Series([row for _, row in X_train_pd.iterrows()])
Xdf_train = pd.DataFrame({'ts': Xsf_train, 'ts_copy': Xsf_train})

In [3]:
Xsf_loong = pd.concat([Xsf_train for _ in range(200)], ignore_index=True)

In [4]:
# using pandas inbuilt function
%timeit Xsf_loong.apply(np.mean)
# I think we cannot use pandas groupby for inbuilt parallelism
# as mutable types (our pd.Series elements in each cell) cannot be
# hashed, which apparently is a requirement
target = Xsf_loong.apply(np.mean)  # for comparison

946 ms ± 18.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [5]:
# using np.apply_along_axis
%timeit pd.Series(np.apply_along_axis(np.mean, 1, np.array(Xsf_loong.to_list())))
# numpy is a little faster

895 ms ± 24.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
# using explicit for loop
function = lambda X: pd.DataFrame([np.mean(row) for row in X])
%timeit function(Xsf_loong)
# looks like pandas is currently using explicit for loop internally

1.03 s ± 28.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [7]:
# using joblib for parallel processing after splitting the dataframe
with joblib.Parallel(n_jobs=-1) as parallel:
    function = lambda Z: pd.concat(parallel(joblib.delayed(lambda X: X.apply(np.mean))(part) for part in np.array_split(Z, 2)))
    %timeit function(Xsf_loong)
# dataframe was split into two and processed in parallel
# the speed-up can easily be seen
got = function(Xsf_loong)  # for comparison
assert_array_equal(got, target)  # no difference in the final result

568 ms ± 73.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
# attempt nesting parallel operations (performing the same thing 4 times)
# using joblib for parallel processing after splitting the dataframe
def par_func(dummy):
    with joblib.Parallel(n_jobs=-1) as parallel:
        function = lambda Z: pd.concat(parallel(joblib.delayed(lambda X: X.apply(np.mean))(part) for part in np.array_split(Z, 2)))
        function(Xsf_loong)

with joblib.Parallel(n_jobs=-1) as parallel:
    %timeit parallel(joblib.delayed(par_func)(i) for i in range(4))
# note that the time is for performing the previous cell operation 4 times

2.29 s ± 46.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
# using Dask (a pandas replacement, with inherent parallel processing)
# it basically splits the data frame for parallel processing
# but is more well managed and scales to clusters
# can also work with big huge datasets without loading everything
# into the RAM
Dsf_train = dd.from_pandas(Xsf_train, npartitions=3)
# should specify output datatype of the function
%timeit Dsf_train.apply(np.mean, meta=float)
# This is byfar the easiest and quickest option
# But, this is not a drop-in replacement for pandas
# Please see dask-ml, which has a sklearn clone with dask compatibility
# the time shown is only for graph making not actual computation

717 µs ± 32.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
