Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Add DataIterator.materialize #43210

Merged
merged 6 commits into from Feb 16, 2024

Conversation

justinvyu
Copy link
Contributor

@justinvyu justinvyu commented Feb 15, 2024

Why are these changes needed?

This PR introduces a DataIterator.materialize API that fully executes/consumes a data iterator and returns it as a MaterializedDataset for the user to continue processing it.

The reason to add this API is to support model training in Ray Train that requires the full dataset up front. For example, xgboost needs to consider the full dataset to fit decision trees and expects that full dataset to be .

The get_dataset_shard API which bridges Ray Data and Ray Train calls streaming_split on the dataset, where the number of splits is the number of training workers. This works well for SGD training schemes (typical for Torch, Tensorflow users), since the typical training procedure is to estimate the gradient on a small batch of data at a time. Fitting decision trees requires searching for the best split over the entire dataset, where the batch by batch dataloading is not suitable.

With this change, the following workflow is now possible:

def train_fn_per_worker(config):
    # 1. Get the dataset shard for the worker and convert to a `xgboost.DMatrix`
    train_ds_iter, eval_ds_iter = (
        ray.train.get_dataset_shard("train"),
        ray.train.get_dataset_shard("validation"),
    )

    train_ds, eval_ds = train_ds_iter.materialize(), eval_ds_iter.materialize()  # <-- new API usage

    train_df, eval_df = train_ds.to_pandas(), eval_ds.to_pandas()
    train_X, train_y = train_df.drop("y", axis=1), train_df["y"]
    eval_X, eval_y = eval_df.drop("y", axis=1), eval_df["y"]
    dtrain = xgboost.DMatrix(train_X, label=train_y)
    deval = xgboost.DMatrix(eval_X, label=eval_y)

    # 2. Do distributed data-parallel training.
    with CommunicatorContext():
        bst = xgboost.train(..., dtrain=dtrain)

# Launch distributed training job with Ray Train
train_ds = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
eval_ds = ray.data.from_items([{"x": x, "y": x + 1} for x in range(16)])
ray.train.xgboost.XGBoostTrainer(train_fn_per_worker, datasets={"train": train_ds, "validation": eval_ds})

XGBoost training with a data iterator

Note that there actually is support for xgboost training with data iterators, but it is experimental and possibly less performant: https://xgboost.readthedocs.io/en/latest/tutorials/external_memory.html#data-iterator

Related PR

This PR is a pre-requisite for #42767

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
feature_column_dtypes[key]
if isinstance(feature_column_dtypes, dict)
else feature_column_dtypes,
(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just a lint change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My auto-linting caught this for some reason.

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @justinvyu!

Can we also add an end-to-end test for XGBoostTrainer? It can wait for followup PR if you plan to do it later.


block_iter, stats, owned_by_consumer = self._to_block_iterator()

block_refs_and_metadata = list(block_iter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment that this would trigger the execution and materialize all blocks of iterator? Want to make it obvious for people to read in the future.

@c21
Copy link
Contributor

c21 commented Feb 16, 2024

Oh forget one more thing, please add the new API to documentation - https://github.com/ray-project/ray/blob/master/doc/source/data/api/data_iterator.rst . Thanks.

@justinvyu
Copy link
Contributor Author

@c21 Yes, I plan on adding the e2e test in the follow-up XGBoostTrainer PR that uses this API.

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
@c21 c21 merged commit e221c6e into ray-project:master Feb 16, 2024
9 checks passed
@justinvyu justinvyu deleted the data_iter_to_dataset branch February 16, 2024 21:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants