Skip to content

Commit

Permalink
Implement basic groupby apply
Browse files Browse the repository at this point in the history
  • Loading branch information
sklam committed Dec 18, 2017
1 parent 0e3f774 commit e70deb2
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 5 deletions.
49 changes: 46 additions & 3 deletions pygdf/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from numba import cuda

from .dataframe import DataFrame, Series
from .multi import concat
from . import _gdf, cudautils
from .column import Column
from .buffer import Buffer
Expand Down Expand Up @@ -97,8 +98,9 @@ def as_df(self):
"""
return self._group_dataframe(self._df, self._by)

def _form_groups(self, functors):
"""
def _agg_groups(self, functors):
"""Aggregate the groups
Parameters
----------
functors: dict
Expand Down Expand Up @@ -321,6 +323,47 @@ def _get_function(x):
else [_get_function(x) for x in v])
else:
return self.agg([args])
return self._form_groups(functors)
return self._agg_groups(functors)

def _combine(self, functors, chunks):
outdf = concat([chk[:1].loc[:, list(self._by)] for chk in chunks])
outdf = outdf.reset_index()

for col, funclist in functors.items():
namer = ((lambda k, f: '{}_{}'.format(k, f.__name__))
if len(funclist) > 1
else (lambda k, f: k))

for aggfn in funclist:
aggvals = np.asarray([aggfn(chk[col]) for chk in chunks])
outdf[namer(col, aggfn)] = aggvals

return outdf

_auto_generate_grouper_agg(locals())

def apply(self, function):
"""Apply a transformation function over the grouped chunk.
"""
if not callable(function):
raise TypeError("type {!r} is not callable", type(function))
transform = SerialTransform(function)
chunks = transform(self._get_chunks())
return concat(chunks)

def _get_chunks(self):
"""Group chunks and return them
"""
df, segs = self._group_dataframe(self._df, self._by)
ends = chain(segs[1:], [None])
chunks = [df[s:e] for s, e in zip(segs, ends)]
return chunks


class SerialTransform(object):
def __init__(self, functor):
self._function = functor

def __call__(self, chunks):
return [self._function(chk) for chk in chunks]

4 changes: 2 additions & 2 deletions pygdf/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def __init__(self, start, stop=None):
"""
if stop is None:
start, stop = 0, start
self._start = start
self._stop = stop
self._start = int(start)
self._stop = int(stop)

def __repr__(self):
return "{}(start={}, stop={})".format(self.__class__.__name__,
Expand Down
22 changes: 22 additions & 0 deletions pygdf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,25 @@ def assert_values_equal(arr):
assert_values_equal(pddf[k].values)


def test_groupby_apply():
np.random.seed(0)
df = DataFrame()
nelem = 20
df['key1'] = np.random.randint(0, 3, nelem)
df['key2'] = np.random.randint(0, 2, nelem)
df['val1'] = np.random.random(nelem)
df['val2'] = np.random.random(nelem)

expect_grpby = df.to_pandas().groupby(['key1', 'key2'],
as_index=False)
got_grpby = df.groupby(['key1', 'key2'])

def foo(df):
df['out'] = df['val1'] + df['val2']
return df

expect = expect_grpby.apply(foo)
expect = expect.sort_values(['key1', 'key2']).reset_index(drop=True)

got = got_grpby.apply(foo).to_pandas()
pd.util.testing.assert_frame_equal(expect, got)

0 comments on commit e70deb2

Please sign in to comment.