Skip to content

Commit

Permalink
Use lambdas for cumulative reductions
Browse files Browse the repository at this point in the history
Turns out that the pandas.Series.cum* methods are very hard to
serialize.  This reverts back to lambdas.
  • Loading branch information
mrocklin committed Jan 4, 2016
1 parent 25fe0b0 commit 7c8e679
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions dask/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,16 +733,18 @@ def _cum_agg(self, token, chunk, aggregate, axis, skipna=True,

@derived_from(pd.DataFrame)
def cumsum(self, axis=None, skipna=True):
cumsum = lambda x, **kwargs: x.cumsum(**kwargs)
return self._cum_agg('cumsum',
chunk=self._partition_type.cumsum,
chunk=cumsum,
aggregate=operator.add,
axis=axis, skipna=skipna,
chunk_kwargs=dict(axis=axis, skipna=skipna))

@derived_from(pd.DataFrame)
def cumprod(self, axis=None, skipna=True):
cumprod = lambda x, **kwargs: x.cumprod(**kwargs)
return self._cum_agg('cumprod',
chunk=self._partition_type.cumprod,
chunk=cumprod,
aggregate=operator.mul,
axis=axis, skipna=skipna,
chunk_kwargs=dict(axis=axis, skipna=skipna))
Expand All @@ -754,8 +756,9 @@ def aggregate(x, y):
return x.where((x > y) | x.isnull(), y, axis=x.ndim - 1)
else: # scalar
return x if x > y else y
cummax = lambda x, **kwargs: x.cummax(**kwargs)
return self._cum_agg('cummax',
chunk=self._partition_type.cummax,
chunk=cummax,
aggregate=aggregate,
axis=axis, skipna=skipna,
chunk_kwargs=dict(axis=axis, skipna=skipna))
Expand All @@ -767,8 +770,9 @@ def aggregate(x, y):
return x.where((x < y) | x.isnull(), y, axis=x.ndim - 1)
else: # scalar
return x if x < y else y
cummin = lambda x, **kwargs: x.cummin(**kwargs)
return self._cum_agg('cummin',
chunk=self._partition_type.cummin,
chunk=cummin,
aggregate=aggregate,
axis=axis, skipna=skipna,
chunk_kwargs=dict(axis=axis, skipna=skipna))
Expand Down

0 comments on commit 7c8e679

Please sign in to comment.