Skip to content

Commit

Permalink
reuse _debug_dump_topology
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Xue <andewzxue@gmail.com>
  • Loading branch information
Zandew committed Oct 31, 2023
1 parent 295fd96 commit 4c58632
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 46 deletions.
41 changes: 12 additions & 29 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,10 @@
)
from ray.data.context import DataContext

try:
import tqdm
except ImportError:
tqdm = None

logger = DatasetLogger(__name__)

# Set this environment variable for detailed scheduler debugging logs.
# If not set, execution state will still be logged after each scheduling loop.
DEBUG_TRACE_SCHEDULING = "RAY_DATA_TRACE_SCHEDULING" in os.environ

# Force a progress bar update after this many events processed . This avoids the
Expand Down Expand Up @@ -296,27 +292,11 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
update_operator_states(topology)

# Update the progress bar to reflect scheduling decisions.
log_str = "Execution Progress Status:\n"
cur_time = time.perf_counter()
for op, op_state in topology.items():
for op_state in topology.values():
op_state.refresh_progress_bar()
if tqdm is not None:
log_str += (
tqdm.tqdm.format_meter(
op_state.num_completed_tasks,
op.num_outputs_total(),
cur_time - self._start_time,
prefix=op.name,
bar_format="{desc:<35.34}{percentage:3.0f}%|{bar}{r_bar}",
)
+ "\n"
)
else:
log_str += (
f"{op.name}: "
f"{op_state.num_completed_tasks}/{op.num_outputs_total()}\n"
)
logger.get_logger(log_to_stdout=False).info(log_str)

if not DEBUG_TRACE_SCHEDULING:
_debug_dump_topology(topology, log_to_stdout=False)

update_stats_actor_metrics(
[op.metrics for op in self._topology], {"dataset": self._dataset_tag}
Expand Down Expand Up @@ -435,13 +415,16 @@ def walk(op):
raise ValueError(error_message.strip())


def _debug_dump_topology(topology: Topology) -> None:
def _debug_dump_topology(topology: Topology, log_to_stdout: bool = True) -> None:
"""Print out current execution state for the topology for debugging.
Args:
topology: The topology to debug.
"""
logger.get_logger().info("vvv scheduling trace vvv")
logger.get_logger(log_to_stdout).info("vvv scheduling trace vvv")
for i, (op, state) in enumerate(topology.items()):
logger.get_logger().info(f"{i}: {state.summary_str()}")
logger.get_logger().info("^^^ scheduling trace ^^^")
logger.get_logger(log_to_stdout).info(
f"{i}: {state.summary_str()}, "
f"Blocks Outputted: {state.num_completed_tasks}/{op.num_outputs_total()}"
)
logger.get_logger(log_to_stdout).info("^^^ scheduling trace ^^^")
29 changes: 12 additions & 17 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,23 +1292,18 @@ def test_op_metrics_logging():
assert sum([log == map_str for log in logs]) == 1


def test_progress_bar_logging():
def _test_progress_bar_logging():
logger = DatasetLogger(
"ray.data._internal.execution.streaming_executor"
).get_logger()
with patch.object(logger, "info") as mock_logger:
ray.data.range(100).map_batches(lambda x: x).materialize()
logs = [canonicalize(call.args[0]) for call in mock_logger.call_args_list]

for log in logs:
if log.startswith("Execution Progress Status:"):
assert "Input" in log
assert "ReadRange->MapBatches(<lambda>)" in log

_test_progress_bar_logging()
with patch("ray.data._internal.execution.streaming_executor.tqdm", new=None):
_test_progress_bar_logging()
def test_op_state_logging():
logger = DatasetLogger(
"ray.data._internal.execution.streaming_executor"
).get_logger()
with patch.object(logger, "info") as mock_logger:
ray.data.range(100).map_batches(lambda x: x).materialize()
logs = [canonicalize(call.args[0]) for call in mock_logger.call_args_list]

for i, log in enumerate(logs):
if log == "vvv scheduling trace vvv":
assert "Input" in logs[i+1]
assert "ReadRange->MapBatches(<lambda>)" in logs[i+2]


if __name__ == "__main__":
Expand Down

0 comments on commit 4c58632

Please sign in to comment.