diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 9521b7b68..5571d27af 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -17,7 +17,7 @@ jobs: steps: # We need the full repo to avoid this issue # https://github.com/actions/checkout/issues/23 - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml index 9449d9290..c71fc8913 100644 --- a/.github/workflows/ci-additional.yaml +++ b/.github/workflows/ci-additional.yaml @@ -22,7 +22,7 @@ jobs: outputs: triggered: ${{ steps.detect-trigger.outputs.trigger-found }} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 2 - uses: xarray-contrib/ci-trigger@v1.2 @@ -41,10 +41,10 @@ jobs: env: CONDA_ENV_FILE: ci/environment.yml - PYTHON_VERSION: "3.10" + PYTHON_VERSION: "3.11" steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. @@ -92,10 +92,10 @@ jobs: shell: bash -l {0} env: CONDA_ENV_FILE: ci/environment.yml - PYTHON_VERSION: "3.10" + PYTHON_VERSION: "3.11" steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 75aafc4b6..b963d671d 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -25,9 +25,9 @@ jobs: fail-fast: false matrix: os: ["ubuntu-latest", "windows-latest"] - python-version: ["3.8", "3.10"] + python-version: ["3.9", "3.11"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. - name: Set environment variables @@ -74,7 +74,7 @@ jobs: "minimal-requirements", ] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. - name: Set up conda environment @@ -108,7 +108,7 @@ jobs: run: shell: bash -l {0} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: repository: 'pydata/xarray' fetch-depth: 0 # Fetch all history for all branches and tags. diff --git a/.github/workflows/pypi.yaml b/.github/workflows/pypi.yaml index 7372dd0fa..93891fcd7 100644 --- a/.github/workflows/pypi.yaml +++ b/.github/workflows/pypi.yaml @@ -8,7 +8,7 @@ jobs: deploy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: diff --git a/.github/workflows/testpypi-release.yaml b/.github/workflows/testpypi-release.yaml index 1ed0a4a56..3b5f98b17 100644 --- a/.github/workflows/testpypi-release.yaml +++ b/.github/workflows/testpypi-release.yaml @@ -17,14 +17,14 @@ jobs: if: ${{ contains( github.event.pull_request.labels.*.name, 'test-build') && github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' }} runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 - uses: actions/setup-python@v4 name: Install Python with: - python-version: "3.10" + python-version: "3.11" - name: Install dependencies run: | @@ -65,7 +65,7 @@ jobs: - uses: actions/setup-python@v4 name: Install Python with: - python-version: "3.10" + python-version: "3.11" - uses: actions/download-artifact@v3 with: name: releases diff --git a/.github/workflows/upstream-dev-ci.yaml b/.github/workflows/upstream-dev-ci.yaml index 0de92037f..80ad81ac6 100644 --- a/.github/workflows/upstream-dev-ci.yaml +++ b/.github/workflows/upstream-dev-ci.yaml @@ -26,9 +26,9 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.10"] + python-version: ["3.11"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 # Fetch all history for all branches and tags. - name: Set environment variables diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e9b1e9d6b..c36fa38cf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ ci: repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: 'v0.0.276' + rev: 'v0.0.292' hooks: - id: ruff args: ["--fix"] @@ -18,12 +18,12 @@ repos: - id: check-docstring-first - repo: https://github.com/psf/black - rev: 23.3.0 + rev: 23.9.1 hooks: - id: black - repo: https://github.com/executablebooks/mdformat - rev: 0.7.16 + rev: 0.7.17 hooks: - id: mdformat additional_dependencies: @@ -44,13 +44,13 @@ repos: args: [--extra-keys=metadata.kernelspec metadata.language_info.version] - repo: https://github.com/codespell-project/codespell - rev: v2.2.5 + rev: v2.2.6 hooks: - id: codespell additional_dependencies: - tomli - repo: https://github.com/abravalheri/validate-pyproject - rev: v0.13 + rev: v0.14 hooks: - id: validate-pyproject diff --git a/ci/environment.yml b/ci/environment.yml index 93f0e891f..9d5aa6d01 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -22,3 +22,4 @@ dependencies: - pooch - toolz - numba + - scipy diff --git a/docs/source/aggregations.md b/docs/source/aggregations.md index e6c10e4ba..d3591d2dc 100644 --- a/docs/source/aggregations.md +++ b/docs/source/aggregations.md @@ -11,8 +11,11 @@ the `func` kwarg: - `"std"`, `"nanstd"` - `"argmin"` - `"argmax"` -- `"first"` -- `"last"` +- `"first"`, `"nanfirst"` +- `"last"`, `"nanlast"` +- `"median"`, `"nanmedian"` +- `"mode"`, `"nanmode"` +- `"quantile"`, `"nanquantile"` ```{tip} We would like to add support for `cumsum`, `cumprod` ([issue](https://github.com/xarray-contrib/flox/issues/91)). Contributions are welcome! diff --git a/docs/source/implementation.md b/docs/source/implementation.md index f3a2a87f7..29d9faf46 100644 --- a/docs/source/implementation.md +++ b/docs/source/implementation.md @@ -199,7 +199,7 @@ width: 100% 1. Group labels must be known at graph construction time, so this only works for numpy arrays. 1. This does require more tasks and a more complicated graph, but the communication overhead can be significantly lower. 1. The detection of "cohorts" is currently slow but could be improved. -1. The extra effort of detecting cohorts and mul;tiple copying of intermediate blocks may be worthwhile only if the chunk sizes are small +1. The extra effort of detecting cohorts and multiple copying of intermediate blocks may be worthwhile only if the chunk sizes are small relative to the approximate period of group labels, or small relative to the size of spatially localized groups. ### Example : sensitivity to chunking diff --git a/docs/source/user-stories.md b/docs/source/user-stories.md index 22b37939e..0241e01dc 100644 --- a/docs/source/user-stories.md +++ b/docs/source/user-stories.md @@ -8,4 +8,5 @@ user-stories/climatology.ipynb user-stories/climatology-hourly.ipynb user-stories/custom-aggregations.ipynb + user-stories/nD-bins.ipynb ``` diff --git a/docs/source/user-stories/custom-aggregations.ipynb b/docs/source/user-stories/custom-aggregations.ipynb index 7b4167b98..8b9be09e9 100644 --- a/docs/source/user-stories/custom-aggregations.ipynb +++ b/docs/source/user-stories/custom-aggregations.ipynb @@ -15,8 +15,13 @@ ">\n", "> A = da.groupby(['lon_bins', 'lat_bins']).mode()\n", "\n", - "This notebook will describe how to accomplish this using a custom `Aggregation`\n", - "since `mode` and `median` aren't supported by flox yet.\n" + "This notebook will describe how to accomplish this using a custom `Aggregation`.\n", + "\n", + "\n", + "```{tip}\n", + "flox now supports `mode`, `nanmode`, `quantile`, `nanquantile`, `median`, `nanmedian` using exactly the same \n", + "approach as shown below\n", + "```\n" ] }, { @@ -135,7 +140,7 @@ " # The next are for dask inputs and describe how to reduce\n", " # the data in parallel\n", " chunk=(\"sum\", \"nanlen\"), # first compute these blockwise : (grouped_sum, grouped_count)\n", - " combine=(\"sum\", \"sum\"), # reduce intermediate reuslts (sum the sums, sum the counts)\n", + " combine=(\"sum\", \"sum\"), # reduce intermediate results (sum the sums, sum the counts)\n", " finalize=lambda sum_, count: sum_ / count, # final mean value (divide sum by count)\n", "\n", " fill_value=(0, 0), # fill value for intermediate sums and counts when groups have no members\n", diff --git a/docs/source/user-stories/nD-bins.ipynb b/docs/source/user-stories/nD-bins.ipynb new file mode 100644 index 000000000..87ef942bf --- /dev/null +++ b/docs/source/user-stories/nD-bins.ipynb @@ -0,0 +1,373 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "e970d800-c612-482a-bb3a-b1eb7ad53d88", + "metadata": { + "tags": [], + "user_expressions": [] + }, + "source": [ + "# Binning with multi-dimensional bins\n", + "\n", + "```{warning}\n", + "This post is a proof-of-concept for discussion. Expect APIs to change to enable this use case.\n", + "```\n", + "\n", + "Here we explore a binning problem where the bins are multidimensional\n", + "([xhistogram issue](https://github.com/xgcm/xhistogram/issues/28))\n", + "\n", + "> One of such multi-dim bin applications is the ranked probability score rps we\n", + "> use in `xskillscore.rps`, where we want to know how many forecasts fell into\n", + "> which bins. Bins are often defined as terciles of the forecast distribution\n", + "> and the bins for these terciles\n", + "> (`forecast_with_lon_lat_time_dims.quantile(q=[.33,.66],dim='time')`) depend on\n", + "> `lon` and `lat`.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01f1a2ef-de62-45d0-a04e-343cd78debc5", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import math\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xarray as xr\n", + "\n", + "import flox\n", + "import flox.xarray" + ] + }, + { + "cell_type": "markdown", + "id": "0be3e214-0cf0-426f-8ebb-669cc5322310", + "metadata": { + "user_expressions": [] + }, + "source": [ + "## Create test data\n" + ] + }, + { + "cell_type": "markdown", + "id": "ce239000-e053-4fc3-ad14-e9e0160da869", + "metadata": { + "user_expressions": [] + }, + "source": [ + "Data to be reduced\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7659c24e-f5a1-4e59-84c0-5ec965ef92d2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "array = xr.DataArray(\n", + " np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]),\n", + " dims=(\"space\", \"time\"),\n", + " name=\"array\",\n", + ")\n", + "array" + ] + }, + { + "cell_type": "markdown", + "id": "da0c0ac9-ad75-42cd-a1ea-99069f5bef00", + "metadata": { + "user_expressions": [] + }, + "source": [ + "Array to group by\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4601e744-5d22-447e-97ce-9644198d485e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "by = xr.DataArray(\n", + " np.array([[1, 2, 3], [3, 4, 5], [5, 6, 7], [6, 7, 9]]),\n", + " dims=(\"space\", \"time\"),\n", + " name=\"by\",\n", + ")\n", + "by" + ] + }, + { + "cell_type": "markdown", + "id": "61c21c94-7b6e-46a6-b9c2-59d7b2d40c81", + "metadata": { + "tags": [], + "user_expressions": [] + }, + "source": [ + "Multidimensional bins:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "863a1991-ab8d-47c0-aa48-22b422fcea8c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "bins = by + 0.5\n", + "bins = xr.DataArray(\n", + " np.concatenate([bins, bins[:, [-1]] + 1], axis=-1)[:, :-1].T,\n", + " dims=(\"time\", \"nbins\"),\n", + " name=\"bins\",\n", + ")\n", + "bins" + ] + }, + { + "cell_type": "markdown", + "id": "e65ecaba-d1cc-4485-ae58-c390cb2ebfab", + "metadata": { + "user_expressions": [] + }, + "source": [ + "## Concept\n", + "\n", + "The key idea is that GroupBy is two steps:\n", + "\n", + "1. Factorize (a.k.a \"digitize\") : convert the `by` data to a set of integer\n", + " codes representing the bins.\n", + "2. Apply the reduction.\n", + "\n", + "We treat multi-dimensional binning as a slightly complicated factorization\n", + "problem. Assume that bins are a function of `time`. So we\n", + "\n", + "1. generate a set of appropriate integer codes by:\n", + " 1. Loop over \"time\" and factorize the data appropriately.\n", + " 2. Add an offset to these codes so that \"bin 0\" for `time=0` is different\n", + " from \"bin 0\" for `time=1`\n", + "2. apply the groupby reduction to the \"offset codes\"\n", + "3. reshape the output to the right shape\n", + "\n", + "We will work at the xarray level, so its easy to keep track of the different\n", + "dimensions.\n", + "\n", + "### Factorizing\n", + "\n", + "The core `factorize_` function (which wraps `pd.cut`) only handles 1D bins, so\n", + "we use `xr.apply_ufunc` to vectorize it for us.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aa33ab2c-0ecf-4198-a033-2a77f5d83c99", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "factorize_loop_dim = \"time\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "afcddcc1-dd57-461e-a649-1f8bcd30342f", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def factorize_nd_bins_core(by, bins):\n", + " group_idx, *_, props = flox.core.factorize_(\n", + " (by,),\n", + " axes=(-1,),\n", + " expected_groups=(pd.IntervalIndex.from_breaks(bins),),\n", + " )\n", + " # Use -1 as the NaN sentinel value\n", + " group_idx[props.nanmask] = -1\n", + " return group_idx\n", + "\n", + "\n", + "codes = xr.apply_ufunc(\n", + " factorize_nd_bins_core,\n", + " by,\n", + " bins,\n", + " # TODO: avoid hardcoded dim names\n", + " input_core_dims=[[\"space\"], [\"nbins\"]],\n", + " output_core_dims=[[\"space\"]],\n", + " vectorize=True,\n", + ")\n", + "codes" + ] + }, + { + "cell_type": "markdown", + "id": "1661312a-dc61-4a26-bfd8-12c2dc01eb15", + "metadata": { + "user_expressions": [] + }, + "source": [ + "### Offset the codes\n", + "\n", + "These are integer codes appropriate for a single timestep.\n", + "\n", + "We now add an offset that changes in time, to make sure \"bin 0\" for `time=0` is\n", + "different from \"bin 0\" for `time=1` (taken from\n", + "[this StackOverflow thread](https://stackoverflow.com/questions/46256279/bin-elements-per-row-vectorized-2d-bincount-for-numpy)).\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0e5801cb-a79c-4670-ad10-36bb19f1a6ff", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "N = math.prod([codes.sizes[d] for d in codes.dims if d != factorize_loop_dim])\n", + "offset = xr.DataArray(np.arange(codes.sizes[factorize_loop_dim]), dims=factorize_loop_dim)\n", + "# TODO: think about N-1 here\n", + "offset_codes = (codes + offset * (N - 1)).rename(by.name)\n", + "offset_codes.data[codes == -1] = -1\n", + "offset_codes" + ] + }, + { + "cell_type": "markdown", + "id": "6c06c48b-316b-4a33-9bc3-921acd10bcba", + "metadata": { + "user_expressions": [] + }, + "source": [ + "### Reduce\n", + "\n", + "Now that we have appropriate codes, let's apply the reduction\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2cf1295e-4585-48b9-ac2b-9e00d03b2b9a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "interim = flox.xarray.xarray_reduce(\n", + " array,\n", + " offset_codes,\n", + " func=\"sum\",\n", + " # We use RangeIndex to indicate that `-1` code can be safely ignored\n", + " # (it indicates values outside the bins)\n", + " # TODO: Avoid hardcoding 9 = sizes[\"time\"] x (sizes[\"nbins\"] - 1)\n", + " expected_groups=pd.RangeIndex(9),\n", + ")\n", + "interim" + ] + }, + { + "cell_type": "markdown", + "id": "3539509b-d9b4-4342-a679-6ada6f285dfb", + "metadata": { + "user_expressions": [] + }, + "source": [ + "## Make final result\n", + "\n", + "Now reshape that 1D result appropriately.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b1389d37-d76d-4a50-9dfb-8710258de3fd", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "final = (\n", + " interim.coarsen(by=3)\n", + " # bin_number dimension is last, this makes sense since it is the core dimension\n", + " # and we vectorize over the loop dims.\n", + " # So the first (Nbins-1) elements are for the first index of the loop dim\n", + " .construct({\"by\": (factorize_loop_dim, \"bin_number\")})\n", + " .transpose(..., factorize_loop_dim)\n", + " .drop_vars(\"by\")\n", + ")\n", + "final" + ] + }, + { + "cell_type": "markdown", + "id": "a98b5e60-94af-45ae-be1b-4cb47e2d77ba", + "metadata": { + "user_expressions": [] + }, + "source": [ + "I think this is the expected answer.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "053a8643-f6d9-4fd1-b014-230fa716449c", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "array.isel(space=slice(1, None)).rename({\"space\": \"bin_number\"}).identical(final)" + ] + }, + { + "cell_type": "markdown", + "id": "619ba4c4-7c87-459a-ab86-c187d3a86c67", + "metadata": { + "tags": [], + "user_expressions": [] + }, + "source": [ + "## TODO\n", + "\n", + "This could be extended to:\n", + "\n", + "1. handle multiple `factorize_loop_dim`\n", + "2. avoid hard coded dimension names in the `apply_ufunc` call for factorizing\n", + "3. avoid hard coded number of output elements in the `xarray_reduce` call.\n", + "4. Somehow propagate the bin edges to the final output.\n" + ] + } + ], + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/flox/aggregate_npg.py b/flox/aggregate_npg.py index 30e0eb257..966bd43b8 100644 --- a/flox/aggregate_npg.py +++ b/flox/aggregate_npg.py @@ -100,3 +100,84 @@ def _len(group_idx, array, engine, *, func, axis=-1, size=None, fill_value=None, len = partial(_len, func="len") nanlen = partial(_len, func="nanlen") + + +def median(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=np.median, + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def nanmedian(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=np.nanmedian, + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def quantile(group_idx, array, engine, *, q, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=partial(np.quantile, q=q), + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def nanquantile(group_idx, array, engine, *, q, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=partial(np.nanquantile, q=q), + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def mode_(array, nan_policy, dtype): + from scipy.stats import mode + + # npg splits `array` into object arrays for each group + # scipy.stats.mode does not like that + # here we cast back + return mode(array.astype(dtype, copy=False), nan_policy=nan_policy, axis=-1).mode + + +def mode(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=partial(mode_, nan_policy="propagate", dtype=array.dtype), + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) + + +def nanmode(group_idx, array, engine, *, axis=-1, size=None, fill_value=None, dtype=None): + return npg.aggregate_numpy.aggregate( + group_idx, + array, + func=partial(mode_, nan_policy="omit", dtype=array.dtype), + axis=axis, + size=size, + fill_value=fill_value, + dtype=dtype, + ) diff --git a/flox/aggregations.py b/flox/aggregations.py index e5013032a..c06ef3509 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -6,7 +6,6 @@ from typing import TYPE_CHECKING, Any, Callable, TypedDict import numpy as np -import numpy_groupies as npg from numpy.typing import DTypeLike from . import aggregate_flox, aggregate_npg, xrutils @@ -35,6 +34,16 @@ class AggDtype(TypedDict): intermediate: tuple[np.dtype | type[np.intp], ...] +def get_npg_aggregation(func, *, engine): + try: + method_ = getattr(aggregate_npg, func) + method = partial(method_, engine=engine) + except AttributeError: + aggregate = aggregate_npg._get_aggregate(engine).aggregate + method = partial(aggregate, func=func) + return method + + def generic_aggregate( group_idx, array, @@ -51,14 +60,11 @@ def generic_aggregate( try: method = getattr(aggregate_flox, func) except AttributeError: - method = partial(npg.aggregate_numpy.aggregate, func=func) + method = get_npg_aggregation(func, engine="numpy") + elif engine in ["numpy", "numba"]: - try: - method_ = getattr(aggregate_npg, func) - method = partial(method_, engine=engine) - except AttributeError: - aggregate = aggregate_npg._get_aggregate(engine).aggregate - method = partial(aggregate, func=func) + method = get_npg_aggregation(func, engine=engine) + else: raise ValueError( f"Expected engine to be one of ['flox', 'numpy', 'numba']. Received {engine} instead." @@ -465,10 +471,22 @@ def _pick_second(*x): final_dtype=bool, ) -# numpy_groupies does not support median -# And the dask version is really hard! -# median = Aggregation("median", chunk=None, combine=None, fill_value=None) -# nanmedian = Aggregation("nanmedian", chunk=None, combine=None, fill_value=None) +# Support statistical quantities only blockwise +# The parallel versions will be approximate and are hard to implement! +median = Aggregation( + name="median", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64 +) +nanmedian = Aggregation( + name="nanmedian", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64 +) +quantile = Aggregation( + name="quantile", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64 +) +nanquantile = Aggregation( + name="nanquantile", fill_value=dtypes.NA, chunk=None, combine=None, final_dtype=np.float64 +) +mode = Aggregation(name="mode", fill_value=dtypes.NA, chunk=None, combine=None) +nanmode = Aggregation(name="nanmode", fill_value=dtypes.NA, chunk=None, combine=None) aggregations = { "any": any_, @@ -496,6 +514,12 @@ def _pick_second(*x): "nanfirst": nanfirst, "last": last, "nanlast": nanlast, + "median": median, + "nanmedian": nanmedian, + "quantile": quantile, + "nanquantile": nanquantile, + "mode": mode, + "nanmode": nanmode, } diff --git a/flox/core.py b/flox/core.py index 107092c75..ee7fa1bb2 100644 --- a/flox/core.py +++ b/flox/core.py @@ -7,16 +7,14 @@ import sys import warnings from collections import namedtuple +from collections.abc import Mapping, Sequence from functools import partial, reduce from numbers import Integral from typing import ( TYPE_CHECKING, Any, Callable, - Dict, Literal, - Mapping, - Sequence, Union, overload, ) @@ -77,8 +75,8 @@ T_IsBins = Union[bool | Sequence[bool]] -IntermediateDict = Dict[Union[str, Callable], Any] -FinalResultsDict = Dict[str, Union["DaskArray", np.ndarray]] +IntermediateDict = dict[Union[str, Callable], Any] +FinalResultsDict = dict[str, Union["DaskArray", np.ndarray]] FactorProps = namedtuple("FactorProps", "offset_group nan_sentinel nanmask") # This dummy axis is inserted using np.expand_dims @@ -1316,15 +1314,14 @@ def dask_groupby_agg( assert isinstance(axis, Sequence) assert all(ax >= 0 for ax in axis) - if method == "blockwise" and not isinstance(by, np.ndarray): - raise NotImplementedError - inds = tuple(range(array.ndim)) name = f"groupby_{agg.name}" token = dask.base.tokenize(array, by, agg, expected_groups, axis) if expected_groups is None and reindex: expected_groups = _get_expected_groups(by, sort=sort) + if method == "cohorts": + assert reindex is False by_input = by @@ -1358,7 +1355,6 @@ def dask_groupby_agg( # b. "_grouped_combine": A more general solution where we tree-reduce the groupby reduction. # This allows us to discover groups at compute time, support argreductions, lower intermediate # memory usage (but method="cohorts" would also work to reduce memory in some cases) - do_simple_combine = not _is_arg_reduction(agg) if method == "blockwise": @@ -1384,7 +1380,7 @@ def dask_groupby_agg( partial( blockwise_method, axis=axis, - expected_groups=None if method == "cohorts" else expected_groups, + expected_groups=expected_groups if reindex else None, engine=engine, sort=sort, ), @@ -1477,14 +1473,24 @@ def dask_groupby_agg( elif method == "blockwise": reduced = intermediate - # Here one input chunk → one output chunks - # find number of groups in each chunk, this is needed for output chunks - # along the reduced axis - slices = slices_from_chunks(tuple(array.chunks[ax] for ax in axis)) - groups_in_block = tuple(_unique(by_input[slc]) for slc in slices) - groups = (np.concatenate(groups_in_block),) - ngroups_per_block = tuple(len(grp) for grp in groups_in_block) - group_chunks = (ngroups_per_block,) + if reindex: + if TYPE_CHECKING: + assert expected_groups is not None + # TODO: we could have `expected_groups` be a dask array with appropriate chunks + # for now, we have a numpy array that is interpreted as listing all group labels + # that are present in every chunk + groups = (expected_groups,) + group_chunks = ((len(expected_groups),),) + else: + # Here one input chunk → one output chunks + # find number of groups in each chunk, this is needed for output chunks + # along the reduced axis + # TODO: this logic is very specialized for the resampling case + slices = slices_from_chunks(tuple(array.chunks[ax] for ax in axis)) + groups_in_block = tuple(_unique(by_input[slc]) for slc in slices) + groups = (np.concatenate(groups_in_block),) + ngroups_per_block = tuple(len(grp) for grp in groups_in_block) + group_chunks = (ngroups_per_block,) else: raise ValueError(f"Unknown method={method}.") @@ -1556,7 +1562,7 @@ def _validate_reindex( if reindex is True and not all_numpy: if _is_arg_reduction(func): raise NotImplementedError - if method in ["blockwise", "cohorts"]: + if method == "cohorts" or (method == "blockwise" and not any_by_dask): raise ValueError( "reindex=True is not a valid choice for method='blockwise' or method='cohorts'." ) @@ -1571,7 +1577,11 @@ def _validate_reindex( # have to do the grouped_combine since there's no good fill_value reindex = False - if method == "blockwise" or _is_arg_reduction(func): + if method == "blockwise": + # for grouping by dask arrays, we set reindex=True + reindex = any_by_dask + + elif _is_arg_reduction(func): reindex = False elif method == "cohorts": @@ -1795,7 +1805,10 @@ def groupby_reduce( *by : ndarray or DaskArray Array of labels to group over. Must be aligned with ``array`` so that ``array.shape[-by.ndim :] == by.shape`` - func : str or Aggregation + func : {"all", "any", "count", "sum", "nansum", "mean", "nanmean", \ + "max", "nanmax", "min", "nanmin", "argmax", "nanargmax", "argmin", "nanargmin", \ + "quantile", "nanquantile", "median", "nanmedian", "mode", "nanmode", \ + "first", "nanfirst", "last", "nanlast"} or Aggregation Single function name or an Aggregation instance expected_groups : (optional) Sequence Expected unique labels. @@ -1863,7 +1876,7 @@ def groupby_reduce( boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the original block size. Avoid that by using ``method="cohorts"``. By default, it is turned off for argreductions. finalize_kwargs : dict, optional - Kwargs passed to finalize the reduction such as ``ddof`` for var, std. + Kwargs passed to finalize the reduction such as ``ddof`` for var, std or ``q`` for quantile. Returns ------- @@ -1877,10 +1890,10 @@ def groupby_reduce( xarray.xarray_reduce """ + if func == "quantile" and (finalize_kwargs is None or "q" not in finalize_kwargs): + raise ValueError("Please pass `q` for quantile calculations.") + bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) - nby = len(bys) - by_is_dask = tuple(is_duck_dask_array(b) for b in bys) - any_by_dask = any(by_is_dask) if engine is None: engine = _choose_engine(bys, func) @@ -1891,6 +1904,10 @@ def groupby_reduce( "Try engine='numpy' or engine='numba' instead." ) + nby = len(bys) + by_is_dask = tuple(is_duck_dask_array(b) for b in bys) + any_by_dask = any(by_is_dask) + if method in ["split-reduce", "cohorts"] and any_by_dask: raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") @@ -2054,7 +2071,7 @@ def groupby_reduce( result, groups = partial_agg( array, by_, - expected_groups=None if method == "blockwise" else expected_groups, + expected_groups=expected_groups, agg=agg, reindex=reindex, method=method, diff --git a/flox/xarray.py b/flox/xarray.py index a335a6f35..397e1d9b5 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Hashable, Iterable, Sequence, Union +from collections.abc import Hashable, Iterable, Sequence +from typing import TYPE_CHECKING, Any, Union import numpy as np import pandas as pd @@ -88,8 +89,11 @@ def xarray_reduce( Xarray object to reduce *by : DataArray or iterable of str or iterable of DataArray Variables with which to group by ``obj`` - func : str or Aggregation - Reduction method + func : {"all", "any", "count", "sum", "nansum", "mean", "nanmean", \ + "max", "nanmax", "min", "nanmin", "argmax", "nanargmax", "argmin", "nanargmin", \ + "quantile", "nanquantile", "median", "nanmedian", "mode", "nanmode", \ + "first", "nanfirst", "last", "nanlast"} or Aggregation + Single function name or an Aggregation instance expected_groups : str or sequence expected group labels corresponding to each `by` variable isbin : iterable of bool @@ -164,7 +168,7 @@ def xarray_reduce( boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the original block size. Avoid that by using method="cohorts". By default, it is turned off for arg reductions. **finalize_kwargs - kwargs passed to the finalize function, like ``ddof`` for var, std. + kwargs passed to the finalize function, like ``ddof`` for var, std or ``q`` for quantile. Returns ------- diff --git a/flox/xrutils.py b/flox/xrutils.py index 2204dc0c4..50ccffa30 100644 --- a/flox/xrutils.py +++ b/flox/xrutils.py @@ -3,7 +3,8 @@ import datetime import importlib -from typing import Any, Iterable +from collections.abc import Iterable +from typing import Any import numpy as np import pandas as pd diff --git a/pyproject.toml b/pyproject.toml index fb27ee761..e387657d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "flox" description = "GroupBy operations for dask.array" license = {file = "LICENSE"} readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.9" keywords = ["xarray", "dask", "groupby"] classifiers = [ "Development Status :: 4 - Beta", @@ -11,7 +11,6 @@ classifiers = [ "Natural Language :: English", "Operating System :: OS Independent", "Programming Language :: Python", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", @@ -60,10 +59,10 @@ write_to_template= '__version__ = "{version}"' [tool.black] line-length = 100 -target-version = ["py38"] +target-version = ["py39"] [tool.ruff] -target-version = "py38" +target-version = "py39" builtins = ["ellipsis"] exclude = [ ".eggs", @@ -119,7 +118,8 @@ module=[ "matplotlib.*", "pandas", "setuptools", - "toolz" + "scipy.*", + "toolz", ] ignore_missing_imports = true diff --git a/tests/__init__.py b/tests/__init__.py index 4c04a0fc8..f1c8ec6bf 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,5 +1,5 @@ import importlib -from contextlib import contextmanager +from contextlib import nullcontext import numpy as np import packaging.version @@ -46,6 +46,8 @@ def LooseVersion(vstring): has_dask, requires_dask = _importorskip("dask") +has_numba, requires_numba = _importorskip("numba") +has_scipy, requires_scipy = _importorskip("scipy") has_xarray, requires_xarray = _importorskip("xarray") @@ -67,15 +69,10 @@ def __call__(self, dsk, keys, **kwargs): return dask.get(dsk, keys, **kwargs) -@contextmanager -def dummy_context(): - yield None - - def raise_if_dask_computes(max_computes=0): # return a dummy context manager so that this can be used for non-dask objects if not has_dask: - return dummy_context() + return nullcontext() scheduler = CountingScheduler(max_computes) return dask.config.set(scheduler=scheduler) diff --git a/tests/conftest.py b/tests/conftest.py index 37956aaf8..eb5971784 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,10 @@ import pytest +from . import requires_numba -@pytest.fixture(scope="module", params=[None, "flox", "numpy", "numba"]) + +@pytest.fixture( + scope="module", params=["flox", "numpy", pytest.param("numba", marks=requires_numba)] +) def engine(request): - if request.param == "numba": - try: - import numba # noqa - except ImportError: - pytest.xfail() return request.param diff --git a/tests/test_core.py b/tests/test_core.py index 376ddc054..f6cece01c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -33,6 +33,7 @@ # has_numbagg, raise_if_dask_computes, requires_dask, + requires_scipy, ) labels = np.array([0, 0, 2, 2, 2, 1, 1, 2, 2, 1, 1, 0]) @@ -52,6 +53,9 @@ def dask_array_ones(*args): return None +DEFAULT_QUANTILE = 0.9 +SCIPY_STATS_FUNCS = ("mode", "nanmode") +BLOCKWISE_FUNCS = ("median", "nanmedian", "quantile", "nanquantile") + SCIPY_STATS_FUNCS ALL_FUNCS = ( "sum", "nansum", @@ -75,9 +79,11 @@ def dask_array_ones(*args): "any", "all", "nanlast", - pytest.param("median", marks=(pytest.mark.skip,)), - pytest.param("nanmedian", marks=(pytest.mark.skip,)), -) + "median", + "nanmedian", + "quantile", + "nanquantile", +) + tuple(pytest.param(func, marks=requires_scipy) for func in SCIPY_STATS_FUNCS) if TYPE_CHECKING: from flox.core import T_Agg, T_Engine, T_ExpectedGroupsOpt, T_Method @@ -86,12 +92,26 @@ def dask_array_ones(*args): def _get_array_func(func: str) -> Callable: if func == "count": - def npfunc(x): + def npfunc(x, **kwargs): x = np.asarray(x) return (~np.isnan(x)).sum() elif func in ["nanfirst", "nanlast"]: npfunc = getattr(xrutils, func) + + elif func in SCIPY_STATS_FUNCS: + import scipy.stats + + if "nan" in func: + func = func[3:] + nan_policy = "omit" + else: + nan_policy = "propagate" + + def npfunc(x, **kwargs): + spfunc = partial(getattr(scipy.stats, func), nan_policy=nan_policy) + return getattr(spfunc(x, **kwargs), func) + else: npfunc = getattr(np, func) @@ -107,7 +127,7 @@ def test_alignment_error(): @pytest.mark.parametrize("dtype", (float, int)) -@pytest.mark.parametrize("chunk", [False, True]) +@pytest.mark.parametrize("chunk", [False, pytest.param(True, marks=requires_dask)]) # TODO: make this intp when python 3.8 is dropped @pytest.mark.parametrize("expected_groups", [None, [0, 1, 2], np.array([0, 1, 2], dtype=np.int64)]) @pytest.mark.parametrize( @@ -147,7 +167,7 @@ def test_groupby_reduce( ) -> None: array = array.astype(dtype) if chunk: - if not has_dask or expected_groups is None: + if expected_groups is None: pytest.skip() array = da.from_array(array, chunks=(3,) if array.ndim == 1 else (1, 3)) by = da.from_array(by, chunks=(3,) if by.ndim == 1 else (1, 3)) @@ -168,7 +188,7 @@ def test_groupby_reduce( engine=engine, ) # we use pd.Index(expected_groups).to_numpy() which is always int64 - # for the values in this tests + # for the values in this test if expected_groups is None: g_dtype = by.dtype elif isinstance(expected_groups, np.ndarray): @@ -193,15 +213,21 @@ def gen_array_by(size, func): return array, by -@pytest.mark.parametrize("chunks", [None, -1, 3, 4]) +@pytest.mark.parametrize( + "chunks", + [ + None, + pytest.param(-1, marks=requires_dask), + pytest.param(3, marks=requires_dask), + pytest.param(4, marks=requires_dask), + ], +) @pytest.mark.parametrize("nby", [1, 2, 3]) @pytest.mark.parametrize("size", ((12,), (12, 9))) @pytest.mark.parametrize("add_nan_by", [True, False]) @pytest.mark.parametrize("func", ALL_FUNCS) def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): - if chunks is not None and not has_dask: - pytest.skip() - if "arg" in func and engine == "flox": + if ("arg" in func and engine == "flox") or (func in BLOCKWISE_FUNCS and chunks != -1): pytest.skip() array, by = gen_array_by(size, func) @@ -220,6 +246,10 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): finalize_kwargs = finalize_kwargs + [{"ddof": 1}, {"ddof": 0}] fill_value = np.nan tolerance = {"rtol": 1e-14, "atol": 1e-16} + elif "quantile" in func: + finalize_kwargs = [{"q": DEFAULT_QUANTILE}] + fill_value = None + tolerance = None else: fill_value = None tolerance = None @@ -242,15 +272,16 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): func_ = f"nan{func}" if "nan" not in func else func array_[..., nanmask] = np.nan expected = getattr(np, func_)(array_, axis=-1, **kwargs) - # elif func in ["first", "last"]: - # expected = getattr(xrutils, f"nan{func}")(array_[..., ~nanmask], axis=-1, **kwargs) - elif func in ["nanfirst", "nanlast"]: - expected = getattr(xrutils, func)(array_[..., ~nanmask], axis=-1, **kwargs) else: - expected = getattr(np, func)(array_[..., ~nanmask], axis=-1, **kwargs) + array_func = _get_array_func(func) + expected = array_func(array_[..., ~nanmask], axis=-1, **kwargs) for _ in range(nby): expected = np.expand_dims(expected, -1) + if func in BLOCKWISE_FUNCS: + assert chunks == -1 + flox_kwargs["method"] = "blockwise" + actual, *groups = groupby_reduce(array, *by, **flox_kwargs) assert actual.ndim == (array.ndim + nby - 1) assert expected.ndim == (array.ndim + nby - 1) @@ -261,7 +292,7 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): assert actual.dtype.kind == "i" assert_equal(actual, expected, tolerance) - if not has_dask or chunks is None: + if not has_dask or chunks is None or func in BLOCKWISE_FUNCS: continue params = list(itertools.product(["map-reduce"], [True, False, None])) @@ -392,16 +423,16 @@ def test_numpy_reduce_nd_md(): def test_groupby_agg_dask(func, shape, array_chunks, group_chunks, add_nan, dtype, engine, reindex): """Tests groupby_reduce with dask arrays against groupby_reduce with numpy arrays""" - rng = np.random.default_rng(12345) - array = dask.array.from_array(rng.random(shape), chunks=array_chunks).astype(dtype) - array = dask.array.ones(shape, chunks=array_chunks) - - if func in ["first", "last"]: + if func in ["first", "last"] or func in BLOCKWISE_FUNCS: pytest.skip() if "arg" in func and (engine == "flox" or reindex): pytest.skip() + rng = np.random.default_rng(12345) + array = dask.array.from_array(rng.random(shape), chunks=array_chunks).astype(dtype) + array = dask.array.ones(shape, chunks=array_chunks) + labels = np.array([0, 0, 2, 2, 2, 1, 1, 2, 2, 1, 1, 0]) if add_nan: labels = labels.astype(float) @@ -547,7 +578,7 @@ def test_first_last_disallowed_dask(func): "axis", [None, (0, 1, 2), (0, 1), (0, 2), (1, 2), 0, 1, 2, (0,), (1,), (2,)] ) def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): - if "arg" in func and engine == "flox": + if ("arg" in func and engine == "flox") or func in BLOCKWISE_FUNCS: pytest.skip() if not isinstance(axis, int): @@ -614,7 +645,14 @@ def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): assert_equal(actual, expected, tolerance) -@pytest.mark.parametrize("reindex,chunks", [(None, None), (False, (2, 2, 3)), (True, (2, 2, 3))]) +@pytest.mark.parametrize( + "reindex, chunks", + [ + (None, None), + pytest.param(False, (2, 2, 3), marks=requires_dask), + pytest.param(True, (2, 2, 3), marks=requires_dask), + ], +) @pytest.mark.parametrize( "axis, groups, expected_shape", [ @@ -626,8 +664,6 @@ def test_groupby_reduce_axis_subset_against_numpy(func, axis, engine): def test_groupby_reduce_nans(reindex, chunks, axis, groups, expected_shape, engine): def _maybe_chunk(arr): if chunks: - if not has_dask: - pytest.skip() return da.from_array(arr, chunks=chunks) else: return arr @@ -741,7 +777,14 @@ def test_npg_nanarg_bug(func): ) @pytest.mark.parametrize("method", ["cohorts", "map-reduce"]) @pytest.mark.parametrize("chunk_labels", [False, True]) -@pytest.mark.parametrize("chunks", ((), (1,), (2,))) +@pytest.mark.parametrize( + "chunks", + ( + (), + pytest.param((1,), marks=requires_dask), + pytest.param((2,), marks=requires_dask), + ), +) def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None: array = [1, 1, 1, 1, 1, 1] labels = [0.2, 1.5, 1.9, 2, 3, 20] @@ -750,8 +793,6 @@ def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None: pytest.xfail() if chunks: - if not has_dask: - pytest.skip() array = dask.array.from_array(array, chunks=chunks) if chunk_labels: labels = dask.array.from_array(labels, chunks=chunks) @@ -827,15 +868,13 @@ def test_rechunk_for_cohorts(chunk_at, expected): assert rechunked.chunks == expected -@pytest.mark.parametrize("chunks", [None, 3]) +@pytest.mark.parametrize("chunks", [None, pytest.param(3, marks=requires_dask)]) @pytest.mark.parametrize("fill_value", [123, np.nan]) @pytest.mark.parametrize("func", ALL_FUNCS) def test_fill_value_behaviour(func, chunks, fill_value, engine): # fill_value = np.nan tests promotion of int counts to float # This is used by xarray - if func in ["all", "any"] or "arg" in func: - pytest.skip() - if chunks is not None and not has_dask: + if (func in ["all", "any"] or "arg" in func) or func in BLOCKWISE_FUNCS: pytest.skip() npfunc = _get_array_func(func) @@ -891,8 +930,17 @@ def test_cohorts_map_reduce_consistent_dtypes(method, dtype, labels_dtype): @requires_dask @pytest.mark.parametrize("func", ALL_FUNCS) @pytest.mark.parametrize("axis", (-1, None)) -@pytest.mark.parametrize("method", ["blockwise", "cohorts", "map-reduce", "split-reduce"]) +@pytest.mark.parametrize("method", ["blockwise", "cohorts", "map-reduce"]) def test_cohorts_nd_by(func, method, axis, engine): + if ( + ("arg" in func and (axis is None or engine == "flox")) + or (method != "blockwise" and func in BLOCKWISE_FUNCS) + or (axis is None and ("first" in func or "last" in func)) + ): + pytest.skip() + if axis is not None and method != "map-reduce": + pytest.xfail() + o = dask.array.ones((3,), chunks=-1) o2 = dask.array.ones((2, 3), chunks=-1) @@ -903,20 +951,14 @@ def test_cohorts_nd_by(func, method, axis, engine): by[0, 4] = 31 array = np.broadcast_to(array, (2, 3) + array.shape) - if "arg" in func and (axis is None or engine == "flox"): - pytest.skip() - if func in ["any", "all"]: fill_value = False else: fill_value = -123 - if axis is not None and method != "map-reduce": - pytest.xfail() - if axis is None and ("first" in func or "last" in func): - pytest.skip() - kwargs = dict(func=func, engine=engine, method=method, axis=axis, fill_value=fill_value) + if "quantile" in func: + kwargs["finalize_kwargs"] = {"q": DEFAULT_QUANTILE} actual, groups = groupby_reduce(array, by, **kwargs) expected, sorted_groups = groupby_reduce(array.compute(), by, **kwargs) assert_equal(groups, sorted_groups) @@ -978,6 +1020,8 @@ def test_datetime_binning(): def test_bool_reductions(func, engine): if "arg" in func and engine == "flox": pytest.skip() + if "quantile" in func or "mode" in func: + pytest.skip() groups = np.array([1, 1, 1]) data = np.array([True, True, False]) npfunc = _get_array_func(func) @@ -1052,11 +1096,8 @@ def test_factorize_values_outside_bins(): assert_equal(expected, actual) -@pytest.mark.parametrize("chunk", [True, False]) +@pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False]) def test_multiple_groupers_bins(chunk) -> None: - if chunk and not has_dask: - pytest.skip() - xp = dask.array if chunk else np array_kwargs = {"chunks": 2} if chunk else {} array = xp.ones((5, 2), **array_kwargs, dtype=np.int64) @@ -1089,9 +1130,9 @@ def test_multiple_groupers_bins(chunk) -> None: np.arange(2, 4).reshape(1, 2), ], ) -@pytest.mark.parametrize("chunk", [True, False]) +@pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False]) def test_multiple_groupers(chunk, by1, by2, expected_groups) -> None: - if chunk and (not has_dask or expected_groups is None): + if chunk and expected_groups is None: pytest.skip() xp = dask.array if chunk else np @@ -1233,9 +1274,14 @@ def grouped_median(group_idx, array, *, axis=-1, size=None, fill_value=None, dty def test_dtype(func, dtype, engine): if "arg" in func or func in ["any", "all"]: pytest.skip() + + finalize_kwargs = {"q": DEFAULT_QUANTILE} if "quantile" in func else {} + arr = np.ones((4, 12), dtype=dtype) labels = np.array(["a", "a", "c", "c", "c", "b", "b", "c", "c", "b", "b", "f"]) - actual, _ = groupby_reduce(arr, labels, func=func, dtype=np.float64) + actual, _ = groupby_reduce( + arr, labels, func=func, dtype=np.float64, engine=engine, finalize_kwargs=finalize_kwargs + ) assert actual.dtype == np.dtype("float64") @@ -1378,6 +1424,33 @@ def test_validate_reindex() -> None: ) assert actual is False + with pytest.raises(ValueError): + _validate_reindex( + True, + "sum", + method="blockwise", + expected_groups=np.array([1, 2, 3]), + any_by_dask=False, + is_dask_array=True, + ) + + assert _validate_reindex( + True, + "sum", + method="blockwise", + expected_groups=np.array([1, 2, 3]), + any_by_dask=True, + is_dask_array=True, + ) + assert _validate_reindex( + None, + "sum", + method="blockwise", + expected_groups=np.array([1, 2, 3]), + any_by_dask=True, + is_dask_array=True, + ) + @requires_dask def test_1d_blockwise_sort_optimization(): diff --git a/tests/test_xarray.py b/tests/test_xarray.py index 7a343d962..8f006e5f3 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -16,7 +16,7 @@ dask.config.set(scheduler="sync") try: - # Should test against legacy xarray implementation + # test against legacy xarray implementation xr.set_options(use_flox=False) except ValueError: pass @@ -31,15 +31,15 @@ @pytest.mark.parametrize("add_nan", [True, False]) @pytest.mark.parametrize("skipna", [True, False]) def test_xarray_reduce(skipna, add_nan, min_count, engine, reindex): + if skipna is False and min_count is not None: + pytest.skip() + arr = np.ones((4, 12)) if add_nan: arr[1, ...] = np.nan arr[[0, 2], [3, 4]] = np.nan - if skipna is False and min_count is not None: - pytest.skip() - labels = np.array(["a", "a", "c", "c", "c", "b", "b", "c", "c", "b", "b", "f"]) labels = np.array(labels) labels2 = np.array([1, 2, 2, 1]) @@ -77,11 +77,8 @@ def test_xarray_reduce(skipna, add_nan, min_count, engine, reindex): # TODO: sort @pytest.mark.parametrize("pass_expected_groups", [True, False]) -@pytest.mark.parametrize("chunk", (True, False)) +@pytest.mark.parametrize("chunk", (pytest.param(True, marks=requires_dask), False)) def test_xarray_reduce_multiple_groupers(pass_expected_groups, chunk, engine): - if not has_dask and chunk: - pytest.skip() - if chunk and pass_expected_groups is False: pytest.skip() @@ -126,11 +123,8 @@ def test_xarray_reduce_multiple_groupers(pass_expected_groups, chunk, engine): @pytest.mark.parametrize("pass_expected_groups", [True, False]) -@pytest.mark.parametrize("chunk", (True, False)) +@pytest.mark.parametrize("chunk", (pytest.param(True, marks=requires_dask), False)) def test_xarray_reduce_multiple_groupers_2(pass_expected_groups, chunk, engine): - if not has_dask and chunk: - pytest.skip() - if chunk and pass_expected_groups is False: pytest.skip() @@ -317,14 +311,12 @@ def test_multi_index_groupby_sum(engine): assert_equal(expected, actual) -@pytest.mark.parametrize("chunks", (None, 2)) +@pytest.mark.parametrize("chunks", (None, pytest.param(2, marks=requires_dask))) def test_xarray_groupby_bins(chunks, engine): array = xr.DataArray([1, 1, 1, 1, 1], dims="x") labels = xr.DataArray([1, 1.5, 1.9, 2, 3], dims="x", name="labels") if chunks: - if not has_dask: - pytest.skip() array = array.chunk({"x": chunks}) labels = labels.chunk({"x": chunks}) @@ -472,11 +464,8 @@ def test_alignment_error(): @pytest.mark.parametrize("add_nan", [True, False]) @pytest.mark.parametrize("dtype_out", [np.float64, "float64", np.dtype("float64")]) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) -@pytest.mark.parametrize("chunk", (True, False)) +@pytest.mark.parametrize("chunk", (pytest.param(True, marks=requires_dask), False)) def test_dtype(add_nan, chunk, dtype, dtype_out, engine): - if chunk and not has_dask: - pytest.skip() - xp = dask.array if chunk else np data = xp.linspace(0, 1, 48, dtype=dtype).reshape((4, 12)) @@ -508,12 +497,9 @@ def test_dtype(add_nan, chunk, dtype, dtype_out, engine): xr.testing.assert_allclose(expected, actual.transpose("labels", ...), **tolerance64) -@pytest.mark.parametrize("chunk", [True, False]) +@pytest.mark.parametrize("chunk", [pytest.param(True, marks=requires_dask), False]) @pytest.mark.parametrize("use_flox", [True, False]) def test_dtype_accumulation(use_flox, chunk): - if chunk and not has_dask: - pytest.skip() - datetimes = pd.date_range("2010-01", "2015-01", freq="6H", inclusive="left") samples = 10 + np.cos(2 * np.pi * 0.001 * np.arange(len(datetimes))) * 1 samples += np.random.randn(len(datetimes))