From 11d5083e8149af1f48f6d7d97e74c1d0d762fef8 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sun, 1 Oct 2023 19:00:53 -0600 Subject: [PATCH 1/6] Bump actions/checkout from 3 to 4 (#267) Bumps [actions/checkout](https://github.com/actions/checkout) from 3 to 4. - [Release notes](https://github.com/actions/checkout/releases) - [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md) - [Commits](https://github.com/actions/checkout/compare/v3...v4) --- updated-dependencies: - dependency-name: actions/checkout dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/benchmarks.yml | 2 +- .github/workflows/ci-additional.yaml | 6 +++--- .github/workflows/ci.yaml | 6 +++--- .github/workflows/pypi.yaml | 2 +- .github/workflows/testpypi-release.yaml | 2 +- .github/workflows/upstream-dev-ci.yaml | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 9521b7b68..5571d27af 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -17,7 +17,7 @@ jobs: steps: # We need the full repo to avoid this issue # https://github.com/actions/checkout/issues/23 - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml index 9449d9290..790b751ee 100644 --- a/.github/workflows/ci-additional.yaml +++ b/.github/workflows/ci-additional.yaml @@ -22,7 +22,7 @@ jobs: outputs: triggered: ${{ steps.detect-trigger.outputs.trigger-found }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 2 - uses: xarray-contrib/ci-trigger@v1.2 @@ -44,7 +44,7 @@ jobs: PYTHON_VERSION: "3.10" steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. @@ -95,7 +95,7 @@ jobs: PYTHON_VERSION: "3.10" steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 75aafc4b6..43985f508 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -27,7 +27,7 @@ jobs: os: ["ubuntu-latest", "windows-latest"] python-version: ["3.8", "3.10"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. - name: Set environment variables @@ -74,7 +74,7 @@ jobs: "minimal-requirements", ] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. - name: Set up conda environment @@ -108,7 +108,7 @@ jobs: run: shell: bash -l {0} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: repository: 'pydata/xarray' fetch-depth: 0 # Fetch all history for all branches and tags. diff --git a/.github/workflows/pypi.yaml b/.github/workflows/pypi.yaml index 7372dd0fa..93891fcd7 100644 --- a/.github/workflows/pypi.yaml +++ b/.github/workflows/pypi.yaml @@ -8,7 +8,7 @@ jobs: deploy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: diff --git a/.github/workflows/testpypi-release.yaml b/.github/workflows/testpypi-release.yaml index 1ed0a4a56..858348e6d 100644 --- a/.github/workflows/testpypi-release.yaml +++ b/.github/workflows/testpypi-release.yaml @@ -17,7 +17,7 @@ jobs: if: ${{ contains( github.event.pull_request.labels.*.name, 'test-build') && github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' }} runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 diff --git a/.github/workflows/upstream-dev-ci.yaml b/.github/workflows/upstream-dev-ci.yaml index 0de92037f..7d87be16a 100644 --- a/.github/workflows/upstream-dev-ci.yaml +++ b/.github/workflows/upstream-dev-ci.yaml @@ -28,7 +28,7 @@ jobs: matrix: python-version: ["3.10"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. - name: Set environment variables From 528a64549874963b4a5a4f381539307c1193402d Mon Sep 17 00:00:00 2001 From: Mathias Hauser Date: Mon, 2 Oct 2023 23:27:06 +0200 Subject: [PATCH 2/6] tests: move xfail out of functions (#265) * tests: move xfail out of functions * remove stray print * Update tests/test_core.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add engine in test_dtype --------- Co-authored-by: Deepak Cherian Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- tests/__init__.py | 10 +++---- tests/conftest.py | 11 ++++---- tests/test_core.py | 63 ++++++++++++++++++++++++++------------------ tests/test_xarray.py | 32 +++++++--------------- 4 files changed, 54 insertions(+), 62 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index 4c04a0fc8..3e43e94d6 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,5 +1,5 @@ import importlib -from contextlib import contextmanager +from contextlib import nullcontext import numpy as np import packaging.version @@ -46,6 +46,7 @@ def LooseVersion(vstring): has_dask, requires_dask = _importorskip("dask") +has_numba, requires_numba = _importorskip("numba") has_xarray, requires_xarray = _importorskip("xarray") @@ -67,15 +68,10 @@ def __call__(self, dsk, keys, **kwargs): return dask.get(dsk, keys, **kwargs) -@contextmanager -def dummy_context(): - yield None - - def raise_if_dask_computes(max_computes=0): # return a dummy context manager so that this can be used for non-dask objects if not has_dask: - return dummy_context() + return nullcontext() scheduler = CountingScheduler(max_computes) return dask.config.set(scheduler=scheduler) diff --git a/tests/conftest.py b/tests/conftest.py index 5c3bb81f6..eb5971784 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,10 @@ import pytest +from . import requires_numba -@pytest.fixture(scope="module", params=["flox", "numpy", "numba"]) + +@pytest.fixture( + scope="module", params=["flox", "numpy", pytest.param("numba", marks=requires_numba)] +) def engine(request): - if request.param == "numba": - try: - import numba # noqa - except ImportError: - pytest.xfail() return request.param diff --git a/tests/test_core.py b/tests/test_core.py index 83b823b07..453958b2d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -105,7 +105,7 @@ def test_alignment_error(): @pytest.mark.parametrize("dtype", (float, int)) -@pytest.mark.parametrize("chunk", [False, True]) +@pytest.mark.parametrize("chunk", [False, pytest.param(True, marks=requires_dask)]) # TODO: make this intp when python 3.8 is dropped @pytest.mark.parametrize("expected_groups", [None, [0, 1, 2], np.array([0, 1, 2], dtype=np.int64)]) @pytest.mark.parametrize( @@ -145,7 +145,7 @@ def test_groupby_reduce( ) -> None: array = array.astype(dtype) if chunk: - if not has_dask or expected_groups is None: + if expected_groups is None: pytest.skip() array = da.from_array(array, chunks=(3,) if array.ndim == 1 else (1, 3)) by = da.from_array(by, chunks=(3,) if by.ndim == 1 else (1, 3)) @@ -166,7 +166,7 @@ def test_groupby_reduce( engine=engine, ) # we use pd.Index(expected_groups).to_numpy() which is always int64 - # for the values in this tests + # for the values in this test if expected_groups is None: g_dtype = by.dtype elif isinstance(expected_groups, np.ndarray): @@ -191,14 +191,20 @@ def gen_array_by(size, func): return array, by -@pytest.mark.parametrize("chunks", [None, -1, 3, 4]) +@pytest.mark.parametrize( + "chunks", + [ + None, + pytest.param(-1, marks=requires_dask), + pytest.param(3, marks=requires_dask), + pytest.param(4, marks=requires_dask), + ], +) @pytest.mark.parametrize("nby", [1, 2, 3]) @pytest.mark.parametrize("size", ((12,), (12, 9))) @pytest.mark.parametrize("add_nan_by", [True, False]) @pytest.mark.parametrize("func", ALL_FUNCS) def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): - if chunks is not None and not has_dask: - pytest.skip() if "arg" in func and engine == "flox": pytest.skip() @@ -390,16 +396,16 @@ def test_numpy_reduce_nd_md(): def test_groupby_agg_dask(func, shape, array_chunks, group_chunks, add_nan, dtype, engine, reindex): """Tests groupby_reduce with dask arrays against groupby_reduce with numpy arrays""" - rng = np.random.default_rng(12345) - array = dask.array.from_array(rng.random(shape), chunks=array_chunks).astype(dtype) - array = dask.array.ones(shape, chunks=array_chunks) - if func in ["first", "last"]: pytest.skip() if "arg" in func and (engine == "flox" or reindex): pytest.skip() + rng = np.random.default_rng(12345) + array = dask.array.from_array(rng.random(shape), chunks=array_chunks).astype(dtype) + array = dask.array.ones(shape, chunks=array_chunks) + labels = np.array([0, 0, 2, 2, 2, 1, 1, 2, 2, 1, 1, 0]) if add_nan: labels = labels.astype(float) @@ -612,7 +618,14 @@ def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): assert_equal(actual, expected, tolerance) -@pytest.mark.parametrize("reindex,chunks", [(None, None), (False, (2, 2, 3)), (True, (2, 2, 3))]) +@pytest.mark.parametrize( + "reindex, chunks", + [ + (None, None), + pytest.param(False, (2, 2, 3), marks=requires_dask), + pytest.param(True, (2, 2, 3), marks=requires_dask), + ], +) @pytest.mark.parametrize( "axis, groups, expected_shape", [ @@ -624,8 +637,6 @@ def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): def test_groupby_reduce_nans(reindex, chunks, axis, groups, expected_shape, engine): def _maybe_chunk(arr): if chunks: - if not has_dask: - pytest.skip() return da.from_array(arr, chunks=chunks) else: return arr @@ -739,7 +750,14 @@ def test_npg_nanarg_bug(func): ) @pytest.mark.parametrize("method", ["cohorts", "map-reduce"]) @pytest.mark.parametrize("chunk_labels", [False, True]) -@pytest.mark.parametrize("chunks", ((), (1,), (2,))) +@pytest.mark.parametrize( + "chunks", + ( + (), + pytest.param((1,), marks=requires_dask), + pytest.param((2,), marks=requires_dask), + ), +) def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None: array = [1, 1, 1, 1, 1, 1] labels = [0.2, 1.5, 1.9, 2, 3, 20] @@ -748,8 +766,6 @@ def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None: pytest.xfail() if chunks: - if not has_dask: - pytest.skip() array = dask.array.from_array(array, chunks=chunks) if chunk_labels: labels = dask.array.from_array(labels, chunks=chunks) @@ -825,7 +841,7 @@ def test_rechunk_for_cohorts(chunk_at, expected): assert rechunked.chunks == expected -@pytest.mark.parametrize("chunks", [None, 3]) +@pytest.mark.parametrize("chunks", [None, pytest.param(3, marks=requires_dask)]) @pytest.mark.parametrize("fill_value", [123, np.nan]) @pytest.mark.parametrize("func", ALL_FUNCS) def test_fill_value_behaviour(func, chunks, fill_value, engine): @@ -833,8 +849,6 @@ def test_fill_value_behaviour(func, chunks, fill_value, engine): # This is used by xarray if func in ["all", "any"] or "arg" in func: pytest.skip() - if chunks is not None and not has_dask: - pytest.skip() npfunc = _get_array_func(func) by = np.array([1, 2, 3, 1, 2, 3]) @@ -1050,11 +1064,8 @@ def test_factorize_values_outside_bins(): assert_equal(expected, actual) -@pytest.mark.parametrize("chunk", [True, False]) +@pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False]) def test_multiple_groupers_bins(chunk) -> None: - if chunk and not has_dask: - pytest.skip() - xp = dask.array if chunk else np array_kwargs = {"chunks": 2} if chunk else {} array = xp.ones((5, 2), **array_kwargs, dtype=np.int64) @@ -1087,9 +1098,9 @@ def test_multiple_groupers_bins(chunk) -> None: np.arange(2, 4).reshape(1, 2), ], ) -@pytest.mark.parametrize("chunk", [True, False]) +@pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False]) def test_multiple_groupers(chunk, by1, by2, expected_groups) -> None: - if chunk and (not has_dask or expected_groups is None): + if chunk and expected_groups is None: pytest.skip() xp = dask.array if chunk else np @@ -1233,7 +1244,7 @@ def test_dtype(func, dtype, engine): pytest.skip() arr = np.ones((4, 12), dtype=dtype) labels = np.array(["a", "a", "c", "c", "c", "b", "b", "c", "c", "b", "b", "f"]) - actual, _ = groupby_reduce(arr, labels, func=func, dtype=np.float64) + actual, _ = groupby_reduce(arr, labels, func=func, dtype=np.float64, engine=engine) assert actual.dtype == np.dtype("float64") diff --git a/tests/test_xarray.py b/tests/test_xarray.py index 7a343d962..8f006e5f3 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -16,7 +16,7 @@ dask.config.set(scheduler="sync") try: - # Should test against legacy xarray implementation + # test against legacy xarray implementation xr.set_options(use_flox=False) except ValueError: pass @@ -31,15 +31,15 @@ @pytest.mark.parametrize("add_nan", [True, False]) @pytest.mark.parametrize("skipna", [True, False]) def test_xarray_reduce(skipna, add_nan, min_count, engine, reindex): + if skipna is False and min_count is not None: + pytest.skip() + arr = np.ones((4, 12)) if add_nan: arr[1, ...] = np.nan arr[[0, 2], [3, 4]] = np.nan - if skipna is False and min_count is not None: - pytest.skip() - labels = np.array(["a", "a", "c", "c", "c", "b", "b", "c", "c", "b", "b", "f"]) labels = np.array(labels) labels2 = np.array([1, 2, 2, 1]) @@ -77,11 +77,8 @@ def test_xarray_reduce(skipna, add_nan, min_count, engine, reindex): # TODO: sort @pytest.mark.parametrize("pass_expected_groups", [True, False]) -@pytest.mark.parametrize("chunk", (True, False)) +@pytest.mark.parametrize("chunk", (pytest.param(True, marks=requires_dask), False)) def test_xarray_reduce_multiple_groupers(pass_expected_groups, chunk, engine): - if not has_dask and chunk: - pytest.skip() - if chunk and pass_expected_groups is False: pytest.skip() @@ -126,11 +123,8 @@ def test_xarray_reduce_multiple_groupers(pass_expected_groups, chunk, engine): @pytest.mark.parametrize("pass_expected_groups", [True, False]) -@pytest.mark.parametrize("chunk", (True, False)) +@pytest.mark.parametrize("chunk", (pytest.param(True, marks=requires_dask), False)) def test_xarray_reduce_multiple_groupers_2(pass_expected_groups, chunk, engine): - if not has_dask and chunk: - pytest.skip() - if chunk and pass_expected_groups is False: pytest.skip() @@ -317,14 +311,12 @@ def test_multi_index_groupby_sum(engine): assert_equal(expected, actual) -@pytest.mark.parametrize("chunks", (None, 2)) +@pytest.mark.parametrize("chunks", (None, pytest.param(2, marks=requires_dask))) def test_xarray_groupby_bins(chunks, engine): array = xr.DataArray([1, 1, 1, 1, 1], dims="x") labels = xr.DataArray([1, 1.5, 1.9, 2, 3], dims="x", name="labels") if chunks: - if not has_dask: - pytest.skip() array = array.chunk({"x": chunks}) labels = labels.chunk({"x": chunks}) @@ -472,11 +464,8 @@ def test_alignment_error(): @pytest.mark.parametrize("add_nan", [True, False]) @pytest.mark.parametrize("dtype_out", [np.float64, "float64", np.dtype("float64")]) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) -@pytest.mark.parametrize("chunk", (True, False)) +@pytest.mark.parametrize("chunk", (pytest.param(True, marks=requires_dask), False)) def test_dtype(add_nan, chunk, dtype, dtype_out, engine): - if chunk and not has_dask: - pytest.skip() - xp = dask.array if chunk else np data = xp.linspace(0, 1, 48, dtype=dtype).reshape((4, 12)) @@ -508,12 +497,9 @@ def test_dtype(add_nan, chunk, dtype, dtype_out, engine): xr.testing.assert_allclose(expected, actual.transpose("labels", ...), **tolerance64) -@pytest.mark.parametrize("chunk", [True, False]) +@pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False]) @pytest.mark.parametrize("use_flox", [True, False]) def test_dtype_accumulation(use_flox, chunk): - if chunk and not has_dask: - pytest.skip() - datetimes = pd.date_range("2010-01", "2015-01", freq="6H", inclusive="left") samples = 10 + np.cos(2 * np.pi * 0.001 * np.arange(len(datetimes))) * 1 samples += np.random.randn(len(datetimes)) From 486b7c5fb4324ddad48e9a00c0b88638f3e65471 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Mon, 2 Oct 2023 17:56:38 -0600 Subject: [PATCH 3/6] Drop python 3.8, test python 3.11 (#209) * Drop py38 * bump to py 3.11 * more bump * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * revert bump for xarray tests --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .github/workflows/ci-additional.yaml | 4 ++-- .github/workflows/ci.yaml | 2 +- .github/workflows/testpypi-release.yaml | 4 ++-- .github/workflows/upstream-dev-ci.yaml | 2 +- flox/core.py | 8 +++----- flox/xarray.py | 3 ++- flox/xrutils.py | 3 ++- pyproject.toml | 7 +++---- 8 files changed, 16 insertions(+), 17 deletions(-) diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml index 790b751ee..c71fc8913 100644 --- a/.github/workflows/ci-additional.yaml +++ b/.github/workflows/ci-additional.yaml @@ -41,7 +41,7 @@ jobs: env: CONDA_ENV_FILE: ci/environment.yml - PYTHON_VERSION: "3.10" + PYTHON_VERSION: "3.11" steps: - uses: actions/checkout@v4 @@ -92,7 +92,7 @@ jobs: shell: bash -l {0} env: CONDA_ENV_FILE: ci/environment.yml - PYTHON_VERSION: "3.10" + PYTHON_VERSION: "3.11" steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 43985f508..b963d671d 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -25,7 +25,7 @@ jobs: fail-fast: false matrix: os: ["ubuntu-latest", "windows-latest"] - python-version: ["3.8", "3.10"] + python-version: ["3.9", "3.11"] steps: - uses: actions/checkout@v4 with: diff --git a/.github/workflows/testpypi-release.yaml b/.github/workflows/testpypi-release.yaml index 858348e6d..3b5f98b17 100644 --- a/.github/workflows/testpypi-release.yaml +++ b/.github/workflows/testpypi-release.yaml @@ -24,7 +24,7 @@ jobs: - uses: actions/setup-python@v4 name: Install Python with: - python-version: "3.10" + python-version: "3.11" - name: Install dependencies run: | @@ -65,7 +65,7 @@ jobs: - uses: actions/setup-python@v4 name: Install Python with: - python-version: "3.10" + python-version: "3.11" - uses: actions/download-artifact@v3 with: name: releases diff --git a/.github/workflows/upstream-dev-ci.yaml b/.github/workflows/upstream-dev-ci.yaml index 7d87be16a..80ad81ac6 100644 --- a/.github/workflows/upstream-dev-ci.yaml +++ b/.github/workflows/upstream-dev-ci.yaml @@ -26,7 +26,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.10"] + python-version: ["3.11"] steps: - uses: actions/checkout@v4 with: diff --git a/flox/core.py b/flox/core.py index f8f700f99..98c37bc6f 100644 --- a/flox/core.py +++ b/flox/core.py @@ -7,16 +7,14 @@ import sys import warnings from collections import namedtuple +from collections.abc import Mapping, Sequence from functools import partial, reduce from numbers import Integral from typing import ( TYPE_CHECKING, Any, Callable, - Dict, Literal, - Mapping, - Sequence, Union, overload, ) @@ -74,8 +72,8 @@ T_IsBins = Union[bool | Sequence[bool]] -IntermediateDict = Dict[Union[str, Callable], Any] -FinalResultsDict = Dict[str, Union["DaskArray", np.ndarray]] +IntermediateDict = dict[Union[str, Callable], Any] +FinalResultsDict = dict[str, Union["DaskArray", np.ndarray]] FactorProps = namedtuple("FactorProps", "offset_group nan_sentinel nanmask") # This dummy axis is inserted using np.expand_dims diff --git a/flox/xarray.py b/flox/xarray.py index 487850ca0..c85ad7113 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Hashable, Iterable, Sequence, Union +from collections.abc import Hashable, Iterable, Sequence +from typing import TYPE_CHECKING, Any, Union import numpy as np import pandas as pd diff --git a/flox/xrutils.py b/flox/xrutils.py index 958bd3976..4f80ec8c8 100644 --- a/flox/xrutils.py +++ b/flox/xrutils.py @@ -2,7 +2,8 @@ # defined in xarray import datetime -from typing import Any, Iterable +from collections.abc import Iterable +from typing import Any import numpy as np import pandas as pd diff --git a/pyproject.toml b/pyproject.toml index fb27ee761..e41fb17eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "flox" description = "GroupBy operations for dask.array" license = {file = "LICENSE"} readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.9" keywords = ["xarray", "dask", "groupby"] classifiers = [ "Development Status :: 4 - Beta", @@ -11,7 +11,6 @@ classifiers = [ "Natural Language :: English", "Operating System :: OS Independent", "Programming Language :: Python", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -60,10 +59,10 @@ write_to_template= '__version__ = "{version}"' [tool.black] line-length = 100 -target-version = ["py38"] +target-version = ["py39"] [tool.ruff] -target-version = "py38" +target-version = "py39" builtins = ["ellipsis"] exclude = [ ".eggs", From f2945f0312339b3ad7b63b670bfd60965e804ee9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 3 Oct 2023 17:10:07 -0600 Subject: [PATCH 4/6] [pre-commit.ci] pre-commit autoupdate (#268) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [pre-commit.ci] pre-commit autoupdate updates: - [github.com/astral-sh/ruff-pre-commit: v0.0.276 → v0.0.292](https://github.com/astral-sh/ruff-pre-commit/compare/v0.0.276...v0.0.292) - [github.com/psf/black: 23.3.0 → 23.9.1](https://github.com/psf/black/compare/23.3.0...23.9.1) - [github.com/executablebooks/mdformat: 0.7.16 → 0.7.17](https://github.com/executablebooks/mdformat/compare/0.7.16...0.7.17) - [github.com/codespell-project/codespell: v2.2.5 → v2.2.6](https://github.com/codespell-project/codespell/compare/v2.2.5...v2.2.6) - [github.com/abravalheri/validate-pyproject: v0.13 → v0.14](https://github.com/abravalheri/validate-pyproject/compare/v0.13...v0.14) * fix typos --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Deepak Cherian --- .pre-commit-config.yaml | 10 +++++----- docs/source/implementation.md | 2 +- docs/source/user-stories/custom-aggregations.ipynb | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e9b1e9d6b..c36fa38cf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ ci: repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: 'v0.0.276' + rev: 'v0.0.292' hooks: - id: ruff args: ["--fix"] @@ -18,12 +18,12 @@ repos: - id: check-docstring-first - repo: https://github.com/psf/black - rev: 23.3.0 + rev: 23.9.1 hooks: - id: black - repo: https://github.com/executablebooks/mdformat - rev: 0.7.16 + rev: 0.7.17 hooks: - id: mdformat additional_dependencies: @@ -44,13 +44,13 @@ repos: args: [--extra-keys=metadata.kernelspec metadata.language_info.version] - repo: https://github.com/codespell-project/codespell - rev: v2.2.5 + rev: v2.2.6 hooks: - id: codespell additional_dependencies: - tomli - repo: https://github.com/abravalheri/validate-pyproject - rev: v0.13 + rev: v0.14 hooks: - id: validate-pyproject diff --git a/docs/source/implementation.md b/docs/source/implementation.md index f3a2a87f7..29d9faf46 100644 --- a/docs/source/implementation.md +++ b/docs/source/implementation.md @@ -199,7 +199,7 @@ width: 100% 1. Group labels must be known at graph construction time, so this only works for numpy arrays. 1. This does require more tasks and a more complicated graph, but the communication overhead can be significantly lower. 1. The detection of "cohorts" is currently slow but could be improved. -1. The extra effort of detecting cohorts and mul;tiple copying of intermediate blocks may be worthwhile only if the chunk sizes are small +1. The extra effort of detecting cohorts and multiple copying of intermediate blocks may be worthwhile only if the chunk sizes are small relative to the approximate period of group labels, or small relative to the size of spatially localized groups. ### Example : sensitivity to chunking diff --git a/docs/source/user-stories/custom-aggregations.ipynb b/docs/source/user-stories/custom-aggregations.ipynb index 7b4167b98..f191c77e0 100644 --- a/docs/source/user-stories/custom-aggregations.ipynb +++ b/docs/source/user-stories/custom-aggregations.ipynb @@ -135,7 +135,7 @@ " # The next are for dask inputs and describe how to reduce\n", " # the data in parallel\n", " chunk=(\"sum\", \"nanlen\"), # first compute these blockwise : (grouped_sum, grouped_count)\n", - " combine=(\"sum\", \"sum\"), # reduce intermediate reuslts (sum the sums, sum the counts)\n", + " combine=(\"sum\", \"sum\"), # reduce intermediate results (sum the sums, sum the counts)\n", " finalize=lambda sum_, count: sum_ / count, # final mean value (divide sum by count)\n", "\n", " fill_value=(0, 0), # fill value for intermediate sums and counts when groups have no members\n", From 30d522dbaaeb84245f0496304f73272f3183bb2e Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 4 Oct 2023 22:04:36 -0600 Subject: [PATCH 5/6] Add multidimensional binning demo (#203) * Add nD binning notebook * fix. --- docs/source/user-stories.md | 1 + docs/source/user-stories/nD-bins.ipynb | 373 +++++++++++++++++++++++++ 2 files changed, 374 insertions(+) create mode 100644 docs/source/user-stories/nD-bins.ipynb diff --git a/docs/source/user-stories.md b/docs/source/user-stories.md index 22b37939e..0241e01dc 100644 --- a/docs/source/user-stories.md +++ b/docs/source/user-stories.md @@ -8,4 +8,5 @@ user-stories/climatology.ipynb user-stories/climatology-hourly.ipynb user-stories/custom-aggregations.ipynb + user-stories/nD-bins.ipynb ``` diff --git a/docs/source/user-stories/nD-bins.ipynb b/docs/source/user-stories/nD-bins.ipynb new file mode 100644 index 000000000..87ef942bf --- /dev/null +++ b/docs/source/user-stories/nD-bins.ipynb @@ -0,0 +1,373 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "e970d800-c612-482a-bb3a-b1eb7ad53d88", + "metadata": { + "tags": [], + "user_expressions": [] + }, + "source": [ + "# Binning with multi-dimensional bins\n", + "\n", + "```{warning}\n", + "This post is a proof-of-concept for discussion. Expect APIs to change to enable this use case.\n", + "```\n", + "\n", + "Here we explore a binning problem where the bins are multidimensional\n", + "([xhistogram issue](https://github.com/xgcm/xhistogram/issues/28))\n", + "\n", + "> One of such multi-dim bin applications is the ranked probability score rps we\n", + "> use in `xskillscore.rps`, where we want to know how many forecasts fell into\n", + "> which bins. Bins are often defined as terciles of the forecast distribution\n", + "> and the bins for these terciles\n", + "> (`forecast_with_lon_lat_time_dims.quantile(q=[.33,.66],dim='time')`) depend on\n", + "> `lon` and `lat`.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01f1a2ef-de62-45d0-a04e-343cd78debc5", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import math\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xarray as xr\n", + "\n", + "import flox\n", + "import flox.xarray" + ] + }, + { + "cell_type": "markdown", + "id": "0be3e214-0cf0-426f-8ebb-669cc5322310", + "metadata": { + "user_expressions": [] + }, + "source": [ + "## Create test data\n" + ] + }, + { + "cell_type": "markdown", + "id": "ce239000-e053-4fc3-ad14-e9e0160da869", + "metadata": { + "user_expressions": [] + }, + "source": [ + "Data to be reduced\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7659c24e-f5a1-4e59-84c0-5ec965ef92d2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "array = xr.DataArray(\n", + " np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]),\n", + " dims=(\"space\", \"time\"),\n", + " name=\"array\",\n", + ")\n", + "array" + ] + }, + { + "cell_type": "markdown", + "id": "da0c0ac9-ad75-42cd-a1ea-99069f5bef00", + "metadata": { + "user_expressions": [] + }, + "source": [ + "Array to group by\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4601e744-5d22-447e-97ce-9644198d485e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "by = xr.DataArray(\n", + " np.array([[1, 2, 3], [3, 4, 5], [5, 6, 7], [6, 7, 9]]),\n", + " dims=(\"space\", \"time\"),\n", + " name=\"by\",\n", + ")\n", + "by" + ] + }, + { + "cell_type": "markdown", + "id": "61c21c94-7b6e-46a6-b9c2-59d7b2d40c81", + "metadata": { + "tags": [], + "user_expressions": [] + }, + "source": [ + "Multidimensional bins:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "863a1991-ab8d-47c0-aa48-22b422fcea8c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "bins = by + 0.5\n", + "bins = xr.DataArray(\n", + " np.concatenate([bins, bins[:, [-1]] + 1], axis=-1)[:, :-1].T,\n", + " dims=(\"time\", \"nbins\"),\n", + " name=\"bins\",\n", + ")\n", + "bins" + ] + }, + { + "cell_type": "markdown", + "id": "e65ecaba-d1cc-4485-ae58-c390cb2ebfab", + "metadata": { + "user_expressions": [] + }, + "source": [ + "## Concept\n", + "\n", + "The key idea is that GroupBy is two steps:\n", + "\n", + "1. Factorize (a.k.a \"digitize\") : convert the `by` data to a set of integer\n", + " codes representing the bins.\n", + "2. Apply the reduction.\n", + "\n", + "We treat multi-dimensional binning as a slightly complicated factorization\n", + "problem. Assume that bins are a function of `time`. So we\n", + "\n", + "1. generate a set of appropriate integer codes by:\n", + " 1. Loop over \"time\" and factorize the data appropriately.\n", + " 2. Add an offset to these codes so that \"bin 0\" for `time=0` is different\n", + " from \"bin 0\" for `time=1`\n", + "2. apply the groupby reduction to the \"offset codes\"\n", + "3. reshape the output to the right shape\n", + "\n", + "We will work at the xarray level, so its easy to keep track of the different\n", + "dimensions.\n", + "\n", + "### Factorizing\n", + "\n", + "The core `factorize_` function (which wraps `pd.cut`) only handles 1D bins, so\n", + "we use `xr.apply_ufunc` to vectorize it for us.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aa33ab2c-0ecf-4198-a033-2a77f5d83c99", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "factorize_loop_dim = \"time\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "afcddcc1-dd57-461e-a649-1f8bcd30342f", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def factorize_nd_bins_core(by, bins):\n", + " group_idx, *_, props = flox.core.factorize_(\n", + " (by,),\n", + " axes=(-1,),\n", + " expected_groups=(pd.IntervalIndex.from_breaks(bins),),\n", + " )\n", + " # Use -1 as the NaN sentinel value\n", + " group_idx[props.nanmask] = -1\n", + " return group_idx\n", + "\n", + "\n", + "codes = xr.apply_ufunc(\n", + " factorize_nd_bins_core,\n", + " by,\n", + " bins,\n", + " # TODO: avoid hardcoded dim names\n", + " input_core_dims=[[\"space\"], [\"nbins\"]],\n", + " output_core_dims=[[\"space\"]],\n", + " vectorize=True,\n", + ")\n", + "codes" + ] + }, + { + "cell_type": "markdown", + "id": "1661312a-dc61-4a26-bfd8-12c2dc01eb15", + "metadata": { + "user_expressions": [] + }, + "source": [ + "### Offset the codes\n", + "\n", + "These are integer codes appropriate for a single timestep.\n", + "\n", + "We now add an offset that changes in time, to make sure \"bin 0\" for `time=0` is\n", + "different from \"bin 0\" for `time=1` (taken from\n", + "[this StackOverflow thread](https://stackoverflow.com/questions/46256279/bin-elements-per-row-vectorized-2d-bincount-for-numpy)).\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0e5801cb-a79c-4670-ad10-36bb19f1a6ff", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "N = math.prod([codes.sizes[d] for d in codes.dims if d != factorize_loop_dim])\n", + "offset = xr.DataArray(np.arange(codes.sizes[factorize_loop_dim]), dims=factorize_loop_dim)\n", + "# TODO: think about N-1 here\n", + "offset_codes = (codes + offset * (N - 1)).rename(by.name)\n", + "offset_codes.data[codes == -1] = -1\n", + "offset_codes" + ] + }, + { + "cell_type": "markdown", + "id": "6c06c48b-316b-4a33-9bc3-921acd10bcba", + "metadata": { + "user_expressions": [] + }, + "source": [ + "### Reduce\n", + "\n", + "Now that we have appropriate codes, let's apply the reduction\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2cf1295e-4585-48b9-ac2b-9e00d03b2b9a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "interim = flox.xarray.xarray_reduce(\n", + " array,\n", + " offset_codes,\n", + " func=\"sum\",\n", + " # We use RangeIndex to indicate that `-1` code can be safely ignored\n", + " # (it indicates values outside the bins)\n", + " # TODO: Avoid hardcoding 9 = sizes[\"time\"] x (sizes[\"nbins\"] - 1)\n", + " expected_groups=pd.RangeIndex(9),\n", + ")\n", + "interim" + ] + }, + { + "cell_type": "markdown", + "id": "3539509b-d9b4-4342-a679-6ada6f285dfb", + "metadata": { + "user_expressions": [] + }, + "source": [ + "## Make final result\n", + "\n", + "Now reshape that 1D result appropriately.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1389d37-d76d-4a50-9dfb-8710258de3fd", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "final = (\n", + " interim.coarsen(by=3)\n", + " # bin_number dimension is last, this makes sense since it is the core dimension\n", + " # and we vectorize over the loop dims.\n", + " # So the first (Nbins-1) elements are for the first index of the loop dim\n", + " .construct({\"by\": (factorize_loop_dim, \"bin_number\")})\n", + " .transpose(..., factorize_loop_dim)\n", + " .drop_vars(\"by\")\n", + ")\n", + "final" + ] + }, + { + "cell_type": "markdown", + "id": "a98b5e60-94af-45ae-be1b-4cb47e2d77ba", + "metadata": { + "user_expressions": [] + }, + "source": [ + "I think this is the expected answer.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "053a8643-f6d9-4fd1-b014-230fa716449c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "array.isel(space=slice(1, None)).rename({\"space\": \"bin_number\"}).identical(final)" + ] + }, + { + "cell_type": "markdown", + "id": "619ba4c4-7c87-459a-ab86-c187d3a86c67", + "metadata": { + "tags": [], + "user_expressions": [] + }, + "source": [ + "## TODO\n", + "\n", + "This could be extended to:\n", + "\n", + "1. handle multiple `factorize_loop_dim`\n", + "2. avoid hard coded dimension names in the `apply_ufunc` call for factorizing\n", + "3. avoid hard coded number of output elements in the `xarray_reduce` call.\n", + "4. Somehow propagate the bin edges to the final output.\n" + ] + } + ], + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 68b122e441d2f7fb1b6d70a4a2805b1f5226fe6f Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 5 Oct 2023 16:39:52 -0600 Subject: [PATCH 6/6] Support quantile, median, mode with method="blockwise". (#269) * Support quantile, median with method="blockwise". We allow method="blockwise" when grouping by a dask array. This can only work if we have expected_groups, and set reindex=True. * Update flox/core.py * fix comment * Update validate_reindex test * Fix * Fix * Raise early if `q` is not provided for quantile * WIP test * narrow type * fix type * Mode + Tests * limit tests * cleanup tests * fix bool tests * Revert "limit tests" This reverts commit d46c3aef3b9c5d9dadd6555c0e7b1b628c0633e6. * Small cleanup * more cleanup * fix test * ignore scipy typing * update docs --- ci/environment.yml | 1 + docs/source/aggregations.md | 7 +- .../user-stories/custom-aggregations.ipynb | 9 +- flox/aggregate_npg.py | 81 +++++++++++++ flox/aggregations.py | 48 ++++++-- flox/core.py | 54 ++++++--- flox/xarray.py | 9 +- pyproject.toml | 3 +- tests/__init__.py | 1 + tests/test_core.py | 110 ++++++++++++++---- 10 files changed, 261 insertions(+), 62 deletions(-) diff --git a/ci/environment.yml b/ci/environment.yml index 93f0e891f..9d5aa6d01 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -22,3 +22,4 @@ dependencies: - pooch - toolz - numba + - scipy diff --git a/docs/source/aggregations.md b/docs/source/aggregations.md index e6c10e4ba..d3591d2dc 100644 --- a/docs/source/aggregations.md +++ b/docs/source/aggregations.md @@ -11,8 +11,11 @@ the `func` kwarg: - `"std"`, `"nanstd"` - `"argmin"` - `"argmax"` -- `"first"` -- `"last"` +- `"first"`, `"nanfirst"` +- `"last"`, `"nanlast"` +- `"median"`, `"nanmedian"` +- `"mode"`, `"nanmode"` +- `"quantile"`, `"nanquantile"` ```{tip} We would like to add support for `cumsum`, `cumprod` ([issue](https://github.com/xarray-contrib/flox/issues/91)). Contributions are welcome! diff --git a/docs/source/user-stories/custom-aggregations.ipynb b/docs/source/user-stories/custom-aggregations.ipynb index f191c77e0..8b9be09e9 100644 --- a/docs/source/user-stories/custom-aggregations.ipynb +++ b/docs/source/user-stories/custom-aggregations.ipynb @@ -15,8 +15,13 @@ ">\n", "> A = da.groupby(['lon_bins', 'lat_bins']).mode()\n", "\n", - "This notebook will describe how to accomplish this using a custom `Aggregation`\n", - "since `mode` and `median` aren't supported by flox yet.\n" + "This notebook will describe how to accomplish this using a custom `Aggregation`.\n", + "\n", + "\n", + "```{tip}\n", + "flox now supports `mode`, `nanmode`, `quantile`, `nanquantile`, `median`, `nanmedian` using exactly the same \n", + "approach as shown below\n", + "```\n" ] }, { diff --git a/flox/aggregate_npg.py b/flox/aggregate_npg.py index 30e0eb257..966bd43b8 100644 --- a/flox/aggregate_npg.py +++ b/flox/aggregate_npg.py @@ -100,3 +100,84 @@ def _len(group_idx, array, engine, *, func, axis=-1, size=None, fill_value=None, len = partial(_len, func="len") nanlen = partial(_len, func="nanlen") + + +def median(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=np.median, + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def nanmedian(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=np.nanmedian, + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def quantile(group_idx, array, engine, *, q, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=partial(np.quantile, q=q), + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def nanquantile(group_idx, array, engine, *, q, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=partial(np.nanquantile, q=q), + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def mode_(array, nan_policy, dtype): + from scipy.stats import mode + + # npg splits `array` into object arrays for each group + # scipy.stats.mode does not like that + # here we cast back + return mode(array.astype(dtype, copy=False), nan_policy=nan_policy, axis=-1).mode + + +def mode(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=partial(mode_, nan_policy="propagate", dtype=array.dtype), + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def nanmode(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=partial(mode_, nan_policy="omit", dtype=array.dtype), + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) diff --git a/flox/aggregations.py b/flox/aggregations.py index e5013032a..c06ef3509 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING, Any, Callable, TypedDict import numpy as np -import numpy_groupies as npg from numpy.typing import DTypeLike from . import aggregate_flox, aggregate_npg, xrutils @@ -35,6 +34,16 @@ class AggDtype(TypedDict): intermediate: tuple[np.dtype | type[np.intp], ...] +def get_npg_aggregation(func, *, engine): + try: + method_ = getattr(aggregate_npg, func) + method = partial(method_, engine=engine) + except AttributeError: + aggregate = aggregate_npg._get_aggregate(engine).aggregate + method = partial(aggregate, func=func) + return method + + def generic_aggregate( group_idx, array, @@ -51,14 +60,11 @@ def generic_aggregate( try: method = getattr(aggregate_flox, func) except AttributeError: - method = partial(npg.aggregate_numpy.aggregate, func=func) + method = get_npg_aggregation(func, engine="numpy") + elif engine in ["numpy", "numba"]: - try: - method_ = getattr(aggregate_npg, func) - method = partial(method_, engine=engine) - except AttributeError: - aggregate = aggregate_npg._get_aggregate(engine).aggregate - method = partial(aggregate, func=func) + method = get_npg_aggregation(func, engine=engine) + else: raise ValueError( f"Expected engine to be one of ['flox', 'numpy', 'numba']. Received {engine} instead." @@ -465,10 +471,22 @@ def _pick_second(*x): final_dtype=bool, ) -# numpy_groupies does not support median -# And the dask version is really hard! -# median = Aggregation("median", chunk=None, combine=None, fill_value=None) -# nanmedian = Aggregation("nanmedian", chunk=None, combine=None, fill_value=None) +# Support statistical quantities only blockwise +# The parallel versions will be approximate and are hard to implement! +median = Aggregation( + name="median", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64 +) +nanmedian = Aggregation( + name="nanmedian", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64 +) +quantile = Aggregation( + name="quantile", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64 +) +nanquantile = Aggregation( + name="nanquantile", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64 +) +mode = Aggregation(name="mode", fill_value=dtypes.NA, chunk=None, combine=None) +nanmode = Aggregation(name="nanmode", fill_value=dtypes.NA, chunk=None, combine=None) aggregations = { "any": any_, @@ -496,6 +514,12 @@ def _pick_second(*x): "nanfirst": nanfirst, "last": last, "nanlast": nanlast, + "median": median, + "nanmedian": nanmedian, + "quantile": quantile, + "nanquantile": nanquantile, + "mode": mode, + "nanmode": nanmode, } diff --git a/flox/core.py b/flox/core.py index 98c37bc6f..484303295 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1307,15 +1307,14 @@ def dask_groupby_agg( assert isinstance(axis, Sequence) assert all(ax >= 0 for ax in axis) - if method == "blockwise" and not isinstance(by, np.ndarray): - raise NotImplementedError - inds = tuple(range(array.ndim)) name = f"groupby_{agg.name}" token = dask.base.tokenize(array, by, agg, expected_groups, axis) if expected_groups is None and reindex: expected_groups = _get_expected_groups(by, sort=sort) + if method == "cohorts": + assert reindex is False by_input = by @@ -1349,7 +1348,6 @@ def dask_groupby_agg( # b. "_grouped_combine": A more general solution where we tree-reduce the groupby reduction. # This allows us to discover groups at compute time, support argreductions, lower intermediate # memory usage (but method="cohorts" would also work to reduce memory in some cases) - do_simple_combine = not _is_arg_reduction(agg) if method == "blockwise": @@ -1375,7 +1373,7 @@ def dask_groupby_agg( partial( blockwise_method, axis=axis, - expected_groups=None if method == "cohorts" else expected_groups, + expected_groups=expected_groups if reindex else None, engine=engine, sort=sort, ), @@ -1468,14 +1466,24 @@ def dask_groupby_agg( elif method == "blockwise": reduced = intermediate - # Here one input chunk → one output chunks - # find number of groups in each chunk, this is needed for output chunks - # along the reduced axis - slices = slices_from_chunks(tuple(array.chunks[ax] for ax in axis)) - groups_in_block = tuple(_unique(by_input[slc]) for slc in slices) - groups = (np.concatenate(groups_in_block),) - ngroups_per_block = tuple(len(grp) for grp in groups_in_block) - group_chunks = (ngroups_per_block,) + if reindex: + if TYPE_CHECKING: + assert expected_groups is not None + # TODO: we could have `expected_groups` be a dask array with appropriate chunks + # for now, we have a numpy array that is interpreted as listing all group labels + # that are present in every chunk + groups = (expected_groups,) + group_chunks = ((len(expected_groups),),) + else: + # Here one input chunk → one output chunks + # find number of groups in each chunk, this is needed for output chunks + # along the reduced axis + # TODO: this logic is very specialized for the resampling case + slices = slices_from_chunks(tuple(array.chunks[ax] for ax in axis)) + groups_in_block = tuple(_unique(by_input[slc]) for slc in slices) + groups = (np.concatenate(groups_in_block),) + ngroups_per_block = tuple(len(grp) for grp in groups_in_block) + group_chunks = (ngroups_per_block,) else: raise ValueError(f"Unknown method={method}.") @@ -1547,7 +1555,7 @@ def _validate_reindex( if reindex is True and not all_numpy: if _is_arg_reduction(func): raise NotImplementedError - if method in ["blockwise", "cohorts"]: + if method == "cohorts" or (method == "blockwise" and not any_by_dask): raise ValueError( "reindex=True is not a valid choice for method='blockwise' or method='cohorts'." ) @@ -1562,7 +1570,11 @@ def _validate_reindex( # have to do the grouped_combine since there's no good fill_value reindex = False - if method == "blockwise" or _is_arg_reduction(func): + if method == "blockwise": + # for grouping by dask arrays, we set reindex=True + reindex = any_by_dask + + elif _is_arg_reduction(func): reindex = False elif method == "cohorts": @@ -1767,7 +1779,10 @@ def groupby_reduce( *by : ndarray or DaskArray Array of labels to group over. Must be aligned with ``array`` so that ``array.shape[-by.ndim :] == by.shape`` - func : str or Aggregation + func : {"all", "any", "count", "sum", "nansum", "mean", "nanmean", \ + "max", "nanmax", "min", "nanmin", "argmax", "nanargmax", "argmin", "nanargmin", \ + "quantile", "nanquantile", "median", "nanmedian", "mode", "nanmode", \ + "first", "nanfirst", "last", "nanlast"} or Aggregation Single function name or an Aggregation instance expected_groups : (optional) Sequence Expected unique labels. @@ -1835,7 +1850,7 @@ def groupby_reduce( boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the original block size. Avoid that by using ``method="cohorts"``. By default, it is turned off for argreductions. finalize_kwargs : dict, optional - Kwargs passed to finalize the reduction such as ``ddof`` for var, std. + Kwargs passed to finalize the reduction such as ``ddof`` for var, std or ``q`` for quantile. Returns ------- @@ -1855,6 +1870,9 @@ def groupby_reduce( "Try engine='numpy' or engine='numba' instead." ) + if func == "quantile" and (finalize_kwargs is None or "q" not in finalize_kwargs): + raise ValueError("Please pass `q` for quantile calculations.") + bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) nby = len(bys) by_is_dask = tuple(is_duck_dask_array(b) for b in bys) @@ -2023,7 +2041,7 @@ def groupby_reduce( result, groups = partial_agg( array, by_, - expected_groups=None if method == "blockwise" else expected_groups, + expected_groups=expected_groups, agg=agg, reindex=reindex, method=method, diff --git a/flox/xarray.py b/flox/xarray.py index c85ad7113..eb35da387 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -88,8 +88,11 @@ def xarray_reduce( Xarray object to reduce *by : DataArray or iterable of str or iterable of DataArray Variables with which to group by ``obj`` - func : str or Aggregation - Reduction method + func : {"all", "any", "count", "sum", "nansum", "mean", "nanmean", \ + "max", "nanmax", "min", "nanmin", "argmax", "nanargmax", "argmin", "nanargmin", \ + "quantile", "nanquantile", "median", "nanmedian", "mode", "nanmode", \ + "first", "nanfirst", "last", "nanlast"} or Aggregation + Single function name or an Aggregation instance expected_groups : str or sequence expected group labels corresponding to each `by` variable isbin : iterable of bool @@ -164,7 +167,7 @@ def xarray_reduce( boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the original block size. Avoid that by using method="cohorts". By default, it is turned off for arg reductions. **finalize_kwargs - kwargs passed to the finalize function, like ``ddof`` for var, std. + kwargs passed to the finalize function, like ``ddof`` for var, std or ``q`` for quantile. Returns ------- diff --git a/pyproject.toml b/pyproject.toml index e41fb17eb..e387657d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -118,7 +118,8 @@ module=[ "matplotlib.*", "pandas", "setuptools", - "toolz" + "scipy.*", + "toolz", ] ignore_missing_imports = true diff --git a/tests/__init__.py b/tests/__init__.py index 3e43e94d6..f1c8ec6bf 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -47,6 +47,7 @@ def LooseVersion(vstring): has_dask, requires_dask = _importorskip("dask") has_numba, requires_numba = _importorskip("numba") +has_scipy, requires_scipy = _importorskip("scipy") has_xarray, requires_xarray = _importorskip("xarray") diff --git a/tests/test_core.py b/tests/test_core.py index 453958b2d..440431304 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -31,6 +31,7 @@ has_dask, raise_if_dask_computes, requires_dask, + requires_scipy, ) labels = np.array([0, 0, 2, 2, 2, 1, 1, 2, 2, 1, 1, 0]) @@ -50,6 +51,9 @@ def dask_array_ones(*args): return None +DEFAULT_QUANTILE = 0.9 +SCIPY_STATS_FUNCS = ("mode", "nanmode") +BLOCKWISE_FUNCS = ("median", "nanmedian", "quantile", "nanquantile") + SCIPY_STATS_FUNCS ALL_FUNCS = ( "sum", "nansum", @@ -73,9 +77,11 @@ def dask_array_ones(*args): "any", "all", "nanlast", - pytest.param("median", marks=(pytest.mark.skip,)), - pytest.param("nanmedian", marks=(pytest.mark.skip,)), -) + "median", + "nanmedian", + "quantile", + "nanquantile", +) + tuple(pytest.param(func, marks=requires_scipy) for func in SCIPY_STATS_FUNCS) if TYPE_CHECKING: from flox.core import T_Agg, T_Engine, T_ExpectedGroupsOpt, T_Method @@ -84,12 +90,26 @@ def dask_array_ones(*args): def _get_array_func(func: str) -> Callable: if func == "count": - def npfunc(x): + def npfunc(x, **kwargs): x = np.asarray(x) return (~np.isnan(x)).sum() elif func in ["nanfirst", "nanlast"]: npfunc = getattr(xrutils, func) + + elif func in SCIPY_STATS_FUNCS: + import scipy.stats + + if "nan" in func: + func = func[3:] + nan_policy = "omit" + else: + nan_policy = "propagate" + + def npfunc(x, **kwargs): + spfunc = partial(getattr(scipy.stats, func), nan_policy=nan_policy) + return getattr(spfunc(x, **kwargs), func) + else: npfunc = getattr(np, func) @@ -205,7 +225,7 @@ def gen_array_by(size, func): @pytest.mark.parametrize("add_nan_by", [True, False]) @pytest.mark.parametrize("func", ALL_FUNCS) def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): - if "arg" in func and engine == "flox": + if ("arg" in func and engine == "flox") or (func in BLOCKWISE_FUNCS and chunks != -1): pytest.skip() array, by = gen_array_by(size, func) @@ -224,6 +244,10 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): finalize_kwargs = finalize_kwargs + [{"ddof": 1}, {"ddof": 0}] fill_value = np.nan tolerance = {"rtol": 1e-14, "atol": 1e-16} + elif "quantile" in func: + finalize_kwargs = [{"q": DEFAULT_QUANTILE}] + fill_value = None + tolerance = None else: fill_value = None tolerance = None @@ -246,15 +270,16 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): func_ = f"nan{func}" if "nan" not in func else func array_[..., nanmask] = np.nan expected = getattr(np, func_)(array_, axis=-1, **kwargs) - # elif func in ["first", "last"]: - # expected = getattr(xrutils, f"nan{func}")(array_[..., ~nanmask], axis=-1, **kwargs) - elif func in ["nanfirst", "nanlast"]: - expected = getattr(xrutils, func)(array_[..., ~nanmask], axis=-1, **kwargs) else: - expected = getattr(np, func)(array_[..., ~nanmask], axis=-1, **kwargs) + array_func = _get_array_func(func) + expected = array_func(array_[..., ~nanmask], axis=-1, **kwargs) for _ in range(nby): expected = np.expand_dims(expected, -1) + if func in BLOCKWISE_FUNCS: + assert chunks == -1 + flox_kwargs["method"] = "blockwise" + actual, *groups = groupby_reduce(array, *by, **flox_kwargs) assert actual.ndim == (array.ndim + nby - 1) assert expected.ndim == (array.ndim + nby - 1) @@ -265,7 +290,7 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): assert actual.dtype.kind == "i" assert_equal(actual, expected, tolerance) - if not has_dask or chunks is None: + if not has_dask or chunks is None or func in BLOCKWISE_FUNCS: continue params = list(itertools.product(["map-reduce"], [True, False, None])) @@ -396,7 +421,7 @@ def test_numpy_reduce_nd_md(): def test_groupby_agg_dask(func, shape, array_chunks, group_chunks, add_nan, dtype, engine, reindex): """Tests groupby_reduce with dask arrays against groupby_reduce with numpy arrays""" - if func in ["first", "last"]: + if func in ["first", "last"] or func in BLOCKWISE_FUNCS: pytest.skip() if "arg" in func and (engine == "flox" or reindex): @@ -551,7 +576,7 @@ def test_first_last_disallowed_dask(func): "axis", [None, (0, 1, 2), (0, 1), (0, 2), (1, 2), 0, 1, 2, (0,), (1,), (2,)] ) def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): - if "arg" in func and engine == "flox": + if ("arg" in func and engine == "flox") or func in BLOCKWISE_FUNCS: pytest.skip() if not isinstance(axis, int): @@ -847,7 +872,7 @@ def test_rechunk_for_cohorts(chunk_at, expected): def test_fill_value_behaviour(func, chunks, fill_value, engine): # fill_value = np.nan tests promotion of int counts to float # This is used by xarray - if func in ["all", "any"] or "arg" in func: + if (func in ["all", "any"] or "arg" in func) or func in BLOCKWISE_FUNCS: pytest.skip() npfunc = _get_array_func(func) @@ -903,8 +928,17 @@ def test_cohorts_map_reduce_consistent_dtypes(method, dtype, labels_dtype): @requires_dask @pytest.mark.parametrize("func", ALL_FUNCS) @pytest.mark.parametrize("axis", (-1, None)) -@pytest.mark.parametrize("method", ["blockwise", "cohorts", "map-reduce", "split-reduce"]) +@pytest.mark.parametrize("method", ["blockwise", "cohorts", "map-reduce"]) def test_cohorts_nd_by(func, method, axis, engine): + if ( + ("arg" in func and (axis is None or engine == "flox")) + or (method != "blockwise" and func in BLOCKWISE_FUNCS) + or (axis is None and ("first" in func or "last" in func)) + ): + pytest.skip() + if axis is not None and method != "map-reduce": + pytest.xfail() + o = dask.array.ones((3,), chunks=-1) o2 = dask.array.ones((2, 3), chunks=-1) @@ -915,20 +949,14 @@ def test_cohorts_nd_by(func, method, axis, engine): by[0, 4] = 31 array = np.broadcast_to(array, (2, 3) + array.shape) - if "arg" in func and (axis is None or engine == "flox"): - pytest.skip() - if func in ["any", "all"]: fill_value = False else: fill_value = -123 - if axis is not None and method != "map-reduce": - pytest.xfail() - if axis is None and ("first" in func or "last" in func): - pytest.skip() - kwargs = dict(func=func, engine=engine, method=method, axis=axis, fill_value=fill_value) + if "quantile" in func: + kwargs["finalize_kwargs"] = {"q": DEFAULT_QUANTILE} actual, groups = groupby_reduce(array, by, **kwargs) expected, sorted_groups = groupby_reduce(array.compute(), by, **kwargs) assert_equal(groups, sorted_groups) @@ -990,6 +1018,8 @@ def test_datetime_binning(): def test_bool_reductions(func, engine): if "arg" in func and engine == "flox": pytest.skip() + if "quantile" in func or "mode" in func: + pytest.skip() groups = np.array([1, 1, 1]) data = np.array([True, True, False]) npfunc = _get_array_func(func) @@ -1242,9 +1272,14 @@ def grouped_median(group_idx, array, *, axis=-1, size=None, fill_value=None, dty def test_dtype(func, dtype, engine): if "arg" in func or func in ["any", "all"]: pytest.skip() + + finalize_kwargs = {"q": DEFAULT_QUANTILE} if "quantile" in func else {} + arr = np.ones((4, 12), dtype=dtype) labels = np.array(["a", "a", "c", "c", "c", "b", "b", "c", "c", "b", "b", "f"]) - actual, _ = groupby_reduce(arr, labels, func=func, dtype=np.float64, engine=engine) + actual, _ = groupby_reduce( + arr, labels, func=func, dtype=np.float64, engine=engine, finalize_kwargs=finalize_kwargs + ) assert actual.dtype == np.dtype("float64") @@ -1387,6 +1422,33 @@ def test_validate_reindex() -> None: ) assert actual is False + with pytest.raises(ValueError): + _validate_reindex( + True, + "sum", + method="blockwise", + expected_groups=np.array([1, 2, 3]), + any_by_dask=False, + is_dask_array=True, + ) + + assert _validate_reindex( + True, + "sum", + method="blockwise", + expected_groups=np.array([1, 2, 3]), + any_by_dask=True, + is_dask_array=True, + ) + assert _validate_reindex( + None, + "sum", + method="blockwise", + expected_groups=np.array([1, 2, 3]), + any_by_dask=True, + is_dask_array=True, + ) + @requires_dask def test_1d_blockwise_sort_optimization():