[Data][WIP] Implement Join on hash shuffle v2 #64538
Conversation
Generalize ShuffleReduceOp / _shuffle_reduce_task to N co-partitioned inputs (pairing per (partition, input), all-inputs-done flush, tables_by_input). This is now based on master after ray-project#64481, so the PR carries only multi-input support and not the streaming-reduce removal. Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request updates the shuffle execution framework to support multi-input shuffle reduce operations, enabling a new V2 shuffle-based join implementation. Key changes include modifying ShuffleReduceOp to accept multiple upstream ShuffleMapOp inputs, updating the reduce task to process lists of tables grouped by input, and planning joins using the new shuffle path. The review feedback highlights a critical issue where incremental_resource_usage underestimates peak memory for multi-input shuffles by only considering the first dependency, which could lead to out-of-memory errors. Additionally, defensive checks are recommended in hash_shuffle_v2.py to prevent potential IndexErrors when accessing tables_by_input.
| return ExecutionResources.from_resource_dict( | ||
| self._reduce_task_remote_args(memory) | ||
| ) |
There was a problem hiding this comment.
In multi-input shuffles (such as joins), the incremental_resource_usage method underestimates the peak memory requirement because it only calculates the average partition size from the first upstream dependency (self.input_dependencies[0]). This ignores the memory requirements of the other inputs (e.g., the right side of the join), which can lead to severe underestimation of the peak memory required by the reduce task and potentially cause node out-of-memory (OOM) errors due to over-scheduling.\n\nTo fix this, we should aggregate the partition bytes from all upstream dependencies. For example:\n\npython\n def incremental_resource_usage(self) -> ExecutionResources:\n \"\"\"Per-task resource ask for the framework's budget allocator.\"\"\"\n from collections import defaultdict\n combined_partition_bytes = defaultdict(int)\n for upstream in self.input_dependencies:\n assert isinstance(upstream, ShuffleMapOp)\n for part_id, bytes_val in upstream.get_partition_bytes().items():\n combined_partition_bytes[part_id] += bytes_val\n \n memory = 0\n sizes = [b for b in combined_partition_bytes.values() if b > 0]\n if sizes:\n avg_bytes = sum(sizes) / len(sizes)\n memory = int(avg_bytes * SHUFFLE_PEAK_MEMORY_MULTIPLIER)\n return ExecutionResources.from_resource_dict(\n self._reduce_task_remote_args(memory)\n )\n
| Used under the partition = block contract: called once per partition with | ||
| the full shard list so the output is exactly one block. | ||
| """ | ||
| tables = tables_by_input[0] |
| def _reduce( | ||
| partition_id: int, tables_by_input: List[List[pa.Table]] | ||
| ) -> Iterable[pa.Table]: | ||
| tables = tables_by_input[0] |
Description
Blocked by #64532 #64438 , shouls rebase after they get merged, this pr is self-contained right now.
Related issues
Additional information