diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 1ad17eb09cb1a0..2b0795c3d8d07b 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -324,7 +324,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: def _consumer_idling(self) -> bool: """Returns whether the user thread is blocked on topology execution.""" - return len(self._output_node.outqueue) == 0 + return self._output_node.outqueue.num_blocks == 0 def _get_or_refresh_resource_limits(self) -> ExecutionResources: """Return concrete limits for use at the current time. diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 01671625406383..6c5765bc41f1d6 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -216,7 +216,7 @@ def close_progress_bars(self): def num_queued(self) -> int: """Return the number of queued bundles across all inqueues.""" - return sum(len(q) for q in self.inqueues) + return sum(q.num_blocks for q in self.inqueues) def num_processing(self): """Return the number of bundles currently in processing for this operator.""" @@ -568,7 +568,7 @@ def select_operator_to_run( ops, key=lambda op: ( not op.throttling_disabled(), - len(topology[op].outqueue) + topology[op].num_processing(), + topology[op].outqueue.num_blocks + topology[op].num_processing(), ), )