Skip to content

[train] Implement DatasetManager#63309

Merged
justinvyu merged 7 commits into
ray-project:masterfrom
TimothySeah:tseah/oss-dataset-manager
May 20, 2026
Merged

[train] Implement DatasetManager#63309
justinvyu merged 7 commits into
ray-project:masterfrom
TimothySeah:tseah/oss-dataset-manager

Conversation

@TimothySeah
Copy link
Copy Markdown
Contributor

@TimothySeah TimothySeah commented May 12, 2026

Summary

Here is a timeline that explains the history of the DatasetManager

  1. Before [Data] Remove _base_dataset from StreamSplitDataIterator #61607, StreamSplitDataIterator held a reference to the Dataset, which could be large as explained in this PR description.
  2. [train] Make ray.train.get_dataset_shard lazily configure the dataset sharding #55230 changed how we send StreamSplitDataIterators to workers. Basically, we went from presharding the dataset before creating the worker group to configuring the dataset on the fly when calling ray.train.get_dataset_shard. Unfortunately, this led to a performance regression, so we had to revert it - see the revert PR ([train] Revert "Make ray.train.get_dataset_shard lazily configure the dataset sharding (#55230)" #55760) for a deeper explanation of what happened, but essentially sending over StreamSplitDataIterators was slow because they contained Dataset references which could get big. It's not clear why sending big StreamSplitDataIterators was slower in the get_dataset_shard case than the presharding case.
  3. [Data] Remove _base_dataset from StreamSplitDataIterator #61607 removed Dataset references from StreamSplitDataIterators
  4. This PR brings DatasetManager back now that StreamSplitDataIterators are small.

Testing

The revert PR's description (#55760) included a nice repro script which I modified slightly:

import time

import ray
import ray.data
from ray.data import DataContext
from ray.data.datasource.partitioning import Partitioning
from ray.train.v2._internal.data_integration.dataset_manager import DatasetManager
from ray.train.v2._internal.data_integration.interfaces import DatasetShardMetadata


train_dir = "s3://anyscale-imagenet/ILSVRC/Data/CLS-LOC/train"
train_partitioning = Partitioning(
    "dir", base_dir=train_dir, field_names=["class"]
)
train_ds = ray.data.read_images(
    train_dir,
    mode="RGB",
    include_paths=False,
    partitioning=train_partitioning,
)

num_workers = 16

datasets = {"train": train_ds, "val": train_ds}
dataset_manager = ray.remote(DatasetManager).remote(
    datasets=datasets,
    data_config=ray.train.DataConfig(),
    data_context=DataContext.get_current(),
    world_size=num_workers,
    worker_node_ids=None,
)


def get_size_bytes(obj):
    import ray.cloudpickle as ray_pickle
    return len(ray_pickle.dumps(obj))


@ray.remote(num_cpus=1)
def consumer(dm, rank):
    dataset_info = DatasetShardMetadata("train", rank)

    start = time.perf_counter()
    ds_shard = ray.get(dm.get_dataset_shard.remote(dataset_info))
    end = time.perf_counter()

    size_mb = get_size_bytes(ds_shard) / (1024 * 1024)
    print(f"[{rank=}] TIME TO GET THE DATASET SHARD (SIZE={size_mb}MB):", end - start)


start = time.perf_counter()
ray.get([consumer.remote(dataset_manager, rank) for rank in range(num_workers)])
end = time.perf_counter()
print("elapsed:", end - start)

