Skip to content

Commit

Permalink
API: provide Rolling/Expanding/EWM objects for deferred rolling type …
Browse files Browse the repository at this point in the history
…calculations, xref #10702
  • Loading branch information
jreback committed Dec 19, 2015
1 parent 1357321 commit 3c23dc9
Show file tree
Hide file tree
Showing 8 changed files with 1,945 additions and 761 deletions.
261 changes: 261 additions & 0 deletions pandas/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Base and utility classes for pandas objects.
"""
from pandas import compat
from pandas.compat import builtins
import numpy as np
from pandas.core import common as com
import pandas.core.nanops as nanops
Expand Down Expand Up @@ -218,6 +219,266 @@ def __delete__(self, instance):
raise AttributeError("can't delete attribute")


class GroupByError(Exception):
pass


class DataError(GroupByError):
pass


class SpecificationError(GroupByError):
pass


class SelectionMixin(object):
"""
mixin implementing the selection & aggregation interface on a group-like object
sub-classes need to define: obj, exclusions
"""
_selection = None
_internal_names = ['_cache']
_internal_names_set = set(_internal_names)
_builtin_table = {
builtins.sum: np.sum,
builtins.max: np.max,
builtins.min: np.min,
}
_cython_table = {
builtins.sum: 'sum',
builtins.max: 'max',
builtins.min: 'min',
np.sum: 'sum',
np.mean: 'mean',
np.prod: 'prod',
np.std: 'std',
np.var: 'var',
np.median: 'median',
np.max: 'max',
np.min: 'min',
np.cumprod: 'cumprod',
np.cumsum: 'cumsum'
}

@property
def name(self):
if self._selection is None:
return None # 'result'
else:
return self._selection

@property
def _selection_list(self):
if not isinstance(self._selection, (list, tuple, com.ABCSeries, com.ABCIndex, np.ndarray)):
return [self._selection]
return self._selection

@cache_readonly
def _selected_obj(self):

if self._selection is None or isinstance(self.obj, com.ABCSeries):
return self.obj
else:
return self.obj[self._selection]

@cache_readonly
def _obj_with_exclusions(self):
if self._selection is not None and isinstance(self.obj, com.ABCDataFrame):
return self.obj.reindex(columns=self._selection_list)

if len(self.exclusions) > 0:
return self.obj.drop(self.exclusions, axis=1)
else:
return self.obj

def __getitem__(self, key):
if self._selection is not None:
raise Exception('Column(s) %s already selected' % self._selection)

if isinstance(key, (list, tuple, com.ABCSeries, com.ABCIndex, np.ndarray)):
if len(self.obj.columns.intersection(key)) != len(key):
bad_keys = list(set(key).difference(self.obj.columns))
raise KeyError("Columns not found: %s"
% str(bad_keys)[1:-1])
return self._gotitem(list(key), ndim=2)

elif not getattr(self,'as_index',False):
if key not in self.obj.columns:
raise KeyError("Column not found: %s" % key)
return self._gotitem(key, ndim=2)

else:
if key not in self.obj:
raise KeyError("Column not found: %s" % key)
return self._gotitem(key, ndim=1)

def _gotitem(self, key, ndim, subset=None):
"""
sub-classes to define
return a sliced object
Parameters
----------
key : string / list of selections
ndim : 1,2
requested ndim of result
subset : object, default None
subset to act on
"""
raise AbstractMethodError(self)

_agg_doc = """Aggregate using input function or dict of {column -> function}
Parameters
----------
arg : function or dict
Function to use for aggregating groups. If a function, must either
work when passed a DataFrame or when passed to DataFrame.apply. If
passed a dict, the keys must be DataFrame column names.
Accepted Combinations are:
- string cythonized function name
- function
- list of functions
- dict of columns -> functions
- nested dict of names -> dicts of functions
Notes
-----
Numpy functions mean/median/prod/sum/std/var are special cased so the
default behavior is applying the function along axis=0
(e.g., np.mean(arr_2d, axis=0)) as opposed to
mimicking the default Numpy behavior (e.g., np.mean(arr_2d)).
Returns
-------
aggregated : DataFrame
"""

@Appender(_agg_doc)
def agg(self, func, *args, **kwargs):
return self.aggregate(func, *args, **kwargs)

@Appender(_agg_doc)
def aggregate(self, func, *args, **kwargs):
raise AbstractMethodError(self)

def _aggregate(self, arg, *args, **kwargs):
"""
provide an implementation for the aggregators
Returns
-------
tuple of result, how
Notes
-----
how can be a string describe the required post-processing, or
None if not required
"""

if isinstance(arg, compat.string_types):
return getattr(self, arg)(*args, **kwargs), None

result = compat.OrderedDict()
if isinstance(arg, dict):
if self.axis != 0: # pragma: no cover
raise ValueError('Can only pass dict with axis=0')

obj = self._selected_obj

if any(isinstance(x, (list, tuple, dict)) for x in arg.values()):
new_arg = compat.OrderedDict()
for k, v in compat.iteritems(arg):
if not isinstance(v, (tuple, list, dict)):
new_arg[k] = [v]
else:
new_arg[k] = v
arg = new_arg

keys = []
if self._selection is not None:
subset = obj

for fname, agg_how in compat.iteritems(arg):
colg = self._gotitem(self._selection, ndim=1, subset=subset)
result[fname] = colg.aggregate(agg_how)
keys.append(fname)
else:
for col, agg_how in compat.iteritems(arg):
colg = self._gotitem(col, ndim=1)
result[col] = colg.aggregate(agg_how)
keys.append(col)

if isinstance(list(result.values())[0], com.ABCDataFrame):
from pandas.tools.merge import concat
result = concat([result[k] for k in keys], keys=keys, axis=1)
else:
from pandas import DataFrame
result = DataFrame(result)

return result, True
elif hasattr(arg, '__iter__'):
return self._aggregate_multiple_funcs(arg), None
else:
result = None

cy_func = self._is_cython_func(arg)
if cy_func and not args and not kwargs:
return getattr(self, cy_func)(), None

# caller can react
return result, True

def _aggregate_multiple_funcs(self, arg):
from pandas.tools.merge import concat

if self.axis != 0:
raise NotImplementedError("axis other than 0 is not supported")

obj = self._obj_with_exclusions
results = []
keys = []

# degenerate case
if obj.ndim == 1:
for a in arg:
try:
colg = self._gotitem(obj.name, ndim=1, subset=obj)
results.append(colg.aggregate(a))
keys.append(getattr(a,'name',a))
except (TypeError, DataError):
pass
except SpecificationError:
raise

# multiples
else:
for col in obj:
try:
colg = self._gotitem(col, ndim=1, subset=obj[col])
results.append(colg.aggregate(arg))
keys.append(col)
except (TypeError, DataError):
pass
except SpecificationError:
raise
result = concat(results, keys=keys, axis=1)

return result

def _is_cython_func(self, arg):
""" if we define an internal function for this argument, return it """
return self._cython_table.get(arg)

def _is_builtin_func(self, arg):
"""
if we define an builtin function for this argument, return it,
otherwise return the arg
"""
return self._builtin_table.get(arg, arg)

class FrozenList(PandasObject, list):

"""
Expand Down
1 change: 1 addition & 0 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -5149,6 +5149,7 @@ def combineMult(self, other):
DataFrame._setup_axes(['index', 'columns'], info_axis=1, stat_axis=0,
axes_are_reversed=True, aliases={'rows': 0})
DataFrame._add_numeric_operations()
DataFrame._add_series_or_dataframe_operations()

