From 4c5863240f51d43c683e07eb6ee4328658781d06 Mon Sep 17 00:00:00 2001 From: Andrew Xue Date: Tue, 31 Oct 2023 11:06:32 -0700 Subject: [PATCH] reuse _debug_dump_topology Signed-off-by: Andrew Xue --- .../_internal/execution/streaming_executor.py | 41 ++++++------------- python/ray/data/tests/test_stats.py | 29 ++++++------- 2 files changed, 24 insertions(+), 46 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 7ae3030a3571e..d0c89a94aa7ca 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -41,14 +41,10 @@ ) from ray.data.context import DataContext -try: - import tqdm -except ImportError: - tqdm = None - logger = DatasetLogger(__name__) # Set this environment variable for detailed scheduler debugging logs. +# If not set, execution state will still be logged after each scheduling loop. DEBUG_TRACE_SCHEDULING = "RAY_DATA_TRACE_SCHEDULING" in os.environ # Force a progress bar update after this many events processed . This avoids the @@ -296,27 +292,11 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: update_operator_states(topology) # Update the progress bar to reflect scheduling decisions. - log_str = "Execution Progress Status:\n" - cur_time = time.perf_counter() - for op, op_state in topology.items(): + for op_state in topology.values(): op_state.refresh_progress_bar() - if tqdm is not None: - log_str += ( - tqdm.tqdm.format_meter( - op_state.num_completed_tasks, - op.num_outputs_total(), - cur_time - self._start_time, - prefix=op.name, - bar_format="{desc:<35.34}{percentage:3.0f}%|{bar}{r_bar}", - ) - + "\n" - ) - else: - log_str += ( - f"{op.name}: " - f"{op_state.num_completed_tasks}/{op.num_outputs_total()}\n" - ) - logger.get_logger(log_to_stdout=False).info(log_str) + + if not DEBUG_TRACE_SCHEDULING: + _debug_dump_topology(topology, log_to_stdout=False) update_stats_actor_metrics( [op.metrics for op in self._topology], {"dataset": self._dataset_tag} @@ -435,13 +415,16 @@ def walk(op): raise ValueError(error_message.strip()) -def _debug_dump_topology(topology: Topology) -> None: +def _debug_dump_topology(topology: Topology, log_to_stdout: bool = True) -> None: """Print out current execution state for the topology for debugging. Args: topology: The topology to debug. """ - logger.get_logger().info("vvv scheduling trace vvv") + logger.get_logger(log_to_stdout).info("vvv scheduling trace vvv") for i, (op, state) in enumerate(topology.items()): - logger.get_logger().info(f"{i}: {state.summary_str()}") - logger.get_logger().info("^^^ scheduling trace ^^^") + logger.get_logger(log_to_stdout).info( + f"{i}: {state.summary_str()}, " + f"Blocks Outputted: {state.num_completed_tasks}/{op.num_outputs_total()}" + ) + logger.get_logger(log_to_stdout).info("^^^ scheduling trace ^^^") diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 0a8bb43f25f55..6e7ec2bf18228 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -1292,23 +1292,18 @@ def test_op_metrics_logging(): assert sum([log == map_str for log in logs]) == 1 -def test_progress_bar_logging(): - def _test_progress_bar_logging(): - logger = DatasetLogger( - "ray.data._internal.execution.streaming_executor" - ).get_logger() - with patch.object(logger, "info") as mock_logger: - ray.data.range(100).map_batches(lambda x: x).materialize() - logs = [canonicalize(call.args[0]) for call in mock_logger.call_args_list] - - for log in logs: - if log.startswith("Execution Progress Status:"): - assert "Input" in log - assert "ReadRange->MapBatches()" in log - - _test_progress_bar_logging() - with patch("ray.data._internal.execution.streaming_executor.tqdm", new=None): - _test_progress_bar_logging() +def test_op_state_logging(): + logger = DatasetLogger( + "ray.data._internal.execution.streaming_executor" + ).get_logger() + with patch.object(logger, "info") as mock_logger: + ray.data.range(100).map_batches(lambda x: x).materialize() + logs = [canonicalize(call.args[0]) for call in mock_logger.call_args_list] + + for i, log in enumerate(logs): + if log == "vvv scheduling trace vvv": + assert "Input" in logs[i+1] + assert "ReadRange->MapBatches()" in logs[i+2] if __name__ == "__main__":