Skip to content

[data] Gate restore_original_order in iter_batches consumer pipeline behind preserve_order#63792

Merged
justinvyu merged 8 commits into
ray-project:masterfrom
justinvyu:justinvyu/gate-restore-original-order
Jun 4, 2026
Merged

[data] Gate restore_original_order in iter_batches consumer pipeline behind preserve_order#63792
justinvyu merged 8 commits into
ray-project:masterfrom
justinvyu:justinvyu/gate-restore-original-order

Conversation

@justinvyu
Copy link
Copy Markdown
Contributor

@justinvyu justinvyu commented Jun 2, 2026

Description

Part of the iter_batches consumer pipeline cleanup (#63660, #63682). Gates restore_original_order behind DataContext.execution_options.preserve_order (default off). When one format/collate worker lags, the reorder buffer grows with the other workers' completed batches, and ready batches aren't allowed to be yielded; this PR skips the reorder step when ordering isn't required. Recovers next-batch latency from PR1+2's regressed 113 ms steady back to 23 ms (lower than master's 32 ms), with no other regressions.

Why does restore_original_order reduce throughput?

Here's the chain that leads to higher average throughput:

  1. Straggler in one format/collate worker → other workers' batches dam up in the reorder buffer.
  2. Consumer's next(loader) blocks waiting for the buffer to flush → long next_batch latency, consumer idle.
  3. When the straggler finally finishes, the buffer releases the whole wave in batch_idx order → consumer pulls the straggler's batch first.
  4. That batch's H2D was just kicked off (finalize ran right before it entered the buffer) — so forward() queues kernels that depend on a tensor whose copy is still in flight → CUDA stream stalls on the H2D.
  5. The training step time is inflated for these straggler batches, dragging up the average.

Profiling the training worker

Before: loss.item() takes up ~25% of the profiling duration, since loss.item() is the cuda/CPU sync point. The slow straggler batch time manifests in loss.item() timing outliers.

Screenshot 2026-06-02 at 12 33 10 AM

After: loss.item() takes <2% of the profiling duration.

Screenshot 2026-06-02 at 12 32 41 AM

Release test results

Metric Master PR1+2 PR1+2+3 Δ vs master
peak_object_store_memory variant (sleep=2s, exposes consumer-side buffer fill)
peak obj store (GiB) 136.19 ± 0.32 79.89 ± 0.27 79.84 ± 0.22 −41%
throughput steady (rows/s) 460.5 ± 4.0 467.8 ± 5.9 473.3 ± 6.4 +3%
next-batch steady (ms) 1.67 ± 0.22 1.25 ± 0.17 1.44 ± 0.10 −14%
step avg (ms) 2213.9 ± 22.3 2179.8 ± 25.4 2158.4 ± 29.2 −2.5%
throughput variant (no sleep, pipeline is the rate-limiter)
peak obj store (GiB) 115.27 ± 12.44 74.06 ± 1.32 64.03 ± 2.09 −44%
throughput steady (rows/s) 1017.3 ± 75.4 1222.4 ± 24.5 1232.5 ± 19.5 +21%
next-batch steady (ms) 32.54 ± 28.99 113.17 ± 26.13 23.39 ± 11.06 −28%
step avg (ms) 923.6 ± 61.1 711.3 ± 11.5 768.2 ± 14.8 −17%

PR3 closes the temporary next-batch latency regression that PR1+2 alone introduced (32 → 113 ms in the throughput variant), bringing it down to 23 ms — lower than master's 32 ms — while preserving PR1+2's −44% peak object store memory and +21% throughput wins.

Follow-up work

Extra GPU memory cost:

  • The reorder buffer holds post-finalize GPU tensors while it waits for the next batch_idx to come in, and it can accumulate up to N items (one per format/collate worker that ran ahead) on top of the outer make_async_gen output queue of size prefetch_batches. ([data] Fix iter_batches spilling (1/n): Remove outer make_async_gen to reduce untracked buffered batches + reduce prefetch onto GPU #63660 is removing that output queue.)
  • This means that the reorder buffer holds up some GPU memory, on top of the prefetch_batches × batch_bytes outer queue, the in-flight finalize, the current step's forward activations, gradients, etc. Gating the reorder buffer behind preserve_order=False reclaims this GPU memory entirely — batches flow through immediately rather than waiting on the slowest worker.
  • We should update the iter_batches pipeline to finalize after the restore_original_order instead to remove the unexpected buffer in GPU memory.

This PR mitigates the issue for the default case because the restore_original_order buffer isn't used.

…ns.preserve_order

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu justinvyu requested a review from a team as a code owner June 2, 2026 07:35
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request makes the restoration of the original batch order optional, executing it only when execution_options.preserve_order is enabled. This prevents head-of-line blocking and improves throughput for consumers that do not require order preservation. A corresponding unit test was added to verify this behavior. The reviewer noted that dynamically calling DataContext.get_current() inside the pipeline could lead to issues if the context changes or is evaluated asynchronously, and suggested capturing the preserve_order flag during initialization instead.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread python/ray/data/_internal/block_batching/iter_batches.py Outdated
Comment thread python/ray/data/_internal/block_batching/iter_batches.py
@ray-gardener ray-gardener Bot added performance data Ray Data-related issues labels Jun 2, 2026
@bveeramani bveeramani self-assigned this Jun 3, 2026
justinvyu and others added 2 commits June 3, 2026 15:54
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 7cfe17f. Configure here.

Comment thread python/ray/data/tests/block_batching/test_iter_batches.py
justinvyu and others added 4 commits June 3, 2026 16:14
…rve_order

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…erator

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@justinvyu
Copy link
Copy Markdown
Contributor Author

@justinvyu justinvyu added the go add ONLY when ready to merge, run all tests label Jun 4, 2026
@justinvyu justinvyu merged commit dcf1c41 into ray-project:master Jun 4, 2026
8 checks passed
@justinvyu justinvyu deleted the justinvyu/gate-restore-original-order branch June 4, 2026 17:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests performance

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants