Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use numbagg for rolling methods #8493

Merged
merged 13 commits into from Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 9 additions & 2 deletions doc/whats-new.rst
Expand Up @@ -23,6 +23,13 @@ v2023.11.1 (unreleased)
New Features
~~~~~~~~~~~~

- :py:meth:`rolling` uses numbagg <https://github.com/numbagg/numbagg>`_ for
most of its computations by default. Numbagg is up to 5x faster than bottleneck
where parallelization is possible. Where parallelization isn't possible — for
example a 1D array — it's about the same speed as bottleneck, and 2-5x faster
than pandas' default functions. (:pull:`8493`). numbagg is an optional
dependency, so requires installing separately.
By `Maximilian Roos <https://github.com/max-sixty>`_.
- Use a concise format when plotting datetime arrays. (:pull:`8449`).
By `Jimmy Westling <https://github.com/illviljan>`_.
- Avoid overwriting unchanged existing coordinate variables when appending by setting ``mode='a-'``.
Expand Down Expand Up @@ -77,13 +84,13 @@ Documentation
- Improved error message when attempting to get a variable which doesn't exist from a Dataset.
(:pull:`8474`)
By `Maximilian Roos <https://github.com/max-sixty>`_.
- Fix default value of ``combine_attrs `` in :py:func:`xarray.combine_by_coords` (:pull:`8471`)
- Fix default value of ``combine_attrs`` in :py:func:`xarray.combine_by_coords` (:pull:`8471`)
By `Gregorio L. Trevisan <https://github.com/gtrevisan>`_.

Internal Changes
~~~~~~~~~~~~~~~~

- :py:meth:`DataArray.bfill` & :py:meth:`DataArray.ffill` now use numbagg by
- :py:meth:`DataArray.bfill` & :py:meth:`DataArray.ffill` now use numbagg <https://github.com/numbagg/numbagg>`_ by
default, which is up to 5x faster where parallelization is possible. (:pull:`8339`)
By `Maximilian Roos <https://github.com/max-sixty>`_.
- Update mypy version to 1.7 (:issue:`8448`, :pull:`8501`).
Expand Down
112 changes: 99 additions & 13 deletions xarray/core/rolling.py
Expand Up @@ -8,13 +8,14 @@
from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar

import numpy as np
from packaging.version import Version

from xarray.core import dtypes, duck_array_ops, utils
from xarray.core import dtypes, duck_array_ops, pycompat, utils
from xarray.core.arithmetic import CoarsenArithmetic
from xarray.core.options import OPTIONS, _get_keep_attrs
from xarray.core.pycompat import is_duck_dask_array
from xarray.core.types import CoarsenBoundaryOptions, SideOptions, T_Xarray
from xarray.core.utils import either_dict_or_kwargs
from xarray.core.utils import either_dict_or_kwargs, module_available

try:
import bottleneck
Expand Down Expand Up @@ -145,22 +146,35 @@ def _reduce_method( # type: ignore[misc]
name: str, fillna: Any, rolling_agg_func: Callable | None = None
) -> Callable[..., T_Xarray]:
"""Constructs reduction methods built on a numpy reduction function (e.g. sum),
a bottleneck reduction function (e.g. move_sum), or a Rolling reduction (_mean).
a numbagg reduction function (e.g. move_sum), a bottleneck reduction function
(e.g. move_sum), or a Rolling reduction (_mean).

The logic here for which function to run is quite diffuse, across this method &
_array_reduce. Arguably we could refactor this. But one constraint is that we
need context of xarray options, of the functions each library offers, of
the array (e.g. dtype).
"""
if rolling_agg_func:
array_agg_func = None
else:
array_agg_func = getattr(duck_array_ops, name)

bottleneck_move_func = getattr(bottleneck, "move_" + name, None)
if module_available("numbagg"):
import numbagg

numbagg_move_func = getattr(numbagg, "move_" + name, None)
else:
numbagg_move_func = None

