diff --git a/doc/whats-new.rst b/doc/whats-new.rst index bfe0a1b9be5..e05756401f7 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -23,6 +23,13 @@ v2023.11.1 (unreleased) New Features ~~~~~~~~~~~~ +- :py:meth:`rolling` uses numbagg `_ for + most of its computations by default. Numbagg is up to 5x faster than bottleneck + where parallelization is possible. Where parallelization isn't possible — for + example a 1D array — it's about the same speed as bottleneck, and 2-5x faster + than pandas' default functions. (:pull:`8493`). numbagg is an optional + dependency, so requires installing separately. + By `Maximilian Roos `_. - Use a concise format when plotting datetime arrays. (:pull:`8449`). By `Jimmy Westling `_. - Avoid overwriting unchanged existing coordinate variables when appending by setting ``mode='a-'``. @@ -77,13 +84,13 @@ Documentation - Improved error message when attempting to get a variable which doesn't exist from a Dataset. (:pull:`8474`) By `Maximilian Roos `_. -- Fix default value of ``combine_attrs `` in :py:func:`xarray.combine_by_coords` (:pull:`8471`) +- Fix default value of ``combine_attrs`` in :py:func:`xarray.combine_by_coords` (:pull:`8471`) By `Gregorio L. Trevisan `_. Internal Changes ~~~~~~~~~~~~~~~~ -- :py:meth:`DataArray.bfill` & :py:meth:`DataArray.ffill` now use numbagg by +- :py:meth:`DataArray.bfill` & :py:meth:`DataArray.ffill` now use numbagg `_ by default, which is up to 5x faster where parallelization is possible. (:pull:`8339`) By `Maximilian Roos `_. - Update mypy version to 1.7 (:issue:`8448`, :pull:`8501`). diff --git a/xarray/core/rolling.py b/xarray/core/rolling.py index 8f21fe37072..819c31642d0 100644 --- a/xarray/core/rolling.py +++ b/xarray/core/rolling.py @@ -8,13 +8,14 @@ from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar import numpy as np +from packaging.version import Version -from xarray.core import dtypes, duck_array_ops, utils +from xarray.core import dtypes, duck_array_ops, pycompat, utils from xarray.core.arithmetic import CoarsenArithmetic from xarray.core.options import OPTIONS, _get_keep_attrs from xarray.core.pycompat import is_duck_dask_array from xarray.core.types import CoarsenBoundaryOptions, SideOptions, T_Xarray -from xarray.core.utils import either_dict_or_kwargs +from xarray.core.utils import either_dict_or_kwargs, module_available try: import bottleneck @@ -145,7 +146,13 @@ def _reduce_method( # type: ignore[misc] name: str, fillna: Any, rolling_agg_func: Callable | None = None ) -> Callable[..., T_Xarray]: """Constructs reduction methods built on a numpy reduction function (e.g. sum), - a bottleneck reduction function (e.g. move_sum), or a Rolling reduction (_mean). + a numbagg reduction function (e.g. move_sum), a bottleneck reduction function + (e.g. move_sum), or a Rolling reduction (_mean). + + The logic here for which function to run is quite diffuse, across this method & + _array_reduce. Arguably we could refactor this. But one constraint is that we + need context of xarray options, of the functions each library offers, of + the array (e.g. dtype). """ if rolling_agg_func: array_agg_func = None @@ -153,14 +160,21 @@ def _reduce_method( # type: ignore[misc] array_agg_func = getattr(duck_array_ops, name) bottleneck_move_func = getattr(bottleneck, "move_" + name, None) + if module_available("numbagg"): + import numbagg + + numbagg_move_func = getattr(numbagg, "move_" + name, None) + else: + numbagg_move_func = None def method(self, keep_attrs=None, **kwargs): keep_attrs = self._get_keep_attrs(keep_attrs) - return self._numpy_or_bottleneck_reduce( - array_agg_func, - bottleneck_move_func, - rolling_agg_func, + return self._array_reduce( + array_agg_func=array_agg_func, + bottleneck_move_func=bottleneck_move_func, + numbagg_move_func=numbagg_move_func, + rolling_agg_func=rolling_agg_func, keep_attrs=keep_attrs, fillna=fillna, **kwargs, @@ -510,9 +524,47 @@ def _counts(self, keep_attrs: bool | None) -> DataArray: ) return counts - def _bottleneck_reduce(self, func, keep_attrs, **kwargs): - from xarray.core.dataarray import DataArray + def _numbagg_reduce(self, func, keep_attrs, **kwargs): + # Some of this is copied from `_bottleneck_reduce`, we could reduce this as part + # of a wider refactor. + + axis = self.obj.get_axis_num(self.dim[0]) + padded = self.obj.variable + if self.center[0]: + if is_duck_dask_array(padded.data): + # workaround to make the padded chunk size larger than + # self.window - 1 + shift = -(self.window[0] + 1) // 2 + offset = (self.window[0] - 1) // 2 + valid = (slice(None),) * axis + ( + slice(offset, offset + self.obj.shape[axis]), + ) + else: + shift = (-self.window[0] // 2) + 1 + valid = (slice(None),) * axis + (slice(-shift, None),) + padded = padded.pad({self.dim[0]: (0, -shift)}, mode="constant") + + if is_duck_dask_array(padded.data) and False: + raise AssertionError("should not be reachable") + else: + values = func( + padded.data, + window=self.window[0], + min_count=self.min_periods, + axis=axis, + ) + + if self.center[0]: + values = values[valid] + + attrs = self.obj.attrs if keep_attrs else {} + + return self.obj.__class__( + values, self.obj.coords, attrs=attrs, name=self.obj.name + ) + + def _bottleneck_reduce(self, func, keep_attrs, **kwargs): # bottleneck doesn't allow min_count to be 0, although it should # work the same as if min_count = 1 # Note bottleneck only works with 1d-rolling. @@ -550,12 +602,15 @@ def _bottleneck_reduce(self, func, keep_attrs, **kwargs): attrs = self.obj.attrs if keep_attrs else {} - return DataArray(values, self.obj.coords, attrs=attrs, name=self.obj.name) + return self.obj.__class__( + values, self.obj.coords, attrs=attrs, name=self.obj.name + ) - def _numpy_or_bottleneck_reduce( + def _array_reduce( self, array_agg_func, bottleneck_move_func, + numbagg_move_func, rolling_agg_func, keep_attrs, fillna, @@ -571,6 +626,35 @@ def _numpy_or_bottleneck_reduce( ) del kwargs["dim"] + if ( + OPTIONS["use_numbagg"] + and module_available("numbagg") + and pycompat.mod_version("numbagg") >= Version("0.6.3") + and numbagg_move_func is not None + # TODO: we could at least allow this for the equivalent of `apply_ufunc`'s + # "parallelized". `rolling_exp` does this, as an example (but rolling_exp is + # much simpler) + and not is_duck_dask_array(self.obj.data) + # Numbagg doesn't handle object arrays and generally has dtype consistency, + # so doesn't deal well with bool arrays which are expected to change type. + and self.obj.data.dtype.kind not in "ObMm" + # TODO: we could also allow this, probably as part of a refactoring of this + # module, so we can use the machinery in `self.reduce`. + and self.ndim == 1 + ): + import numbagg + + # Numbagg has a default ddof of 1. I (@max-sixty) think we should make + # this the default in xarray too, but until we do, don't use numbagg for + # std and var unless ddof is set to 1. + if ( + numbagg_move_func not in [numbagg.move_std, numbagg.move_var] + or kwargs.get("ddof") == 1 + ): + return self._numbagg_reduce( + numbagg_move_func, keep_attrs=keep_attrs, **kwargs + ) + if ( OPTIONS["use_bottleneck"] and bottleneck_move_func is not None @@ -583,8 +667,10 @@ def _numpy_or_bottleneck_reduce( return self._bottleneck_reduce( bottleneck_move_func, keep_attrs=keep_attrs, **kwargs ) + if rolling_agg_func: return rolling_agg_func(self, keep_attrs=self._get_keep_attrs(keep_attrs)) + if fillna is not None: if fillna is dtypes.INF: fillna = dtypes.get_pos_infinity(self.obj.dtype, max_for_int=True) @@ -705,7 +791,7 @@ def _counts(self, keep_attrs: bool | None) -> Dataset: DataArrayRolling._counts, keep_attrs=keep_attrs ) - def _numpy_or_bottleneck_reduce( + def _array_reduce( self, array_agg_func, bottleneck_move_func, @@ -715,7 +801,7 @@ def _numpy_or_bottleneck_reduce( ): return self._dataset_implementation( functools.partial( - DataArrayRolling._numpy_or_bottleneck_reduce, + DataArrayRolling._array_reduce, array_agg_func=array_agg_func, bottleneck_move_func=bottleneck_move_func, rolling_agg_func=rolling_agg_func, diff --git a/xarray/tests/test_rolling.py b/xarray/tests/test_rolling.py index cb7b723a208..0ea373e3ab0 100644 --- a/xarray/tests/test_rolling.py +++ b/xarray/tests/test_rolling.py @@ -10,7 +10,6 @@ from xarray import DataArray, Dataset, set_options from xarray.tests import ( assert_allclose, - assert_array_equal, assert_equal, assert_identical, has_dask, @@ -24,6 +23,19 @@ ] +@pytest.fixture(params=["numbagg", "bottleneck"]) +def compute_backend(request): + if request.param == "bottleneck": + options = dict(use_bottleneck=True, use_numbagg=False) + elif request.param == "numbagg": + options = dict(use_bottleneck=False, use_numbagg=True) + else: + raise ValueError + + with xr.set_options(**options): + yield request.param + + class TestDataArrayRolling: @pytest.mark.parametrize("da", (1, 2), indirect=True) @pytest.mark.parametrize("center", [True, False]) @@ -87,9 +99,10 @@ def test_rolling_properties(self, da) -> None: @pytest.mark.parametrize("center", (True, False, None)) @pytest.mark.parametrize("min_periods", (1, None)) @pytest.mark.parametrize("backend", ["numpy"], indirect=True) - def test_rolling_wrapped_bottleneck(self, da, name, center, min_periods) -> None: + def test_rolling_wrapped_bottleneck( + self, da, name, center, min_periods, compute_backend + ) -> None: bn = pytest.importorskip("bottleneck", minversion="1.1") - # Test all bottleneck functions rolling_obj = da.rolling(time=7, min_periods=min_periods) @@ -98,7 +111,9 @@ def test_rolling_wrapped_bottleneck(self, da, name, center, min_periods) -> None expected = getattr(bn, func_name)( da.values, window=7, axis=1, min_count=min_periods ) - assert_array_equal(actual.values, expected) + + # Using assert_allclose because we get tiny (1e-17) differences in numbagg. + np.testing.assert_allclose(actual.values, expected) with pytest.warns(DeprecationWarning, match="Reductions are applied"): getattr(rolling_obj, name)(dim="time") @@ -106,7 +121,8 @@ def test_rolling_wrapped_bottleneck(self, da, name, center, min_periods) -> None # Test center rolling_obj = da.rolling(time=7, center=center) actual = getattr(rolling_obj, name)()["time"] - assert_equal(actual, da["time"]) + # Using assert_allclose because we get tiny (1e-17) differences in numbagg. + assert_allclose(actual, da["time"]) @requires_dask @pytest.mark.parametrize("name", ("mean", "count")) @@ -153,7 +169,9 @@ def test_rolling_wrapped_dask_nochunk(self, center) -> None: @pytest.mark.parametrize("center", (True, False)) @pytest.mark.parametrize("min_periods", (None, 1, 2, 3)) @pytest.mark.parametrize("window", (1, 2, 3, 4)) - def test_rolling_pandas_compat(self, center, window, min_periods) -> None: + def test_rolling_pandas_compat( + self, center, window, min_periods, compute_backend + ) -> None: s = pd.Series(np.arange(10)) da = DataArray.from_series(s) @@ -203,7 +221,9 @@ def test_rolling_construct(self, center: bool, window: int) -> None: @pytest.mark.parametrize("min_periods", (None, 1, 2, 3)) @pytest.mark.parametrize("window", (1, 2, 3, 4)) @pytest.mark.parametrize("name", ("sum", "mean", "std", "max")) - def test_rolling_reduce(self, da, center, min_periods, window, name) -> None: + def test_rolling_reduce( + self, da, center, min_periods, window, name, compute_backend + ) -> None: if min_periods is not None and window < min_periods: min_periods = window @@ -223,7 +243,9 @@ def test_rolling_reduce(self, da, center, min_periods, window, name) -> None: @pytest.mark.parametrize("min_periods", (None, 1, 2, 3)) @pytest.mark.parametrize("window", (1, 2, 3, 4)) @pytest.mark.parametrize("name", ("sum", "max")) - def test_rolling_reduce_nonnumeric(self, center, min_periods, window, name) -> None: + def test_rolling_reduce_nonnumeric( + self, center, min_periods, window, name, compute_backend + ) -> None: da = DataArray( [0, np.nan, 1, 2, np.nan, 3, 4, 5, np.nan, 6, 7], dims="time" ).isnull() @@ -239,7 +261,7 @@ def test_rolling_reduce_nonnumeric(self, center, min_periods, window, name) -> N assert_allclose(actual, expected) assert actual.dims == expected.dims - def test_rolling_count_correct(self) -> None: + def test_rolling_count_correct(self, compute_backend) -> None: da = DataArray([0, np.nan, 1, 2, np.nan, 3, 4, 5, np.nan, 6, 7], dims="time") kwargs: list[dict[str, Any]] = [ @@ -279,7 +301,9 @@ def test_rolling_count_correct(self) -> None: @pytest.mark.parametrize("center", (True, False)) @pytest.mark.parametrize("min_periods", (None, 1)) @pytest.mark.parametrize("name", ("sum", "mean", "max")) - def test_ndrolling_reduce(self, da, center, min_periods, name) -> None: + def test_ndrolling_reduce( + self, da, center, min_periods, name, compute_backend + ) -> None: rolling_obj = da.rolling(time=3, x=2, center=center, min_periods=min_periods) actual = getattr(rolling_obj, name)() @@ -560,7 +584,7 @@ def test_rolling_properties(self, ds) -> None: @pytest.mark.parametrize("key", ("z1", "z2")) @pytest.mark.parametrize("backend", ["numpy"], indirect=True) def test_rolling_wrapped_bottleneck( - self, ds, name, center, min_periods, key + self, ds, name, center, min_periods, key, compute_backend ) -> None: bn = pytest.importorskip("bottleneck", minversion="1.1") @@ -577,12 +601,12 @@ def test_rolling_wrapped_bottleneck( ) else: raise ValueError - assert_array_equal(actual[key].values, expected) + np.testing.assert_allclose(actual[key].values, expected) # Test center rolling_obj = ds.rolling(time=7, center=center) actual = getattr(rolling_obj, name)()["time"] - assert_equal(actual, ds["time"]) + assert_allclose(actual, ds["time"]) @pytest.mark.parametrize("center", (True, False)) @pytest.mark.parametrize("min_periods", (None, 1, 2, 3))