From 59ed451bb621d22db4f26bf8777787cfe08e750b Mon Sep 17 00:00:00 2001 From: dcherian Date: Mon, 15 Aug 2022 20:46:16 -0600 Subject: [PATCH 01/16] [FIX] intp -> uintp for cupy This will need to handle -ve fill value for count --- flox/aggregations.py | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index 13b23fafe..635d5a74e 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -252,8 +252,8 @@ def __repr__(self) -> str: combine="sum", fill_value=0, final_fill_value=0, - dtypes=np.intp, - final_dtype=np.intp, + dtypes=np.uintp, + final_dtype=np.uintp, ) # note that the fill values are the result of np.func([np.nan, np.nan]) @@ -281,7 +281,7 @@ def _mean_finalize(sum_, count): combine=("sum", "sum"), finalize=_mean_finalize, fill_value=(0, 0), - dtypes=(None, np.intp), + dtypes=(None, np.uintp), final_dtype=np.floating, ) nanmean = Aggregation( @@ -290,7 +290,7 @@ def _mean_finalize(sum_, count): combine=("sum", "sum"), finalize=_mean_finalize, fill_value=(0, 0), - dtypes=(None, np.intp), + dtypes=(None, np.uintp), final_dtype=np.floating, ) @@ -315,7 +315,7 @@ def _std_finalize(sumsq, sum_, count, ddof=0): finalize=_var_finalize, fill_value=0, final_fill_value=np.nan, - dtypes=(None, None, np.intp), + dtypes=(None, None, np.uintp), final_dtype=np.floating, ) nanvar = Aggregation( @@ -325,7 +325,7 @@ def _std_finalize(sumsq, sum_, count, ddof=0): finalize=_var_finalize, fill_value=0, final_fill_value=np.nan, - dtypes=(None, None, np.intp), + dtypes=(None, None, np.uintp), final_dtype=np.floating, ) std = Aggregation( @@ -335,7 +335,7 @@ def _std_finalize(sumsq, sum_, count, ddof=0): finalize=_std_finalize, fill_value=0, final_fill_value=np.nan, - dtypes=(None, None, np.intp), + dtypes=(None, None, np.uintp), final_dtype=np.floating, ) nanstd = Aggregation( @@ -345,7 +345,7 @@ def _std_finalize(sumsq, sum_, count, ddof=0): finalize=_std_finalize, fill_value=0, final_fill_value=np.nan, - dtypes=(None, None, np.intp), + dtypes=(None, None, np.uintp), final_dtype=np.floating, ) @@ -368,7 +368,7 @@ def argreduce_preprocess(array, axis): assert len(axis) == 1 axis = axis[0] - idx = dask.array.arange(array.shape[axis], chunks=array.chunks[axis], dtype=np.intp) + idx = dask.array.arange(array.shape[axis], chunks=array.chunks[axis], dtype=np.uintp) # broadcast (TODO: is this needed?) idx = idx[tuple(slice(None) if i == axis else np.newaxis for i in range(array.ndim))] @@ -398,8 +398,8 @@ def _pick_second(*x): fill_value=(dtypes.NINF, 0), final_fill_value=-1, finalize=_pick_second, - dtypes=(None, np.intp), - final_dtype=np.intp, + dtypes=(None, np.uintp), + final_dtype=np.uintp, ) argmin = Aggregation( @@ -411,8 +411,8 @@ def _pick_second(*x): fill_value=(dtypes.INF, 0), final_fill_value=-1, finalize=_pick_second, - dtypes=(None, np.intp), - final_dtype=np.intp, + dtypes=(None, np.uintp), + final_dtype=np.uintp, ) nanargmax = Aggregation( @@ -424,8 +424,8 @@ def _pick_second(*x): fill_value=(dtypes.NINF, 0), final_fill_value=-1, finalize=_pick_second, - dtypes=(None, np.intp), - final_dtype=np.intp, + dtypes=(None, np.uintp), + final_dtype=np.uintp, ) nanargmin = Aggregation( @@ -437,8 +437,8 @@ def _pick_second(*x): fill_value=(dtypes.INF, 0), final_fill_value=-1, finalize=_pick_second, - dtypes=(None, np.intp), - final_dtype=np.intp, + dtypes=(None, np.uintp), + final_dtype=np.uintp, ) first = Aggregation("first", chunk=None, combine=None, fill_value=0) @@ -574,8 +574,10 @@ def _initialize_aggregation( agg.combine += ("sum",) agg.fill_value["intermediate"] += (0,) agg.fill_value["numpy"] += (0,) - agg.dtype["intermediate"] += (np.intp,) - agg.dtype["numpy"] += (np.intp,) + # uintp is supported by cupy, intp is not + # Also count is >=0, so uint should be fine. + agg.dtype["intermediate"] += (np.uintp,) + agg.dtype["numpy"] += (np.uintp,) else: agg.min_count = 0 From bbdaa0c31400bb0cd8d11d6e1f3745e0585bfbaa Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 12 May 2023 16:37:43 -0600 Subject: [PATCH 02/16] WIP support for engine="flox" --- flox/aggregate_flox.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flox/aggregate_flox.py b/flox/aggregate_flox.py index 4df3f77a4..d7e9c85ad 100644 --- a/flox/aggregate_flox.py +++ b/flox/aggregate_flox.py @@ -25,7 +25,9 @@ def _np_grouped_op(group_idx, array, op, axis=-1, size=None, fill_value=None, dt most of this code is from shoyer's gist https://gist.github.com/shoyer/f538ac78ae904c936844 """ - # assumes input is sorted, which I do in core._prepare_for_flox + # For numpy arrays, assumes input is sorted, which I do in _prepare_for_flox + # For cupy arrays, sorting is not needed + aux = group_idx flag = np.concatenate((np.array([True], like=array), aux[1:] != aux[:-1])) @@ -38,7 +40,12 @@ def _np_grouped_op(group_idx, array, op, axis=-1, size=None, fill_value=None, dt dtype = array.dtype if out is None: - out = np.full(array.shape[:-1] + (size,), fill_value=fill_value, dtype=dtype) + out = np.full(array.shape[:-1] + (size,), fill_value=fill_value, dtype=dtype, like=array) + + # if isinstance(array, cupy_array_type): + # op = cupy_ops[op] + # op(out, group_idx, array) + # return out if (len(uniques) == size) and (uniques == np.arange(size, like=array)).all(): # The previous version of this if condition From a0740eb17f4f4b4a2a2549bce2c9308859834839 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 12 May 2023 16:37:56 -0600 Subject: [PATCH 03/16] Fix factorizing --- flox/core.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index 57ea4556f..43706c6ee 100644 --- a/flox/core.py +++ b/flox/core.py @@ -522,7 +522,7 @@ def factorize_( # this is important in shared-memory parallelism with dask # TODO: figure out how to avoid this idx = flat.copy() - found_groups.append(np.array(expect)) + found_groups.append(np.array(expect, like=flat)) # TODO: fix by using masked integers idx[idx > expect[-1]] = -1 @@ -537,7 +537,7 @@ def factorize_( right = expect.closed_right idx = np.digitize( flat, - bins=bins.view(np.intp) if bins.dtype.kind == "M" else bins, + bins=np.array(bins.view(np.intp) if bins.dtype.kind == "M" else bins, like=flat), right=right, ) idx -= 1 @@ -560,9 +560,13 @@ def factorize_( idx = sorter[(idx,)] idx[mask] = -1 else: - idx, groups = pd.factorize(flat, sort=sort) + if isinstance(flat, np.ndarray): + idx, groups = pd.factorize(flat, sort=sort) + else: + assert sort + groups, idx = np.unique(flat, return_inverse=True) - found_groups.append(np.array(groups)) + found_groups.append(groups) factorized.append(idx.reshape(groupvar.shape)) grp_shape = tuple(len(grp) for grp in found_groups) From a5cf26ecaad782a91eb9790b8a9827ccf3c24871 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 12 May 2023 22:39:35 +0000 Subject: [PATCH 04/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- flox/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 43706c6ee..87f395e76 100644 --- a/flox/core.py +++ b/flox/core.py @@ -537,7 +537,9 @@ def factorize_( right = expect.closed_right idx = np.digitize( flat, - bins=np.array(bins.view(np.intp) if bins.dtype.kind == "M" else bins, like=flat), + bins=np.array( + bins.view(np.intp) if bins.dtype.kind == "M" else bins, like=flat + ), right=right, ) idx -= 1 From b6a7edc509f8f77c024f65a40fe911a11a94b884 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 12 May 2023 16:43:43 -0600 Subject: [PATCH 05/16] Avoid copies --- flox/core.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 87f395e76..21e7adcc4 100644 --- a/flox/core.py +++ b/flox/core.py @@ -522,7 +522,7 @@ def factorize_( # this is important in shared-memory parallelism with dask # TODO: figure out how to avoid this idx = flat.copy() - found_groups.append(np.array(expect, like=flat)) + found_groups.append(np.array(expect, like=flat, copy=False)) # TODO: fix by using masked integers idx[idx > expect[-1]] = -1 @@ -538,7 +538,9 @@ def factorize_( idx = np.digitize( flat, bins=np.array( - bins.view(np.intp) if bins.dtype.kind == "M" else bins, like=flat + bins.view(np.intp) if bins.dtype.kind == "M" else bins, + like=flat, + copy=False, ), right=right, ) From 5df4a75e2e980d8c7a043b868d0ed2c08f93a585 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 12 May 2023 16:45:15 -0600 Subject: [PATCH 06/16] Revert "[FIX] intp -> uintp for cupy" This reverts commit 59ed451bb621d22db4f26bf8777787cfe08e750b. --- flox/aggregations.py | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/flox/aggregations.py b/flox/aggregations.py index 635d5a74e..13b23fafe 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -252,8 +252,8 @@ def __repr__(self) -> str: combine="sum", fill_value=0, final_fill_value=0, - dtypes=np.uintp, - final_dtype=np.uintp, + dtypes=np.intp, + final_dtype=np.intp, ) # note that the fill values are the result of np.func([np.nan, np.nan]) @@ -281,7 +281,7 @@ def _mean_finalize(sum_, count): combine=("sum", "sum"), finalize=_mean_finalize, fill_value=(0, 0), - dtypes=(None, np.uintp), + dtypes=(None, np.intp), final_dtype=np.floating, ) nanmean = Aggregation( @@ -290,7 +290,7 @@ def _mean_finalize(sum_, count): combine=("sum", "sum"), finalize=_mean_finalize, fill_value=(0, 0), - dtypes=(None, np.uintp), + dtypes=(None, np.intp), final_dtype=np.floating, ) @@ -315,7 +315,7 @@ def _std_finalize(sumsq, sum_, count, ddof=0): finalize=_var_finalize, fill_value=0, final_fill_value=np.nan, - dtypes=(None, None, np.uintp), + dtypes=(None, None, np.intp), final_dtype=np.floating, ) nanvar = Aggregation( @@ -325,7 +325,7 @@ def _std_finalize(sumsq, sum_, count, ddof=0): finalize=_var_finalize, fill_value=0, final_fill_value=np.nan, - dtypes=(None, None, np.uintp), + dtypes=(None, None, np.intp), final_dtype=np.floating, ) std = Aggregation( @@ -335,7 +335,7 @@ def _std_finalize(sumsq, sum_, count, ddof=0): finalize=_std_finalize, fill_value=0, final_fill_value=np.nan, - dtypes=(None, None, np.uintp), + dtypes=(None, None, np.intp), final_dtype=np.floating, ) nanstd = Aggregation( @@ -345,7 +345,7 @@ def _std_finalize(sumsq, sum_, count, ddof=0): finalize=_std_finalize, fill_value=0, final_fill_value=np.nan, - dtypes=(None, None, np.uintp), + dtypes=(None, None, np.intp), final_dtype=np.floating, ) @@ -368,7 +368,7 @@ def argreduce_preprocess(array, axis): assert len(axis) == 1 axis = axis[0] - idx = dask.array.arange(array.shape[axis], chunks=array.chunks[axis], dtype=np.uintp) + idx = dask.array.arange(array.shape[axis], chunks=array.chunks[axis], dtype=np.intp) # broadcast (TODO: is this needed?) idx = idx[tuple(slice(None) if i == axis else np.newaxis for i in range(array.ndim))] @@ -398,8 +398,8 @@ def _pick_second(*x): fill_value=(dtypes.NINF, 0), final_fill_value=-1, finalize=_pick_second, - dtypes=(None, np.uintp), - final_dtype=np.uintp, + dtypes=(None, np.intp), + final_dtype=np.intp, ) argmin = Aggregation( @@ -411,8 +411,8 @@ def _pick_second(*x): fill_value=(dtypes.INF, 0), final_fill_value=-1, finalize=_pick_second, - dtypes=(None, np.uintp), - final_dtype=np.uintp, + dtypes=(None, np.intp), + final_dtype=np.intp, ) nanargmax = Aggregation( @@ -424,8 +424,8 @@ def _pick_second(*x): fill_value=(dtypes.NINF, 0), final_fill_value=-1, finalize=_pick_second, - dtypes=(None, np.uintp), - final_dtype=np.uintp, + dtypes=(None, np.intp), + final_dtype=np.intp, ) nanargmin = Aggregation( @@ -437,8 +437,8 @@ def _pick_second(*x): fill_value=(dtypes.INF, 0), final_fill_value=-1, finalize=_pick_second, - dtypes=(None, np.uintp), - final_dtype=np.uintp, + dtypes=(None, np.intp), + final_dtype=np.intp, ) first = Aggregation("first", chunk=None, combine=None, fill_value=0) @@ -574,10 +574,8 @@ def _initialize_aggregation( agg.combine += ("sum",) agg.fill_value["intermediate"] += (0,) agg.fill_value["numpy"] += (0,) - # uintp is supported by cupy, intp is not - # Also count is >=0, so uint should be fine. - agg.dtype["intermediate"] += (np.uintp,) - agg.dtype["numpy"] += (np.uintp,) + agg.dtype["intermediate"] += (np.intp,) + agg.dtype["numpy"] += (np.intp,) else: agg.min_count = 0 From 1c2166074931175932c79ae67d0b0ce1be5a183d Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 12 May 2023 16:54:52 -0600 Subject: [PATCH 07/16] Fix bug --- flox/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 21e7adcc4..9fb420df0 100644 --- a/flox/core.py +++ b/flox/core.py @@ -554,7 +554,7 @@ def factorize_( else: if expect is not None and reindex: sorter = np.argsort(expect) - groups = expect[(sorter,)] if sort else expect + groups = np.array(expect[(sorter,)]) if sort else expect idx = np.searchsorted(expect, flat, sorter=sorter) mask = ~np.isin(flat, expect) | isnull(flat) | (idx == len(expect)) if not sort: @@ -566,6 +566,7 @@ def factorize_( else: if isinstance(flat, np.ndarray): idx, groups = pd.factorize(flat, sort=sort) + groups = np.array(groups) else: assert sort groups, idx = np.unique(flat, return_inverse=True) From 9d35bb2f999bfbb7510f6dc9b7f2052d9be889b1 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 12 May 2023 16:55:39 -0600 Subject: [PATCH 08/16] Add requirements --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 360be8a27..84629198e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ changelog = "https://github.com/xarray-contrib/flox/releases" [project.optional-dependencies] all = ["cachey", "dask", "numba", "xarray"] +cupy = ["cupy>=12.1"] test = ["netCDF4"] [build-system] From c2b83fd9d4795436b64be77846f2742ba431047a Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 2 Jun 2023 22:18:04 -0600 Subject: [PATCH 09/16] Begin testing --- flox/aggregate_flox.py | 4 ++- flox/core.py | 5 +++- tests/__init__.py | 24 +++++++++++++++-- tests/conftest.py | 15 +++++++++++ tests/test_core.py | 60 ++++++++++++++++++++++++++++++------------ 5 files changed, 87 insertions(+), 21 deletions(-) diff --git a/flox/aggregate_flox.py b/flox/aggregate_flox.py index d7e9c85ad..7380e70a3 100644 --- a/flox/aggregate_flox.py +++ b/flox/aggregate_flox.py @@ -14,7 +14,9 @@ def _prepare_for_flox(group_idx, array): if issorted: ordered_array = array else: - perm = group_idx.argsort(kind="stable") + kind = "stable" if isinstance(group_idx, np.ndarray) else None + + perm = np.argsort(group_idx, kind=kind) group_idx = group_idx[..., perm] ordered_array = array[..., perm] return group_idx, ordered_array diff --git a/flox/core.py b/flox/core.py index 9fb420df0..15dc24bd1 100644 --- a/flox/core.py +++ b/flox/core.py @@ -570,6 +570,8 @@ def factorize_( else: assert sort groups, idx = np.unique(flat, return_inverse=True) + idx[np.isnan(flat)] = -1 + groups = groups[~np.isnan(groups)] found_groups.append(groups) factorized.append(idx.reshape(groupvar.shape)) @@ -1261,7 +1263,7 @@ def subset_to_blocks( layer = {(name,) + key: tuple(new_keys[key].tolist()) for key in keys} graph = HighLevelGraph.from_collections(name, layer, dependencies=[array]) - return dask.array.Array(graph, name, chunks, meta=array) + return dask.array.Array(graph, name, chunks, meta=array._meta) def _extract_unknown_groups(reduced, dtype) -> tuple[DaskArray]: @@ -1494,6 +1496,7 @@ def dask_groupby_agg( reduced, inds, adjust_chunks=dict(zip(out_inds, output_chunks)), + meta=array._meta, dtype=agg.dtype["final"], key=agg.name, name=f"{name}-{token}", diff --git a/tests/__init__.py b/tests/__init__.py index 4c04a0fc8..43aaa5d1c 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -24,6 +24,13 @@ except ImportError: xr_types = () # type: ignore +try: + import cupy as cp + + cp_types = (cp.ndarray,) +except ImportError: + cp_types = () # type: ignore + def _importorskip(modname, minversion=None): try: @@ -80,6 +87,15 @@ def raise_if_dask_computes(max_computes=0): return dask.config.set(scheduler=scheduler) +def to_numpy(a): + a_np = a + if isinstance(a_np, dask_array_type): + a_np = a_np.compute() + if isinstance(a_np, cp_types): + a_np = a_np.get() + return a_np + + def assert_equal(a, b, tolerance=None): __tracebackhide__ = True @@ -102,16 +118,20 @@ def assert_equal(a, b, tolerance=None): else: tolerance = {} - if has_dask and isinstance(a, dask_array_type) or isinstance(b, dask_array_type): + if has_dask and (isinstance(a, dask_array_type) or isinstance(b, dask_array_type)): # sometimes it's nice to see values and shapes # rather than being dropped into some file in dask - np.testing.assert_allclose(a, b, **tolerance) + np.testing.assert_allclose(to_numpy(a), to_numpy(b), **tolerance) # does some validation of the dask graph da.utils.assert_eq(a, b, equal_nan=True) else: if a.dtype != b.dtype: raise AssertionError(f"a and b have different dtypes: (a: {a.dtype}, b: {b.dtype})") + if isinstance(a, cp_types): + a = a.get() + if isinstance(b, cp_types): + b = b.get() np.testing.assert_allclose(a, b, equal_nan=True, **tolerance) diff --git a/tests/conftest.py b/tests/conftest.py index 5c3bb81f6..b0d1d11c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,3 +9,18 @@ def engine(request): except ImportError: pytest.xfail() return request.param + + +@pytest.fixture(scope="module", params=["numpy", "cupy"]) +def array_module(request): + if request.param == "cupy": + try: + import cupy # noqa + + return cupy + except ImportError: + pytest.xfail() + elif request.param == "numpy": + import numpy + + return numpy diff --git a/tests/test_core.py b/tests/test_core.py index 5c4db9248..9809d90fb 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -178,31 +178,53 @@ def test_groupby_reduce( assert_equal(expected_result, result) -def gen_array_by(size, func): - by = np.ones(size[-1]) - rng = np.random.default_rng(12345) +def maybe_skip_cupy(array_module, func, engine): + if array_module is np: + return + + import cupy + + assert array_module is cupy + + if engine == "numba": + pytest.skip() + + if engine == "numpy" and ("prod" in func or "first" in func or "last" in func): + pytest.xfail() + elif engine == "flox" and not ( + "sum" in func or "mean" in func or "std" in func or "var" in func + ): + pytest.xfail() + + +def gen_array_by(size, func, array_module): + xp = array_module + by = xp.ones(size[-1]) + rng = xp.random.default_rng(12345) array = rng.random(size) if "nan" in func and "nanarg" not in func: - array[[1, 4, 5], ...] = np.nan + array[[1, 4, 5], ...] = xp.nan elif "nanarg" in func and len(size) > 1: - array[[1, 4, 5], 1] = np.nan + array[[1, 4, 5], 1] = xp.nan if func in ["any", "all"]: array = array > 0.5 return array, by -@pytest.mark.parametrize("chunks", [None, -1, 3, 4]) @pytest.mark.parametrize("nby", [1, 2, 3]) @pytest.mark.parametrize("size", ((12,), (12, 9))) -@pytest.mark.parametrize("add_nan_by", [True, False]) +@pytest.mark.parametrize("chunks", [None, -1, 3, 4]) @pytest.mark.parametrize("func", ALL_FUNCS) -def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): +@pytest.mark.parametrize("add_nan_by", [True, False]) +def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine, array_module): if chunks is not None and not has_dask: pytest.skip() if "arg" in func and engine == "flox": pytest.skip() - array, by = gen_array_by(size, func) + maybe_skip_cupy(array_module, func, engine) + + array, by = gen_array_by(size, func, array_module) if chunks: array = dask.array.from_array(array, chunks=chunks) by = (by,) * nby @@ -254,10 +276,12 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): assert expected.ndim == (array.ndim + nby - 1) expected_groups = tuple(np.array([idx + 1.0]) for idx in range(nby)) for actual_group, expect in zip(groups, expected_groups): - assert_equal(actual_group, expect) + assert_equal(actual_group, array_module.asarray(expect)) if "arg" in func: assert actual.dtype.kind == "i" - assert_equal(actual, expected, tolerance) + if chunks is not None: + assert isinstance(actual._meta, type(array._meta)) + assert_equal(actual, array_module.asarray(expected), tolerance) if not has_dask or chunks is None: continue @@ -287,6 +311,8 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): assert_equal(actual_group, expect, tolerance) if "arg" in func: assert actual.dtype.kind == "i" + if chunks is not None: + assert isinstance(actual._meta, type(array._meta)) assert_equal(actual, expected, tolerance) @@ -313,18 +339,18 @@ def test_arg_reduction_dtype_is_int(size, func): assert actual.dtype.kind == "i" -def test_groupby_reduce_count(): - array = np.array([0, 0, np.nan, np.nan, np.nan, 1, 1]) - labels = np.array(["a", "b", "b", "b", "c", "c", "c"]) +def test_groupby_reduce_count(array_module): + array = array_module.array([0, 0, np.nan, np.nan, np.nan, 1, 1]) + labels = array_module.array(["a", "b", "b", "b", "c", "c", "c"]) result, _ = groupby_reduce(array, labels, func="count") assert_equal(result, np.array([1, 1, 2], dtype=np.intp)) -def test_func_is_aggregation(): +def test_func_is_aggregation(array_module): from flox.aggregations import mean - array = np.array([0, 0, np.nan, np.nan, np.nan, 1, 1]) - labels = np.array(["a", "b", "b", "b", "c", "c", "c"]) + array = array_module.array([0, 0, np.nan, np.nan, np.nan, 1, 1]) + labels = array_module.array(["a", "b", "b", "b", "c", "c", "c"]) expected, _ = groupby_reduce(array, labels, func="mean") actual, _ = groupby_reduce(array, labels, func=mean) assert_equal(actual, expected) From 2f1fb7fb239407f257144df28d1c531a713f9a88 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 2 Jun 2023 22:31:15 -0600 Subject: [PATCH 10/16] Some fixes --- flox/core.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 15dc24bd1..efe27a488 100644 --- a/flox/core.py +++ b/flox/core.py @@ -170,7 +170,10 @@ def _get_optimal_chunks_for_groups(chunks, labels): def _unique(a: np.ndarray) -> np.ndarray: """Much faster to use pandas unique and sort the results. np.unique sorts before uniquifying and is slow.""" - return np.sort(pd.unique(a.reshape(-1))) + if isinstance(a, np.ndarray): + return np.sort(pd.unique(a.reshape(-1))) + else: + return np.unique(a.reshape(-1)) @memoize @@ -927,7 +930,8 @@ def _find_unique_groups(x_chunk) -> np.ndarray: from dask.base import flatten from dask.utils import deepmap - unique_groups = _unique(np.asarray(tuple(flatten(deepmap(listify_groups, x_chunk))))) + tup = tuple(flatten(deepmap(listify_groups, x_chunk))) + unique_groups = _unique(np.asarray(tup, like=tup[0])) unique_groups = unique_groups[~isnull(unique_groups)] if len(unique_groups) == 0: From f6cb33f7ac9b38e1d5f23e937cff2ceea1b37ef0 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 2 Jun 2023 22:52:33 -0600 Subject: [PATCH 11/16] Fix cohohrts --- flox/core.py | 19 +++++++++++-------- flox/xrutils.py | 17 +++++++++++++++++ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/flox/core.py b/flox/core.py index efe27a488..814cb8bb7 100644 --- a/flox/core.py +++ b/flox/core.py @@ -35,7 +35,7 @@ generic_aggregate, ) from .cache import memoize -from .xrutils import is_duck_array, is_duck_dask_array, isnull +from .xrutils import is_duck_array, is_duck_dask_array, isnull, to_numpy if TYPE_CHECKING: try: @@ -203,7 +203,9 @@ def find_group_cohorts(labels, chunks, merge: bool = True) -> dict: import dask # To do this, we must have values in memory so casting to numpy should be safe - labels = np.asarray(labels) + if not is_duck_array(labels): + labels = np.asarray(labels) + labels = to_numpy(labels) # Build an array with the shape of labels, but where every element is the "chunk number" # 1. First subset the array appropriately @@ -412,7 +414,7 @@ def reindex_( reindexed = np.full(array.shape[:-1] + (len(to),), fill_value, dtype=array.dtype) return reindexed - from_ = pd.Index(from_) + from_ = pd.Index(to_numpy(from_)) # short-circuit for trivial case if from_.equals(to): return array @@ -931,7 +933,9 @@ def _find_unique_groups(x_chunk) -> np.ndarray: from dask.utils import deepmap tup = tuple(flatten(deepmap(listify_groups, x_chunk))) - unique_groups = _unique(np.asarray(tup, like=tup[0])) + # passing like=None raises. Seems like a bug + kwargs = dict(like=tup[0]) if is_duck_array(tup[0]) else {} + unique_groups = _unique(np.asarray(tup, **kwargs)) unique_groups = unique_groups[~isnull(unique_groups)] if len(unique_groups) == 0: @@ -1003,12 +1007,11 @@ def _conc2(x_chunk, key1, key2=slice(None), axis: T_Axes | None = None) -> np.nd def reindex_intermediates(x: IntermediateDict, agg: Aggregation, unique_groups) -> IntermediateDict: + to = pd.Index(to_numpy(unique_groups)) new_shape = x["groups"].shape[:-1] + (len(unique_groups),) newx: IntermediateDict = {"groups": np.broadcast_to(unique_groups, new_shape)} newx["intermediates"] = tuple( - reindex_( - v, from_=np.atleast_1d(x["groups"].squeeze()), to=pd.Index(unique_groups), fill_value=f - ) + reindex_(v, from_=to_numpy(np.atleast_1d(x["groups"].squeeze())), to=to, fill_value=f) for v, f in zip(x["intermediates"], agg.fill_value["intermediate"]) ) return newx @@ -2025,7 +2028,7 @@ def groupby_reduce( # now we get rid of them by reindexing # This also handles bins with no data result = reindex_( - result, from_=groups[0], to=expected_groups, fill_value=fill_value + result, from_=to_numpy(groups[0]), to=expected_groups, fill_value=fill_value ).reshape(result.shape[:-1] + grp_shape) groups = final_groups diff --git a/flox/xrutils.py b/flox/xrutils.py index 45cf45eec..133fd35ed 100644 --- a/flox/xrutils.py +++ b/flox/xrutils.py @@ -316,3 +316,20 @@ def nanlast(values, axis, keepdims=False): return np.expand_dims(result, axis=axis) else: return result + + +try: + import cupy as cp + + cp_types = (cp.ndarray,) +except ImportError: + cp_types = () # type: ignore + + +def to_numpy(a): + a_np = a + if is_duck_dask_array(a_np): + a_np = a_np.compute() + if isinstance(a_np, cp_types): + a_np = a_np.get() + return a_np From 796dcd2ea2e7a23cf22582e642b71a8df978b0fc Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 2 Jun 2023 22:52:42 -0600 Subject: [PATCH 12/16] [WIP] Fix blockwise --- flox/core.py | 25 ++++++++++++++----------- tests/test_core.py | 4 ++-- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/flox/core.py b/flox/core.py index 814cb8bb7..decde6d8b 100644 --- a/flox/core.py +++ b/flox/core.py @@ -135,36 +135,42 @@ def _collapse_axis(arr: np.ndarray, naxis: int) -> np.ndarray: @memoize def _get_optimal_chunks_for_groups(chunks, labels): - chunkidx = np.cumsum(chunks) - 1 + chunks_array = np.asarray(chunks, like=labels) + chunkidx = np.cumsum(chunks_array) - 1 # what are the groups at chunk boundaries labels_at_chunk_bounds = _unique(labels[chunkidx]) # what's the last index of all groups - last_indexes = npg.aggregate_numpy.aggregate(labels, np.arange(len(labels)), func="last") + last_indexes = npg.aggregate_numpy.aggregate( + labels, np.arange(len(labels), like=labels), func="last" + ) # what's the last index of groups at the chunk boundaries. lastidx = last_indexes[labels_at_chunk_bounds] if len(chunkidx) == len(lastidx) and (chunkidx == lastidx).all(): return chunks - first_indexes = npg.aggregate_numpy.aggregate(labels, np.arange(len(labels)), func="first") + first_indexes = npg.aggregate_numpy.aggregate( + labels, np.arange(len(labels), like=labels), func="first" + ) firstidx = first_indexes[labels_at_chunk_bounds] - newchunkidx = [0] + newchunkidx = np.array([0], like=labels) for c, f, l in zip(chunkidx, firstidx, lastidx): # noqa Δf = abs(c - f) Δl = abs(c - l) if c == 0 or newchunkidx[-1] > l: continue if Δf < Δl and f > newchunkidx[-1]: - newchunkidx.append(f) + newchunkidx = np.append(newchunkidx, f) else: - newchunkidx.append(l + 1) + newchunkidx = np.append(newchunkidx, l + 1) if newchunkidx[-1] != chunkidx[-1] + 1: - newchunkidx.append(chunkidx[-1] + 1) + newchunkidx = np.append(newchunkidx, chunkidx[-1] + 1) newchunks = np.diff(newchunkidx) assert sum(newchunks) == sum(chunks) - return tuple(newchunks) + # workaround cupy bug with tuple(array) + return tuple(newchunks.tolist()) def _unique(a: np.ndarray) -> np.ndarray: @@ -1317,9 +1323,6 @@ def dask_groupby_agg( assert isinstance(axis, Sequence) assert all(ax >= 0 for ax in axis) - if method == "blockwise" and not isinstance(by, np.ndarray): - raise NotImplementedError - inds = tuple(range(array.ndim)) name = f"groupby_{agg.name}" token = dask.base.tokenize(array, by, agg, expected_groups, axis) diff --git a/tests/test_core.py b/tests/test_core.py index 9809d90fb..e8d826350 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -807,8 +807,8 @@ def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None: [(10,), (10,)], ], ) -def test_rechunk_for_blockwise(inchunks, expected): - labels = np.array([1, 1, 1, 2, 2, 3, 3, 5, 5, 5]) +def test_rechunk_for_blockwise(inchunks, expected, array_module): + labels = array_module.array([1, 1, 1, 2, 2, 3, 3, 5, 5, 5]) assert _get_optimal_chunks_for_groups(inchunks, labels) == expected From f93f5750ef2c4b8ebfc7cbaa914c1cb174623288 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 2 Nov 2023 21:56:16 -0600 Subject: [PATCH 13/16] ignore types for cupy --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index e3b7afcf9..0a7b1dc46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -118,6 +118,7 @@ module=[ "asv_runner.*", "cachey", "cftime", + "cupy", "dask.*", "importlib_metadata", "numba", From 4abcfa67c79965c28cbc386a1caded6e791734fd Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 2 Nov 2023 21:59:05 -0600 Subject: [PATCH 14/16] More type fixes. --- flox/core.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index 313863333..ac69c9cdd 100644 --- a/flox/core.py +++ b/flox/core.py @@ -189,7 +189,7 @@ def _get_optimal_chunks_for_groups(chunks, labels): return tuple(newchunks.tolist()) -def _unique(a: np.ndarray) -> np.ndarray: +def _unique(a): """Much faster to use pandas unique and sort the results. np.unique sorts before uniquifying and is slow.""" if isinstance(a, np.ndarray): @@ -606,13 +606,13 @@ def factorize_( idx[mask] = -1 else: if isinstance(flat, np.ndarray): - idx, groups = pd.factorize(flat, sort=sort) # type: ignore[arg-type] + idx, groups = pd.factorize(flat, sort=sort) groups = np.array(groups) else: assert sort - groups, idx = np.unique(flat, return_inverse=True) + groups, idx = np.unique(flat, return_inverse=True) # type: ignore[call-overload] idx[np.isnan(flat)] = -1 - groups = groups[~np.isnan(groups)] + groups = groups[~np.isnan(groups)] # type: ignore[index] found_groups.append(groups) factorized.append(idx.reshape(groupvar.shape)) From 9391873337b05708afbcac75553eab57b597d012 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 2 Nov 2023 22:05:39 -0600 Subject: [PATCH 15/16] More type ignores --- flox/core.py | 8 ++++---- tests/__init__.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flox/core.py b/flox/core.py index ac69c9cdd..2990d11dc 100644 --- a/flox/core.py +++ b/flox/core.py @@ -606,15 +606,15 @@ def factorize_( idx[mask] = -1 else: if isinstance(flat, np.ndarray): - idx, groups = pd.factorize(flat, sort=sort) + idx, groups = pd.factorize(flat, sort=sort) # type: ignore[call-overload] groups = np.array(groups) else: assert sort - groups, idx = np.unique(flat, return_inverse=True) # type: ignore[call-overload] + groups, idx = np.unique(flat, return_inverse=True) idx[np.isnan(flat)] = -1 - groups = groups[~np.isnan(groups)] # type: ignore[index] + groups = groups[~np.isnan(groups)] # type: ignore[call-overload] - found_groups.append(groups) + found_groups.append(groups) # type: ignore[arg-type] factorized.append(idx.reshape(groupvar.shape)) grp_shape = tuple(len(grp) for grp in found_groups) diff --git a/tests/__init__.py b/tests/__init__.py index 38bff5038..7e216e615 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -29,7 +29,7 @@ cp_types = (cp.ndarray,) except ImportError: - cp_types = () # type: ignore + cp_types = () # type: ignore[assignment] def _importorskip(modname, minversion=None): From 16e09b0111a63b8d811d36f6c70954989568534e Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 2 Nov 2023 22:11:38 -0600 Subject: [PATCH 16/16] one last ignore --- flox/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 2990d11dc..27e2c58f7 100644 --- a/flox/core.py +++ b/flox/core.py @@ -612,7 +612,7 @@ def factorize_( assert sort groups, idx = np.unique(flat, return_inverse=True) idx[np.isnan(flat)] = -1 - groups = groups[~np.isnan(groups)] # type: ignore[call-overload] + groups = groups[~np.isnan(groups)] # type: ignore[call-overload,index] found_groups.append(groups) # type: ignore[arg-type] factorized.append(idx.reshape(groupvar.shape))