Skip to content

Commit

Permalink
Revert "Revert "[Data] Time streaming exec scheduling (ray-project#43112
Browse files Browse the repository at this point in the history
)" (ray-project#43283)"

This reverts commit 2b92f57.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
  • Loading branch information
omatthew98 committed Feb 26, 2024
1 parent c1535c0 commit 11b5862
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
9 changes: 7 additions & 2 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,12 @@ def run(self):
"""
try:
# Run scheduling loop until complete.
while self._scheduling_loop_step(self._topology) and not self._shutdown:
pass
while True:
# use process_time to avoid timing ray.wait in _scheduling_loop_step
with self._initial_stats.streaming_exec_schedule_s.timer():
continue_sched = self._scheduling_loop_step(self._topology)
if not continue_sched or self._shutdown:
break
except Exception as e:
# Propagate it to the result iterator.
self._output_node.mark_finished(e)
Expand All @@ -236,6 +240,7 @@ def _generate_stats(self) -> DatasetStats:
builder = stats.child_builder(op.name, override_start_time=self._start_time)
stats = builder.build_multioperator(op.get_stats())
stats.extra_metrics = op.metrics.as_dict()
stats.streaming_exec_schedule_s = self._initial_stats.streaming_exec_schedule_s
return stats

def _scheduling_loop_step(self, topology: Topology) -> bool:
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,9 @@ def __init__(
self.needs_stats_actor = needs_stats_actor
self.stats_uuid = stats_uuid

# Streaming executor stats
self.streaming_exec_schedule_s: Timer = Timer()

# Iteration stats, filled out if the user iterates over the dataset.
self.iter_wait_s: Timer = Timer()
self.iter_get_s: Timer = Timer()
Expand Down
9 changes: 9 additions & 0 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,15 @@ def map(_):
), out_str


def test_time_scheduling():
ds = ray.data.range(1000).map_batches(lambda x: x)
for _ in ds.iter_batches():
continue

ds_stats = ds._plan.stats()
assert 0 < ds_stats.streaming_exec_schedule_s.get() < 1


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 11b5862

Please sign in to comment.