Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def __init__(self, max_stats=1000):
self.last_time = {}
self.start_time = {}
self.max_stats = max_stats
self.fifo_queue = []

# Assign dataset uuids with a global counter.
self.next_dataset_id = 0
Expand All @@ -177,6 +176,10 @@ def __init__(self, max_stats=1000):
self._metadata_exporter = get_dataset_metadata_exporter()
self.dataset_metadatas: Dict[str, DatasetMetadata] = {}

# A FIFO queue of dataset_tags for finished datasets. This is used to
# efficiently evict the oldest finished datasets when max_stats is reached.
self.finished_datasets_queue = collections.deque()

# Ray Data dashboard metrics
# Everything is a gauge because we need to reset all of
# a dataset's metrics to 0 after each finishes execution.
Expand Down Expand Up @@ -563,6 +566,14 @@ def update_dataset(self, dataset_tag: str, state: Dict[str, Any]):

self.update_dataset_metadata_operator_states(dataset_tag, operator_states)

# Evict the oldest finished datasets to ensure the `max_stats` limit is enforced.
if state["state"] in {DatasetState.FINISHED.name, DatasetState.FAILED.name}:
self.finished_datasets_queue.append(dataset_tag)
while len(self.datasets) > self.max_stats and self.finished_datasets_queue:
tag_to_evict = self.finished_datasets_queue.popleft()
self.datasets.pop(tag_to_evict, None)
self.dataset_metadatas.pop(tag_to_evict, None)

def get_datasets(self, job_id: Optional[str] = None):
if not job_id:
return self.datasets
Expand Down
75 changes: 75 additions & 0 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
from ray.data._internal.execution.backpressure_policy.backpressure_policy import (
BackpressurePolicy,
)
from ray.data._internal.execution.dataset_state import DatasetState
from ray.data._internal.execution.interfaces.op_runtime_metrics import TaskDurationStats
from ray.data._internal.execution.interfaces.physical_operator import PhysicalOperator
from ray.data._internal.stats import (
DatasetStats,
NodeMetrics,
StatsManager,
_get_or_create_stats_actor,
_StatsActor,
)
from ray.data._internal.util import MemoryProfiler
from ray.data.context import DataContext
Expand Down Expand Up @@ -1911,6 +1913,79 @@ def test_stats_actor_datasets(ray_start_cluster):
assert value["state"] == "FINISHED"


def test_stats_actor_datasets_eviction(ray_start_cluster):
"""
Tests that finished datasets are evicted from the _StatsActor when
the number of datasets exceeds the configured `max_stats` limit.
"""
# Set a low max_stats limit to easily trigger eviction.
max_stats = 2
# Create a dedicated _StatsActor for this test to avoid interfering
# with the global actor.
stats_actor = _StatsActor.remote(max_stats=max_stats)

# Patch the function that retrieves the stats actor to return our
# test-specific actor instance.
with patch(
"ray.data._internal.stats._get_or_create_stats_actor",
return_value=stats_actor,
):

def check_ds_finished(ds_name):
"""Helper to check if a dataset is marked as FINISHED in the actor."""
datasets = ray.get(stats_actor.get_datasets.remote())
ds_tag = next((tag for tag in datasets if tag.startswith(ds_name)), None)
if not ds_tag:
return False
return datasets[ds_tag]["state"] == DatasetState.FINISHED.name

# --- DS1 ---
# Create and materialize the first dataset.
ds1 = ray.data.range(1, override_num_blocks=1)
ds1.set_name("ds1")
ds1.materialize()
# Wait until the actor has been updated with the FINISHED state.
wait_for_condition(lambda: check_ds_finished("ds1"))

# --- DS2 ---
# Create and materialize the second dataset.
# This brings the total number of datasets to the `max_stats` limit.
ds2 = ray.data.range(1, override_num_blocks=1)
ds2.set_name("ds2")
ds2.materialize()
wait_for_condition(lambda: check_ds_finished("ds2"))

# --- Verify state before eviction ---
# At this point, both ds1 and ds2 should be in the actor.
datasets = ray.get(stats_actor.get_datasets.remote())
names_in_actor = {k.split("_")[0] for k in datasets.keys()}
assert names_in_actor == {"ds1", "ds2"}

# --- DS3 ---
# Create and materialize the third dataset. This should trigger the
# eviction of the oldest finished dataset (ds1).
ds3 = ray.data.range(1, override_num_blocks=1)
ds3.set_name("ds3")
ds3.materialize()

def check_eviction():
"""
Helper to check that the actor state reflects the eviction.
The actor should now contain ds2 and ds3, but not ds1.
"""
datasets = ray.get(stats_actor.get_datasets.remote())
# The eviction happens asynchronously, so we might briefly see 3 datasets.
# We wait until the count is back to 2.
if len(datasets) == max_stats + 1:
return False
names = {k.split("_")[0] for k in datasets.keys()}
assert names == {"ds2", "ds3"}
return True

# Wait until the eviction has occurred and the actor state is correct.
wait_for_condition(check_eviction)


@patch.object(StatsManager, "STATS_ACTOR_UPDATE_INTERVAL_SECONDS", new=0.5)
@patch.object(StatsManager, "_stats_actor_handle")
@patch.object(StatsManager, "UPDATE_THREAD_INACTIVITY_LIMIT", new=1)
Expand Down