feat: Add partition awareness to GroupBy to skip shuffle operations#60869
feat: Add partition awareness to GroupBy to skip shuffle operations#60869Annmool wants to merge 7 commits intoray-project:masterfrom
Conversation
Signed-off-by: Annmool <aydv.267@gmail.com>
1d09b85 to
772d2b6
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces a valuable optimization for GroupBy operations by adding partition awareness to skip unnecessary shuffles. The implementation, which includes new utility functions for partition detection and modifications to GroupedData, is well-structured. The addition of unit tests is also a positive step.
I have identified a critical issue in how block metadata is collected, which would likely lead to a runtime error. Additionally, I've suggested an improvement to the error handling to enhance observability when the partition awareness check fails. Overall, this is a strong contribution that should significantly improve performance for workloads with pre-partitioned data.
python/ray/data/grouped_data.py
Outdated
| for ref, metadata in self._dataset.iter_internal_ref_bundles(): | ||
| if hasattr(metadata, 'blocks') and metadata.blocks: | ||
| for block_ref, block_metadata in metadata.blocks: | ||
| blocks_metadata.append(block_metadata) |
There was a problem hiding this comment.
The iteration over self._dataset.iter_internal_ref_bundles() appears to be incorrect. This method returns an iterator of RefBundle objects, not tuples. Each RefBundle can contain metadata for multiple blocks.
You should iterate over the RefBundle objects and then access their metadata property to get the list of BlockMetadata for all blocks within that bundle.
for bundle in self._dataset.iter_internal_ref_bundles():
blocks_metadata.extend(bundle.metadata)| except Exception as e: | ||
| # If anything goes wrong, fall back to regular shuffle | ||
| return False, f"Partition awareness check failed: {str(e)}" |
There was a problem hiding this comment.
The broad exception handler correctly falls back to the shuffle-based approach, which is a safe default. However, it currently might hide underlying issues in the partition awareness check by only returning the error message as a string.
To improve observability and make debugging easier, I recommend logging the exception as a warning. This will ensure that failures in this optimization path are more visible in the logs.
except Exception as e:
# If anything goes wrong, fall back to regular shuffle
logger = logging.getLogger(__name__)
logger.warning(
"Partition awareness check failed, falling back to shuffle: %s",
e,
exc_info=True,
)
return False, f"Partition awareness check failed: {str(e)}"…ion awareness check - Fix iteration over iter_internal_ref_bundles() to properly access RefBundle objects - Each RefBundle contains metadata property with blocks information - Add logging for exception handling to improve observability - Include stack trace in warning logs for debugging Signed-off-by: Annmool <aydv.267@gmail.com>
dcb1840 to
e7bd41e
Compare
… partition info, update tests, add logging Signed-off-by: Annmool <aydv.267@gmail.com>
…rsing, logging, and tests Signed-off-by: Annmool <aydv.267@gmail.com>
…uristic; update tests; tighten partition parsing Signed-off-by: Annmool <aydv.267@gmail.com>
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
|
This pull request has been automatically closed because there has been no more activity in the 14 days Please feel free to reopen or open a new pull request if you'd still like this to be addressed. Again, you can always ask for help on our discussion forum or Ray's public slack channel. Thanks again for your contribution! |
Description
This PR adds partition-awareness optimization to Ray Data’s GroupBy operations.
Right now,
groupby()always triggers aHashShuffleOperation, even when the dataset is already partitioned by the same column (for example, Hive-style partitioned Parquet data). In those cases, the shuffle is unnecessary and causes avoidable network transfer and memory overhead.With this change, we first check whether the dataset is already partitioned by the groupby key using block metadata and input file paths. If it is, we skip the shuffle entirely and perform the groupby directly on the existing blocks. If not, the behavior remains unchanged and we fall back to the regular shuffle-based execution.
This optimization is fully backward compatible and does not introduce any API changes.
Changes
1-
partition_aware.py(NEW)extract_partition_values_from_paths()Parses Hive-style file paths and extracts partition column values.
is_partition_aware_groupby_possible()Validates whether the dataset blocks are already partitioned in a way that allows us to safely skip the shuffle phase.
2-
grouped_data.py(MODIFIED)_check_partition_awareness()to theGroupedDataclass.map_groups()to evaluate partition awareness before triggering a shuffle.3-
test_partition_aware_groupby.py(NEW)Impact