[data] Fix iter_batches spilling (1/n): Remove outer make_async_gen to reduce untracked buffered batches + reduce prefetch onto GPU#63660
Conversation
…terator Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request replaces the asynchronous generator make_async_gen with a simpler background thread iterator iter_in_background using a bounded queue. A critical issue was identified in the new iter_in_background utility: if the consumer stops iterating early, the background producer thread can block indefinitely on queue.put(), causing a thread and resource leak. A code suggestion was provided to use a threading.Event and a finally block to safely terminate the producer thread and drain the queue.
…sumer exit Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…invyu/replace-outer-make-async-gen
…kers Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…invyu/replace-outer-make-async-gen
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 6639106. Configure here.
| worker_threads = [ | ||
| threading.Thread(target=_worker, name="iter_threaded", daemon=True) | ||
| for _ in range(num_workers) | ||
| ] |
There was a problem hiding this comment.
Multi-worker runs duplicate pipelines
Medium Severity
When iter_threaded uses num_workers greater than one, each worker runs a full fn(...) over the shared base_iterator instead of splitting work like make_async_gen. Two pipelines interleave ref-bundle reads and emit batches into one queue, corrupting batching and ordering.
Reviewed by Cursor Bugbot for commit 6639106. Configure here.
There was a problem hiding this comment.
I don't think this is an actual bug since we are only using num_workers = 1 for _iter_batches.
There was a problem hiding this comment.
I think the cursor comment isn't accurate, but I realized I have no unit tests. Will sanity check implementation and add unit tests.
rayhhome
left a comment
There was a problem hiding this comment.
A few comments but doesn't need to block; overall lgtm
| worker_threads = [ | ||
| threading.Thread(target=_worker, name="iter_threaded", daemon=True) | ||
| for _ in range(num_workers) | ||
| ] |
There was a problem hiding this comment.
I don't think this is an actual bug since we are only using num_workers = 1 for _iter_batches.
| _SENTINEL = object() | ||
|
|
||
|
|
||
| def iter_threaded( |
There was a problem hiding this comment.
I think this function should live in python/ray/data/_internal/util.py beside make_async_gen?
There was a problem hiding this comment.
I wanted to keep this utility in iter_batches so that it doesn't end up getting abused/co-opted for another purpose again and then forgetting the original intention. That's what happened with make_async_gen 😄
|
|
||
| try: | ||
| while True: | ||
| item = result_queue.get() |
There was a problem hiding this comment.
I think make_async_gen has a polling mechanism that has a timeout for get and also detects interrupt events. I think this can lead to hanging issues but in very unlikely situations.
There was a problem hiding this comment.
Interrupt should be handled by the finally: stopped.set(). Do you have a different deadlock/hang case in mind?
There was a problem hiding this comment.
I was thinking about the case where a node dies and get hangs, but the old code seems to also hang in that case, so I guess the current implementation is fine 😃
There was a problem hiding this comment.
I see -- this stuff is all just threads on a single node so if the node dies then there's no hanging, everything just dies.
There is one scenario where node death can cause a somewhat orthogonal issue: if block ref A gets sent over, it doesn't block the pipeline on this queue get, but it will block if ray.get(block_ref) fails in resolve_block_refs. But that's the same as the status quo and requires lineage reconstruction to unblock.
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
iter_batches spilling (1/n): Remove outer make_async_gen and its untracked input/output queuesiter_batches spilling (1/n): Remove outer make_async_gen to reduce untracked buffered batches + reduce prefetch onto GPU
…e `make_async_gen` with `iter_threaded` (#63682) Replaces the inner format/collate `make_async_gen` with `iter_threaded` from #63660, cutting untracked object store memory pinned batches from ~16 to ~8 (2× reduction). - `_format_in_threadpool` runs format + collate across a threadpool via `make_async_gen(num_workers=min(4, prefetch_batches), preserve_ordering=False)`. With the default `buffer_size=1`, this allocates one shared input queue of size `(buffer_size + 1) * num_workers` and `num_workers` per-worker output queues of size `buffer_size` — for `num_workers=4`, that is **8 (input) + 4 (in-flight in workers) + 4 (output) ≈ 16** batches buffered inside the threadpool, none of which are visible to the resource manager. - These buffered batches are pre-format `pa.Table.slice()` views that pin their **full** source blocks in the object store (`pa.Table.slice` is zero-copy and references the entire underlying buffer). They keep blocks pinned in shared memory even after the distributed reference counter considers them out of scope, which is the accounting decoupling that contributes to streaming-split underestimation and spilling. - Replace with `iter_threaded(..., num_workers=num_threadpool_workers, output_buffer_size=num_threadpool_workers)` from PR 1 (generalized in this stack to take a required `fn` and `num_workers`). Workers share `batch_iter` under a lock and funnel results through a single bounded queue sized to match the worker count — enough depth to keep workers from blocking on each other's `put()` when collate is non-trivial. In-flight is now bounded to **~2 × num_workers ≈ 8** (workers + shared output buffer) — roughly a 2× reduction in untracked pinned batches. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…n` to reduce untracked buffered batches + reduce prefetch onto GPU (ray-project#63660) - `iter_batches` uses `make_async_gen(ref_bundle_iterator, num_workers=1)` to decouple the batching pipeline from the consumer thread. With a single worker, the multi-worker machinery (filling worker, per-worker input/output queues, round-robin draining) is unnecessary — it just adds complexity and hidden buffering. - With `buffer_size=prefetch_batches` (added in ray-project#58657), `make_async_gen` creates an input queue (capacity `prefetch_batches + 1`) and an output queue (capacity `prefetch_batches`), buffering up to `~2 * prefetch_batches` items that are invisible to the resource manager's memory accounting. - **BEHAVIOR CHANGE AFTER THIS PR**: The outer make_async_gen output buffer was a queue of **GPU batches** max size prefetch_batches. This means that you can have up to prefetch_batches + 1 (the working batch) on GPU memory. This happens implicitly and is not good default behavior, since users expect their entire GRAM to be usable for model params, grads, optimizer states, and the current batch and associated activations. Prefetching too many batches into GPU forces can be silently hurting user GPU utilization and throughput by forcing them to reduce their batch size. This PR bounds the number of prefetched GPU batches to 1 which has parity with the current defaults. A follow-up PR will introduce a configuration option to choose how many batches to be prefetched to the GPU at a time. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…e `make_async_gen` with `iter_threaded` (ray-project#63682) Replaces the inner format/collate `make_async_gen` with `iter_threaded` from ray-project#63660, cutting untracked object store memory pinned batches from ~16 to ~8 (2× reduction). - `_format_in_threadpool` runs format + collate across a threadpool via `make_async_gen(num_workers=min(4, prefetch_batches), preserve_ordering=False)`. With the default `buffer_size=1`, this allocates one shared input queue of size `(buffer_size + 1) * num_workers` and `num_workers` per-worker output queues of size `buffer_size` — for `num_workers=4`, that is **8 (input) + 4 (in-flight in workers) + 4 (output) ≈ 16** batches buffered inside the threadpool, none of which are visible to the resource manager. - These buffered batches are pre-format `pa.Table.slice()` views that pin their **full** source blocks in the object store (`pa.Table.slice` is zero-copy and references the entire underlying buffer). They keep blocks pinned in shared memory even after the distributed reference counter considers them out of scope, which is the accounting decoupling that contributes to streaming-split underestimation and spilling. - Replace with `iter_threaded(..., num_workers=num_threadpool_workers, output_buffer_size=num_threadpool_workers)` from PR 1 (generalized in this stack to take a required `fn` and `num_workers`). Workers share `batch_iter` under a lock and funnel results through a single bounded queue sized to match the worker count — enough depth to keep workers from blocking on each other's `put()` when collate is non-trivial. In-flight is now bounded to **~2 × num_workers ≈ 8** (workers + shared output buffer) — roughly a 2× reduction in untracked pinned batches. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…e behind `preserve_order` (#63792) Part of the iter_batches consumer pipeline cleanup (#63660, #63682). Gates restore_original_order behind `DataContext.execution_options.preserve_order` (default off). When one format/collate worker lags, the reorder buffer grows with the other workers' completed batches, and ready batches aren't allowed to be yielded; this PR skips the reorder step when ordering isn't required. Recovers next-batch latency from PR1+2's regressed 113 ms steady back to 23 ms (lower than master's 32 ms), with no other regressions. --------- Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>




Description
iter_batchesusesmake_async_gen(ref_bundle_iterator, num_workers=1)to decouple the batching pipeline from the consumer thread. With a single worker, the multi-worker machinery (filling worker, per-worker input/output queues, round-robin draining) is unnecessary — it just adds complexity and hidden buffering.buffer_size=prefetch_batches(added in [Data] Handle prefetches buffering in iter_batches #58657),make_async_gencreates an input queue (capacityprefetch_batches + 1) and an output queue (capacityprefetch_batches), buffering up to~2 * prefetch_batchesitems that are invisible to the resource manager's memory accounting.iter_threaded, a simplified utility that runs an iterator in a single background thread with a bounded output queue.Related PRs
This PR brings the behavior closer to the original implementation: #33620
#51661
preserve_ordercase with multiple threadpool workers in Ray Data read tasks. Theiter_batchesusage ofmake_async_genonly uses a single thread and the behavior change was unintentional.#58657
make_async_gen(buffer_size=prefetch_batches)was unrelated to the main issue of the batch formatting threadpool round-robin causing next batch spiky latencies.Release test results
backpressure_benchmark.single_nodebackpressure_benchmark.multi_nodeNote: Still spills with this PR, but significantly reduced. See PR2 for reducing spill to 0.