Skip to content

Commit

Permalink
[Data] Time streaming exec scheduling (#43112)
Browse files Browse the repository at this point in the history
Currently we are not timing how much time is spent during scheduling in the streaming executor. This times the total `process_time` for the scheduling steps / calls to `_scheduling_loop_step`. This stat is included in DatasetStats and a later PR will include this and other StreamingExecutor stats into the DatasetStatsSummary.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
  • Loading branch information
omatthew98 committed Feb 16, 2024
1 parent e221c6e commit 9641c72
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
9 changes: 7 additions & 2 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,12 @@ def run(self):
"""
try:
# Run scheduling loop until complete.
while self._scheduling_loop_step(self._topology) and not self._shutdown:
pass
while True:
# use process_time to avoid timing ray.wait in _scheduling_loop_step
with self._initial_stats.streaming_exec_schedule_s.timer():
continue_sched = self._scheduling_loop_step(self._topology)
if not continue_sched or self._shutdown:
break
except Exception as e:
# Propagate it to the result iterator.
self._output_node.mark_finished(e)
Expand All @@ -236,6 +240,7 @@ def _generate_stats(self) -> DatasetStats:
builder = stats.child_builder(op.name, override_start_time=self._start_time)
stats = builder.build_multioperator(op.get_stats())
stats.extra_metrics = op.metrics.as_dict()
stats.streaming_exec_schedule_s = self._initial_stats.streaming_exec_schedule_s
return stats

def _scheduling_loop_step(self, topology: Topology) -> bool:
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,9 @@ def __init__(
self.needs_stats_actor = needs_stats_actor
self.stats_uuid = stats_uuid

# Streaming executor stats
self.streaming_exec_schedule_s: Timer = Timer()

# Iteration stats, filled out if the user iterates over the dataset.
self.iter_wait_s: Timer = Timer()
self.iter_get_s: Timer = Timer()
Expand Down
9 changes: 9 additions & 0 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,15 @@ def map(_):
), out_str


def test_time_scheduling():
ds = ray.data.range(1000).map_batches(lambda x: x)
for _ in ds.iter_batches():
continue

ds_stats = ds._plan.stats()
assert 0 < ds_stats.streaming_exec_schedule_s.get() < 1


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 9641c72

Please sign in to comment.