Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Add Pandas-native groupby and sorting. #26313

Conversation

clarkzinzow
Copy link
Contributor

This PR adds a Pandas-native implementation of groupby and sorting for Pandas blocks. Before this PR, we were converting to Arrow, doing groupbys + aggregations and sorting in Arrow land, and then converting back to Pandas; this to-from-Arrow conversion was happening both on the map side and the reduce side, which was very inefficient for Pandas blocks (many extra table copies). By adding Pandas-native groupby + sorting, we should see a decrease in memory utilization and faster performance when using the AIR preprocessors.

Related issue number

Closes #21296

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@clarkzinzow clarkzinzow marked this pull request as ready for review July 5, 2022 23:41
@clarkzinzow clarkzinzow changed the title [Datasets[ Add Pandas-native groupby and sorting. [Datasets] Add Pandas-native groupby and sorting. Jul 5, 2022
@clarkzinzow clarkzinzow force-pushed the datasets/feat/pandas-groupby-sort-impl branch from 0e30293 to f7d7d93 Compare July 6, 2022 15:26
@jjyao jjyao self-assigned this Jul 6, 2022
bounds = table[col].searchsorted(boundaries)
last_idx = 0
for idx in bounds:
# Slices need to be copied to avoid including the base table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment relevant for pandas table as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not, let me remove that!

"""
if key is not None and not isinstance(key, str):
raise ValueError(
"key must be a string or None when aggregating on Arrow blocks, but "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer Arrow :)

@clarkzinzow clarkzinzow force-pushed the datasets/feat/pandas-groupby-sort-impl branch from f7d7d93 to b116873 Compare July 19, 2022 20:18
Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Having some minor comments/questions. Curious how much performance improvement we saw during testing?

)

if self._table.shape[0] == 0:
# If the pyarrow table is empty we may not have schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow table -> pandas DataFrame?


def combine(self, key: KeyFn, aggs: Tuple[AggregateFn]) -> "pandas.DataFrame":
# TODO (kfstorm): A workaround to pass tests. Not efficient.
return BlockAccessor.for_block(self.to_arrow()).combine(key, aggs).to_pandas()
"""Combine rows with the same key into an accumulator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like very similar to ArrowBlockAccessor.combine, except the builder is different. Shall we refactor them into a single method?

This is a non-blocking comment.

stats = BlockExecStats.builder()
blocks = [b for b in blocks if b.shape[0] > 0]
if len(blocks) == 0:
ret = PandasBlockAccessor._empty_table()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just self._empty_table() as above?


@staticmethod
def merge_sorted_blocks(
blocks: List["pandas.DataFrame"], key: "SortKeyT", _descending: bool
blocks: List[Block[T]], key: "SortKeyT", _descending: bool
) -> Tuple["pandas.DataFrame", BlockMetadata]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's blocking us to change return type to Tuple[Block[T], BlockMetadata] same as ArrowBlockAccessor?

)
next_row = None
builder = PandasBlockBuilder()
while True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this merge-aggregate loop is also mostly same as Arrow's function. Non-blocking comment as well.

@c21
Copy link
Contributor

c21 commented Jul 21, 2022

Discussed offline, feel free to merge it as it is now, and I can add a followup PR to address the comments.

@clarkzinzow
Copy link
Contributor Author

Synced with @c21 offline, agreed with all feedback but @c21 is going to take it on as a follow-up PR so we can get this in.

@clarkzinzow clarkzinzow merged commit da97efb into ray-project:master Jul 21, 2022
Rohan138 pushed a commit to Rohan138/ray that referenced this pull request Jul 28, 2022
This PR adds a Pandas-native implementation of groupby and sorting for Pandas blocks. Before this PR, we were converting to Arrow, doing groupbys + aggregations and sorting in Arrow land, and then converting back to Pandas; this to-from-Arrow conversion was happening both on the map side and the reduce side, which was very inefficient for Pandas blocks (many extra table copies). By adding Pandas-native groupby + sorting, we should see a decrease in memory utilization and faster performance when using the AIR preprocessors.

Signed-off-by: Rohan138 <rapotdar@purdue.edu>
Stefan-1313 pushed a commit to Stefan-1313/ray_mod that referenced this pull request Aug 18, 2022
This PR adds a Pandas-native implementation of groupby and sorting for Pandas blocks. Before this PR, we were converting to Arrow, doing groupbys + aggregations and sorting in Arrow land, and then converting back to Pandas; this to-from-Arrow conversion was happening both on the map side and the reduce side, which was very inefficient for Pandas blocks (many extra table copies). By adding Pandas-native groupby + sorting, we should see a decrease in memory utilization and faster performance when using the AIR preprocessors.

Signed-off-by: Stefan van der Kleij <s.vanderkleij@viroteq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Datasets] [Pandas Block] Implement PandasBlockAccessor in pandas-native ways
3 participants