In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import string
from psutil import cpu_count

In [49]:
SIZE = 10000000
df = pd.DataFrame({
    "groups": np.random.choice(np.random.choice(range(100000)), size=SIZE),
    "normal": np.random.normal(size=SIZE),
    "exponential": np.random.exponential(size=SIZE),
    "letters": np.random.choice(list(string.ascii_uppercase), size=SIZE)
})
df.head()

Unnamed: 0,groups,normal,exponential,letters
0,40784,-0.236014,0.378062,J
1,38084,0.345262,0.490875,L
2,19279,-0.056556,0.817486,Y
3,38445,0.178465,0.684911,D
4,36573,0.424478,2.465018,R


In [3]:
def numeric_func(x):
    return x["normal"].mean() / x["exponential"].var()

In [4]:
def str_func(x):
    return "-".join(x["letters"]).lower()

In [54]:
%time df.groupby("groups").apply(numeric_func)

CPU times: user 13.8 s, sys: 0 ns, total: 13.8 s
Wall time: 13.8 s


groups
0        0.039295
1       -0.008579
2        0.091604
3       -0.084005
4       -0.070328
           ...   
42009    0.121669
42010   -0.070045
42011   -0.029437
42012    0.101081
42013    0.078654
Length: 42014, dtype: float64

In [55]:
%time df.groupby("groups").apply(str_func)

CPU times: user 5.79 s, sys: 0 ns, total: 5.79 s
Wall time: 5.78 s


groups
0        p-i-k-i-d-q-g-t-s-v-y-d-b-o-h-x-i-k-f-z-g-e-a-...
1        v-v-j-z-e-r-e-j-r-h-e-j-r-q-b-x-r-z-a-m-w-d-l-...
2        e-n-a-o-r-n-g-l-c-b-i-w-a-j-k-i-y-i-u-l-g-n-u-...
3        v-o-i-n-m-c-d-i-m-a-j-x-r-k-z-l-p-g-i-s-l-y-a-...
4        b-d-t-o-r-c-u-r-r-o-z-r-a-b-r-w-p-w-e-h-s-d-v-...
                               ...                        
42009    p-p-i-u-u-v-d-l-k-m-h-q-n-u-v-x-t-x-d-m-u-f-i-...
42010    d-j-i-a-y-j-r-f-c-l-s-x-p-h-u-b-d-d-k-v-n-s-o-...
42011    d-w-d-j-g-s-u-y-n-i-v-f-t-u-z-b-s-y-a-z-k-k-f-...
42012    y-g-u-r-e-r-r-b-s-i-p-i-u-k-i-a-q-n-o-a-t-h-j-...
42013    s-y-f-l-v-b-k-w-u-h-v-x-t-y-k-v-q-a-l-h-p-a-e-...
Length: 42014, dtype: object

In [56]:
!pip install -U ray

