Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Enable sort over multiple keys in datasets #37124

Merged
merged 21 commits into from
Aug 10, 2023

Conversation

jaidisido
Copy link
Contributor

@jaidisido jaidisido commented Jul 5, 2023

Why are these changes needed?

This is a first draft into enabling sorting Ray datasets over multiple keys (i.e. passing a list of keys to .sort()).

A new unit test test_sort_with_multiple_keys showcases an example.

Related issue number

#25732

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Contributor Author

@jaidisido jaidisido left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarifications

table = sort(self._table, key, descending)
if len(boundaries) == 0:
return [table]

partitions = []
# For each boundary value, count the number of items that are less
# than it. Since the block is sorted, these counts partition the items
# such that boundaries[i] <= x < boundaries[i + 1] for each x in
# partition[i]. If `descending` is true, `boundaries` would also be
# in descending order and we only need to count the number of items
# *greater than* the boundary value instead.
Copy link
Contributor Author

@jaidisido jaidisido Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np.searchsorted does not support multi-dimensional arrays. This implementation over multiple keys is not vectorised and therefore not as optimal/performant. Would appreciate pointers on how to improve it.

Note that pyarrow table to_pylist method was only introduced in pyarrow version >=7, so a custom _to_pylist function with the same logic is used to support arrow 6

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird that I find we are already using to_pylist and our CI also test against pyarrow 6. Not sure why that can work. I'll confirm. also, in this is indeed needed, we can move the definition to python/ray/data/_internal/arrow_ops/transform_pyarrow.py.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI for pyarrow 6 was failing. Thanks, moved to arrow_ops.

