Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Report first block size per Operator #39656

Closed
wants to merge 13 commits into from
35 changes: 35 additions & 0 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Deque, Dict, List, Optional, Tuple, Union

import ray
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
Expand All @@ -26,6 +27,7 @@
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.util import memory_string
from ray.data._internal.progress_bar import ProgressBar
from ray.data.context import DataContext

# Holds the full execution state of the streaming topology. It's a dict mapping each
# operator to tracked streaming exec state.
Expand All @@ -41,6 +43,8 @@
# Min number of seconds between two autoscaling requests.
MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS = 20

logger = DatasetLogger(__name__)


@dataclass
class AutoscalingState:
Expand Down Expand Up @@ -126,6 +130,7 @@ def __init__(self, op: PhysicalOperator, inqueues: List[Deque[MaybeRefBundle]]):
# Tracks whether `input_done` is called for each input op.
self.input_done_called = [False] * len(op.input_dependencies)
self.dependents_completed_called = False
self.first_block_size_bytes = None

def initialize_progress_bars(self, index: int, verbose_progress: bool) -> int:
"""Create progress bars at the given index (line offset in console).
Expand Down Expand Up @@ -171,6 +176,8 @@ def add_output(self, ref: RefBundle) -> None:
self.num_completed_tasks += 1
if self.progress_bar:
self.progress_bar.update(1, self.op._estimated_output_blocks)
if self.first_block_size_bytes is None:
self._check_first_block_size(ref)

def refresh_progress_bar(self) -> None:
"""Update the console with the latest operator progress."""
Expand Down Expand Up @@ -259,6 +266,34 @@ def _queue_memory_usage(self, queue: Deque[RefBundle]) -> int:
break # Concurrent pop from the outqueue by the consumer thread.
return object_store_memory

def _check_first_block_size(self, ref: RefBundle):
"""Checks and logs the size of the first bundle produced by this operator
to the Ray Data logfile (skips stdout). If the block size exceeds 2 times
the target max block size, also logs a warning to stdout."""

BLOCK_SIZE_TO_MAX_TARGET_RATIO = 2

first_block_metadata = ref.blocks[0][1]
self.first_block_size_bytes = first_block_metadata.size_bytes
target_max_block_size = DataContext.get_current().target_max_block_size

if self.first_block_size_bytes > (
BLOCK_SIZE_TO_MAX_TARGET_RATIO * target_max_block_size
):
logger.get_logger().warning(
f"{self.op.name} in-memory block size of "
f"{(self.first_block_size_bytes / 2**20):.2f} MB is significantly "
f"larger than the maximium target block size of "
f"{(target_max_block_size / 2**20):.2f} MB. "
f"This could be because one single row is larger than the "
f"target block size, or there could be an unexpected Ray Data bug."
)
else:
logger.get_logger(log_to_stdout=False).info(
f"{self.op.name} in-memory block size: "
f"{(self.first_block_size_bytes / 2**20):.2f} MB"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still unclear how useful this would be. 1) in which case the block size would be larger than target_max_block_size? It shouldn't happen unless there is a bug? 2) printing this as an info may be too verbose, and may cause confusions to the users.
@ericl @scottjlee could one of you elaborate on what exact issue this is trying to solve?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl what was the original scenario where this issue appeared? The block splitting should be ensuring we don't get block sizes larger than ``target_max_block_size`, are there operators where this is not being handled?



def build_streaming_topology(
dag: PhysicalOperator, options: ExecutionOptions
Expand Down
3 changes: 1 addition & 2 deletions python/ray/data/tests/block_batching/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@
import ray
from ray.data._internal.block_batching.interfaces import Batch
from ray.data._internal.block_batching.util import (
Queue,
_calculate_ref_hits,
blocks_to_batches,
collate,
finalize_batches,
format_batches,
resolve_block_refs,
)
from ray.data._internal.util import make_async_gen
from ray.data._internal.util import Queue, make_async_gen
Copy link
Contributor Author

@scottjlee scottjlee Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixes breaking test import (unrelated to changes in this PR)



def block_generator(num_rows: int, num_blocks: int):
Expand Down
55 changes: 54 additions & 1 deletion python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import collections
import time
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import numpy as np
import pytest

import ray
from ray._private.test_utils import wait_for_condition
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -33,6 +35,7 @@
update_operator_states,
)
from ray.data._internal.execution.util import make_ref_bundles
from ray.data.context import DataContext
from ray.data.tests.conftest import * # noqa
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

Expand Down Expand Up @@ -684,6 +687,56 @@ def test_execution_allowed_nothrottle():
)


def test__check_first_block_size():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a typo to have 2 underscores?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the function name is _check_first_block_size, so i was following the usual format of test_<fn_name>. let me know if you want me to keep as 1 underscore

# Check that block size of the first block is recorded and reported.
logger = DatasetLogger(
"ray.data._internal.execution.streaming_executor_state"
).get_logger()
with patch.object(logger, "info") as mock_logger:
# Generate 1MB and 2MB blocks.
inputs = make_ref_bundles(
[
np.zeros(1024 * 1024, dtype=np.uint8),
np.zeros(1024 * 1024 * 2, dtype=np.uint8),
]
)
o1 = InputDataBuffer(inputs)
o2 = MapOperator.create(make_map_transformer(lambda block: block), o1)

topo, _ = build_streaming_topology(o2, ExecutionOptions())
process_completed_tasks(topo)
update_operator_states(topo)

logger_args, logger_kwargs = mock_logger.call_args
assert logger_args[0] == ("Input in-memory block size: 1.00 MB")

# Test case where the block size is much larger than the target max block size.
ctx = DataContext.get_current()
ctx.target_max_block_size = 1024 * 1024

with patch.object(logger, "warning") as mock_logger:
# Generate a 4 MB block, more than 2 times larger than
# the configured max block size of 1 MB.
inputs = make_ref_bundles(
[
np.zeros(1024 * 1024 * 4, dtype=np.uint8),
]
)
o1 = InputDataBuffer(inputs)
o2 = MapOperator.create(make_map_transformer(lambda block: block), o1)
topo, _ = build_streaming_topology(o2, ExecutionOptions())
process_completed_tasks(topo)
update_operator_states(topo)

logger_args, logger_kwargs = mock_logger.call_args
assert logger_args[0].startswith(
(
"Input in-memory block size of 4.00 MB is significantly larger "
"than the maximium target block size of 1.00 MB."
)
)


if __name__ == "__main__":
import sys

Expand Down
Loading