From 2b92f571a0cdadc676d476fbd2fdfec71e293e50 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Date: Tue, 20 Feb 2024 10:05:39 -0800 Subject: [PATCH] Revert "[Data] Time streaming exec scheduling (#43112)" (#43283) This reverts commit 9641c72d80dc3fd24db62ba5ca9dcfa1bd4e7e3e. --- .../ray/data/_internal/execution/streaming_executor.py | 9 ++------- python/ray/data/_internal/stats.py | 3 --- python/ray/data/tests/test_streaming_executor.py | 9 --------- 3 files changed, 2 insertions(+), 19 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 2387915a9454c..3a4949f353df1 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -208,12 +208,8 @@ def run(self): """ try: # Run scheduling loop until complete. - while True: - # use process_time to avoid timing ray.wait in _scheduling_loop_step - with self._initial_stats.streaming_exec_schedule_s.timer(): - continue_sched = self._scheduling_loop_step(self._topology) - if not continue_sched or self._shutdown: - break + while self._scheduling_loop_step(self._topology) and not self._shutdown: + pass except Exception as e: # Propagate it to the result iterator. self._output_node.mark_finished(e) @@ -240,7 +236,6 @@ def _generate_stats(self) -> DatasetStats: builder = stats.child_builder(op.name, override_start_time=self._start_time) stats = builder.build_multioperator(op.get_stats()) stats.extra_metrics = op.metrics.as_dict() - stats.streaming_exec_schedule_s = self._initial_stats.streaming_exec_schedule_s return stats def _scheduling_loop_step(self, topology: Topology) -> bool: diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index c3025ec4736e7..754e738eae0bc 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -600,9 +600,6 @@ def __init__( self.needs_stats_actor = needs_stats_actor self.stats_uuid = stats_uuid - # Streaming executor stats - self.streaming_exec_schedule_s: Timer = Timer() - # Iteration stats, filled out if the user iterates over the dataset. self.iter_wait_s: Timer = Timer() self.iter_get_s: Timer = Timer() diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 66bdf96b03870..8111568044deb 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -798,15 +798,6 @@ def map(_): ), out_str -def test_time_scheduling(): - ds = ray.data.range(1000).map_batches(lambda x: x) - for _ in ds.iter_batches(): - continue - - ds_stats = ds._plan.stats() - assert 0 < ds_stats.streaming_exec_schedule_s.get() < 1 - - if __name__ == "__main__": import sys