Skip to content

Commit

Permalink
Merge pull request dask#889 from sinhrks/skipna
Browse files Browse the repository at this point in the history
Add skipna option for DataFrame aggregations
  • Loading branch information
mrocklin committed Dec 27, 2015
2 parents 12df1fe + 062bfd8 commit 09a75c1
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 65 deletions.
170 changes: 105 additions & 65 deletions dask/dataframe/core.py
Expand Up @@ -528,34 +528,34 @@ def _aca_agg(self, token, func, aggfunc=None):
raise NotImplementedError

@derived_from(pd.DataFrame)
def sum(self, axis=None):
def sum(self, axis=None, skipna=True):
axis = self._validate_axis(axis)
if axis == 1:
f = lambda x: x.sum(axis=1)
f = lambda x: x.sum(axis=1, skipna=skipna)
name = '{0}sum(axis=1)'.format(self._token_prefix)
return map_partitions(f, None, self, token=name)
else:
return self._aca_agg(token='sum', func=lambda x: x.sum())
return self._aca_agg(token='sum', func=lambda x: x.sum(skipna=skipna))

@derived_from(pd.DataFrame)
def max(self, axis=None):
def max(self, axis=None, skipna=True):
axis = self._validate_axis(axis)
if axis == 1:
f = lambda x: x.max(axis=1)
f = lambda x: x.max(axis=1, skipna=skipna)
name = '{0}max(axis=1)'.format(self._token_prefix)
return map_partitions(f, None, self, token=name)
else:
return self._aca_agg(token='max', func=lambda x: x.max())
return self._aca_agg(token='max', func=lambda x: x.max(skipna=skipna))

@derived_from(pd.DataFrame)
def min(self, axis=None):
def min(self, axis=None, skipna=True):
axis = self._validate_axis(axis)
if axis == 1:
f = lambda x: x.min(axis=1)
f = lambda x: x.min(axis=1, skipna=skipna)
name = '{0}min(axis=1)'.format(self._token_prefix)
return map_partitions(f, None, self, token=name)
else:
return self._aca_agg(token='min', func=lambda x: x.min())
return self._aca_agg(token='min', func=lambda x: x.min(skipna=skipna))

@derived_from(pd.DataFrame)
def count(self, axis=None):
Expand All @@ -569,15 +569,15 @@ def count(self, axis=None):
aggfunc=lambda x: x.sum())

@derived_from(pd.DataFrame)
def mean(self, axis=None):
def mean(self, axis=None, skipna=True):
axis = self._validate_axis(axis)
if axis == 1:
f = lambda x: x.mean(axis=1)
f = lambda x: x.mean(axis=1, skipna=skipna)
name = '{0}mean(axis=1)'.format(self._token_prefix)
return map_partitions(f, None, self, token=name)
else:
num = self._get_numeric_data()
s = num.sum()
s = num.sum(skipna=skipna)
n = num.count()

def f(s, n):
Expand All @@ -589,16 +589,16 @@ def f(s, n):
return map_partitions(f, no_default, s, n, token=name)

@derived_from(pd.DataFrame)
def var(self, axis=None, ddof=1):
def var(self, axis=None, skipna=True, ddof=1):
axis = self._validate_axis(axis)
if axis == 1:
f = lambda x, ddof=ddof: x.var(axis=1, ddof=ddof)
f = lambda x: x.var(axis=1, skipna=skipna, ddof=ddof)
name = '{0}var(axis=1, ddof={1})'.format(self._token_prefix, ddof)
return map_partitions(f, None, self, token=name)
else:
num = self._get_numeric_data()
x = 1.0 * num.sum()
x2 = 1.0 * (num ** 2).sum()
x = 1.0 * num.sum(skipna=skipna)
x2 = 1.0 * (num ** 2).sum(skipna=skipna)
n = num.count()

def f(x2, x, n):
Expand All @@ -613,14 +613,14 @@ def f(x2, x, n):
return map_partitions(f, no_default, x2, x, n, token=name)

@derived_from(pd.DataFrame)
def std(self, axis=None, ddof=1):
def std(self, axis=None, skipna=True, ddof=1):
axis = self._validate_axis(axis)
if axis == 1:
f = lambda x, ddof=ddof: x.std(axis=1, ddof=ddof)
f = lambda x: x.std(axis=1, skipna=skipna, ddof=ddof)
name = '{0}std(axis=1, ddof={1})'.format(self._token_prefix, ddof)
return map_partitions(f, None, self, token=name)
else:
v = self.var(ddof=ddof)
v = self.var(skipna=skipna, ddof=ddof)
name = '{0}std(ddof={1})'.format(self._token_prefix, ddof)
return map_partitions(np.sqrt, no_default, v, token=name)

