Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

numpy_groupies #4540

Closed
wants to merge 4 commits into from
Closed

numpy_groupies #4540

wants to merge 4 commits into from

Conversation

max-sixty
Copy link
Collaborator

Very early effort — I found this harder than I expected — I was trying to use the existing groupby infra, but think I maybe should start afresh. The result of the numpy_groupies operation is a fully formed array, whereas we're used to handling an iterable of results which need to be concat.

I also added some type signature / notes and I was going through the existing code; mostly for my own understanding

If anyone has any thoughts, feel free to comment — otherwise I'll resume this soon

@@ -404,6 +418,8 @@ def __init__(
self._groups = None
self._dims = None

# TODO: is this correct? Should we be returning the dims of the result? This
# will use the original dim where we're grouping by a coord.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the existing code for this property correct? Currently x.groupby(foo).dims != x.groupby(foo).sum(...).dims when we're grouping by an non-indexed coord

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns the dims for the first group, so you can decide what dim values can be passed to .mean. But I agree it is confusing. Maybe we should deprecate and remove. This use-case is also served by implementing GroupBy.__getitem__

# Conflicts:
#	asv_bench/benchmarks/unstacking.py
#	xarray/core/options.py
Copy link
Contributor

@dcherian dcherian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@max-sixty let me know how I can help move this forward.


# The remainder is mostly copied from `_combine`

# FIXME: this part seems broken at the moment — the `_infer_concat_args`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since npg returns a full array, the concat bit isn't needed any more so combined = applied. I think you could just delete all the concat code.

return grouped


def npg_aggregate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be moved down to Variable.

@@ -404,6 +418,8 @@ def __init__(
self._groups = None
self._dims = None

# TODO: is this correct? Should we be returning the dims of the result? This
# will use the original dim where we're grouping by a coord.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns the dims for the first group, so you can decide what dim values can be passed to .mean. But I agree it is confusing. Maybe we should deprecate and remove. This use-case is also served by implementing GroupBy.__getitem__


def mean(self, dim=None):
grouped = self._npg_groupby(func="mean")
if dim:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not right for mean. Would it work if we broadcast indices along all reduction dims before passing to npg?


applied = applied_example = type(self._obj)(
data=array,
dims=tuple(self.dims_()),
Copy link
Contributor

@dcherian dcherian Mar 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to assign coordinate variables from self._obj here or maybe the apply_ufunc version will solve that

from numpy_groupies.aggregate_numba import aggregate

axis = da.get_axis_num(dim)
return aggregate(group_idx=group_idx, a=da, func=func, axis=axis)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return aggregate(group_idx=group_idx, a=da, func=func, axis=axis)
return aggregate(group_idx=group_idx, a=da.data, func=func, axis=axis)

Could make this the following (from @shoyer's notebook) or do that later...

def _binned_agg(
    array: np.ndarray,
    indices: np.ndarray,
    num_bins: int,
    *,
    func,
    fill_value,
    dtype,
) -> np.ndarray:
    """NumPy helper function for aggregating over bins."""
    mask = np.logical_not(np.isnan(indices))
    int_indices = indices[mask].astype(int)
    shape = array.shape[: -indices.ndim] + (num_bins,)
    result = numpy_groupies.aggregate_numpy.aggregate(
        int_indices,
        array[..., mask],
        func=func,
        size=num_bins,
        fill_value=fill_value,
        dtype=dtype,
        axis=-1,
    )
    return result


def groupby_bins_agg(
    array: xarray.DataArray,
    group: xarray.DataArray,
    bins,
    func="sum",
    fill_value=0,
    dtype=None,
    **cut_kwargs,
) -> xarray.DataArray:
    """Faster equivalent of Xarray's groupby_bins(...).sum()."""
    # TODO: implement this upstream in xarray:
    # https://github.com/pydata/xarray/issues/4473
    binned = pd.cut(np.ravel(group), bins, **cut_kwargs)
    new_dim_name = group.name + "_bins"
    indices = group.copy(data=binned.codes.reshape(group.shape))

    result = xarray.apply_ufunc(
        _binned_agg,
        array,
        indices,
        input_core_dims=[indices.dims, indices.dims],
        output_core_dims=[[new_dim_name]],
        output_dtypes=[array.dtype],
        dask_gufunc_kwargs=dict(
            output_sizes={new_dim_name: binned.categories.size},
        ),
        kwargs={
            "num_bins": binned.categories.size,
            "func": func,
            "fill_value": fill_value,
            "dtype": dtype,
        },
        dask="parallelized",
    )
    result.coords[new_dim_name] = binned.categories
    return result

@max-sixty
Copy link
Collaborator Author

Cheers @dcherian . I've been a bit absent from xarray features for the past couple of months as you know.

If you want to take this on, please feel free. It's still at the top of my list — so I would get to it — but I really don't want to slow the progress.

@max-sixty
Copy link
Collaborator Author

@dcherian I know we discussed this a couple of weeks ago, and I said I was hoping to take another pass at this using https://github.com/dcherian/dask_groupby

For transparency, I haven't managed to spend any time on this yet. I certainly don't want to block you if you're interested in taking this forward.

FWIW my selfish motivation is more around speeding up in-memory groupbys than dask groupbys — the elegance of numpy_groupies & dask_groupby is that they can potentially be unified.

@dcherian
Copy link
Contributor

No worries @max-sixty! It looks like @andersy005 will be trying to get this completed soon

@tlogan2000
Copy link

Hello all, my fellow xclim https://github.com/Ouranosinc/xclim devs and I would be interested to know if this PR is still moving forward? xr.resample is pretty fundamental to much of our package so we are definitely interested in seeing the possibility of improved performance.

cheers

@dcherian
Copy link
Contributor

Hi @tlogan2000 , this is moving forward but slowly.

The good news is that I got resampling working this week (at least for some minimal test cases). It works by rechunking so that every chunk boundary lines up with a group boundary, and then applies numpy_groupies blockwise.

https://github.com/dcherian/dask_groupby/blob/8a30c4b20f23acacfc3b53dbeab2a7b268ecd3fc/dask_groupby/xarray.py#L268-L272

You can test it out with something like this

dask_groupby.xarray.resample_reduce(ds.resample(time="M"), func="mean")

For general groupby use

dask_groupby.xarray.xarray_reduce(...)

This still fails a few xarray tests but a lot of them pass! Let me know how it goes and please file bug reports over at the dask_groupby repo if you find any.

@tlogan2000
Copy link

FYI @aulemahal @Zeitsperre @huard ... re: xclim discussion yesterday. If we have spare moments in the following weeks we could try a few tests on our to end to benchmark and provide bug reports

@max-sixty max-sixty closed this Oct 24, 2021
@max-sixty max-sixty deleted the npg branch February 5, 2022 22:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants