[data] Update streaming_repartition and map_batches_fusion#59476
[data] Update streaming_repartition and map_batches_fusion#59476alexeykudinkin merged 22 commits intoray-project:masterfrom
Conversation
The GIL makes checking s`elf._serialize_cache is not None` atomic, so we don't need lock. Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: xgui <xgui@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request updates the operator fusion logic for StreamingRepartition and MapBatches. The main change is to allow fusion of MapBatches -> StreamingRepartition when the batch_size of MapBatches is a multiple of target_num_rows_per_block of StreamingRepartition. The reverse fusion (StreamingRepartition -> MapBatches) is disabled. The changes look good and are well-tested. I've added one comment to improve code readability in a test file.
There was a problem hiding this comment.
Bug: Fused operator uses wrong target size for output blocks
The StreamingRepartitionRefBundler is created with batch_size (from the upstream MapBatches operator) instead of target_num_rows_per_block (from the downstream StreamingRepartition operator). Previously this was correct because the fusion condition required batch_size == target_num_rows_per_block. After changing the condition to batch_size % target_num_rows_per_block == 0, fusion now occurs when batch_size is larger (e.g., batch_size=100, target_num_rows=20). The fused operator will produce batch_size-sized output blocks instead of target_num_rows_per_block-sized blocks, violating the expected repartition behavior.
python/ray/data/_internal/logical/rules/operator_fusion.py#L308-L309
ray/python/ray/data/_internal/logical/rules/operator_fusion.py
Lines 308 to 309 in 44380fa
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
…ct#59476) Analysis of the two operator patterns: ## Streaming_repartition → map_batches | | Number of `map_batches` tasks | |----------------------|---------------------------------------------------------------------------| | **Fused** | `num_input_blocks` (which is ≤ number of output blocks of StreamingRepartition) | | **Not fused** | number of output blocks of StreamingRepartition | When fused, the number of tasks equals the number of input blocks, which is ≤ the number of output blocks of StreamingRepartition. If StreamingRepartition is supposed to break down blocks to increase parallelism, that won't happen when fused. So we don't fuse. --- ## Map_batches → streaming_repartition `batch_size % target_num_rows == 0` | | Number of `map_batches` tasks | |----------------------|-------------------------------| | **Fused** | == total_rows / batch_size | | **Not fused** | == total_rows / batch_size | So, the fusion doesn’t affect the parallelism. --- Thus, we currently disable the `Streaming_repartition → map_batches` fusion and enable the fusion when `batch_size % target_num_rows == 0` for `Map_batches → streaming_repartition`. --------- Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> Signed-off-by: xgui <xgui@anyscale.com> Signed-off-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
…ct#59476) Analysis of the two operator patterns: ## Streaming_repartition → map_batches | | Number of `map_batches` tasks | |----------------------|---------------------------------------------------------------------------| | **Fused** | `num_input_blocks` (which is ≤ number of output blocks of StreamingRepartition) | | **Not fused** | number of output blocks of StreamingRepartition | When fused, the number of tasks equals the number of input blocks, which is ≤ the number of output blocks of StreamingRepartition. If StreamingRepartition is supposed to break down blocks to increase parallelism, that won't happen when fused. So we don't fuse. Signed-off-by: lee1258561 <lee1258561@gmail.com> --- ## Map_batches → streaming_repartition `batch_size % target_num_rows == 0` | | Number of `map_batches` tasks | |----------------------|-------------------------------| | **Fused** | == total_rows / batch_size | | **Not fused** | == total_rows / batch_size | So, the fusion doesn’t affect the parallelism. --- Thus, we currently disable the `Streaming_repartition → map_batches` fusion and enable the fusion when `batch_size % target_num_rows == 0` for `Map_batches → streaming_repartition`. --------- Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> Signed-off-by: xgui <xgui@anyscale.com> Signed-off-by: Alexey Kudinkin <alexey.kudinkin@gmail.com> Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
Analysis of the two operator patterns:
Streaming_repartition → map_batches
map_batchestasksnum_input_blocks(which is ≤ number of output blocks of StreamingRepartition)When fused, the number of tasks equals the number of input blocks, which is
≤ the number of output blocks of StreamingRepartition. If StreamingRepartition
is supposed to break down blocks to increase parallelism, that won't happen
when fused. So we don't fuse.
Map_batches → streaming_repartition
batch_size % target_num_rows == 0map_batchestasksSo, the fusion doesn’t affect the parallelism.
Thus, we currently disable the
Streaming_repartition → map_batchesfusion and enable the fusion whenbatch_size % target_num_rows == 0forMap_batches → streaming_repartition.