-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
"most common" Aggregator with Dask #263
Comments
Ah, now this is interesting! The general solution is hard and approximate (I think). You'll need to implement something like count min sketch (I'm sure there are others) The easier way: You'll have to decompose the problem and compute each unique item and associated count in Are there useful properties for the field you're collapsing or the field you're grouping by (example plots of both these fields would be useful)? For examlke
|
Hi Deepak, thank you for your reply.
I attempted to do this, however I am unsure of how to exactly make this work. Below is the code, but I am not sure of how to wrap the individual functions in such a way that they're compatible with Aggregation. I could not make much sense of the documentation/docstrings beyond the very simple examples which are available. I also received some errors relating to setting a element with a sequence (as each chunk will return an array with unique values, instead of a single value). Codedef unique_labels(a: np.ndarray) -> np.ndarray:
labels = np.unique(a)
return labels
def unique_counts(a: np.ndarray) -> np.ndarray:
_, counts = np.unique(a, return_counts=True)
return counts
def most_common_chunked(multi_values: np.ndarray, multi_counts: np.ndarray, **kwargs):
all_values, index = np.unique(multi_values, return_inverse=True)
all_counts = np.zeros(all_values.size, np.int64)
np.add.at(all_counts, index, multi_counts.ravel()) # inplace
return all_values[all_counts.argmax()]
most_common = Aggregation(
name="most_common",
numpy=_custom_grouped_reduction,
chunk=(unique_labels, unique_counts), # first compute blockwise
combine=(wrap_stack, wrap_stack), # stack these intermediate results
finalize=most_common_chunked, # get most common value from the combined result
fill_value=0,
)
Our specific use case is a high resolution land cover dataset, which we want to be able to regrid easily. Of course once the "most common" strategy works, knowing the 2nd (or n-th) most common is also interesting. Due to the nature of our dataset it will have a limited number of unique labels. |
Unfortunately, this will need some major thinking. You'll have to handle the unique and count intermediates together, similar to how the Is it possible to rechunk so that a blockwise solution works? Can you describe your problem precisely? A reproducible example would be best...
OK that's good for the exact nature of this solution. |
Also |
#269 should fix your blockwise problems. I'm thinking I can just add support for |
Hi Deepak, thank you for your replies and the modifications to the code. I am currently too busy with other projects to focus on this, but I will get back to it and try it out as soon as I can make some time!
Yes that is fine.
I do have a demo notebook on our repository where I show a basic use case. It is not easily reproducible as it requires a very large dataset at the moment, but it might give you a better idea of what we are trying to achieve. |
Ah I've been dense. We can simply compute the import flox
import numpy as np
import pandas as pd
array = np.array([[0, 0, 0, 1, 1, 5, 4, 5, 5, 3], [0, 0, 0, 1, 1, 3, 4, 5, 5, 3]])
array[1, :] += 4
by = np.array([0, 0, 0, 1, 1, 2, 2, 2, 3, 3]) The default only works with numpy and problems that can be executed flox.groupby_reduce(array, by, func="mode", axis=-1)
This is the more general implementation that should always work regardless of chunking result, uniques, groups = flox.groupby_reduce(array, array, np.broadcast_to(by, array.shape), func="count", fill_value=0, axis=-1)
uniques[result.argmax(axis=-2)], groups
With dask, you'll need to provide the unique values of |
Hi! Thanks for developing Flox, it has been quite useful in our workflows.
We have been working on a method to reduce categorical data using flox, using a "most common" strategy.
This works very well for in-memory datasets, however, not on Dask data (and I have not been able to get it to work with Dask data).
If the input is dask data, the following error is returned
If
method="blockwise"
is set, then the following error is returned.I have tried to implement
chunk=
andcombine=
, but I don't think that is possible with this method.Memory limit of xarray_reduce
Additionally, because of the size of our input dataset the
group_idx
array does not fit in memory. This is due to the size beingdim1 * dim2 * ... * size(np.int64)
.Our current workaround is to apply the reduction on subset slices of the source data and target data, and then merging them. This works OK but is quite ugly and slow.
Is there another way to avoid this limit?
Most common implementation
The text was updated successfully, but these errors were encountered: