[Data] [Core] [5/n] Remove _external_consumer_bytes from ResourceManager#64456
[Data] [Core] [5/n] Remove _external_consumer_bytes from ResourceManager#64456rayhhome wants to merge 14 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a centralized BlockRefCounter to track object-store memory usage per operator via Ray Core out-of-scope callbacks, replacing the previous estimation-based object store usage accounting. Operators now register produced blocks with the BlockRefCounter when they are yielded, and the ResourceManager queries this counter to get live memory usage per operator. The review feedback suggests lazy-loading the default add_object_out_of_scope_callback from global_worker inside on_block_produced instead of resolving it in __init__ to prevent AttributeError in environments where Ray is not yet initialized.
| def __init__( | ||
| self, | ||
| add_object_out_of_scope_callback: Optional[ | ||
| Callable[["ray.ObjectRef", Callable[[bytes], None]], bool] | ||
| ] = None, | ||
| ): | ||
| if add_object_out_of_scope_callback is None: | ||
| add_object_out_of_scope_callback = ( | ||
| global_worker.core_worker.add_object_out_of_scope_callback # pyrefly: ignore[missing-attribute] | ||
| ) | ||
| self._add_callback_fn = add_object_out_of_scope_callback |
There was a problem hiding this comment.
To prevent AttributeError when BlockRefCounter is instantiated in environments where Ray is not yet initialized (such as unit tests that don't start a Ray cluster), we should lazy-load the add_object_out_of_scope_callback from global_worker inside on_block_produced instead of resolving it immediately in __init__.
def __init__(
self,
add_object_out_of_scope_callback: Optional[
Callable[["ray.ObjectRef", Callable[[bytes], None]], bool]
] = None,
):
self._add_callback_fn = add_object_out_of_scope_callback| try: | ||
| registered = self._add_callback_fn(block_ref, _on_object_freed) |
There was a problem hiding this comment.
Lazy-load the default out-of-scope callback from global_worker if it was not explicitly provided during initialization.
| try: | |
| registered = self._add_callback_fn(block_ref, _on_object_freed) | |
| if self._add_callback_fn is None: | |
| self._add_callback_fn = ( | |
| global_worker.core_worker.add_object_out_of_scope_callback # pyrefly: ignore[missing-attribute] | |
| ) | |
| try: | |
| registered = self._add_callback_fn(block_ref, _on_object_freed) |
There was a problem hiding this comment.
Pull request overview
This PR removes per-executor _external_consumer_bytes accounting from ResourceManager and switches object-store memory attribution fully to the new BlockRefCounter (Ray Core out-of-scope callback based). It also simplifies “external consumer” tracking to a boolean flag used for OutputBackpressureGuard behavior distinctions (iterator-consumed vs write pipelines).
Changes:
- Replace
ResourceManagerobject-store usage estimation withBlockRefCounter-based per-operator accounting and remove external-consumer-bytes APIs. - Thread a shared
BlockRefCounterthroughStreamingExecutor→build_streaming_topology→PhysicalOperator.startand register produced blocks from tasks/operators. - Update iterator/stream-split paths and tests; add new BlockRefCounter lifecycle/thread-safety tests.
Reviewed changes
Copilot reviewed 37 out of 37 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| python/ray/data/_internal/execution/block_ref_counter.py | Introduces callback-based per-operator object-store byte accounting. |
| python/ray/data/_internal/execution/resource_manager.py | Removes _external_consumer_bytes; reads per-op bytes from BlockRefCounter. |
| python/ray/data/_internal/execution/streaming_executor.py | Creates/clears shared BlockRefCounter; replaces set_external_consumer_bytes with set_external_consumer. |
| python/ray/data/_internal/execution/streaming_executor_state.py | Requires shared BlockRefCounter for topology construction and passes it into op.start(). |
| python/ray/data/_internal/execution/interfaces/physical_operator.py | Adds shared counter plumbing to start() and DataOpTask, registering produced blocks on yield. |
| python/ray/data/_internal/execution/backpressure_policy/downstream_capacity_backpressure_policy.py | Terminal edge capacity now returns 0 (no longer consults external-consumer bytes). |
| python/ray/data/iterator.py | Registers external consumer via boolean only; removes prefetch-bytes callback wiring. |
| python/ray/data/_internal/iterator/stream_split_iterator.py | Registers external consumer via boolean only; removes prefetched-bytes reporting. |
| python/ray/data/_internal/execution/operators/map_operator.py | Passes shared counter + producer id into DataOpTask; updates start() signature. |
| python/ray/data/_internal/execution/operators/actor_pool_map_operator.py | Updates start() signature to accept shared counter. |
| python/ray/data/_internal/execution/operators/input_data_buffer.py | Updates start() signature to accept shared counter. |
| python/ray/data/_internal/execution/operators/union_operator.py | Updates start() signature to accept shared counter. |
| python/ray/data/_internal/execution/operators/output_splitter.py | Updates start() signature; registers produced/forwarded blocks for tracking. |
| python/ray/data/_internal/execution/operators/zip_operator.py | Registers zip-produced blocks for tracking. |
| python/ray/data/_internal/execution/operators/limit_operator.py | Registers sliced-output blocks for tracking. |
| python/ray/data/_internal/execution/operators/aggregate_num_rows.py | Registers produced aggregate block for tracking. |
| python/ray/data/_internal/execution/operators/base_physical_operator.py | Registers bulk-op outputs for tracking. |
| python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_reduce_operator.py | Passes shared counter + producer id into DataOpTask; registers empty-partition output. |
| python/ray/data/_internal/execution/operators/shuffle_operators/shuffle_map_operator.py | Removes now-unused estimate_object_store_usage() override. |
| python/ray/data/_internal/execution/operators/hash_shuffle.py | Updates start() signature; passes shared counter + producer id into DataOpTask. |
| python/ray/data/_internal/gpu_shuffle/hash_shuffle.py | Updates start() signature; passes shared counter + producer id into DataOpTask. |
| python/ray/data/tests/conftest.py | Adds noop_counter() helper for tests that need a counter without a Ray cluster. |
| python/ray/data/tests/unit/test_block_ref_counter.py | Adds deterministic unit tests for BlockRefCounter accounting/clear/thread-safety. |
| python/ray/data/tests/test_block_ref_counter.py | Adds integration-style lifecycle tests against Ray Core callback behavior. |
| python/ray/data/tests/unit/test_resource_manager.py | Updates union attribution tests and introduces counter stubbing usage. |
| python/ray/data/tests/test_resource_manager.py | Updates ResourceManager tests for counter-based accounting and removes external-consumer-bytes tests. |
| python/ray/data/tests/test_streaming_executor.py | Updates topology construction and DataOpTask construction for new signatures. |
| python/ray/data/tests/test_downstream_capacity_backpressure_policy.py | Removes external-bytes mocking from test helper. |
| python/ray/data/tests/test_reservation_based_resource_allocator.py | Updates topology construction / ResourceManager construction signatures. |
| python/ray/data/tests/test_output_splitter.py | Updates start() calls to include BlockRefCounter. |
| python/ray/data/tests/test_operators.py | Updates start() calls to include BlockRefCounter. |
| python/ray/data/tests/test_map_operator.py | Updates start() calls to include BlockRefCounter. |
| python/ray/data/tests/test_limit_operator.py | Starts ops with a shared noop counter for manual operator driving. |
| python/ray/data/tests/test_gpu_shuffle.py | Sets operator _block_ref_counter for test setup consistency. |
| python/ray/data/tests/test_executor_resource_management.py | Updates start() calls to include BlockRefCounter. |
| python/ray/data/tests/test_actor_pool_map_operator.py | Updates start() calls; updates topology construction for new signature. |
| python/ray/data/BUILD.bazel | Adds Bazel py_test target for test_block_ref_counter. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| topo = build_streaming_topology(o2, ExecutionOptions(), noop_counter()) | ||
|
|
||
| resource_manager = ResourceManager( | ||
| topo, | ||
| ExecutionOptions(), | ||
| MagicMock(), | ||
| DataContext.get_current(), | ||
| BlockRefCounter(), | ||
| ) |
| topo = build_streaming_topology(o3, opt, noop_counter()) | ||
| resource_manager = ResourceManager( | ||
| topo, | ||
| ExecutionOptions(), | ||
| MagicMock(return_value=ExecutionResources.zero()), | ||
| DataContext.get_current(), | ||
| BlockRefCounter(), | ||
| ) |
| from ray.data.tests.conftest import * # noqa | ||
| from ray.data.tests.conftest import noop_counter | ||
| from ray.data.tests.test_resource_manager import StubBlockRefCounter | ||
|
|
| def set_usage(self, producer_id: str, bytes: int) -> None: | ||
| self._bytes_by_producer[producer_id] = bytes |
|
Close to change merge target. (Reopened because previous commits have merge target on master, so setting the target of this PR on dependent branches break file changes.) |
db236fb to
7db3bfb
Compare
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
… docstring Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…ounter Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
…perator tests since _output_operator is removed Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
7db3bfb to
d566612
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit d566612. Configure here.
| with self._lock: | ||
| self._client_prefetched_bytes[output_split_idx] = 0 | ||
| self._report_prefetched_bytes_to_executor() | ||
| # Track overhead time in the instance variable |
There was a problem hiding this comment.
Unused prefetch bytes helper
Low Severity
_get_total_prefetched_bytes is no longer called after _report_prefetched_bytes_to_executor was removed, so it is dead code that still suggests prefetch totals feed resource accounting.
Reviewed by Cursor Bugbot for commit d566612. Configure here.


Description
Removes
_external_consumer_bytesfromResourceManagernow thatBlockRefCounterhandles per-operator object-store memory tracking. The byte-tracking setter/getter is replaced with a simple booleanset_external_consumer()/has_external_consumerused only byOutputBackpressureGuardto distinguish iterator-consumed pipelines from write pipelines.Changes
resource_manager.py: Remove_external_consumer_bytes,set_external_consumer_bytes,get_external_consumer_bytes,_output_operator, and the debug string that surfaced them. Keep_has_external_consumerwith a simplifiedset_external_consumer()setter for the backpressure guard.streaming_executor.py: Replaceset_external_consumer_byteswithset_external_consumer.downstream_capacity_backpressure_policy.py: Return 0 at terminal edge instead of readingget_external_consumer_bytes(). The_get_queue_ratioalready handles 0 downstream capacity as "no backpressure."iterator.py: Remove prefetch callback creation; register external consumer withset_external_consumer().stream_split_iterator.py: Remove_report_prefetched_bytes_to_executorand its 3 call sites; register external consumer withset_external_consumer().test_external_consumer_bytes_*tests; update backpressure guard test to useset_external_consumer().Tests
Existing
OutputBackpressureGuardtests updated. Existing e2e tests (test_backpressure_e2e.py,test_streaming_integration.py) implicitly cover the behavioral change.Related issues
Depends on #64192 (operator resource tracking).
Related to #63601 (prototype), #63074 (previous manual
BlockRefCounter).