In [1]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import string
from psutil import cpu_count

In [2]:
SIZE = 10000000
df = pd.DataFrame({
    "groups": np.random.choice(np.random.choice(range(100000)), size=SIZE),
    "normal": np.random.normal(size=SIZE),
    "exponential": np.random.exponential(size=SIZE),
    "letters": np.random.choice(list(string.ascii_uppercase), size=SIZE)
})
df.head()

Unnamed: 0,groups,normal,exponential,letters
0,65801,-0.316303,2.019591,F
1,34036,-0.254399,0.165559,M
2,21164,-1.311322,2.019238,A
3,30952,0.211413,0.663089,Z
4,21440,-0.861639,1.28842,O


In [3]:
def numeric_func(x):
    return x["normal"].mean() / x["exponential"].var()

In [4]:
def str_func(x):
    return "-".join(x["letters"]).lower()

In [5]:
%time df.groupby("groups").apply(numeric_func)

CPU times: user 13.5 s, sys: 164 ms, total: 13.6 s
Wall time: 13.6 s


groups
0       -0.208227
1        0.016416
2        0.049437
3       -0.039399
4        0.067989
           ...   
46661    0.052638
46662    0.074521
46663   -0.150120
46664    0.039322
46665   -0.022067
Length: 46666, dtype: float64

In [6]:
%time df.groupby("groups").apply(str_func)

CPU times: user 5.17 s, sys: 170 ms, total: 5.34 s
Wall time: 5.33 s


groups
0        t-e-k-n-y-l-n-h-q-w-p-m-n-j-a-c-u-y-p-d-p-p-u-...
1        r-w-j-j-s-u-x-s-c-v-i-c-t-u-s-j-c-f-o-c-a-m-z-...
2        x-u-h-a-k-k-c-w-s-k-j-a-h-o-r-p-b-y-v-s-y-p-x-...
3        j-f-r-j-d-y-y-n-p-g-y-v-g-g-a-k-r-t-c-d-g-b-s-...
4        q-y-r-f-l-s-l-l-e-q-h-b-k-g-g-z-r-d-h-i-n-g-l-...
                               ...                        
46661    m-a-b-g-z-h-r-e-v-u-l-c-l-l-i-o-v-s-c-t-q-k-o-...
46662    m-m-k-t-c-j-o-t-w-q-x-u-c-i-l-b-k-m-l-a-x-s-i-...
46663    z-i-l-a-e-u-u-a-r-p-i-p-q-r-m-v-w-q-f-i-p-e-i-...
46664    b-g-z-u-v-c-o-f-v-y-e-g-y-z-k-g-i-p-r-q-h-t-v-...
46665    g-e-q-w-z-c-m-t-r-c-d-r-k-e-c-n-u-n-i-g-y-o-i-...
Length: 46666, dtype: object

In [5]:
!pip install -U ray

