Skip to content

Commit

Permalink
Revert "[Data] Time streaming exec scheduling (ray-project#43112)" (r…
Browse files Browse the repository at this point in the history
…ay-project#43283)

This reverts commit 9641c72.
  • Loading branch information
can-anyscale committed Feb 20, 2024
1 parent 04c7b49 commit 2b92f57
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 19 deletions.
9 changes: 2 additions & 7 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,8 @@ def run(self):
"""
try:
# Run scheduling loop until complete.
while True:
# use process_time to avoid timing ray.wait in _scheduling_loop_step
with self._initial_stats.streaming_exec_schedule_s.timer():
continue_sched = self._scheduling_loop_step(self._topology)
if not continue_sched or self._shutdown:
break
while self._scheduling_loop_step(self._topology) and not self._shutdown:
pass
except Exception as e:
# Propagate it to the result iterator.
self._output_node.mark_finished(e)
Expand All @@ -240,7 +236,6 @@ def _generate_stats(self) -> DatasetStats:
builder = stats.child_builder(op.name, override_start_time=self._start_time)
stats = builder.build_multioperator(op.get_stats())
stats.extra_metrics = op.metrics.as_dict()
stats.streaming_exec_schedule_s = self._initial_stats.streaming_exec_schedule_s
return stats

def _scheduling_loop_step(self, topology: Topology) -> bool:
Expand Down
3 changes: 0 additions & 3 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,6 @@ def __init__(
self.needs_stats_actor = needs_stats_actor
self.stats_uuid = stats_uuid

# Streaming executor stats
self.streaming_exec_schedule_s: Timer = Timer()

# Iteration stats, filled out if the user iterates over the dataset.
self.iter_wait_s: Timer = Timer()
self.iter_get_s: Timer = Timer()
Expand Down
9 changes: 0 additions & 9 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,15 +798,6 @@ def map(_):
), out_str


def test_time_scheduling():
ds = ray.data.range(1000).map_batches(lambda x: x)
for _ in ds.iter_batches():
continue

ds_stats = ds._plan.stats()
assert 0 < ds_stats.streaming_exec_schedule_s.get() < 1


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 2b92f57

Please sign in to comment.