From c63f8b71f150b0dc0add60b2817ce2241abd41ac Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Mon, 22 Jan 2024 19:09:03 -0800 Subject: [PATCH] fix Signed-off-by: Hao Chen --- python/ray/data/_internal/execution/streaming_executor.py | 2 +- .../ray/data/_internal/execution/streaming_executor_state.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 1ad17eb09cb1a0..2b0795c3d8d07b 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -324,7 +324,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: def _consumer_idling(self) -> bool: """Returns whether the user thread is blocked on topology execution.""" - return len(self._output_node.outqueue) == 0 + return self._output_node.outqueue.num_blocks == 0 def _get_or_refresh_resource_limits(self) -> ExecutionResources: """Return concrete limits for use at the current time. diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 01671625406383..6c5765bc41f1d6 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -216,7 +216,7 @@ def close_progress_bars(self): def num_queued(self) -> int: """Return the number of queued bundles across all inqueues.""" - return sum(len(q) for q in self.inqueues) + return sum(q.num_blocks for q in self.inqueues) def num_processing(self): """Return the number of bundles currently in processing for this operator.""" @@ -568,7 +568,7 @@ def select_operator_to_run( ops, key=lambda op: ( not op.throttling_disabled(), - len(topology[op].outqueue) + topology[op].num_processing(), + topology[op].outqueue.num_blocks + topology[op].num_processing(), ), )