Now we are sending ~1kb DataIterators over in ~5s (the 5s is due to waiting for streaming_split, not the transfer time), whereas the revert PR (#55760 (comment)) showed that we were sending ~780mb DataIterators over in up to 174s! In other words, this is no longer an issue, so it's safe to merge DatasetManager for real this time.

(ray4) tseah@tseah-LV3607J62K ray % RAY_DEDUP_LOGS=0 python driver.py
2026-05-12 16:34:59,444	INFO worker.py:2018 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
(DatasetManager pid=69108) Using blocking ray.get inside async actor. This blocks the event loop. Please use `await` on object ref with asyncio.gather if you want to yield execution to the event loop instead.
elapsed: 6.097738708020188
(consumer pid=69103) [rank=3] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.295730707992334
(consumer pid=69106) [rank=0] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.3177806670137215
(consumer pid=69110) [rank=4] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.318447458994342
(consumer pid=69110) [rank=14] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 0.015125167003134266
(consumer pid=69104) [rank=1] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.295733000006294
(consumer pid=69111) [rank=2] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.337776000000304
(consumer pid=69102) [rank=5] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.340320540999528
(consumer pid=69101) [rank=7] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.306097583001247
(consumer pid=69105) [rank=6] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.326983959006611
(consumer pid=69107) [rank=8] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.3252247920027
(consumer pid=69109) [rank=9] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 5.325624417018844
(consumer pid=69109) [rank=15] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 0.014673249999759719
(consumer pid=69773) [rank=10] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 4.506475125002908
(consumer pid=69774) [rank=12] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 4.497576666995883
(consumer pid=69771) [rank=11] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 4.499152167001739
(consumer pid=69772) [rank=13] TIME TO GET THE DATASET SHARD (SIZE=0.0016326904296875MB): 1.9341769159946125

Testing 2

If the dataset(s) are not sharded the DataIteratorImpl will contain the dataset object. I ran the training_ingest_benchmark-task=image_classification.skip_training.jpeg release tests on this PR (https://buildkite.com/ray-project/release/builds/93074) and a master branch PR (#63388, https://buildkite.com/ray-project/release/builds/93075) with dataset sharding temporarily disabled. Note that these jobs are only failing at the "get stats" step after training is finished; we can just focus on the training time. The results show that this PR actually achieves better e2e runtime. Honestly, I'm not really sure why this is the case - maybe pulling from the datasetmanager actor is faster than pushing from the traincontroller actor since the latter is responsible for many other activities as well?

  With DatasetManager Without DatasetManager
  2m13s 5m27s
.preserve_order 4m48s 36m37s
.local_fs 32s 33s
local_fs.preserve_order 36s 35s
local_fs_multi_gpus 48s 53s
local_fs_multi_gpus.preserve_order 1m21s 2m28s

Note

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a DatasetManager actor to centralize and synchronize the management of Ray Dataset shards across training workers. It refactors RayDatasetShardProvider and DatasetsCallback to delegate dataset configuration and executor shutdown to this new manager. Feedback from the review highlights several improvement opportunities: ensuring safe access to coordinator actors to prevent AttributeError, removing redundant dataset resolution logic, converting blocking ray.get calls to asynchronous operations within the manager to avoid event loop stalls, and replacing assertions with conditional checks for safer lifecycle management during shutdown.

Comment thread python/ray/train/v2/_internal/data_integration/dataset_manager.py Outdated
Comment thread python/ray/train/v2/_internal/data_integration/dataset_manager.py Outdated
Comment thread python/ray/train/v2/_internal/data_integration/dataset_manager.py
Comment thread python/ray/train/v2/_internal/callbacks/datasets.py Outdated
TimothySeah and others added 3 commits May 12, 2026 17:52
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
Copy link
Copy Markdown
Contributor

@JasonLi1909 JasonLi1909 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! One edge to test for: if the dataset(s) are not sharded the DataIteratorImpl will contain the dataset object

@TimothySeah
Copy link
Copy Markdown
Contributor Author

TODO: try training_ingest_benchmark-task=image_classification.skip_training.jpeg without splitting datasets with and without this PR. Reason: DataIterator (non split case) still has a dataset so we need to verify that this doesn't slow that case down.

Copy link
Copy Markdown
Contributor

@justinvyu justinvyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment thread python/ray/train/v2/_internal/data_integration/dataset_manager.py Outdated
TimothySeah and others added 2 commits May 15, 2026 17:03
Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
Signed-off-by: Timothy Seah <tseah@anyscale.com>
…chmark

Temporarily pass datasets_to_split=[] in RayDataLoaderFactory so the
training_ingest_benchmark-task=image_classification.skip_training.jpeg
integration test runs without splitting the dataset across workers.
Intended to be reverted after measuring the behavioral difference.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@TimothySeah TimothySeah marked this pull request as ready for review May 19, 2026 22:52
@TimothySeah TimothySeah requested a review from a team as a code owner May 19, 2026 22:52
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit a1ae586. Configure here.

Comment thread python/ray/train/v2/_internal/data_integration/dataset_manager.py
@TimothySeah TimothySeah changed the title [train] Open source DatasetManager [train] Implement DatasetManager May 20, 2026
@ray-gardener ray-gardener Bot added train Ray Train Related Issue data Ray Data-related issues labels May 20, 2026
@justinvyu justinvyu enabled auto-merge (squash) May 20, 2026 22:00
@github-actions github-actions Bot added the go add ONLY when ready to merge, run all tests label May 20, 2026
@justinvyu justinvyu merged commit 844c38d into ray-project:master May 20, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests train Ray Train Related Issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants