Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Chen <chenh1024@gmail.com>
  • Loading branch information
raulchen committed Jan 23, 2024
1 parent 225db82 commit c63f8b7
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 3 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Expand Up @@ -324,7 +324,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:

def _consumer_idling(self) -> bool:
"""Returns whether the user thread is blocked on topology execution."""
return len(self._output_node.outqueue) == 0
return self._output_node.outqueue.num_blocks == 0

def _get_or_refresh_resource_limits(self) -> ExecutionResources:
"""Return concrete limits for use at the current time.
Expand Down
Expand Up @@ -216,7 +216,7 @@ def close_progress_bars(self):

def num_queued(self) -> int:
"""Return the number of queued bundles across all inqueues."""
return sum(len(q) for q in self.inqueues)
return sum(q.num_blocks for q in self.inqueues)

def num_processing(self):
"""Return the number of bundles currently in processing for this operator."""
Expand Down Expand Up @@ -568,7 +568,7 @@ def select_operator_to_run(
ops,
key=lambda op: (
not op.throttling_disabled(),
len(topology[op].outqueue) + topology[op].num_processing(),
topology[op].outqueue.num_blocks + topology[op].num_processing(),
),
)

Expand Down

0 comments on commit c63f8b7

Please sign in to comment.