Skip to content

Commit

Permalink
Merge branch 'main' into optimize-broadcast
Browse files Browse the repository at this point in the history
* main:
  Handle min_count=0 (#238)
  Try and fix dtypes on 3.8,3.10 windows
  Preserve input dtypes now that pandas can do it.
  Add pytest-pretty to envs
  factorize early as much as possible
  Bump codecov/codecov-action from 3.1.1 to 3.1.3 (#239)
  [pre-commit.ci] pre-commit autoupdate (#229)
  • Loading branch information
dcherian committed May 4, 2023
2 parents 5527f7a + aa358a5 commit 9c3a810
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 38 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-additional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
run: |
python -m pytest --doctest-modules flox --ignore flox/tests --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
uses: codecov/codecov-action@v3.1.1
uses: codecov/codecov-action@v3.1.3
with:
file: ./coverage.xml
flags: unittests
Expand Down Expand Up @@ -126,7 +126,7 @@ jobs:
python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report
- name: Upload mypy coverage to Codecov
uses: codecov/codecov-action@v3.1.1
uses: codecov/codecov-action@v3.1.3
with:
file: mypy_report/cobertura.xml
flags: mypy
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
run: |
pytest -n auto --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
uses: codecov/codecov-action@v3.1.1
uses: codecov/codecov-action@v3.1.3
with:
file: ./coverage.xml
flags: unittests
Expand Down Expand Up @@ -91,7 +91,7 @@ jobs:
run: |
python -m pytest -n auto --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
uses: codecov/codecov-action@v3.1.1
uses: codecov/codecov-action@v3.1.3
with:
file: ./coverage.xml
flags: unittests
Expand Down
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ci:
repos:
- repo: https://github.com/charliermarsh/ruff-pre-commit
# Ruff version.
rev: 'v0.0.246'
rev: 'v0.0.260'
hooks:
- id: ruff
args: ["--fix"]
Expand All @@ -18,7 +18,7 @@ repos:
- id: check-docstring-first

- repo: https://github.com/psf/black
rev: 23.1.0
rev: 23.3.0
hooks:
- id: black

Expand All @@ -31,7 +31,7 @@ repos:
- mdformat-myst

- repo: https://github.com/nbQA-dev/nbQA
rev: 1.6.1
rev: 1.7.0
hooks:
- id: nbqa-black
- id: nbqa-ruff
Expand All @@ -44,13 +44,13 @@ repos:
args: [--extra-keys=metadata.kernelspec metadata.language_info.version]

- repo: https://github.com/codespell-project/codespell
rev: v2.2.2
rev: v2.2.4
hooks:
- id: codespell
additional_dependencies:
- tomli

- repo: https://github.com/abravalheri/validate-pyproject
rev: v0.12.1
rev: v0.12.2
hooks:
- id: validate-pyproject
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies:
- pip
- pytest
- pytest-cov
- pytest-pretty
- pytest-xdist
- xarray
- pre-commit
Expand Down
1 change: 1 addition & 0 deletions ci/minimal-requirements.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies:
- pip
- pytest
- pytest-cov
- pytest-pretty
- pytest-xdist
- numpy==1.20
- numpy_groupies==0.9.19
Expand Down
1 change: 1 addition & 0 deletions ci/no-dask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies:
- pip
- pytest
- pytest-cov
- pytest-pretty
- pytest-xdist
- xarray
- numpydoc
Expand Down
1 change: 1 addition & 0 deletions ci/no-xarray.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies:
- pip
- pytest
- pytest-cov
- pytest-pretty
- pytest-xdist
- dask-core
- numpydoc
Expand Down
1 change: 1 addition & 0 deletions ci/upstream-dev-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- numba
- pytest
- pytest-cov
- pytest-pretty
- pytest-xdist
- pip
- pip:
Expand Down
7 changes: 6 additions & 1 deletion flox/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,15 @@ def _initialize_aggregation(
assert isinstance(finalize_kwargs, dict)
agg.finalize_kwargs = finalize_kwargs

if min_count is None:
min_count = 0

# This is needed for the dask pathway.
# Because we use intermediate fill_value since a group could be
# absent in one block, but present in another block
# We set it for numpy to get nansum, nanprod tests to pass
# where the identity element is 0, 1
if min_count is not None:
if min_count > 0:
agg.min_count = min_count
agg.chunk += ("nanlen",)
agg.numpy += ("nanlen",)
Expand All @@ -571,5 +574,7 @@ def _initialize_aggregation(
agg.fill_value["numpy"] += (0,)
agg.dtype["intermediate"] += (np.intp,)
agg.dtype["numpy"] += (np.intp,)
else:
agg.min_count = 0

return agg
54 changes: 39 additions & 15 deletions flox/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ def rechunk_for_cohorts(
def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray:
"""
Rechunks array so that group boundaries line up with chunk boundaries, allowing
embarassingly parallel group reductions.
embarrassingly parallel group reductions.
This only works when the groups are sequential
(e.g. labels = ``[0,0,0,1,1,1,1,2,2]``).
Expand Down Expand Up @@ -849,7 +849,7 @@ def _finalize_results(
"""
squeezed = _squeeze_results(results, axis)

if agg.min_count is not None:
if agg.min_count > 0:
counts = squeezed["intermediates"][-1]
squeezed["intermediates"] = squeezed["intermediates"][:-1]

Expand All @@ -860,7 +860,7 @@ def _finalize_results(
else:
finalized[agg.name] = agg.finalize(*squeezed["intermediates"], **agg.finalize_kwargs)

if agg.min_count is not None:
if agg.min_count > 0:
count_mask = counts < agg.min_count
if count_mask.any():
# For one count_mask.any() prevents promoting bool to dtype(fill_value) unless
Expand Down Expand Up @@ -1598,7 +1598,11 @@ def _lazy_factorize_wrapper(*by: T_By, **kwargs) -> np.ndarray:


def _factorize_multiple(
by: T_Bys, expected_groups: T_ExpectIndexTuple, any_by_dask: bool, reindex: bool
by: T_Bys,
expected_groups: T_ExpectIndexTuple,
any_by_dask: bool,
reindex: bool,
sort: bool = True,
) -> tuple[tuple[np.ndarray], tuple[np.ndarray, ...], tuple[int, ...]]:
if any_by_dask:
import dask.array
Expand All @@ -1617,6 +1621,7 @@ def _factorize_multiple(
expected_groups=expected_groups,
fastpath=True,
reindex=reindex,
sort=sort,
)

fg, gs = [], []
Expand All @@ -1643,6 +1648,7 @@ def _factorize_multiple(
expected_groups=expected_groups,
fastpath=True,
reindex=reindex,
sort=sort,
)

return (group_idx,), found_groups, grp_shape
Expand All @@ -1653,10 +1659,16 @@ def _validate_expected_groups(nby: int, expected_groups: T_ExpectedGroupsOpt) ->
return (None,) * nby

if nby == 1 and not isinstance(expected_groups, tuple):
if isinstance(expected_groups, pd.Index):
if isinstance(expected_groups, (pd.Index, np.ndarray)):
return (expected_groups,)
else:
return (np.asarray(expected_groups),)
array = np.asarray(expected_groups)
if np.issubdtype(array.dtype, np.integer):
# preserve default dtypes
# on pandas 1.5/2, on windows
# when a list is passed
array = array.astype(np.int64)
return (array,)

if nby > 1 and not isinstance(expected_groups, tuple): # TODO: test for list
raise ValueError(
Expand Down Expand Up @@ -1833,21 +1845,28 @@ def groupby_reduce(
# (pd.IntervalIndex or not)
expected_groups = _convert_expected_groups_to_index(expected_groups, isbins, sort)

is_binning = any([isinstance(e, pd.IntervalIndex) for e in expected_groups])

# TODO: could restrict this to dask-only
factorize_early = (nby > 1) or (
is_binning and method == "cohorts" and is_duck_dask_array(array)
# Don't factorize "early only when
# grouping by dask arrays, and not having expected_groups
factorize_early = not (
# can't do it if we are grouping by dask array but don't have expected_groups
any(is_dask and ex_ is None for is_dask, ex_ in zip(by_is_dask, expected_groups))
)
if factorize_early:
bys, final_groups, grp_shape = _factorize_multiple(
bys, expected_groups, any_by_dask=any_by_dask, reindex=reindex
bys,
expected_groups,
any_by_dask=any_by_dask,
# This is the only way it makes sense I think.
# reindex controls what's actually allocated in chunk_reduce
# At this point, we care about an accurate conversion to codes.
reindex=True,
sort=sort,
)
expected_groups = (pd.RangeIndex(math.prod(grp_shape)),)

assert len(bys) == 1
by_ = bys[0]
expected_groups = expected_groups[0]
(by_,) = bys
(expected_groups,) = expected_groups

if axis is None:
axis_ = tuple(array.ndim + np.arange(-by_.ndim, 0))
Expand Down Expand Up @@ -1898,7 +1917,12 @@ def groupby_reduce(
min_count = 1

# TODO: set in xarray?
if min_count is not None and func in ["nansum", "nanprod"] and fill_value is None:
if (
min_count is not None
and min_count > 0
and func in ["nansum", "nanprod"]
and fill_value is None
):
# nansum, nanprod have fill_value=0, 1
# overwrite than when min_count is set
fill_value = np.nan
Expand Down
2 changes: 1 addition & 1 deletion flox/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ def rechunk_for_cohorts(
def rechunk_for_blockwise(obj: T_DataArray | T_Dataset, dim: str, labels: T_DataArray):
"""
Rechunks array so that group boundaries line up with chunk boundaries, allowing
embarassingly parallel group reductions.
embarrassingly parallel group reductions.
This only works when the groups are sequential
(e.g. labels = ``[0,0,0,1,1,1,1,2,2]``).
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest


@pytest.fixture(scope="module", params=["flox", "numpy", "numba"])
@pytest.fixture(scope="module", params=["flox"])
def engine(request):
if request.param == "numba":
try:
Expand Down
23 changes: 12 additions & 11 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def test_alignment_error():

@pytest.mark.parametrize("dtype", (float, int))
@pytest.mark.parametrize("chunk", [False, True])
@pytest.mark.parametrize("expected_groups", [None, [0, 1, 2], np.array([0, 1, 2])])
# TODO: make this intp when python 3.8 is dropped
@pytest.mark.parametrize("expected_groups", [None, [0, 1, 2], np.array([0, 1, 2], dtype=np.int64)])
@pytest.mark.parametrize(
"func, array, by, expected",
[
Expand Down Expand Up @@ -148,7 +149,12 @@ def test_groupby_reduce(
)
# we use pd.Index(expected_groups).to_numpy() which is always int64
# for the values in this tests
g_dtype = by.dtype if expected_groups is None else np.int64
if expected_groups is None:
g_dtype = by.dtype
elif isinstance(expected_groups, np.ndarray):
g_dtype = expected_groups.dtype
else:
g_dtype = np.int64

assert_equal(groups, np.array([0, 1, 2], g_dtype))
assert_equal(expected_result, result)
Expand Down Expand Up @@ -653,7 +659,7 @@ def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None:
array = [1, 1, 1, 1, 1, 1]
labels = [0.2, 1.5, 1.9, 2, 3, 20]

if method in ["split-reduce", "cohorts"] and chunk_labels:
if method == "cohorts" and chunk_labels:
pytest.xfail()

if chunks:
Expand Down Expand Up @@ -784,10 +790,8 @@ def test_dtype_preservation(dtype, func, engine):


@requires_dask
@pytest.mark.parametrize("dtype", [np.int32, np.int64])
@pytest.mark.parametrize(
"labels_dtype", [pytest.param(np.int32, marks=pytest.mark.xfail), np.int64]
)
@pytest.mark.parametrize("dtype", [np.float32, np.float64, np.int32, np.int64])
@pytest.mark.parametrize("labels_dtype", [np.float32, np.float64, np.int32, np.int64])
@pytest.mark.parametrize("method", ["map-reduce", "cohorts"])
def test_cohorts_map_reduce_consistent_dtypes(method, dtype, labels_dtype):
repeats = np.array([4, 4, 12, 2, 3, 4], dtype=np.int32)
Expand Down Expand Up @@ -836,10 +840,7 @@ def test_cohorts_nd_by(func, method, axis, engine):
assert_equal(actual, expected)

actual, groups = groupby_reduce(array, by, sort=False, **kwargs)
if method == "map-reduce":
assert_equal(groups, np.array([1, 30, 2, 31, 3, 4, 40], dtype=np.int64))
else:
assert_equal(groups, np.array([1, 30, 2, 31, 3, 40, 4], dtype=np.int64))
assert_equal(groups, np.array([1, 30, 2, 31, 3, 4, 40], dtype=np.int64))
reindexed = reindex_(actual, groups, pd.Index(sorted_groups))
assert_equal(reindexed, expected)

Expand Down

0 comments on commit 9c3a810

Please sign in to comment.