From e70deb27d8f5409b4cb9c640f18a238c86383fe7 Mon Sep 17 00:00:00 2001 From: Siu Kwan Lam Date: Mon, 18 Dec 2017 17:28:04 -0600 Subject: [PATCH] Implement basic groupby apply --- pygdf/groupby.py | 49 ++++++++++++++++++++++++++++++++++--- pygdf/index.py | 4 +-- pygdf/tests/test_groupby.py | 22 +++++++++++++++++ 3 files changed, 70 insertions(+), 5 deletions(-) diff --git a/pygdf/groupby.py b/pygdf/groupby.py index 11cb97ad4ba..81ea629e75d 100644 --- a/pygdf/groupby.py +++ b/pygdf/groupby.py @@ -6,6 +6,7 @@ from numba import cuda from .dataframe import DataFrame, Series +from .multi import concat from . import _gdf, cudautils from .column import Column from .buffer import Buffer @@ -97,8 +98,9 @@ def as_df(self): """ return self._group_dataframe(self._df, self._by) - def _form_groups(self, functors): - """ + def _agg_groups(self, functors): + """Aggregate the groups + Parameters ---------- functors: dict @@ -321,6 +323,47 @@ def _get_function(x): else [_get_function(x) for x in v]) else: return self.agg([args]) - return self._form_groups(functors) + return self._agg_groups(functors) + + def _combine(self, functors, chunks): + outdf = concat([chk[:1].loc[:, list(self._by)] for chk in chunks]) + outdf = outdf.reset_index() + + for col, funclist in functors.items(): + namer = ((lambda k, f: '{}_{}'.format(k, f.__name__)) + if len(funclist) > 1 + else (lambda k, f: k)) + + for aggfn in funclist: + aggvals = np.asarray([aggfn(chk[col]) for chk in chunks]) + outdf[namer(col, aggfn)] = aggvals + + return outdf _auto_generate_grouper_agg(locals()) + + def apply(self, function): + """Apply a transformation function over the grouped chunk. + """ + if not callable(function): + raise TypeError("type {!r} is not callable", type(function)) + transform = SerialTransform(function) + chunks = transform(self._get_chunks()) + return concat(chunks) + + def _get_chunks(self): + """Group chunks and return them + """ + df, segs = self._group_dataframe(self._df, self._by) + ends = chain(segs[1:], [None]) + chunks = [df[s:e] for s, e in zip(segs, ends)] + return chunks + + +class SerialTransform(object): + def __init__(self, functor): + self._function = functor + + def __call__(self, chunks): + return [self._function(chk) for chk in chunks] + diff --git a/pygdf/index.py b/pygdf/index.py index bb85791b5f1..e1e6f4deb62 100644 --- a/pygdf/index.py +++ b/pygdf/index.py @@ -101,8 +101,8 @@ def __init__(self, start, stop=None): """ if stop is None: start, stop = 0, start - self._start = start - self._stop = stop + self._start = int(start) + self._stop = int(stop) def __repr__(self): return "{}(start={}, stop={})".format(self.__class__.__name__, diff --git a/pygdf/tests/test_groupby.py b/pygdf/tests/test_groupby.py index bb6b9127410..eaedcc0542d 100644 --- a/pygdf/tests/test_groupby.py +++ b/pygdf/tests/test_groupby.py @@ -118,3 +118,25 @@ def assert_values_equal(arr): assert_values_equal(pddf[k].values) +def test_groupby_apply(): + np.random.seed(0) + df = DataFrame() + nelem = 20 + df['key1'] = np.random.randint(0, 3, nelem) + df['key2'] = np.random.randint(0, 2, nelem) + df['val1'] = np.random.random(nelem) + df['val2'] = np.random.random(nelem) + + expect_grpby = df.to_pandas().groupby(['key1', 'key2'], + as_index=False) + got_grpby = df.groupby(['key1', 'key2']) + + def foo(df): + df['out'] = df['val1'] + df['val2'] + return df + + expect = expect_grpby.apply(foo) + expect = expect.sort_values(['key1', 'key2']).reset_index(drop=True) + + got = got_grpby.apply(foo).to_pandas() + pd.util.testing.assert_frame_equal(expect, got)