Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

numpy_groupies #4540

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions xarray/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from html import escape
from textwrap import dedent
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -27,6 +28,12 @@
from .rolling_exp import RollingExp
from .utils import Frozen, either_dict_or_kwargs, is_scalar

if TYPE_CHECKING:
from xarray.core.variable import IndexVariable

from .dataarray import DataArray


# Used as a sentinel value to indicate a all dimensions
ALL_DIMS = ...

Expand Down Expand Up @@ -577,13 +584,13 @@ def pipe(

>>> def adder(data, arg):
... return data + arg
...

>>> def div(data, arg):
... return data / arg
...

>>> def sub_mult(data, sub_arg, mult_arg):
... return (data * mult_arg) - sub_arg
...

>>> x.pipe(adder, 2)
<xarray.Dataset>
Dimensions: (lat: 2, lon: 2)
Expand Down Expand Up @@ -633,7 +640,12 @@ def pipe(
else:
return func(self, *args, **kwargs)

def groupby(self, group, squeeze: bool = True, restore_coord_dims: bool = None):
def groupby(
self,
group: Union[Hashable, "DataArray", "IndexVariable"],
squeeze: bool = True,
restore_coord_dims: bool = None,
):
"""Returns a GroupBy object for performing grouped operations.

Parameters
Expand Down
138 changes: 125 additions & 13 deletions xarray/core/groupby.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import functools
import warnings
from typing import TYPE_CHECKING, Hashable, Iterator, Union

import numpy as np
import pandas as pd
Expand All @@ -11,7 +12,7 @@
from .concat import concat
from .formatting import format_array_flat
from .indexes import propagate_indexes
from .options import _get_keep_attrs
from .options import OPTIONS, _get_keep_attrs
from .pycompat import integer_types
from .utils import (
either_dict_or_kwargs,
Expand All @@ -23,6 +24,9 @@
)
from .variable import IndexVariable, Variable, as_variable

if TYPE_CHECKING:
from .dataarray import DataArray


def check_reduce_dims(reduce_dims, dimensions):

Expand Down Expand Up @@ -55,12 +59,12 @@ def unique_value_groups(ar, sort=True):
the corresponding value in `unique_values`.
"""
inverse, values = pd.factorize(ar, sort=sort)
groups = [[] for _ in range(len(values))]
indices = [[] for _ in range(len(values))]
for n, g in enumerate(inverse):
if g >= 0:
# pandas uses -1 to mark NaN, but doesn't include them in values
groups[g].append(n)
return values, groups
indices[g].append(n)
return values, indices


def _dummy_copy(xarray_obj):
Expand Down Expand Up @@ -267,7 +271,7 @@ class GroupBy(SupportsArithmetic):
def __init__(
self,
obj,
group,
group_or_name: Union[Hashable, "DataArray", IndexVariable],
squeeze=False,
grouper=None,
bins=None,
Expand All @@ -280,7 +284,7 @@ def __init__(
----------
obj : Dataset or DataArray
Object to group.
group : DataArray
group_or_name : str, DataArray, IndexVariable
Array with the group values.
squeeze : bool, optional
If "group" is a coordinate of object, `squeeze` controls whether
Expand All @@ -305,20 +309,22 @@ def __init__(
if grouper is not None and bins is not None:
raise TypeError("can't specify both `grouper` and `bins`")

if not isinstance(group, (DataArray, IndexVariable)):
if not hashable(group):
if not isinstance(group_or_name, (DataArray, IndexVariable)):
if not hashable(group_or_name):
raise TypeError(
"`group` must be an xarray.DataArray or the "
"name of an xarray variable or dimension."
f"Received {group!r} instead."
f"Received {group_or_name!r} instead."
)
group = obj[group]
group = obj[group_or_name]
if len(group) == 0:
raise ValueError(f"{group.name} must not be empty")

if group.name not in obj.coords and group.name in obj.dims:
# DummyGroups should not appear on groupby results
group = _DummyGroup(obj, group.name, group.coords)
else:
group = group_or_name

if getattr(group, "name", None) is None:
group.name = "group"
Expand Down Expand Up @@ -361,7 +367,8 @@ def __init__(
if not squeeze:
# use slices to do views instead of fancy indexing
# equivalent to: group_indices = group_indices.reshape(-1, 1)
group_indices = [slice(i, i + 1) for i in group_indices]
# TODO: fix type error `Unsupported operand types for + ("slice" and "int")`
group_indices = [slice(i, i + 1) for i in group_indices] # type: ignore
unique_coord = group
else:
if group.isnull().any():
Expand Down Expand Up @@ -392,8 +399,15 @@ def __init__(
# specification for the groupby operation
self._obj = obj
self._group = group
# The dimension over which to group over. Where the group is a
# non-index coord, this wil differ from the group name
self._group_dim = group_dim
# TODO: reword!
#
# A list containing a list for each group. each list contains the
# indices of points the respective group contains
self._group_indices = group_indices
# IndexVariable of unique values (labels?)
self._unique_coord = unique_coord
self._stacked_dim = stacked_dim
self._inserted_dims = inserted_dims
Expand All @@ -404,6 +418,8 @@ def __init__(
self._groups = None
self._dims = None

# TODO: is this correct? Should we be returning the dims of the result? This
# will use the original dim where we're grouping by a coord.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the existing code for this property correct? Currently x.groupby(foo).dims != x.groupby(foo).sum(...).dims when we're grouping by an non-indexed coord

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns the dims for the first group, so you can decide what dim values can be passed to .mean. But I agree it is confusing. Maybe we should deprecate and remove. This use-case is also served by implementing GroupBy.__getitem__

@property
def dims(self):
if self._dims is None:
Expand Down Expand Up @@ -882,9 +898,105 @@ def reduce_array(ar):

return self.map(reduce_array, shortcut=shortcut)

def dims_(self) -> Iterator[Hashable]:
""" The dims of the resulting object (before any further reduction) """

for d in self.dims:
if d != self._group_dim:
yield d
# Grouping on a dimension
elif self._group_dim == self._group.name:
yield d
# Grouping on a coord that isn't a dimension
else:
yield self._group.name

def _npg_groupby(self, func: str):

# `.values` seems to be required for datetimes
indices, _ = pd.factorize(self._group.values)

array = npg_aggregate(self._obj, self._group_dim, func, indices)

# FIXME: Currently we're trying to use as much of the existing
# infrastructure as possible, but I'm struggling to fit it in — it may
# be easier to start from scratch.
#
# The existing model checks a single result `applied_example`; which in
# this case we don't have — we generate the whole array at once.

applied = applied_example = type(self._obj)(
data=array,
dims=tuple(self.dims_()),
Copy link
Contributor

@dcherian dcherian Mar 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to assign coordinate variables from self._obj here or maybe the apply_ufunc version will solve that

)

# The remainder is mostly copied from `_combine`

# FIXME: this part seems broken at the moment — the `_infer_concat_args`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since npg returns a full array, the concat bit isn't needed any more so combined = applied. I think you could just delete all the concat code.

# doesn't return the correct result when the group isn't a dimensioned
# coordinate

coord = self._unique_coord
coord, dim, positions = self._infer_concat_args(applied_example)
# NB: These are commented out for simplicity.
# if shortcut:
# combined = self._concat_shortcut(applied, dim, positions)
# else:
combined = concat(applied, dim)
combined = _maybe_reorder(combined, dim, positions)

if isinstance(combined, type(self._obj)):
# only restore dimension order for arrays
combined = self._restore_dim_order(combined)
# assign coord when the applied function does not return that coord
if coord is not None: # and dim not in applied_example.dims:
# if shortcut:
# coord_var = as_variable(coord)
# combined._coords[coord.name] = coord_var
# else:
combined.coords[coord.name] = coord
combined = self._maybe_restore_empty_groups(combined)
combined = self._maybe_unstack(combined)

return combined

if OPTIONS["numpy_groupies"]:

def sum(self, dim=None):
grouped = self._npg_groupby(func="sum")
if dim:
return grouped.sum(dim)
else:
return grouped

def count(self, dim=None):
grouped = self._npg_groupby(func="count")
if dim:
return grouped.count(dim)
else:
return grouped

def mean(self, dim=None):
grouped = self._npg_groupby(func="mean")
if dim:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not right for mean. Would it work if we broadcast indices along all reduction dims before passing to npg?

return grouped.mean(dim)
else:
return grouped


def npg_aggregate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be moved down to Variable.

da: "DataArray", dim: str, func: str, group_idx: IndexVariable
) -> np.array:
from numpy_groupies.aggregate_numba import aggregate

axis = da.get_axis_num(dim)
return aggregate(group_idx=group_idx, a=da, func=func, axis=axis)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return aggregate(group_idx=group_idx, a=da, func=func, axis=axis)
return aggregate(group_idx=group_idx, a=da.data, func=func, axis=axis)

Could make this the following (from @shoyer's notebook) or do that later...

def _binned_agg(
    array: np.ndarray,
    indices: np.ndarray,
    num_bins: int,
    *,
    func,
    fill_value,
    dtype,
) -> np.ndarray:
    """NumPy helper function for aggregating over bins."""
    mask = np.logical_not(np.isnan(indices))
    int_indices = indices[mask].astype(int)
    shape = array.shape[: -indices.ndim] + (num_bins,)
    result = numpy_groupies.aggregate_numpy.aggregate(
        int_indices,
        array[..., mask],
        func=func,
        size=num_bins,
        fill_value=fill_value,
        dtype=dtype,
        axis=-1,
    )
    return result


def groupby_bins_agg(
    array: xarray.DataArray,
    group: xarray.DataArray,
    bins,
    func="sum",
    fill_value=0,
    dtype=None,
    **cut_kwargs,
) -> xarray.DataArray:
    """Faster equivalent of Xarray's groupby_bins(...).sum()."""
    # TODO: implement this upstream in xarray:
    # https://github.com/pydata/xarray/issues/4473
    binned = pd.cut(np.ravel(group), bins, **cut_kwargs)
    new_dim_name = group.name + "_bins"
    indices = group.copy(data=binned.codes.reshape(group.shape))

    result = xarray.apply_ufunc(
        _binned_agg,
        array,
        indices,
        input_core_dims=[indices.dims, indices.dims],
        output_core_dims=[[new_dim_name]],
        output_dtypes=[array.dtype],
        dask_gufunc_kwargs=dict(
            output_sizes={new_dim_name: binned.categories.size},
        ),
        kwargs={
            "num_bins": binned.categories.size,
            "func": func,
            "fill_value": fill_value,
            "dtype": dtype,
        },
        dask="parallelized",
    )
    result.coords[new_dim_name] = binned.categories
    return result



if not OPTIONS["numpy_groupies"]:

ops.inject_reduce_methods(DataArrayGroupBy)
ops.inject_binary_ops(DataArrayGroupBy)
ops.inject_reduce_methods(DataArrayGroupBy)
ops.inject_binary_ops(DataArrayGroupBy)


class DatasetGroupBy(GroupBy, ImplementsDatasetReduce):
Expand Down
4 changes: 3 additions & 1 deletion xarray/core/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
CMAP_DIVERGENT = "cmap_divergent"
KEEP_ATTRS = "keep_attrs"
DISPLAY_STYLE = "display_style"

NUMPY_GROUPIES = "numpy_groupies"

OPTIONS = {
DISPLAY_WIDTH: 80,
Expand All @@ -21,6 +21,7 @@
CMAP_DIVERGENT: "RdBu_r",
KEEP_ATTRS: "default",
DISPLAY_STYLE: "html",
NUMPY_GROUPIES: True,
}

_JOIN_OPTIONS = frozenset(["inner", "outer", "left", "right", "exact"])
Expand Down Expand Up @@ -104,6 +105,7 @@ class set_options:
Default: ``'default'``.
- ``display_style``: display style to use in jupyter for xarray objects.
Default: ``'text'``. Other options are ``'html'``.
- ``numpy_groupies``: use numpy groupies.


You can use ``set_options`` either as a context manager:
Expand Down