Collecting ray
  Downloading ray-1.13.0-cp39-cp39-manylinux2014_x86_64.whl (54.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.3/54.3 MB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hCollecting grpcio<=1.43.0,>=1.28.1
  Downloading grpcio-1.43.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.1/4.1 MB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting filelock
  Downloading filelock-3.7.1-py3-none-any.whl (10 kB)
Collecting frozenlist
  Downloading frozenlist-1.3.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (156 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m156.2/156.2 KB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
Collecting aiosignal
  Downloading aiosignal-1.2.0-py3-none-any.whl (8.2 kB)
Collecting protobuf<4.0.0,>=3.15.3
  Downloading protobuf-3.20.1-cp39-cp39

In [59]:
import ray
from typing import List
def parallel_grpby_apply(df: pd.DataFrame, grpby_cols: List[str], threads: int = cpu_count() * 2) -> pd.DataFrame:
    """
    A parallelized version of _create_sales_ewm_feats_day,
    uses Ray on the dataframe splitted into N chunks, N = the number of physical cores.
    The split is made considering item_id range splitted linearly over chunks - which guarantees the
    equal amount of items in each chunk, but not the equal amount of rows(a room for improvement)
    :param df: dataframe to work with
    :param gpby_cols: columns names to group by before calculations
    :param target_col: column name to work with
    :param alpha: coefficient of smoothing of expon. weighted mean
    :param shift: base shift backwards, equals the maximum not-rolling prediction depth
    :param lags: lags on top of the shift
    :param day: a specific day for which we calculate lags
    :return: dataframe with added features
    :param threads: num of processes to use
    """
    splits = np.array_split(df[grpby_cols[0]].unique(), threads)
    chunks = [df.loc[df[grpby_cols[0]].isin(splits[x])] for x in range(threads)]

    chunk_id = [ray.put(ch) for ch in chunks]
    dtf = pd.concat(ray.get([my_func.remote(chunk_id[i], grpby_cols)
                   for i in range(threads)]))

    # the split messes everything up, so we need to resort it unfortunately
    dtf.sort_values(by=grpby_cols, axis=0, inplace=True)
    return dtf


@ray.remote
def my_func(df: pd.DataFrame, gpby_cols: List[str]) -> pd.DataFrame:
    """
    Creating sales exponentially weighted mean features for one day
    :param df: dataframe to work with
    :param gpby_cols: columns names to group by before calculations
    :param target_col: column name to work with
    :param alpha: coefficient of smoothing of expon. weighted mean
    :param shift: base shift backwards, equals the maximum not-rolling prediction depth
    :param lags: lags on top of the shift
    :param day: a specific day for which we calculate lags
    :return: dataframe with added features
    """
    # Ray makes data immutable when stored in its memory.
    # This approach prevents state sharing among processes, but we have a separate chunk for each process
    # to get rid of copying data, we make it mutable in-place again by this hack
    for d in range(len(df._data.blocks)):
        try:
            df._data.blocks[d].values.flags.writeable = True
        except Exception:
            pass

    return numeric_func(df)

In [60]:
%time parallel_grpby_apply(df,  grpby_cols=["groups"])

TypeError: cannot concatenate object of type '<class 'numpy.float64'>'; only Series and DataFrame objs are valid

In [50]:
from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    df = pd.DataFrame(ret_list, index=dfGrouped.groups)
    df.index.rename("groups", inplace=True)
    return df

In [52]:
%time applyParallel(df.groupby("groups"), numeric_func)

CPU times: user 10.4 s, sys: 3.39 s, total: 13.7 s
Wall time: 16 s


Unnamed: 0_level_0,0
groups,Unnamed: 1_level_1
0,0.039295
1,-0.008579
2,0.091604
3,-0.084005
4,-0.070328
...,...
42009,0.121669
42010,-0.070045
42011,-0.029437
42012,0.101081


In [53]:
%time applyParallel(df.groupby("groups"), str_func)

CPU times: user 8.77 s, sys: 1.41 s, total: 10.2 s
Wall time: 11.5 s


Unnamed: 0_level_0,0
groups,Unnamed: 1_level_1
0,p-i-k-i-d-q-g-t-s-v-y-d-b-o-h-x-i-k-f-z-g-e-a-...
1,v-v-j-z-e-r-e-j-r-h-e-j-r-q-b-x-r-z-a-m-w-d-l-...
2,e-n-a-o-r-n-g-l-c-b-i-w-a-j-k-i-y-i-u-l-g-n-u-...
3,v-o-i-n-m-c-d-i-m-a-j-x-r-k-z-l-p-g-i-s-l-y-a-...
4,b-d-t-o-r-c-u-r-r-o-z-r-a-b-r-w-p-w-e-h-s-d-v-...
...,...
42009,p-p-i-u-u-v-d-l-k-m-h-q-n-u-v-x-t-x-d-m-u-f-i-...
42010,d-j-i-a-y-j-r-f-c-l-s-x-p-h-u-b-d-d-k-v-n-s-o-...
42011,d-w-d-j-g-s-u-y-n-i-v-f-t-u-z-b-s-y-a-z-k-k-f-...
42012,y-g-u-r-e-r-r-b-s-i-p-i-u-k-i-a-q-n-o-a-t-h-j-...


In [10]:
import dask.dataframe as dd

In [12]:
ddf = dd.from_pandas(df, npartitions=2*cpu_count())

In [17]:
%time ddf2 = ddf.groupby("groups").apply(numeric_func)

CPU times: user 18.3 ms, sys: 106 µs, total: 18.4 ms
Wall time: 17.5 ms


  return x["normal"].mean() / x["exponential"].var()
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


In [19]:
%time ddf2.compute()

CPU times: user 22.9 s, sys: 16.3 s, total: 39.2 s
Wall time: 35.7 s


groups
33      -0.029004
78      -0.103418
85      -0.066543
88       0.024882
98       0.014877
           ...   
14050   -0.051895
14139   -0.067025
14215   -0.010648
14229   -0.025734
14237    0.027759
Length: 14240, dtype: float64

In [21]:
%time dd.from_pandas(df.set_index("groups").sort_index(), npartitions=2*cpu_count()).groupby("groups").apply(numeric_func).compute()

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


CPU times: user 13.8 s, sys: 8.21 s, total: 22 s
Wall time: 22.8 s


groups
0        0.010586
1       -0.026953
2        0.041916
3       -0.008480
4        0.051525
           ...   
14235    0.023825
14236    0.068291
14237    0.027759
14238   -0.047525
14239   -0.014852
Length: 14240, dtype: float64

In [42]:
%time dd.from_pandas(df.set_index("groups").sort_index(), npartitions=2*cpu_count()).groupby("groups").apply(numeric_func).compute()

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


CPU times: user 53.1 s, sys: 18 s, total: 1min 11s
Wall time: 31 s


groups
0        0.010586
1       -0.026953
2        0.041916
3       -0.008480
4        0.051525
           ...   
14235    0.023825
14236    0.068291
14237    0.027759
14238   -0.047525
14239   -0.014852
Length: 14240, dtype: float64

In [43]:
%time dd.from_pandas(df, npartitions=2*cpu_count()).set_index("groups").groupby("groups").apply(numeric_func).compute()

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


CPU times: user 1min 18s, sys: 14.8 s, total: 1min 33s
Wall time: 51.3 s


groups
0        0.010586
1       -0.026953
2        0.041916
3       -0.008480
4        0.051525
           ...   
14235    0.023825
14236    0.068291
14237    0.027759
14238   -0.047525
14239   -0.014852
Length: 14240, dtype: float64

In [None]:
dd.from_pandas(df).groupby()

In [40]:
def dask_map_index(df, func):
    return dd.from_pandas(df, npartitions=2*cpu_count()).set_index("groups").groupby("groups", apply(numeric_func).compute()

In [41]:
%time dask_map_index(df, numeric_func)


KeyboardInterrupt