Expand Down Expand Up @@ -689,70 +689,75 @@ def build_partition(values):
return self._constructor(dsk, name, num.column_info,
divisions=[None, None])

def _cum_agg(self, token, chunk, aggregate, agginit, axis):
def _cum_agg(self, token, chunk, aggregate, axis, skipna=True):
""" Wrapper for cumulative operation """

axis = self._validate_axis(axis)

if axis == 1:
name = '{0}{1}(axis=1)'.format(self._token_prefix, token)
return map_partitions(chunk, self.column_info, self, 1, token=name)

return map_partitions(chunk, self.column_info, self, token=name)
else:
# cumulate each partitions
name1 = '{0}{1}-map'.format(self._token_prefix, token)
cumpart = map_partitions(chunk, self.column_info, self, token=name1)
# take last element of each cumulated partitions

name2 = '{0}{1}-take-last'.format(self._token_prefix, token)
cumlast = map_partitions(lambda x: x.iloc[-1],
self.column_info, cumpart, token=name2)
cumlast = map_partitions(_take_last, self.column_info, cumpart,
skipna, token=name2)

name = '{0}{1}'.format(self._token_prefix, token)
cname = '{0}{1}-cum-last'.format(self._token_prefix, token)

# aggregate cumulated partisions and its previous last element
dask = {}
if isinstance(self, DataFrame):
agginit = pd.Series(agginit, index=self.column_info)
dask[(cname, 0)] = agginit
dask[(name, 0)] = (cumpart._name, 0)

for i in range(1, self.npartitions):
# store each cumulative step to graph to reduce computation
dask[(cname, i)] = (aggregate, (cname, i - 1),
(cumlast._name, i - 1))
if i == 1:
dask[(cname, i)] = (cumlast._name, i - 1)
else:
# aggregate with previous cumulation results
dask[(cname, i)] = (aggregate, (cname, i - 1),
(cumlast._name, i - 1))
dask[(name, i)] = (aggregate, (cumpart._name, i), (cname, i))
return self._constructor(merge(dask, cumpart.dask, cumlast.dask),
name, self.column_info, self.divisions)

@derived_from(pd.DataFrame)
def cumsum(self, axis=None):
return self._cum_agg('cumsum', self._partition_type.cumsum,
operator.add, 0, axis=axis)
def cumsum(self, axis=None, skipna=True):
chunk = lambda x: self._partition_type.cumsum(x, axis=axis, skipna=skipna)
return self._cum_agg('cumsum', chunk=chunk, aggregate=operator.add,
axis=axis, skipna=skipna)

@derived_from(pd.DataFrame)
def cumprod(self, axis=None):
return self._cum_agg('cumprod', self._partition_type.cumprod,
operator.mul, 1, axis=axis)
def cumprod(self, axis=None, skipna=True):
chunk = lambda x: self._partition_type.cumprod(x, axis=axis, skipna=skipna)
return self._cum_agg('cumprod', chunk=chunk, aggregate=operator.mul,
axis=axis, skipna=skipna)

@derived_from(pd.DataFrame)
def cummax(self, axis=None):
def cummax(self, axis=None, skipna=True):
def aggregate(x, y):
if isinstance(x, (pd.Series, pd.DataFrame)):
return x.where(x > y, y, axis=x.ndim - 1)
return x.where((x > y) | x.isnull(), y, axis=x.ndim - 1)
else: # scalar
return x if x > y else y
return self._cum_agg('cummax', self._partition_type.cummax,
aggregate, np.nan, axis=axis)
chunk = lambda x: self._partition_type.cummax(x, axis=axis, skipna=skipna)
return self._cum_agg('cummax', chunk=chunk, aggregate=aggregate,
axis=axis, skipna=skipna)

@derived_from(pd.DataFrame)
def cummin(self, axis=None):
def cummin(self, axis=None, skipna=True):
def aggregate(x, y):
if isinstance(x, (pd.Series, pd.DataFrame)):
return x.where(x < y, y, axis=x.ndim - 1)
return x.where((x < y) | x.isnull(), y, axis=x.ndim - 1)
else: # scalar
return x if x < y else y
return self._cum_agg('cummin', self._partition_type.cummin,
aggregate, np.nan, axis=axis)
chunk = lambda x: self._partition_type.cummin(x, axis=axis, skipna=skipna)
return self._cum_agg('cummin', chunk=chunk, aggregate=aggregate,
axis=axis, skipna=skipna)

@derived_from(pd.DataFrame)
def where(self, cond, other=np.nan):
Expand Down Expand Up @@ -1014,48 +1019,48 @@ def groupby(self, index, **kwargs):
return SeriesGroupBy(self, index, **kwargs)