Collecting ray
  Downloading ray-1.11.0-cp39-cp39-manylinux2014_x86_64.whl (52.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.4/52.4 MB[0m [31m31.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting grpcio<=1.43.0,>=1.28.1
  Downloading grpcio-1.43.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.1/4.1 MB[0m [31m30.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting protobuf>=3.15.3
  Downloading protobuf-3.20.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m34.3 MB/s[0m eta [36m0:00:00[0m
Collecting redis>=3.5.0
  Downloading redis-4.2.2-py3-none-any.whl (226 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m226.4/226.4 KB[0m [31m29.7 MB/s[0m eta [36m0:00:00[0m
Collecting filelock
  Downloading filelock-3.6.0-py3-none-any.whl (10.0 kB

In [9]:
import ray
from typing import List
def parallel_grpby_apply(df: pd.DataFrame, grpby_cols: List[str], threads: int = cpu_count() * 2) -> pd.DataFrame:
    """
    A parallelized version of _create_sales_ewm_feats_day,
    uses Ray on the dataframe splitted into N chunks, N = the number of physical cores.
    The split is made considering item_id range splitted linearly over chunks - which guarantees the
    equal amount of items in each chunk, but not the equal amount of rows(a room for improvement)
    :param df: dataframe to work with
    :param gpby_cols: columns names to group by before calculations
    :param target_col: column name to work with
    :param alpha: coefficient of smoothing of expon. weighted mean
    :param shift: base shift backwards, equals the maximum not-rolling prediction depth
    :param lags: lags on top of the shift
    :param day: a specific day for which we calculate lags
    :return: dataframe with added features
    :param threads: num of processes to use
    """
    splits = np.array_split(df[grpby_cols[0]].unique(), threads)
    chunks = [df.loc[df[grpby_cols[0]].isin(splits[x])] for x in range(threads)]

    chunk_id = [ray.put(ch) for ch in chunks]
    dtf = pd.concat(ray.get([my_func.remote(chunk_id[i], gpby_cols)
                   for i in range(threads)]))

    # the split messes everything up, so we need to resort it unfortunately
    dtf.sort_values(by=grpby_cols, axis=0, inplace=True)
    return dtf


@ray.remote
def my_func(df: pd.DataFrame, gpby_cols: List[str]) -> pd.DataFrame:
    """
    Creating sales exponentially weighted mean features for one day
    :param df: dataframe to work with
    :param gpby_cols: columns names to group by before calculations
    :param target_col: column name to work with
    :param alpha: coefficient of smoothing of expon. weighted mean
    :param shift: base shift backwards, equals the maximum not-rolling prediction depth
    :param lags: lags on top of the shift
    :param day: a specific day for which we calculate lags
    :return: dataframe with added features
    """
    # Ray makes data immutable when stored in its memory.
    # This approach prevents state sharing among processes, but we have a separate chunk for each process
    # to get rid of copying data, we make it mutable in-place again by this hack
    for d in range(len(df._data.blocks)):
        try:
            df._data.blocks[d].values.flags.writeable = True
        except Exception:
            pass

    return numeric_func(df)

In [10]:
parallel_grpby_apply(df,  grpby_cols=["groups"])



Exception: The current node has not been updated within 30 seconds, this could happen because of some of the Ray processes failed to startup.

In [5]:
from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pd.DataFrame(ret_list, index=dfGrouped.groups)

In [6]:
%time applyParallel(df.groupby("groups"), numeric_func)

CPU times: user 8.86 s, sys: 589 ms, total: 9.45 s
Wall time: 10.6 s


Unnamed: 0,0
0,0.055296
1,0.000427
2,-0.055434
3,-0.026073
4,0.050453
...,...
69191,-0.015054
69192,-0.055151
69193,-0.012558
69194,-0.021996


In [7]:
%time applyParallel(df.groupby("groups"), str_func)

CPU times: user 8.94 s, sys: 734 ms, total: 9.67 s
Wall time: 10.7 s


Unnamed: 0,0
0,m-l-q-r-f-h-b-m-l-e-h-c-a-x-d-l-r-m-m-y-j-a-l-...
1,u-a-h-x-f-w-u-b-b-k-b-f-x-d-m-o-i-i-c-b-q-d-i-...
2,z-l-f-o-b-f-o-h-g-g-k-p-s-o-z-g-m-t-i-h-c-v-a-...
3,j-q-d-m-q-z-p-n-t-r-h-h-o-u-q-v-t-h-h-g-r-z-q-...
4,j-v-o-i-z-n-h-d-c-o-z-b-i-s-h-b-z-i-q-x-q-w-i-...
...,...
69191,r-l-i-h-b-a-g-x-s-c-n-c-o-y-x-h-i-d-i-l-w-l-a-...
69192,n-e-r-z-y-f-h-p-t-w-c-e-h-r-a-p-g-l-y-a-x-i-s-...
69193,m-u-n-u-h-s-y-g-y-h-q-o-y-y-u-b-x-j-y-d-l-g-c-...
69194,o-n-x-k-f-b-p-j-x-w-y-c-j-s-s-v-c-h-b-y-n-b-g-...