def method(self, keep_attrs=None, **kwargs):
keep_attrs = self._get_keep_attrs(keep_attrs)

return self._numpy_or_bottleneck_reduce(
array_agg_func,
bottleneck_move_func,
rolling_agg_func,
return self._array_reduce(
array_agg_func=array_agg_func,
bottleneck_move_func=bottleneck_move_func,
numbagg_move_func=numbagg_move_func,
rolling_agg_func=rolling_agg_func,
keep_attrs=keep_attrs,
fillna=fillna,
**kwargs,
Expand Down Expand Up @@ -510,9 +524,47 @@ def _counts(self, keep_attrs: bool | None) -> DataArray:
)
return counts

def _bottleneck_reduce(self, func, keep_attrs, **kwargs):
from xarray.core.dataarray import DataArray
def _numbagg_reduce(self, func, keep_attrs, **kwargs):
# Some of this is copied from `_bottleneck_reduce`, we could reduce this as part
# of a wider refactor.

axis = self.obj.get_axis_num(self.dim[0])

padded = self.obj.variable
if self.center[0]:
if is_duck_dask_array(padded.data):
# workaround to make the padded chunk size larger than
# self.window - 1
shift = -(self.window[0] + 1) // 2
offset = (self.window[0] - 1) // 2
valid = (slice(None),) * axis + (
slice(offset, offset + self.obj.shape[axis]),
)
else:
shift = (-self.window[0] // 2) + 1
valid = (slice(None),) * axis + (slice(-shift, None),)
padded = padded.pad({self.dim[0]: (0, -shift)}, mode="constant")

if is_duck_dask_array(padded.data) and False:
raise AssertionError("should not be reachable")
else:
values = func(
padded.data,
window=self.window[0],
min_count=self.min_periods,
axis=axis,
)

if self.center[0]:
values = values[valid]

attrs = self.obj.attrs if keep_attrs else {}

return self.obj.__class__(
values, self.obj.coords, attrs=attrs, name=self.obj.name
)

def _bottleneck_reduce(self, func, keep_attrs, **kwargs):
# bottleneck doesn't allow min_count to be 0, although it should
# work the same as if min_count = 1
# Note bottleneck only works with 1d-rolling.
Expand Down Expand Up @@ -550,12 +602,15 @@ def _bottleneck_reduce(self, func, keep_attrs, **kwargs):

attrs = self.obj.attrs if keep_attrs else {}

return DataArray(values, self.obj.coords, attrs=attrs, name=self.obj.name)
return self.obj.__class__(
values, self.obj.coords, attrs=attrs, name=self.obj.name
)

def _numpy_or_bottleneck_reduce(
def _array_reduce(
self,
array_agg_func,
bottleneck_move_func,
numbagg_move_func,
rolling_agg_func,
keep_attrs,
fillna,
Expand All @@ -571,6 +626,35 @@ def _numpy_or_bottleneck_reduce(
)
del kwargs["dim"]

if (
OPTIONS["use_numbagg"]
and module_available("numbagg")
and pycompat.mod_version("numbagg") >= Version("0.6.3")
and numbagg_move_func is not None
# TODO: we could at least allow this for the equivalent of `apply_ufunc`'s
# "parallelized". `rolling_exp` does this, as an example (but rolling_exp is
# much simpler)
and not is_duck_dask_array(self.obj.data)
# Numbagg doesn't handle object arrays and generally has dtype consistency,
# so doesn't deal well with bool arrays which are expected to change type.
and self.obj.data.dtype.kind not in "ObMm"
# TODO: we could also allow this, probably as part of a refactoring of this
# module, so we can use the machinery in `self.reduce`.
and self.ndim == 1
):
import numbagg

# Numbagg has a default ddof of 1. I (@max-sixty) think we should make
# this the default in xarray too, but until we do, don't use numbagg for
# std and var unless ddof is set to 1.
if (
numbagg_move_func not in [numbagg.move_std, numbagg.move_var]
or kwargs.get("ddof") == 1
):
return self._numbagg_reduce(
numbagg_move_func, keep_attrs=keep_attrs, **kwargs
)

if (
OPTIONS["use_bottleneck"]
and bottleneck_move_func is not None
Expand All @@ -583,8 +667,10 @@ def _numpy_or_bottleneck_reduce(
return self._bottleneck_reduce(
bottleneck_move_func, keep_attrs=keep_attrs, **kwargs
)

if rolling_agg_func:
return rolling_agg_func(self, keep_attrs=self._get_keep_attrs(keep_attrs))

if fillna is not None:
if fillna is dtypes.INF:
fillna = dtypes.get_pos_infinity(self.obj.dtype, max_for_int=True)
Expand Down Expand Up @@ -705,7 +791,7 @@ def _counts(self, keep_attrs: bool | None) -> Dataset:
DataArrayRolling._counts, keep_attrs=keep_attrs
)

def _numpy_or_bottleneck_reduce(
def _array_reduce(
self,
array_agg_func,
bottleneck_move_func,
Expand All @@ -715,7 +801,7 @@ def _numpy_or_bottleneck_reduce(
):
return self._dataset_implementation(
functools.partial(
DataArrayRolling._numpy_or_bottleneck_reduce,
DataArrayRolling._array_reduce,
array_agg_func=array_agg_func,
bottleneck_move_func=bottleneck_move_func,
rolling_agg_func=rolling_agg_func,
Expand Down
50 changes: 37 additions & 13 deletions xarray/tests/test_rolling.py
Expand Up @@ -10,7 +10,6 @@
from xarray import DataArray, Dataset, set_options
from xarray.tests import (
assert_allclose,
assert_array_equal,
assert_equal,
assert_identical,
has_dask,
Expand All @@ -24,6 +23,19 @@
]


@pytest.fixture(params=["numbagg", "bottleneck"])
def compute_backend(request):
max-sixty marked this conversation as resolved.
Show resolved Hide resolved
if request.param == "bottleneck":
options = dict(use_bottleneck=True, use_numbagg=False)
elif request.param == "numbagg":
options = dict(use_bottleneck=False, use_numbagg=True)
else:
raise ValueError

with xr.set_options(**options):
yield request.param


class TestDataArrayRolling:
@pytest.mark.parametrize("da", (1, 2), indirect=True)
@pytest.mark.parametrize("center", [True, False])
Expand Down Expand Up @@ -87,9 +99,10 @@ def test_rolling_properties(self, da) -> None:
@pytest.mark.parametrize("center", (True, False, None))
@pytest.mark.parametrize("min_periods", (1, None))
@pytest.mark.parametrize("backend", ["numpy"], indirect=True)
def test_rolling_wrapped_bottleneck(self, da, name, center, min_periods) -> None:
def test_rolling_wrapped_bottleneck(
self, da, name, center, min_periods, compute_backend
) -> None:
bn = pytest.importorskip("bottleneck", minversion="1.1")

# Test all bottleneck functions
rolling_obj = da.rolling(time=7, min_periods=min_periods)

Expand All @@ -98,15 +111,18 @@ def test_rolling_wrapped_bottleneck(self, da, name, center, min_periods) -> None
expected = getattr(bn, func_name)(
da.values, window=7, axis=1, min_count=min_periods
)
assert_array_equal(actual.values, expected)

# Using assert_allclose because we get tiny (1e-17) differences in numbagg.
np.testing.assert_allclose(actual.values, expected)

with pytest.warns(DeprecationWarning, match="Reductions are applied"):
getattr(rolling_obj, name)(dim="time")

# Test center
rolling_obj = da.rolling(time=7, center=center)
actual = getattr(rolling_obj, name)()["time"]
assert_equal(actual, da["time"])
# Using assert_allclose because we get tiny (1e-17) differences in numbagg.
assert_allclose(actual, da["time"])

@requires_dask
@pytest.mark.parametrize("name", ("mean", "count"))
Expand Down Expand Up @@ -153,7 +169,9 @@ def test_rolling_wrapped_dask_nochunk(self, center) -> None:
@pytest.mark.parametrize("center", (True, False))
@pytest.mark.parametrize("min_periods", (None, 1, 2, 3))
@pytest.mark.parametrize("window", (1, 2, 3, 4))
def test_rolling_pandas_compat(self, center, window, min_periods) -> None:
def test_rolling_pandas_compat(
self, center, window, min_periods, compute_backend
) -> None:
s = pd.Series(np.arange(10))
da = DataArray.from_series(s)

Expand Down Expand Up @@ -203,7 +221,9 @@ def test_rolling_construct(self, center: bool, window: int) -> None:
@pytest.mark.parametrize("min_periods", (None, 1, 2, 3))
@pytest.mark.parametrize("window", (1, 2, 3, 4))
@pytest.mark.parametrize("name", ("sum", "mean", "std", "max"))
def test_rolling_reduce(self, da, center, min_periods, window, name) -> None:
def test_rolling_reduce(
self, da, center, min_periods, window, name, compute_backend
) -> None:
if min_periods is not None and window < min_periods:
min_periods = window

Expand All @@ -223,7 +243,9 @@ def test_rolling_reduce(self, da, center, min_periods, window, name) -> None:
@pytest.mark.parametrize("min_periods", (None, 1, 2, 3))
@pytest.mark.parametrize("window", (1, 2, 3, 4))
@pytest.mark.parametrize("name", ("sum", "max"))
def test_rolling_reduce_nonnumeric(self, center, min_periods, window, name) -> None:
def test_rolling_reduce_nonnumeric(
self, center, min_periods, window, name, compute_backend
) -> None:
da = DataArray(
[0, np.nan, 1, 2, np.nan, 3, 4, 5, np.nan, 6, 7], dims="time"
).isnull()
Expand All @@ -239,7 +261,7 @@ def test_rolling_reduce_nonnumeric(self, center, min_periods, window, name) -> N
assert_allclose(actual, expected)
assert actual.dims == expected.dims

def test_rolling_count_correct(self) -> None:
def test_rolling_count_correct(self, compute_backend) -> None:
da = DataArray([0, np.nan, 1, 2, np.nan, 3, 4, 5, np.nan, 6, 7], dims="time")

kwargs: list[dict[str, Any]] = [
Expand Down Expand Up @@ -279,7 +301,9 @@ def test_rolling_count_correct(self) -> None:
@pytest.mark.parametrize("center", (True, False))
@pytest.mark.parametrize("min_periods", (None, 1))
@pytest.mark.parametrize("name", ("sum", "mean", "max"))
def test_ndrolling_reduce(self, da, center, min_periods, name) -> None:
def test_ndrolling_reduce(
self, da, center, min_periods, name, compute_backend
) -> None:
rolling_obj = da.rolling(time=3, x=2, center=center, min_periods=min_periods)

actual = getattr(rolling_obj, name)()
Expand Down Expand Up @@ -560,7 +584,7 @@ def test_rolling_properties(self, ds) -> None:
@pytest.mark.parametrize("key", ("z1", "z2"))
@pytest.mark.parametrize("backend", ["numpy"], indirect=True)
def test_rolling_wrapped_bottleneck(
self, ds, name, center, min_periods, key
self, ds, name, center, min_periods, key, compute_backend
) -> None:
bn = pytest.importorskip("bottleneck", minversion="1.1")

Expand All @@ -577,12 +601,12 @@ def test_rolling_wrapped_bottleneck(
)
else:
raise ValueError
assert_array_equal(actual[key].values, expected)
np.testing.assert_allclose(actual[key].values, expected)

# Test center
rolling_obj = ds.rolling(time=7, center=center)
actual = getattr(rolling_obj, name)()["time"]
assert_equal(actual, ds["time"])
assert_allclose(actual, ds["time"])

@pytest.mark.parametrize("center", (True, False))
@pytest.mark.parametrize("min_periods", (None, 1, 2, 3))
Expand Down