@derived_from(pd.Series)
def sum(self, axis=None):
return super(Series, self).sum(axis=axis)
def sum(self, axis=None, skipna=True):
return super(Series, self).sum(axis=axis, skipna=skipna)

@derived_from(pd.Series)
def max(self, axis=None):
return super(Series, self).max(axis=axis)
def max(self, axis=None, skipna=True):
return super(Series, self).max(axis=axis, skipna=skipna)

@derived_from(pd.Series)
def min(self, axis=None):
return super(Series, self).min(axis=axis)
def min(self, axis=None, skipna=True):
return super(Series, self).min(axis=axis, skipna=skipna)

@derived_from(pd.Series)
def count(self):
return super(Series, self).count()

@derived_from(pd.Series)
def mean(self, axis=None):
return super(Series, self).mean(axis=axis)
def mean(self, axis=None, skipna=True):
return super(Series, self).mean(axis=axis, skipna=skipna)

@derived_from(pd.Series)
def var(self, axis=None, ddof=1):
return super(Series, self).var(axis=axis, ddof=ddof)
def var(self, axis=None, ddof=1, skipna=True):
return super(Series, self).var(axis=axis, ddof=ddof, skipna=skipna)

@derived_from(pd.Series)
def std(self, axis=None, ddof=1):
return super(Series, self).std(axis=axis, ddof=ddof)
def std(self, axis=None, ddof=1, skipna=True):
return super(Series, self).std(axis=axis, ddof=ddof, skipna=skipna)

@derived_from(pd.Series)
def cumsum(self, axis=None):
return super(Series, self).cumsum(axis=axis)
def cumsum(self, axis=None, skipna=True):
return super(Series, self).cumsum(axis=axis, skipna=skipna)

@derived_from(pd.Series)
def cumprod(self, axis=None):
return super(Series, self).cumprod(axis=axis)
def cumprod(self, axis=None, skipna=True):
return super(Series, self).cumprod(axis=axis, skipna=skipna)

@derived_from(pd.Series)
def cummax(self, axis=None):
return super(Series, self).cummax(axis=axis)
def cummax(self, axis=None, skipna=True):
return super(Series, self).cummax(axis=axis, skipna=skipna)

@derived_from(pd.Series)
def cummin(self, axis=None):
return super(Series, self).cummin(axis=axis)
def cummin(self, axis=None, skipna=True):
return super(Series, self).cummin(axis=axis, skipna=skipna)

@derived_from(pd.Series)
def nunique(self):
Expand Down Expand Up @@ -1157,6 +1162,15 @@ def _constructor(self):
def nunique(self):
return self.drop_duplicates().count()

@derived_from(pd.Index)
def max(self):
# it doesn't support axis and skipna kwds
return self._aca_agg(token='max', func=lambda x: x.max())

@derived_from(pd.Index)
def min(self):
return self._aca_agg(token='min', func=lambda x: x.min())

def count(self):
f = lambda x: pd.notnull(x).sum()
return reduction(self, f, np.sum, token='index-count')
Expand Down Expand Up @@ -1420,8 +1434,10 @@ def _aca_agg(self, token, func, aggfunc=None):
if aggfunc is None:
aggfunc = func

# groupby.aggregation doesn't support skipna,
# using gropuby.apply(aggfunc) is a workaround to handle each group as df
return aca([self], chunk=func,
aggregate=lambda x: aggfunc(x.groupby(level=0)),
aggregate=lambda x: x.groupby(level=0).apply(aggfunc),
columns=None, token=self._token_prefix + token)

@derived_from(pd.DataFrame)
Expand Down Expand Up @@ -2279,6 +2295,30 @@ def _resample_bin_and_out_divs(divisions, rule, closed, label):

return tuple(map(pd.Timestamp, newdivs)), tuple(map(pd.Timestamp, outdivs))

def _take_last(a, skipna=True):
"""
take last row (Series) of DataFrame / last value of Seriese
considerning NaN.
Parameters
----------
a : pd.DataFrame or pd.Series
skipna : bool, default True
Whether to exclude NaN
"""
if skipna is False:
return a.iloc[-1]
else:
# take last valid value excluding NaN, NaN location may be different
# in each columns
group_dummy = np.ones(len(a.index))
last_row = a.groupby(group_dummy).last()
if isinstance(a, pd.DataFrame):
return pd.Series(last_row.values[0], index=a.columns)
else:
return last_row.values[0]


def repartition_divisions(a, b, name, out1, out2, force=False):
""" dask graph to repartition dataframe by new divisions
Expand Down

0 comments on commit 09a75c1

Please sign in to comment.