From ef6bbc40257e04994bd4854fdba9a7b3e6c2ba69 Mon Sep 17 00:00:00 2001 From: Mathias Hauser Date: Fri, 29 Sep 2023 04:16:15 +0200 Subject: [PATCH 01/27] use engine flox for ordered groups --- flox/core.py | 22 ++++++++++++++++------ flox/xarray.py | 2 +- tests/conftest.py | 2 +- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/flox/core.py b/flox/core.py index f8f700f99..f7ce72f69 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1755,7 +1755,7 @@ def groupby_reduce( dtype: np.typing.DTypeLike = None, min_count: int | None = None, method: T_Method = "map-reduce", - engine: T_Engine = "numpy", + engine: T_Engine = None, reindex: bool | None = None, finalize_kwargs: dict[Any, Any] | None = None, ) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]: # type: ignore[misc] # Unpack not in mypy yet @@ -1851,17 +1851,27 @@ def groupby_reduce( xarray.xarray_reduce """ + bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) + nby = len(bys) + by_is_dask = tuple(is_duck_dask_array(b) for b in bys) + any_by_dask = any(by_is_dask) + + if engine is None: + # choose numpy per default + engine = "numpy" + + if nby == 1 and not any_by_dask and bys[0].ndim == 1: + # maybe move to helper function + issorted = lambda arr: (arr[:-1] <= arr[1:]).all() + if not _is_arg_reduction(func) and issorted(bys[0]): + engine = "flox" + if engine == "flox" and _is_arg_reduction(func): raise NotImplementedError( "argreductions not supported for engine='flox' yet." "Try engine='numpy' or engine='numba' instead." ) - bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) - nby = len(bys) - by_is_dask = tuple(is_duck_dask_array(b) for b in bys) - any_by_dask = any(by_is_dask) - if method in ["split-reduce", "cohorts"] and any_by_dask: raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") diff --git a/flox/xarray.py b/flox/xarray.py index 487850ca0..bd0e6891f 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -72,7 +72,7 @@ def xarray_reduce( fill_value=None, dtype: np.typing.DTypeLike = None, method: str = "map-reduce", - engine: str = "numpy", + engine: str = None, keep_attrs: bool | None = True, skipna: bool | None = None, min_count: int | None = None, diff --git a/tests/conftest.py b/tests/conftest.py index 5c3bb81f6..37956aaf8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,7 @@ import pytest -@pytest.fixture(scope="module", params=["flox", "numpy", "numba"]) +@pytest.fixture(scope="module", params=[None, "flox", "numpy", "numba"]) def engine(request): if request.param == "numba": try: From e0ea5695135934b416c47876e0e326f06b009fea Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 6 Oct 2023 21:49:58 -0600 Subject: [PATCH 02/27] Add issorted helper func --- flox/core.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index f7ce72f69..5539feb46 100644 --- a/flox/core.py +++ b/flox/core.py @@ -84,6 +84,10 @@ DUMMY_AXIS = -2 +def _issorted(arr): + return (arr[:-1] <= arr[1:]).all() + + def _is_arg_reduction(func: T_Agg) -> bool: if isinstance(func, str) and func in ["argmin", "argmax", "nanargmax", "nanargmin"]: return True @@ -1861,9 +1865,7 @@ def groupby_reduce( engine = "numpy" if nby == 1 and not any_by_dask and bys[0].ndim == 1: - # maybe move to helper function - issorted = lambda arr: (arr[:-1] <= arr[1:]).all() - if not _is_arg_reduction(func) and issorted(bys[0]): + if not _is_arg_reduction(func) and _issorted(bys[0]): engine = "flox" if engine == "flox" and _is_arg_reduction(func): @@ -2046,7 +2048,7 @@ def groupby_reduce( assert len(groups) == 1 sorted_idx = np.argsort(groups[0]) # This optimization helps specifically with resampling - if not (sorted_idx[:-1] <= sorted_idx[1:]).all(): + if not _issorted(sorted_idx).all(): result = result[..., sorted_idx] groups = (groups[0][sorted_idx],) From d4e30d86a20bb830d7c81fdfa1f06480ff616e21 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 6 Oct 2023 22:06:20 -0600 Subject: [PATCH 03/27] Some fixes --- asv_bench/benchmarks/reduce.py | 2 +- flox/core.py | 35 +++++++++++++++++++++++++--------- flox/xrutils.py | 19 ++++++++++++++++++ tests/test_core.py | 29 ++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 10 deletions(-) diff --git a/asv_bench/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py index 985658785..4ccf2fdd1 100644 --- a/asv_bench/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -8,7 +8,7 @@ N = 1000 funcs = ["sum", "nansum", "mean", "nanmean", "max", "var", "nanvar", "count"] -engines = ["flox", "numpy"] +engines = [None, "flox", "numpy"] expected_groups = [None, pd.IntervalIndex.from_breaks([1, 2, 4])] diff --git a/flox/core.py b/flox/core.py index 5539feb46..107092c75 100644 --- a/flox/core.py +++ b/flox/core.py @@ -35,7 +35,9 @@ generic_aggregate, ) from .cache import memoize -from .xrutils import is_duck_array, is_duck_dask_array, isnull +from .xrutils import is_duck_array, is_duck_dask_array, isnull, module_available + +HAS_NUMBAGG = module_available("numbagg") if TYPE_CHECKING: try: @@ -70,6 +72,7 @@ T_Dtypes = Union[np.typing.DTypeLike, Sequence[np.typing.DTypeLike], None] T_FillValues = Union[np.typing.ArrayLike, Sequence[np.typing.ArrayLike], None] T_Engine = Literal["flox", "numpy", "numba"] + T_EngineOpt = None | T_Engine T_Method = Literal["map-reduce", "blockwise", "cohorts"] T_IsBins = Union[bool | Sequence[bool]] @@ -1747,6 +1750,25 @@ def _validate_expected_groups(nby: int, expected_groups: T_ExpectedGroupsOpt) -> return expected_groups +def _choose_engine(bys, func): + # choose numpy per default + HAS_NUMBAGG = False # TODO: delete + if HAS_NUMBAGG and not _is_arg_reduction(func): + engine = "numbagg" + else: + engine = "numpy" + + nby = len(bys) + by_is_dask = tuple(is_duck_dask_array(b) for b in bys) + any_by_dask = any(by_is_dask) + + if nby == 1 and not any_by_dask and bys[0].ndim == 1: + if not _is_arg_reduction(func) and _issorted(bys[0]): + engine = "flox" + + return engine + + def groupby_reduce( array: np.ndarray | DaskArray, *by: T_By, @@ -1759,7 +1781,7 @@ def groupby_reduce( dtype: np.typing.DTypeLike = None, min_count: int | None = None, method: T_Method = "map-reduce", - engine: T_Engine = None, + engine: T_EngineOpt = None, reindex: bool | None = None, finalize_kwargs: dict[Any, Any] | None = None, ) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]: # type: ignore[misc] # Unpack not in mypy yet @@ -1822,7 +1844,7 @@ def groupby_reduce( (for 1D ``by`` only). * ``"split-reduce"``: Same as "cohorts" and will be removed soon. - engine : {"flox", "numpy", "numba"}, optional + engine : {None, "flox", "numpy", "numba"}, optional Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk: * ``"numpy"``: Use the vectorized implementations in ``numpy_groupies.aggregate_numpy``. @@ -1861,12 +1883,7 @@ def groupby_reduce( any_by_dask = any(by_is_dask) if engine is None: - # choose numpy per default - engine = "numpy" - - if nby == 1 and not any_by_dask and bys[0].ndim == 1: - if not _is_arg_reduction(func) and _issorted(bys[0]): - engine = "flox" + engine = _choose_engine(bys, func) if engine == "flox" and _is_arg_reduction(func): raise NotImplementedError( diff --git a/flox/xrutils.py b/flox/xrutils.py index 958bd3976..2204dc0c4 100644 --- a/flox/xrutils.py +++ b/flox/xrutils.py @@ -2,6 +2,7 @@ # defined in xarray import datetime +import importlib from typing import Any, Iterable import numpy as np @@ -316,3 +317,21 @@ def nanlast(values, axis, keepdims=False): return np.expand_dims(result, axis=axis) else: return result + + +def module_available(module: str) -> bool: + """Checks whether a module is installed without importing it. + + Use this for a lightweight check and lazy imports. + + Parameters + ---------- + module : str + Name of the module. + + Returns + ------- + available : bool + Whether the module is installed. + """ + return importlib.util.find_spec(module) is not None diff --git a/tests/test_core.py b/tests/test_core.py index 83b823b07..376ddc054 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -13,6 +13,7 @@ from flox import xrutils from flox.aggregations import Aggregation from flox.core import ( + _choose_engine, _convert_expected_groups_to_index, _get_optimal_chunks_for_groups, _normalize_indexes, @@ -29,6 +30,7 @@ assert_equal, assert_equal_tuple, has_dask, + # has_numbagg, raise_if_dask_computes, requires_dask, ) @@ -1465,3 +1467,30 @@ def test_method_check_numpy(): ] ) assert_equal(actual, expected) + + +def test_choose_engine(): + has_numbagg = False # TODO: delete + default = "numbagg" if has_numbagg else "numpy" + # sorted by -> flox + assert _choose_engine(bys=(np.array([1, 1, 2, 2]),), func="mean") == "flox" + # unsorted by -> numpy + assert _choose_engine(bys=(np.array([3, 1, 1]),), func="mean") == default + # by is dask, not flox + assert ( + _choose_engine( + bys=( + dask.array.ones( + 3, + ), + ), + func="mean", + ) + == default + ) + # nD by + assert _choose_engine(bys=(np.ones((2, 2)),), func="mean") == default + # nby == ` + assert _choose_engine(bys=(np.ones((2,)), np.ones((2,))), func="mean") == default + # argmax does not give engine="flox" + assert _choose_engine(bys=(np.array([1, 1, 2, 2]),), func="argmax") == "numpy" From c8000e380a9265fc44bc91615330890223768dba Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 6 Oct 2023 22:09:55 -0600 Subject: [PATCH 04/27] In xarray too --- flox/xarray.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flox/xarray.py b/flox/xarray.py index bd0e6891f..a335a6f35 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -10,6 +10,7 @@ from .aggregations import Aggregation, _atleast_1d from .core import ( + _choose_engine, _convert_expected_groups_to_index, _get_expected_groups, _validate_expected_groups, @@ -72,7 +73,7 @@ def xarray_reduce( fill_value=None, dtype: np.typing.DTypeLike = None, method: str = "map-reduce", - engine: str = None, + engine: str | None = None, keep_attrs: bool | None = True, skipna: bool | None = None, min_count: int | None = None, @@ -362,9 +363,12 @@ def wrapper(array, *by, func, skipna, core_dims, **kwargs): if "nan" not in func and func not in ["all", "any", "count"]: func = f"nan{func}" + if kwargs.get("engine", None) is None: + kwargs["engine"] = _choose_engine(by, func) + # Flox's count works with non-numeric and its faster than converting. requires_numeric = func not in ["count", "any", "all"] or ( - func == "count" and engine != "flox" + func == "count" and kwargs["engine"] != "flox" ) if requires_numeric: is_npdatetime = array.dtype.kind in "Mm" From c483a1abe1f7e94bdc96875326d358a6bda0426a Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 6 Oct 2023 22:13:39 -0600 Subject: [PATCH 05/27] formatting --- tests/test_core.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index f6cece01c..e4fb4c05e 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1550,17 +1550,7 @@ def test_choose_engine(): # unsorted by -> numpy assert _choose_engine(bys=(np.array([3, 1, 1]),), func="mean") == default # by is dask, not flox - assert ( - _choose_engine( - bys=( - dask.array.ones( - 3, - ), - ), - func="mean", - ) - == default - ) + assert _choose_engine((dask.array.ones(3),), func="mean") == default # nD by assert _choose_engine(bys=(np.ones((2, 2)),), func="mean") == default # nby == ` From 3e4bae904fa269228794979fbae0f4f5ac296899 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Sat, 7 Oct 2023 08:23:26 -0600 Subject: [PATCH 06/27] simplify --- flox/core.py | 18 +++++++----------- flox/xarray.py | 4 ---- tests/test_core.py | 12 +++--------- 3 files changed, 10 insertions(+), 24 deletions(-) diff --git a/flox/core.py b/flox/core.py index ee7fa1bb2..d2fe34819 100644 --- a/flox/core.py +++ b/flox/core.py @@ -782,6 +782,10 @@ def chunk_reduce( group_idx = group_idx.reshape(-1) assert group_idx.ndim == 1 + + if engine is None: + engine = _choose_engine(by, func) + empty = np.all(props.nanmask) results: IntermediateDict = {"groups": [], "intermediates": []} @@ -1760,7 +1764,7 @@ def _validate_expected_groups(nby: int, expected_groups: T_ExpectedGroupsOpt) -> return expected_groups -def _choose_engine(bys, func): +def _choose_engine(by, func): # choose numpy per default HAS_NUMBAGG = False # TODO: delete if HAS_NUMBAGG and not _is_arg_reduction(func): @@ -1768,13 +1772,8 @@ def _choose_engine(bys, func): else: engine = "numpy" - nby = len(bys) - by_is_dask = tuple(is_duck_dask_array(b) for b in bys) - any_by_dask = any(by_is_dask) - - if nby == 1 and not any_by_dask and bys[0].ndim == 1: - if not _is_arg_reduction(func) and _issorted(bys[0]): - engine = "flox" + if not _is_arg_reduction(func) and _issorted(by): + engine = "flox" return engine @@ -1895,9 +1894,6 @@ def groupby_reduce( bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) - if engine is None: - engine = _choose_engine(bys, func) - if engine == "flox" and _is_arg_reduction(func): raise NotImplementedError( "argreductions not supported for engine='flox' yet." diff --git a/flox/xarray.py b/flox/xarray.py index 397e1d9b5..b2524d9ce 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -11,7 +11,6 @@ from .aggregations import Aggregation, _atleast_1d from .core import ( - _choose_engine, _convert_expected_groups_to_index, _get_expected_groups, _validate_expected_groups, @@ -367,9 +366,6 @@ def wrapper(array, *by, func, skipna, core_dims, **kwargs): if "nan" not in func and func not in ["all", "any", "count"]: func = f"nan{func}" - if kwargs.get("engine", None) is None: - kwargs["engine"] = _choose_engine(by, func) - # Flox's count works with non-numeric and its faster than converting. requires_numeric = func not in ["count", "any", "all"] or ( func == "count" and kwargs["engine"] != "flox" diff --git a/tests/test_core.py b/tests/test_core.py index e4fb4c05e..19b89edc9 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1546,14 +1546,8 @@ def test_choose_engine(): has_numbagg = False # TODO: delete default = "numbagg" if has_numbagg else "numpy" # sorted by -> flox - assert _choose_engine(bys=(np.array([1, 1, 2, 2]),), func="mean") == "flox" + assert _choose_engine(np.array([1, 1, 2, 2]), func="mean") == "flox" # unsorted by -> numpy - assert _choose_engine(bys=(np.array([3, 1, 1]),), func="mean") == default - # by is dask, not flox - assert _choose_engine((dask.array.ones(3),), func="mean") == default - # nD by - assert _choose_engine(bys=(np.ones((2, 2)),), func="mean") == default - # nby == ` - assert _choose_engine(bys=(np.ones((2,)), np.ones((2,))), func="mean") == default + assert _choose_engine(np.array([3, 1, 1]), func="mean") == default # argmax does not give engine="flox" - assert _choose_engine(bys=(np.array([1, 1, 2, 2]),), func="argmax") == "numpy" + assert _choose_engine(np.array([1, 1, 2, 2]), func="argmax") == "numpy" From 92eaf2c08bae91c1d97e81bd30869068c89da26a Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 9 Oct 2023 18:20:33 -0600 Subject: [PATCH 07/27] retry --- flox/aggregations.py | 2 ++ flox/core.py | 32 ++++++++++++++++++++++---------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index ec696da53..7a89dd888 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -29,6 +29,7 @@ class AggDtypeInit(TypedDict): class AggDtype(TypedDict): + user: DTypeLike | None final: np.dtype numpy: tuple[np.dtype | type[np.intp], ...] intermediate: tuple[np.dtype | type[np.intp], ...] @@ -569,6 +570,7 @@ def _initialize_aggregation( final_dtype = _normalize_dtype(dtype_ or agg.dtype_init["final"], array_dtype, fill_value) agg.dtype = { + "user": dtype, # Save to automatically choose an engine "final": final_dtype, "numpy": (final_dtype,), "intermediate": tuple( diff --git a/flox/core.py b/flox/core.py index 60cbf96d8..1d312932f 100644 --- a/flox/core.py +++ b/flox/core.py @@ -629,6 +629,7 @@ def chunk_argreduce( reindex: bool = False, engine: T_Engine = "numpy", sort: bool = True, + user_dtype=None, ) -> IntermediateDict: """ Per-chunk arg reduction. @@ -649,6 +650,7 @@ def chunk_argreduce( dtype=dtype, engine=engine, sort=sort, + user_dtype=user_dtype, ) if not isnull(results["groups"]).all(): idx = np.broadcast_to(idx, array.shape) @@ -682,6 +684,7 @@ def chunk_reduce( engine: T_Engine = "numpy", kwargs: Sequence[dict] | None = None, sort: bool = True, + user_dtype=None, ) -> IntermediateDict: """ Wrapper for numpy_groupies aggregate that supports nD ``array`` and @@ -783,9 +786,6 @@ def chunk_reduce( assert group_idx.ndim == 1 - if engine is None: - engine = _choose_engine(by, func) - empty = np.all(props.nanmask) results: IntermediateDict = {"groups": [], "intermediates": []} @@ -817,6 +817,9 @@ def chunk_reduce( if empty: result = np.full(shape=final_array_shape, fill_value=fv) else: + if engine is None: + engine = _choose_engine(by, reduction, user_dtype) + if is_nanlen(reduction) and is_nanlen(previous_reduction): result = results["intermediates"][-1] @@ -1101,6 +1104,7 @@ def _grouped_combine( dtype=(np.intp,), engine=engine, sort=sort, + user_dtype=agg.dtype["user"], )["intermediates"][0] ) @@ -1130,6 +1134,7 @@ def _grouped_combine( dtype=(dtype,), engine=engine, sort=sort, + user_dtype=agg.dtype["user"], ) results["intermediates"].append(*_results["intermediates"]) results["groups"] = _results["groups"] @@ -1179,6 +1184,7 @@ def _reduce_blockwise( engine=engine, sort=sort, reindex=reindex, + user_dtype=agg.dtype["user"], ) if _is_arg_reduction(agg): @@ -1374,6 +1380,7 @@ def dask_groupby_agg( fill_value=agg.fill_value["intermediate"], dtype=agg.dtype["intermediate"], reindex=reindex, + user_dtype=agg.dtype["user"], ) if do_simple_combine: # Add a dummy dimension that then gets reduced over @@ -1764,16 +1771,21 @@ def _validate_expected_groups(nby: int, expected_groups: T_ExpectedGroupsOpt) -> return expected_groups -def _choose_engine(by, func): +def _choose_engine(by, func, dtype): # TODO: dtype for numbagg! # choose numpy per default - if HAS_NUMBAGG and not _is_arg_reduction(func): - engine = "numbagg" + if HAS_NUMBAGG: + if not _is_arg_reduction(func) and ( + dtype is None or (dtype is not None and func == "count") + ): + engine = "numbagg" + else: + engine = "numpy" else: - engine = "numpy" - - if not _is_arg_reduction(func) and _issorted(by): - engine = "flox" + if not _is_arg_reduction(func) and _issorted(by): + engine = "flox" + else: + engine = "numpy" return engine From 1a230eb98409713352caf554d40b2f20546e6f02 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 9 Oct 2023 23:39:40 -0600 Subject: [PATCH 08/27] flox --- flox/core.py | 14 +++++--------- tests/test_core.py | 13 ++++++++----- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/flox/core.py b/flox/core.py index 1d312932f..a553bf7ed 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1778,16 +1778,12 @@ def _choose_engine(by, func, dtype): if not _is_arg_reduction(func) and ( dtype is None or (dtype is not None and func == "count") ): - engine = "numbagg" - else: - engine = "numpy" - else: - if not _is_arg_reduction(func) and _issorted(by): - engine = "flox" - else: - engine = "numpy" + return "numbagg" - return engine + if not _is_arg_reduction(func) and _issorted(by): + return "flox" + else: + return "numpy" def groupby_reduce( diff --git a/tests/test_core.py b/tests/test_core.py index dac26f236..222d6cbe4 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1551,11 +1551,14 @@ def test_method_check_numpy(): assert_equal(actual, expected) -def test_choose_engine(): - default = "numbagg" if HAS_NUMBAGG else "numpy" +@pytest.mark.parametrize("dtype", [None, np.float64]) +def test_choose_engine(dtype): + numbagg_possible = HAS_NUMBAGG and dtype is None + default = "numbagg" if numbagg_possible else "numpy" # sorted by -> flox - assert _choose_engine(np.array([1, 1, 2, 2]), func="mean") == "flox" + sorted_engine = _choose_engine(np.array([1, 1, 2, 2]), func="mean", dtype=dtype) + assert sorted_engine == ("numbagg" if numbagg_possible else "flox") # unsorted by -> numpy - assert _choose_engine(np.array([3, 1, 1]), func="mean") == default + assert _choose_engine(np.array([3, 1, 1]), func="mean", dtype=dtype) == default # argmax does not give engine="flox" - assert _choose_engine(np.array([1, 1, 2, 2]), func="argmax") == "numpy" + assert _choose_engine(np.array([1, 1, 2, 2]), func="argmax", dtype=dtype) == "numpy" From 22bd20c1666ce453584f705c4170fd7c31c77ee7 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 10 Oct 2023 09:28:33 -0600 Subject: [PATCH 09/27] minversion numabgg --- flox/core.py | 2 +- flox/xrutils.py | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/flox/core.py b/flox/core.py index a553bf7ed..06c6b6493 100644 --- a/flox/core.py +++ b/flox/core.py @@ -35,7 +35,7 @@ from .cache import memoize from .xrutils import is_duck_array, is_duck_dask_array, isnull, module_available -HAS_NUMBAGG = module_available("numbagg") +HAS_NUMBAGG = module_available("numbagg", minversion="0.3.0") if TYPE_CHECKING: try: diff --git a/flox/xrutils.py b/flox/xrutils.py index 50ccffa30..1341b1e72 100644 --- a/flox/xrutils.py +++ b/flox/xrutils.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd from numpy.core.multiarray import normalize_axis_index # type: ignore[attr-defined] +from packaging.version import Version try: import cftime @@ -320,7 +321,7 @@ def nanlast(values, axis, keepdims=False): return result -def module_available(module: str) -> bool: +def module_available(module: str, minversion: str | None = None) -> bool: """Checks whether a module is installed without importing it. Use this for a lightweight check and lazy imports. @@ -335,4 +336,9 @@ def module_available(module: str) -> bool: available : bool Whether the module is installed. """ - return importlib.util.find_spec(module) is not None + has = importlib.util.find_spec(module) is not None + if has: + mod = importlib.import_module(module) + return Version(mod.__version__) < Version(minversion) if minversion is not None else True + else: + return False From 699ecd9b67448440b621d75e22fc5553738955d8 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 10 Oct 2023 09:29:19 -0600 Subject: [PATCH 10/27] cleanup --- flox/core.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/flox/core.py b/flox/core.py index 06c6b6493..38367b9c9 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1900,9 +1900,6 @@ def groupby_reduce( xarray.xarray_reduce """ - if func == "quantile" and (finalize_kwargs is None or "q" not in finalize_kwargs): - raise ValueError("Please pass `q` for quantile calculations.") - if engine == "flox" and _is_arg_reduction(func): raise NotImplementedError( "argreductions not supported for engine='flox' yet." From 3d6413f22c61adc55fb8d46debb3e97bede37aa0 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 10 Oct 2023 09:32:53 -0600 Subject: [PATCH 11/27] fix type --- flox/xrutils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flox/xrutils.py b/flox/xrutils.py index 1341b1e72..b523970c7 100644 --- a/flox/xrutils.py +++ b/flox/xrutils.py @@ -4,7 +4,7 @@ import datetime import importlib from collections.abc import Iterable -from typing import Any +from typing import Any, Optional import numpy as np import pandas as pd @@ -321,7 +321,7 @@ def nanlast(values, axis, keepdims=False): return result -def module_available(module: str, minversion: str | None = None) -> bool: +def module_available(module: str, minversion: Optional[str] = None) -> bool: """Checks whether a module is installed without importing it. Use this for a lightweight check and lazy imports. From 443fbc25c184843fdf3bf36fe4ad29177652384b Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 10 Oct 2023 23:04:29 -0600 Subject: [PATCH 12/27] update gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index dbfcb2ef4..c14b72e67 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +asv_bench/pkgs/ docs/source/generated/ html/ .asv/ From eedeb8e82af19c67b44389cb23c34e0ed8ceabc7 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 10 Oct 2023 23:11:20 -0600 Subject: [PATCH 13/27] add types --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 9ff9673f5..a777db31b 100644 --- a/flox/core.py +++ b/flox/core.py @@ -85,7 +85,7 @@ DUMMY_AXIS = -2 -def _issorted(arr): +def _issorted(arr: np.ndarray) -> bool: return (arr[:-1] <= arr[1:]).all() From cc9050921506456a211967e54433c4a10d980cf5 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 10 Oct 2023 23:14:12 -0600 Subject: [PATCH 14/27] Fix env? --- ci/environment.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ci/environment.yml b/ci/environment.yml index 33e1b4661..0c2b636d6 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -22,6 +22,5 @@ dependencies: - pooch - toolz - numba + - numbagg>=0.3 - scipy - - pip: - - numbagg>=0.3 From 56957a140b5a733841932518fa85c661899671d6 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 10 Oct 2023 23:18:41 -0600 Subject: [PATCH 15/27] fix --- flox/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index a777db31b..f2ba72627 100644 --- a/flox/core.py +++ b/flox/core.py @@ -86,7 +86,7 @@ def _issorted(arr: np.ndarray) -> bool: - return (arr[:-1] <= arr[1:]).all() + return bool((arr[:-1] <= arr[1:]).all()) def _is_arg_reduction(func: T_Agg) -> bool: @@ -2122,7 +2122,7 @@ def groupby_reduce( assert len(groups) == 1 sorted_idx = np.argsort(groups[0]) # This optimization helps specifically with resampling - if not _issorted(sorted_idx).all(): + if not _issorted(sorted_idx): result = result[..., sorted_idx] groups = (groups[0][sorted_idx],) From 16d43930c1b7865cc0f38161cfa9dae657888a44 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 10 Oct 2023 23:24:19 -0600 Subject: [PATCH 16/27] fix merge --- asv_bench/benchmarks/reduce.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/asv_bench/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py index 657261fc3..d445081e0 100644 --- a/asv_bench/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -7,7 +7,7 @@ N = 3000 funcs = ["sum", "nansum", "mean", "nanmean", "max", "nanmax", "var", "count", "all"] -engines = ["flox", "numpy", "numbagg"] +engines: list[str | None] = [None, "flox", "numpy", "numbagg"] expected_groups = { "None": None, "RangeIndex": pd.RangeIndex(5), @@ -15,11 +15,7 @@ } expected_names = tuple(expected_groups) -funcs = ["sum", "nansum", "mean", "nanmean", "max", "var", "nanvar", "count"] -engines = [None, "flox", "numpy"] -expected_groups = [None, pd.IntervalIndex.from_breaks([1, 2, 4])] NUMBAGG_FUNCS = ["nansum", "nanmean", "nanmax", "count", "all"] - numbagg_skip = [ (func, expected_names[0], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS ] + [(func, expected_names[1], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS] From f6262d24cfd28e8ff24a0adbe7b26464714aca7a Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 11 Oct 2023 11:39:10 -0600 Subject: [PATCH 17/27] cleanup --- tests/test_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index 222d6cbe4..882ca7e68 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -31,7 +31,6 @@ assert_equal, assert_equal_tuple, has_dask, - # has_numbagg, raise_if_dask_computes, requires_dask, requires_scipy, From e13c2d9c6dcb08aefb50e99479d61ed5ec6be142 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 11 Oct 2023 11:57:06 -0600 Subject: [PATCH 18/27] [skip-ci] bench --- asv_bench/benchmarks/reduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asv_bench/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py index d445081e0..71f7f4d4b 100644 --- a/asv_bench/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -7,7 +7,7 @@ N = 3000 funcs = ["sum", "nansum", "mean", "nanmean", "max", "nanmax", "var", "count", "all"] -engines: list[str | None] = [None, "flox", "numpy", "numbagg"] +engines = [None, "flox", "numpy", "numbagg"] expected_groups = { "None": None, "RangeIndex": pd.RangeIndex(5), From 46262f03ca3c9e1258ac79a7e4ca20dfd79921de Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 11 Oct 2023 18:50:57 -0600 Subject: [PATCH 19/27] temporarily disable numbagg --- ci/environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/environment.yml b/ci/environment.yml index 0c2b636d6..6b05c5aa2 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -22,5 +22,5 @@ dependencies: - pooch - toolz - numba - - numbagg>=0.3 + # - numbagg>=0.3 - scipy From ed8bd5101eaed94c167e0ed0e25156f6b9eef3f1 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 11 Oct 2023 18:55:19 -0600 Subject: [PATCH 20/27] don't cache env --- .github/workflows/ci.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b963d671d..17369e4e3 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -39,7 +39,7 @@ jobs: environment-file: ci/environment.yml environment-name: flox-tests init-shell: bash - cache-environment: true + # cache-environment: true create-args: | python=${{ matrix.python-version }} - name: Install flox @@ -83,7 +83,7 @@ jobs: environment-file: ci/environment.yml environment-name: flox-tests init-shell: bash - cache-environment: true + # cache-environment: true create-args: | python=${{ matrix.python-version }} - name: Install flox @@ -118,7 +118,7 @@ jobs: environment-file: ci/requirements/environment.yml environment-name: xarray-tests init-shell: bash - cache-environment: true + # cache-environment: true create-args: | python=3.10 - name: Install xarray From db0a6cc96b6115a202363ab3292f4310e5895f1b Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 11 Oct 2023 19:29:41 -0600 Subject: [PATCH 21/27] Finally! --- flox/core.py | 26 ++++++++++++++------------ tests/test_core.py | 15 +++++---------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/flox/core.py b/flox/core.py index f2ba72627..5d2aefba6 100644 --- a/flox/core.py +++ b/flox/core.py @@ -826,9 +826,6 @@ def chunk_reduce( if empty: result = np.full(shape=final_array_shape, fill_value=fv) else: - if engine is None: - engine = _choose_engine(by, reduction, user_dtype) - if is_nanlen(reduction) and is_nanlen(previous_reduction): result = results["intermediates"][-1] @@ -1780,16 +1777,15 @@ def _validate_expected_groups(nby: int, expected_groups: T_ExpectedGroupsOpt) -> return expected_groups -def _choose_engine(by, func, dtype): - # TODO: dtype for numbagg! - # choose numpy per default - if HAS_NUMBAGG: - if not _is_arg_reduction(func) and ( - dtype is None or (dtype is not None and func == "count") - ): +def _choose_engine(by, agg: Aggregation): + dtype = agg.dtype["user"] + + not_arg_reduce = not _is_arg_reduction(agg) + if HAS_NUMBAGG and "nan" in agg.name: + if not_arg_reduce and (dtype is None or (dtype is not None and agg.name == "count")): return "numbagg" - if not _is_arg_reduction(func) and _issorted(by): + if not_arg_reduce and _issorted(by): return "flox" else: return "numpy" @@ -2070,9 +2066,15 @@ def groupby_reduce( # overwrite than when min_count is set fill_value = np.nan - kwargs = dict(axis=axis_, fill_value=fill_value, engine=engine) + kwargs = dict(axis=axis_, fill_value=fill_value) agg = _initialize_aggregation(func, dtype, array.dtype, fill_value, min_count_, finalize_kwargs) + # Need to set this early using `agg` + # It cannot be done in the core loop of chunk_reduce + # since we "prepare" the data for flox. + if engine is None: + kwargs["engine"] = _choose_engine(by_, agg) + groups: tuple[np.ndarray | DaskArray, ...] if not has_dask: results = _reduce_blockwise( diff --git a/tests/test_core.py b/tests/test_core.py index 882ca7e68..1970bb37d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -602,12 +602,9 @@ def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): by = np.broadcast_to(labels2d, (3, *labels2d.shape)) rng = np.random.default_rng(12345) array = rng.random(by.shape) - kwargs = dict( - func=func, axis=axis, expected_groups=[0, 2], fill_value=fill_value, engine=engine - ) - expected, _ = groupby_reduce(array, by, **kwargs) + kwargs = dict(func=func, axis=axis, expected_groups=[0, 2], fill_value=fill_value) + expected, _ = groupby_reduce(array, by, engine=engine, **kwargs) if engine == "flox": - kwargs.pop("engine") expected_npg, _ = groupby_reduce(array, by, **kwargs, engine="numpy") assert_equal(expected_npg, expected) @@ -624,12 +621,9 @@ def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): by = np.broadcast_to(labels2d, (3, *labels2d.shape)) rng = np.random.default_rng(12345) array = rng.random(by.shape) - kwargs = dict( - func=func, axis=axis, expected_groups=[0, 2], fill_value=fill_value, engine=engine - ) - expected, _ = groupby_reduce(array, by, **kwargs) + kwargs = dict(func=func, axis=axis, expected_groups=[0, 2], fill_value=fill_value) + expected, _ = groupby_reduce(array, by, engine=engine, **kwargs) if engine == "flox": - kwargs.pop("engine") expected_npg, _ = groupby_reduce(array, by, **kwargs, engine="numpy") assert_equal(expected_npg, expected) @@ -642,6 +636,7 @@ def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): actual, _ = groupby_reduce( da.from_array(array, chunks=(-1, 2, 3)), da.from_array(by, chunks=(-1, 2, 2)), + engine=engine, **kwargs, ) assert_equal(actual, expected, tolerance) From 90cecf8f45639bb1f67012e75d75624d1fb2b8d4 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 13 Oct 2023 22:16:18 -0600 Subject: [PATCH 22/27] bugfix --- flox/core.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 5d2aefba6..4970fe09d 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2072,8 +2072,7 @@ def groupby_reduce( # Need to set this early using `agg` # It cannot be done in the core loop of chunk_reduce # since we "prepare" the data for flox. - if engine is None: - kwargs["engine"] = _choose_engine(by_, agg) + kwargs["engine"] = _choose_engine(by_, agg) if engine is None else engine groups: tuple[np.ndarray | DaskArray, ...] if not has_dask: From f2e0aa6440247f17176bc993998dcf63e3882439 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 13 Oct 2023 22:28:45 -0600 Subject: [PATCH 23/27] Fix doctest --- .github/workflows/ci-additional.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml index c71fc8913..ed78d328c 100644 --- a/.github/workflows/ci-additional.yaml +++ b/.github/workflows/ci-additional.yaml @@ -72,7 +72,10 @@ jobs: conda list - name: Run doctests run: | - python -m pytest --doctest-modules flox --ignore flox/tests --cov=./ --cov-report=xml + python -m pytest --doctest-modules \ + flox/aggregations.py flox/core.py flox/xarray.py \ + --ignore flox/tests \ + --cov=./ --cov-report=xml - name: Upload code coverage to Codecov uses: codecov/codecov-action@v3.1.4 with: From 39c19b845415d6f520910d02e9c18c6e2e64bbf5 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 13 Oct 2023 22:33:12 -0600 Subject: [PATCH 24/27] more fixes --- flox/core.py | 2 +- tests/test_core.py | 25 +++++++++++++++++++++---- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/flox/core.py b/flox/core.py index 4970fe09d..ca8b29a51 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1785,7 +1785,7 @@ def _choose_engine(by, agg: Aggregation): if not_arg_reduce and (dtype is None or (dtype is not None and agg.name == "count")): return "numbagg" - if not_arg_reduce and _issorted(by): + if not_arg_reduce and (not is_duck_dask_array(by) and _issorted(by)): return "flox" else: return "numpy" diff --git a/tests/test_core.py b/tests/test_core.py index 1970bb37d..ad2c70624 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -11,7 +11,7 @@ from numpy_groupies.aggregate_numpy import aggregate from flox import xrutils -from flox.aggregations import Aggregation +from flox.aggregations import Aggregation, _initialize_aggregation from flox.core import ( HAS_NUMBAGG, _choose_engine, @@ -1549,10 +1549,27 @@ def test_method_check_numpy(): def test_choose_engine(dtype): numbagg_possible = HAS_NUMBAGG and dtype is None default = "numbagg" if numbagg_possible else "numpy" + mean = _initialize_aggregation( + "mean", + dtype=dtype, + array_dtype=np.dtype("int64"), + fill_value=0, + min_count=0, + finalize_kwargs=None, + ) + argmax = _initialize_aggregation( + "argmax", + dtype=dtype, + array_dtype=np.dtype("int64"), + fill_value=0, + min_count=0, + finalize_kwargs=None, + ) + # sorted by -> flox - sorted_engine = _choose_engine(np.array([1, 1, 2, 2]), func="mean", dtype=dtype) + sorted_engine = _choose_engine(np.array([1, 1, 2, 2]), agg=mean) assert sorted_engine == ("numbagg" if numbagg_possible else "flox") # unsorted by -> numpy - assert _choose_engine(np.array([3, 1, 1]), func="mean", dtype=dtype) == default + assert _choose_engine(np.array([3, 1, 1]), agg=mean) == default # argmax does not give engine="flox" - assert _choose_engine(np.array([1, 1, 2, 2]), func="argmax", dtype=dtype) == "numpy" + assert _choose_engine(np.array([1, 1, 2, 2]), agg=argmax) == "numpy" From 7126e604bcb61af9a68d6cb006e6c957c71f4232 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 13 Oct 2023 22:36:21 -0600 Subject: [PATCH 25/27] Fix CI --- .github/workflows/ci.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 17369e4e3..fcb42665f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -39,7 +39,7 @@ jobs: environment-file: ci/environment.yml environment-name: flox-tests init-shell: bash - # cache-environment: true + cache-environment: true create-args: | python=${{ matrix.python-version }} - name: Install flox @@ -80,10 +80,10 @@ jobs: - name: Set up conda environment uses: mamba-org/setup-micromamba@v1 with: - environment-file: ci/environment.yml + environment-file: ci/${{ matrix.env }}.yml environment-name: flox-tests init-shell: bash - # cache-environment: true + cache-environment: true create-args: | python=${{ matrix.python-version }} - name: Install flox @@ -118,7 +118,7 @@ jobs: environment-file: ci/requirements/environment.yml environment-name: xarray-tests init-shell: bash - # cache-environment: true + cache-environment: true create-args: | python=3.10 - name: Install xarray From 12439039eacefbc8cc98bd96bab17385945e6349 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 13 Oct 2023 22:37:09 -0600 Subject: [PATCH 26/27] readd numbagg --- ci/environment.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/environment.yml b/ci/environment.yml index 6b05c5aa2..0c2b636d6 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -22,5 +22,5 @@ dependencies: - pooch - toolz - numba - # - numbagg>=0.3 + - numbagg>=0.3 - scipy From 3534766f289448589fa23cbf2a4410324f529e3d Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Sat, 14 Oct 2023 18:43:03 -0600 Subject: [PATCH 27/27] Fix. --- flox/core.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 5bac26777..66e39b6f1 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1776,8 +1776,11 @@ def _choose_engine(by, agg: Aggregation): dtype = agg.dtype["user"] not_arg_reduce = not _is_arg_reduction(agg) + + # numbagg only supports nan-skipping reductions + # without dtype specified if HAS_NUMBAGG and "nan" in agg.name: - if not_arg_reduce and (dtype is None or (dtype is not None and agg.name == "count")): + if not_arg_reduce and dtype is None: return "numbagg" if not_arg_reduce and (not is_duck_dask_array(by) and _issorted(by)):