_EMPTY_SERIES = Series([])

Expand Down
33 changes: 31 additions & 2 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from pandas.util.decorators import Appender, Substitution, deprecate_kwarg
from pandas.core import config


# goal is to be able to define the docs close to function, while still being
# able to share
_shared_docs = dict()
Expand Down Expand Up @@ -4734,6 +4733,36 @@ def nanptp(values, axis=0, skipna=True):
method ``ptp``.""", nanptp)


@classmethod
def _add_series_or_dataframe_operations(cls):
""" add the series or dataframe only operations to the cls; evaluate the doc strings again """

This comment has been minimized.

Copy link
@jbrockmendel

jbrockmendel Jul 22, 2017

Member

Trying to get a handle on this. What does "evaluate the doc strings again" mean in this context? I thought it referred to _doc_parms, but that isn't called here. Is Panel the only thing preventing these methods from being defined directly within NDFrame?


from pandas.core import window as rwindow

@Appender(rwindow.rolling.__doc__)
def rolling(self, window, min_periods=None, freq=None, center=False,
how=None, win_type=None, axis=0):
axis = self._get_axis_number(axis)
return rwindow.rolling(self, window=window, min_periods=min_periods, freq=freq, center=center,
how=how, win_type=win_type, axis=axis)
cls.rolling = rolling

@Appender(rwindow.expanding.__doc__)
def expanding(self, min_periods=None, freq=None, center=False,
how=None, axis=0):
axis = self._get_axis_number(axis)
return rwindow.expanding(self, min_periods=min_periods, freq=freq, center=center,
how=how, axis=axis)
cls.expanding = expanding

@Appender(rwindow.ewm.__doc__)
def ewm(self, com=None, span=None, halflife=None, min_periods=0, freq=None,
adjust=True, how=None, ignore_na=False, axis=0):
axis = self._get_axis_number(axis)
return rwindow.ewm(self, com=com, span=span, halflife=halflife, min_periods=min_periods,
freq=freq, adjust=adjust, how=how, ignore_na=ignore_na, axis=axis)
cls.ewm = ewm

def _doc_parms(cls):
""" return a tuple of the doc parms """
axis_descr = "{%s}" % ', '.join([
Expand Down Expand Up @@ -4916,6 +4945,6 @@ def logical_func(self, axis=None, bool_only=None, skipna=None,
logical_func.__name__ = name
return logical_func

# install the indexerse
# install the indexes
for _name, _indexer in indexing.get_indexers_list():
NDFrame._create_indexer(_name, _indexer)
Loading

0 comments on commit 3c23dc9

Please sign in to comment.