From c71b576a77a8c0d139f35e46c81fd9f3ee97afb6 Mon Sep 17 00:00:00 2001 From: qiwenju Date: Tue, 26 Aug 2025 10:16:00 +0800 Subject: [PATCH 1/2] fix(data): Prevent unbounded growth of_StatsActor.datasets Signed-off-by: qiwenju --- python/ray/data/_internal/stats.py | 18 ++++++- python/ray/data/tests/test_stats.py | 75 +++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index d00b45c89b8a..0852e0ddf790 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -163,7 +163,6 @@ def __init__(self, max_stats=1000): self.last_time = {} self.start_time = {} self.max_stats = max_stats - self.fifo_queue = [] # Assign dataset uuids with a global counter. self.next_dataset_id = 0 @@ -177,6 +176,10 @@ def __init__(self, max_stats=1000): self._metadata_exporter = get_dataset_metadata_exporter() self.dataset_metadatas: Dict[str, DatasetMetadata] = {} + # A FIFO queue of dataset_tags for finished datasets. This is used to + # efficiently evict the oldest finished datasets when max_stats is reached. + self.finished_datasets_queue = collections.deque() + # Ray Data dashboard metrics # Everything is a gauge because we need to reset all of # a dataset's metrics to 0 after each finishes execution. @@ -563,6 +566,19 @@ def update_dataset(self, dataset_tag: str, state: Dict[str, Any]): self.update_dataset_metadata_operator_states(dataset_tag, operator_states) + # Evict the oldest finished datasets to ensure the `max_stats` limit is enforced. + if ( + state["state"] == DatasetState.FINISHED.name + or state["state"] == DatasetState.FAILED.name + ): + self.finished_datasets_queue.append(dataset_tag) + while len(self.datasets) > self.max_stats and self.finished_datasets_queue: + tag_to_evict = self.finished_datasets_queue.popleft() + if tag_to_evict in self.datasets: + del self.datasets[tag_to_evict] + if tag_to_evict in self.dataset_metadatas: + del self.dataset_metadatas[tag_to_evict] + def get_datasets(self, job_id: Optional[str] = None): if not job_id: return self.datasets diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 0a3a32e9d63e..b593ea1a2e98 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -22,6 +22,7 @@ from ray.data._internal.execution.backpressure_policy.backpressure_policy import ( BackpressurePolicy, ) +from ray.data._internal.execution.dataset_state import DatasetState from ray.data._internal.execution.interfaces.op_runtime_metrics import TaskDurationStats from ray.data._internal.execution.interfaces.physical_operator import PhysicalOperator from ray.data._internal.stats import ( @@ -29,6 +30,7 @@ NodeMetrics, StatsManager, _get_or_create_stats_actor, + _StatsActor, ) from ray.data._internal.util import MemoryProfiler from ray.data.context import DataContext @@ -1911,6 +1913,79 @@ def test_stats_actor_datasets(ray_start_cluster): assert value["state"] == "FINISHED" +def test_stats_actor_datasets_eviction(ray_start_cluster): + """ + Tests that finished datasets are evicted from the _StatsActor when + the number of datasets exceeds the configured `max_stats` limit. + """ + # Set a low max_stats limit to easily trigger eviction. + max_stats = 2 + # Create a dedicated _StatsActor for this test to avoid interfering + # with the global actor. + stats_actor = _StatsActor.remote(max_stats=max_stats) + + # Patch the function that retrieves the stats actor to return our + # test-specific actor instance. + with patch( + "ray.data._internal.stats._get_or_create_stats_actor", + return_value=stats_actor, + ): + + def check_ds_finished(ds_name): + """Helper to check if a dataset is marked as FINISHED in the actor.""" + datasets = ray.get(stats_actor.get_datasets.remote()) + ds_tag = next((tag for tag in datasets if tag.startswith(ds_name)), None) + if not ds_tag: + return False + return datasets[ds_tag]["state"] == DatasetState.FINISHED.name + + # --- DS1 --- + # Create and materialize the first dataset. + ds1 = ray.data.range(1, override_num_blocks=1) + ds1.set_name("ds1") + ds1.materialize() + # Wait until the actor has been updated with the FINISHED state. + wait_for_condition(lambda: check_ds_finished("ds1")) + + # --- DS2 --- + # Create and materialize the second dataset. + # This brings the total number of datasets to the `max_stats` limit. + ds2 = ray.data.range(1, override_num_blocks=1) + ds2.set_name("ds2") + ds2.materialize() + wait_for_condition(lambda: check_ds_finished("ds2")) + + # --- Verify state before eviction --- + # At this point, both ds1 and ds2 should be in the actor. + datasets = ray.get(stats_actor.get_datasets.remote()) + names_in_actor = {k.split("_")[0] for k in datasets.keys()} + assert names_in_actor == {"ds1", "ds2"} + + # --- DS3 --- + # Create and materialize the third dataset. This should trigger the + # eviction of the oldest finished dataset (ds1). + ds3 = ray.data.range(1, override_num_blocks=1) + ds3.set_name("ds3") + ds3.materialize() + + def check_eviction(): + """ + Helper to check that the actor state reflects the eviction. + The actor should now contain ds2 and ds3, but not ds1. + """ + datasets = ray.get(stats_actor.get_datasets.remote()) + # The eviction happens asynchronously, so we might briefly see 3 datasets. + # We wait until the count is back to 2. + if len(datasets) == max_stats + 1: + return False + names = {k.split("_")[0] for k in datasets.keys()} + assert names == {"ds2", "ds3"} + return True + + # Wait until the eviction has occurred and the actor state is correct. + wait_for_condition(check_eviction) + + @patch.object(StatsManager, "STATS_ACTOR_UPDATE_INTERVAL_SECONDS", new=0.5) @patch.object(StatsManager, "_stats_actor_handle") @patch.object(StatsManager, "UPDATE_THREAD_INACTIVITY_LIMIT", new=1) From f89a854c7e95a9f2d158d4c9ae364f8d97815195 Mon Sep 17 00:00:00 2001 From: qiwenju Date: Tue, 26 Aug 2025 16:13:19 +0800 Subject: [PATCH 2/2] chore(data): Apply automated code suggestions Signed-off-by: qiwenju --- python/ray/data/_internal/stats.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 0852e0ddf790..5a75c9de71ca 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -567,17 +567,12 @@ def update_dataset(self, dataset_tag: str, state: Dict[str, Any]): self.update_dataset_metadata_operator_states(dataset_tag, operator_states) # Evict the oldest finished datasets to ensure the `max_stats` limit is enforced. - if ( - state["state"] == DatasetState.FINISHED.name - or state["state"] == DatasetState.FAILED.name - ): + if state["state"] in {DatasetState.FINISHED.name, DatasetState.FAILED.name}: self.finished_datasets_queue.append(dataset_tag) while len(self.datasets) > self.max_stats and self.finished_datasets_queue: tag_to_evict = self.finished_datasets_queue.popleft() - if tag_to_evict in self.datasets: - del self.datasets[tag_to_evict] - if tag_to_evict in self.dataset_metadatas: - del self.dataset_metadatas[tag_to_evict] + self.datasets.pop(tag_to_evict, None) + self.dataset_metadatas.pop(tag_to_evict, None) def get_datasets(self, job_id: Optional[str] = None): if not job_id: