Skip to content

Commit

Permalink
optimize output queue memory
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Chen <chenh1024@gmail.com>
  • Loading branch information
raulchen committed Aug 2, 2023
1 parent a497c4c commit 6a96030
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(self, op: PhysicalOperator, inqueues: List[Deque[MaybeRefBundle]]):
# (in addition to the streaming executor thread). Hence, it must be a
# thread-safe type such as `deque`.
self.outqueue: Deque[MaybeRefBundle] = deque()
self._outqueue_memory_usage: int = 0
self.op = op
self.progress_bar = None
self.num_completed_tasks = 0
Expand Down Expand Up @@ -169,6 +170,7 @@ def num_processing(self):
def add_output(self, ref: RefBundle) -> None:
"""Move a bundle produced by the operator to its outqueue."""
self.outqueue.append(ref)
self._outqueue_memory_usage += ref.size_bytes()
self.num_completed_tasks += 1
if self.progress_bar:
self.progress_bar.update(1)
Expand Down Expand Up @@ -210,7 +212,10 @@ def get_output_blocking(self, output_split_idx: Optional[int]) -> MaybeRefBundle
try:
# Non-split output case.
if output_split_idx is None:
return self.outqueue.popleft()
bundle = self.outqueue.popleft()
if isinstance(bundle, RefBundle):
self._outqueue_memory_usage -= bundle.size_bytes()
return bundle

# Scan the queue and look for outputs tagged for the given index.
for i in range(len(self.outqueue)):
Expand All @@ -222,6 +227,7 @@ def get_output_blocking(self, output_split_idx: Optional[int]) -> MaybeRefBundle
return bundle
elif bundle.output_split_idx == output_split_idx:
self.outqueue.remove(bundle)
self._outqueue_memory_usage -= bundle.size_bytes()
return bundle

# Didn't find any outputs matching this index, repeat the loop until
Expand All @@ -241,7 +247,7 @@ def inqueue_memory_usage(self) -> int:

def outqueue_memory_usage(self) -> int:
"""Return the object store memory of this operator's outqueue."""
return self._queue_memory_usage(self.outqueue)
return self._outqueue_memory_usage

def _queue_memory_usage(self, queue: Deque[RefBundle]) -> int:
"""Sum the object store memory usage in this queue.
Expand Down

0 comments on commit 6a96030

Please sign in to comment.