Signed-off-by: Abdel Jaidi <jaidisido@gmail.com>
][1:]
for k, v in sample_dict.items()
}
return [
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sample_boundaries now returns a list of tuples sorted across the keys. For example:

[("A", 1, True), ("A", 2, False), ("B", 1, False)...]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add this to the function comment? and could you also add some comments explaining the above code? It's not very easy to understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that dict/list comprehension was a bit heavy. I've refactored it & removed sorting inside of the loop. Added clarifying comments as well.

Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
@kukushking kukushking force-pushed the feat/ray-data-sort-multi-keys branch from 49729c5 to 7715bfb Compare July 7, 2023 15:32
@zhe-thoughts
Copy link
Collaborator

Question from live discussion: is it OK to start with PyArrow blocks and later extend to more generic cases? @raulchen

@raulchen raulchen self-assigned this Jul 7, 2023
@raulchen
Copy link
Contributor

raulchen commented Jul 7, 2023

@zhe-thoughts I think that is fine as long as the code structure is generic and extensible in the future. E.g., we should define some abstraction in the base class and leave some subclasses as NotImplemented. Will look at this PR later today.

@kukushking
Copy link
Contributor

Hi @zhe-thoughts @raulchen fyi this PR also includes pandas blocks implementation now - added on Friday

Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
@raulchen
Copy link
Contributor

Sorry for the delay. Just got a chance to fully read through the whole PR today.

One major concern is that the type definition of the sort key has been been messy and inconsistent across the code base. Of course, this is an existing issue. But extending to multiple sort columns would make the situation worse. I'm wondering if you'd be able to do a sanitization refactor around this (If you don't have time, we can work on this as well).

Here is the current situation:

  • For Dataset.sort, SortStage, _validate_key_fn, and the Sort logical op, we use key: Optinal[str], descending: bool.
  • But for generate_sort_fn, SortTaskSpec, _internal/sort.py, and BlockAccessor.sample, we use key: SortKeyT, which is Union[None, List[Tuple[str, str]], Callable[[T], Any]].
    • For SortKeyT, if it's List[Tuple[str, str]], it'd be something like [("column1", "ascending"), ("column2", "descending")]. Because of this, we have a lot of code doing something like col = key[0][0] or isinstance(key, list), which is very hard to read. I believe the reason why it's defined as this is to match the definition of pyarrow.compute.sort_indices. Also, the Callable case seems to be not implemented at all (@c21 correct me if I'm wrong).

To sanitize this, I'm thinking of the following approach:

  • At the API level (only Dataset.sort), we still use key: Optinal[str], descending: bool. And extend them to lists after this PR.
  • For everything else, we should define a SortKey class to incapsulate all the related logic. The class will provide methods like get_columns, to_arrow_sort_args, to_pandas_sort_args, etc.

python/ray/data/tests/test_sort.py Show resolved Hide resolved
@@ -2014,7 +2014,9 @@ def std(
ret = self._aggregate_on(Std, on, ignore_nulls, ddof=ddof)
return self._aggregate_result(ret)

def sort(self, key: Optional[str] = None, descending: bool = False) -> "Dataset":
def sort(
self, key: Optional[Union[str, List[str]]] = None, descending: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • same as pandas.DataFrame.sort_values, descending should also accept a list of booleans?
  • Can you also update the example and comment below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense. I was wondering what high-level api that allows multi-directional sort should look like for the user - whether it's a list of tuples like it is in some parts of the existing codebase or something else. sort_values is a good example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multi-directional sort might be ambitious in my opinion. It would require a completely different approach as bisectdoes not support it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that can be done with a custom key function for bisect. Did you try that?
Also by using the custom key function, I think we can avoid reversing the order of the items.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the key argument to bisect was only introduced in Python 3.10+, so isn't available in 3.8, 3.9
https://docs.python.org/3.10/library/bisect.html
https://docs.python.org/3.8/library/bisect.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. But then I think it's worth it to implement our own bisect to avoid copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Custom multi-order key function would work well if it's just simple types like ints i.e. lambda x: x[0], -x[1] but it gets more complicated with other types.

table = sort(self._table, key, descending)
if len(boundaries) == 0:
return [table]

partitions = []
# For each boundary value, count the number of items that are less
# than it. Since the block is sorted, these counts partition the items
# such that boundaries[i] <= x < boundaries[i + 1] for each x in
# partition[i]. If `descending` is true, `boundaries` would also be
# in descending order and we only need to count the number of items
# *greater than* the boundary value instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird that I find we are already using to_pylist and our CI also test against pyarrow 6. Not sure why that can work. I'll confirm. also, in this is indeed needed, we can move the definition to python/ray/data/_internal/arrow_ops/transform_pyarrow.py.

# For each boundary value, count the number of items that are less
# than it. Since the block is sorted, these counts partition the items
# such that boundaries[i] <= x < boundaries[i + 1] for each x in
# partition[i]. If `descending` is true, `boundaries` would also be
# in descending order and we only need to count the number of items
# *greater than* the boundary value instead.
table_items = [tuple(d.values()) for d in _to_pylist(table.select(columns))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks heavy. I guess it's possible to bisect on the original arrow table, without converting it to a pylist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would obviously prefer to bisect on the entire arrow table but not sure how you are suggesting to do it here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of doing the bisect with the original arrow table, with a custom key argument.

Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
@kukushking
Copy link
Contributor

Hi @raulchen yeah that makes sense. I'll give the approach a shot.

On another note - looking at the last ci there's two failures seemingly unrelated to the PR - ray train (python/ray/train/tests/test_lightning_deepspeed.py::test_deepspeed_stages) and link check. Do you know what to do with those? In particular train test - is that just flaky, or potentially shows a performance drop?

@raulchen
Copy link
Contributor

hey @kukushking, the code looks good to me. But I don't fully understand your proposal. Are you proposing to keep SortKeyT for some APIs?

@kukushking
Copy link
Contributor

@raulchen Yes that's right. It seems that BlockAccessor methods are also user-facing so it would make sense to keep the same definition as Dataset.sort so wanted to hear your opinion here. Happy to merge my branch into this PR as-is though.

@raulchen
Copy link
Contributor

@kukushking I see. But BlockAccessor isn't supposed to be user-facing. If any docs/comments cause the confusion, please let us know. We should update them. By the way, if possible, it'd be more preferable to submit the refactor change as a separate PR, so the change is easier to review and track. But if that will bring significant overheads, using 1 PR is acceptable as well.

@kukushking
Copy link
Contributor

@raulchen sounds good, I will submit separate PR. Thanks!

@PhysicsACE
Copy link

@raulchen I was going to submit this PR before but due to hardware issues, I was unable to access my local repo as I did not have access to my computer. I have a working prototype of multikey sorting/grouping/aggregating on my forked branch https://github.com/PhysicsACE/ray/tree/multikey and if it looks good, I would be happy to merge the changes into this PR for the requested feature.

@raulchen
Copy link
Contributor

@raulchen I was going to submit this PR before but due to hardware issues, I was unable to access my local repo as I did not have access to my computer. I have a working prototype of multikey sorting/grouping/aggregating on my forked branch https://github.com/PhysicsACE/ray/tree/multikey and if it looks good, I would be happy to merge the changes into this PR for the requested feature.

Sorry, I am confused. How is that branch different from this PR? Do you mean that branch not only implements sort, but also groupby and agg? I think it'd be better to submit different PRs if possible.

@kukushking kukushking mentioned this pull request Jul 24, 2023
8 tasks
@PhysicsACE
Copy link

@raulchen I was going to submit this PR before but due to hardware issues, I was unable to access my local repo as I did not have access to my computer. I have a working prototype of multikey sorting/grouping/aggregating on my forked branch https://github.com/PhysicsACE/ray/tree/multikey and if it looks good, I would be happy to merge the changes into this PR for the requested feature.

Sorry, I am confused. How is that branch different from this PR? Do you mean that branch not only implements sort, but also groupby and agg? I think it'd be better to submit different PRs if possible.

It implements both sort and groupby. It is different than this branch in terms of implementation as it has a custom searchsorted function to support directions for each column that can be integrated with this PR. Once sort is supported, I can create a separate PR for groupby as it requires the same sort_and_partition function.

@raulchen
Copy link
Contributor

@raulchen I was going to submit this PR before but due to hardware issues, I was unable to access my local repo as I did not have access to my computer. I have a working prototype of multikey sorting/grouping/aggregating on my forked branch https://github.com/PhysicsACE/ray/tree/multikey and if it looks good, I would be happy to merge the changes into this PR for the requested feature.

Sorry, I am confused. How is that branch different from this PR? Do you mean that branch not only implements sort, but also groupby and agg? I think it'd be better to submit different PRs if possible.

It implements both sort and groupby. It is different than this branch in terms of implementation as it has a custom searchsorted function to support directions for each column that can be integrated with this PR. Once sort is supported, I can create a separate PR for groupby as it requires the same sort_and_partition function.

I see. thanks for the clarification. Maybe let's wait until this PR is merged, and then you can submit a separate one?

LeonLuttenberger and others added 4 commits July 28, 2023 15:18
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
descending: Whether to sort in descending order.
key: The column or a list of columns to sort by.
descending: Whether to sort in descending order. Must be a boolean or a list
of booleans matching the number of the colunns.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo columns

Copy link
Contributor

@kukushking kukushking Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Fixed

if len(descending) != len(key):
raise ValueError(
f"Descending must be a boolean or a list of booleans, "
f"but got {descending}."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error message is a bit misleading. just say lengths don't match?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, thanks!

][1:]
for k, v in sample_dict.items()
}
return [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add this to the function comment? and could you also add some comments explaining the above code? It's not very easy to understand.

Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
@raulchen raulchen merged commit bbc2df7 into ray-project:master Aug 10, 2023
56 of 60 checks passed
@LeonLuttenberger LeonLuttenberger deleted the feat/ray-data-sort-multi-keys branch August 10, 2023 18:18
shrekris-anyscale pushed a commit to shrekris-anyscale/ray that referenced this pull request Aug 10, 2023
This is a first draft into enabling sorting Ray datasets over multiple keys (i.e. passing a list of keys to .sort()).

A new unit test test_sort_with_multiple_keys showcases an example.

---------

Signed-off-by: Abdel Jaidi <jaidisido@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Leon Luttenberger <luttenberger.leon@gmail.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
This is a first draft into enabling sorting Ray datasets over multiple keys (i.e. passing a list of keys to .sort()).

A new unit test test_sort_with_multiple_keys showcases an example.

---------

Signed-off-by: Abdel Jaidi <jaidisido@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Leon Luttenberger <luttenberger.leon@gmail.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: NripeshN <nn2012@hw.ac.uk>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
This is a first draft into enabling sorting Ray datasets over multiple keys (i.e. passing a list of keys to .sort()).

A new unit test test_sort_with_multiple_keys showcases an example.

---------

Signed-off-by: Abdel Jaidi <jaidisido@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Leon Luttenberger <luttenberger.leon@gmail.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: harborn <gangsheng.wu@intel.com>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
This is a first draft into enabling sorting Ray datasets over multiple keys (i.e. passing a list of keys to .sort()).

A new unit test test_sort_with_multiple_keys showcases an example.

---------

Signed-off-by: Abdel Jaidi <jaidisido@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Leon Luttenberger <luttenberger.leon@gmail.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
This is a first draft into enabling sorting Ray datasets over multiple keys (i.e. passing a list of keys to .sort()).

A new unit test test_sort_with_multiple_keys showcases an example.

---------

Signed-off-by: Abdel Jaidi <jaidisido@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Leon Luttenberger <luttenberger.leon@gmail.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
This is a first draft into enabling sorting Ray datasets over multiple keys (i.e. passing a list of keys to .sort()).

A new unit test test_sort_with_multiple_keys showcases an example.

---------

Signed-off-by: Abdel Jaidi <jaidisido@gmail.com>
Signed-off-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Anton Kukushkin <kukushkin.anton@gmail.com>
Co-authored-by: Leon Luttenberger <luttenberger.leon@gmail.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Victor <vctr.y.m@example.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants