Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] estimate blocks for limit and union operator #40072

Merged
merged 4 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Callable, Dict, List, Optional, Union
from typing import Callable, Dict, List, Union

import ray
from .ref_bundle import RefBundle
Expand Down Expand Up @@ -200,16 +200,17 @@ def progress_str(self) -> str:
"""
return ""

def num_outputs_total(self) -> Optional[int]:
"""Returns the total number of output bundles of this operator, if known.
def num_outputs_total(self) -> int:
"""Returns the total number of output bundles of this operator.

The value returned may be an estimate based off the consumption so far.
This is useful for reporting progress.
"""
if self._estimated_output_blocks is not None:
return self._estimated_output_blocks
if len(self.input_dependencies) == 1:
return self.input_dependencies[0].num_outputs_total()
return None
raise AttributeError

def start(self, options: ExecutionOptions) -> None:
"""Called by the executor when execution starts for an operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(
self._stats: StatsDict = {}
super().__init__(name, [input_op])

def num_outputs_total(self) -> Optional[int]:
def num_outputs_total(self) -> int:
return (
self._num_outputs
if self._num_outputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def has_next(self) -> bool:
def get_next(self) -> RefBundle:
return self._input_data.pop(0)

def num_outputs_total(self) -> Optional[int]:
def num_outputs_total(self) -> int:
return self._num_output_blocks or self._num_output_bundles

def get_stats(self) -> StatsDict:
Expand Down
26 changes: 22 additions & 4 deletions python/ray/data/_internal/execution/operators/limit_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy
from collections import deque
from typing import Deque, List, Optional, Tuple
from typing import Deque, List, Tuple

import ray
from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle
Expand Down Expand Up @@ -81,6 +81,22 @@ def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]:
if self._limit_reached():
self.all_inputs_done()

# We cannot estimate if we have only consumed empty blocks
if self._consumed_rows > 0:
# Estimate number of output bundles
# Check the case where _limit > # of input rows
num_inputs = self.input_dependencies[0].num_outputs_total()
estimated_total_output_rows = min(
self._limit, self._consumed_rows / self._cur_output_bundles * num_inputs
)
# _consumed_rows / _limit is roughly equal to
# _cur_output_bundles / total output blocks
self._estimated_output_blocks = round(
estimated_total_output_rows
/ self._consumed_rows
* self._cur_output_bundles
)

def has_next(self) -> bool:
return len(self._buffer) > 0

Expand All @@ -90,11 +106,13 @@ def get_next(self) -> RefBundle:
def get_stats(self) -> StatsDict:
return {self._name: self._output_metadata}

def num_outputs_total(self) -> Optional[int]:
def num_outputs_total(self) -> int:
# Before inputs are completed (either because the limit is reached or
# because the inputs operators are done), we don't know how many output
# bundles we will have.
# bundles we will have. We estimate based off the consumption so far.
if self._inputs_complete:
return self._cur_output_bundles
elif self._estimated_output_blocks is not None:
return self._estimated_output_blocks
else:
return None
return self.input_dependencies[0].num_outputs_total()
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import List

from ray.data._internal.execution.interfaces import (
ExecutionOptions,
Expand Down Expand Up @@ -46,15 +46,10 @@ def start(self, options: ExecutionOptions):
self._preserve_order = options.preserve_order
super().start(options)

def num_outputs_total(self) -> Optional[int]:
def num_outputs_total(self) -> int:
num_outputs = 0
for input_op in self.input_dependencies:
op_num_outputs = input_op.num_outputs_total()
# If at least one of the input ops has an unknown number of outputs,
# the number of outputs of the union operator is unknown.
if op_num_outputs is None:
return None
num_outputs += op_num_outputs
num_outputs += input_op.num_outputs_total()
return num_outputs

def add_input(self, refs: RefBundle, input_index: int) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import itertools
from typing import List, Optional, Tuple
from typing import List, Tuple

import ray
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
Expand Down Expand Up @@ -41,7 +41,7 @@ def __init__(
self._stats: StatsDict = {}
super().__init__("Zip", [left_input_op, right_input_op])

def num_outputs_total(self) -> Optional[int]:
def num_outputs_total(self) -> int:
left_num_outputs = self.input_dependencies[0].num_outputs_total()
right_num_outputs = self.input_dependencies[1].num_outputs_total()
if left_num_outputs is not None and right_num_outputs is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def execute(

if not isinstance(dag, InputDataBuffer):
# Note: DAG must be initialized in order to query num_outputs_total.
self._global_info = ProgressBar("Running", dag.num_outputs_total() or 1)
self._global_info = ProgressBar("Running", dag.num_outputs_total())

self._output_node: OpState = self._topology[dag]
self.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int:
enabled = verbose_progress or is_all_to_all
self.progress_bar = ProgressBar(
"- " + self.op.name,
self.op.num_outputs_total() or 1,
self.op.num_outputs_total(),
Zandew marked this conversation as resolved.
Show resolved Hide resolved
index,
enabled=enabled,
)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def set_description(self, name: str) -> None:
def update(self, i: int = 0, total: Optional[int] = None) -> None:
if self._bar and (i != 0 or self._bar.total != total):
self._progress += i
if total is not None and total > self._bar.total:
if total is not None:
self._bar.total = total
if self._bar.total is not None and self._progress > self._bar.total:
# If the progress goes over 100%, update the total.
Expand Down
57 changes: 53 additions & 4 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,10 +612,6 @@ def test_limit_operator(ray_start_regular_shared):
# If the limit is 0, the operator should be completed immediately.
assert limit_op.completed()
assert limit_op._limit_reached()
else:
# The number of output bundles is unknown until
# inputs are completed.
assert limit_op.num_outputs_total() is None, limit
cur_rows = 0
loop_count = 0
while input_op.has_next() and not limit_op._limit_reached():
Expand Down Expand Up @@ -814,6 +810,7 @@ def test_block_ref_bundler_uniform(


def test_estimated_output_blocks():
# Test map operator estimation
Zandew marked this conversation as resolved.
Show resolved Hide resolved
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))

def yield_five(block_iter: Iterable[Block], ctx) -> Iterable[Block]:
Expand Down Expand Up @@ -841,6 +838,58 @@ def yield_five(block_iter: Iterable[Block], ctx) -> Iterable[Block]:
# 100 inputs -> 100 / 10 = 10 tasks -> 10 * 5 = 50 output blocks
assert op._estimated_output_blocks == 50

# Test limit operator estimation
Zandew marked this conversation as resolved.
Show resolved Hide resolved
input_op = InputDataBuffer(make_ref_bundles([[i, i] for i in range(100)]))
op = LimitOperator(100, input_op)
Zandew marked this conversation as resolved.
Show resolved Hide resolved

while input_op.has_next():
op.add_input(input_op.get_next(), 0)
run_op_tasks_sync(op)
assert op._estimated_output_blocks == 50

op.all_inputs_done()

# 2 rows per bundle, 100 / 2 = 50 blocks output
assert op._estimated_output_blocks == 50

# Test limit operator estimation where: limit > # of rows
input_op = InputDataBuffer(make_ref_bundles([[i, i] for i in range(100)]))
op = LimitOperator(300, input_op)

while input_op.has_next():
op.add_input(input_op.get_next(), 0)
run_op_tasks_sync(op)
assert op._estimated_output_blocks == 100

op.all_inputs_done()

# all blocks are outputted
assert op._estimated_output_blocks == 100

# Test all to all operator
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))

def all_transform(bundles: List[RefBundle], ctx):
return bundles, {}

estimated_output_blocks = 500
op1 = AllToAllOperator(all_transform, input_op, estimated_output_blocks)
op2 = AllToAllOperator(all_transform, op1)

while input_op.has_next():
op1.add_input(input_op.get_next(), 0)
op1.all_inputs_done()
run_op_tasks_sync(op1)

while op1.has_next():
op2.add_input(op1.get_next(), 0)
op2.all_inputs_done()
run_op_tasks_sync(op2)

# estimated output blocks for op2 should fallback to op1
assert op2._estimated_output_blocks is None
assert op2.num_outputs_total() == estimated_output_blocks


if __name__ == "__main__":
import sys
Expand Down
Loading