Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Implement streamed read from Hugging Face Datasets #38432

Merged
merged 50 commits into from
Aug 24, 2023

Conversation

scottjlee
Copy link
Contributor

@scottjlee scottjlee commented Aug 14, 2023

Why are these changes needed?

The current implementation of ray.data.from_huggingface materializes all data in memory. This PR implements a streaming (but not distributed) implementation to support efficient reads for large datasets. The implementation in the PR uses a single read task to stream data from the Hugging Face Dataset into Ray Data.

Related issue number

Closes #37591, Closes #37990

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@scottjlee scottjlee changed the title [Data] Implement streamed/distributed read from Hugging Face Datasets [Data] Implement streamed read from Hugging Face Datasets Aug 14, 2023
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee marked this pull request as ready for review August 15, 2023 20:02
self,
parallelism: int,
) -> List[ReadTask]:
# Note that `parallelism` arg is currently not used for HuggingFaceDatasource.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we wanted to use split_dataset_by_node so that we can have distributed reads for the streaming case?

Copy link
Contributor Author

@scottjlee scottjlee Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that was the initial intent of the PR. We did further investigation into split_dataset_by_node, and turned out that it doesn't actually shard the dataset. It actually reads in the same dataset for each node, and selects a subset to emulate sharding -- so it doesn't provide any efficiency gains. That's why this PR only uses a single read task because we cannot distribute the dataset read.

We plan on opening an issue with HF datasets team, to see if they already have another existing way to accomplish this or request as a new feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code, it seems there is logic to shard the base IterableDataset if it contains multiple files:

https://sourcegraph.com/github.com/huggingface/datasets/-/blob/src/datasets/iterable_dataset.py?L1231-1243
huggingface/datasets#5984

Only if the base dataset contains a single file, does it not get sharded

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example:

>>> hf_ds = datasets.load_dataset("openclimatefix/gfs-surface-pressure-2.0deg", split='train', streaming=True)
Downloading metadata: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 14.5k/14.5k [00:00<00:00, 4.98MB/s]
Using custom data configuration openclimatefix--gfs-surface-pressure-2.0deg-e3bd919c6fc2ba90
>>> print(hf_ds.n_shards)
39

Copy link
Contributor

@amogkam amogkam Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe if n_shards is >= parallelism then we should be able to do a proper distributed read

@@ -2167,14 +2185,14 @@ def from_huggingface(dataset: "datasets.Dataset") -> MaterializedDataset:
hf_ds_arrow = dataset.with_format("arrow")
ray_ds = from_arrow(hf_ds_arrow[:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add distributed reads for this case? create read tasks, and have each read task read a portion of hf_ds_arrow

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee requested a review from a team as a code owner August 23, 2023 00:57
@pcmoritz
Copy link
Contributor

Wow, that sounds painful (both the dependencies as well as the speed is pretty abysmal). Do the hugging face datasets have a well-defined dataformat under the hood (like parquet)? If yes, could we use that format directly to read them (maybe combined with some metadata we get from the huggingface library)?

Just putting out this idea, maybe it is a bad idea :D

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee
Copy link
Contributor Author

Wow, that sounds painful (both the dependencies as well as the speed is pretty abysmal). Do the hugging face datasets have a well-defined dataformat under the hood (like parquet)? If yes, could we use that format directly to read them (maybe combined with some metadata we get from the huggingface library)?

Just putting out this idea, maybe it is a bad idea :D

Yeah, under the hood, they use a memory mapped Arrow table, so ideally we would just be able to do distributed reads from that. When we looked into datasets API for our implementation though, the publicly available sharding methods didn't seem to split the dataset across nodes as intended. Amog dug around and potentially found some private APIs which may be helpful, but we decided to do that in a future PR since we'll need to ask the datasets developers some questions about them.

This PR at the very least allows for streaming reads for datasets which won't fit in memory, like the RedPajamas dataset, so I figured it would be worthwhile to get in.

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@amogkam amogkam self-assigned this Aug 23, 2023
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
# Due to HF Dataset's dynamic module system, we need to dynamically import the
# datasets_modules module on every actor when training.
# We accomplish this by simply running the following bit of code directly
# in module you are currently viewing. This ensures that when we
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# in module you are currently viewing. This ensures that when we
# in the module you are currently viewing. This ensures that when we

# datasets_modules module on every actor when training.
# We accomplish this by simply running the following bit of code directly
# in module you are currently viewing. This ensures that when we
# unpickle the Dataset, it will be ran before pickle tries to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# unpickle the Dataset, it will be ran before pickle tries to
# unpickle the Dataset, it runs before pickle tries to

# in module you are currently viewing. This ensures that when we
# unpickle the Dataset, it will be ran before pickle tries to
# import datasets_modules and prevents an exception from being thrown.
# Same logic is present inside ray's TransformersTrainer and HF Transformers Ray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Same logic is present inside ray's TransformersTrainer and HF Transformers Ray
# Same logic is present inside Ray's TransformersTrainer and HF Transformers Ray

return read_datasource(
HuggingFaceDatasource(),
dataset=dataset,
)
if isinstance(dataset, datasets.Dataset):
# To get the resulting Arrow table from a Hugging Face Dataset after
# applying transformations (e.g. train_test_split(), shard(), select()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# applying transformations (e.g. train_test_split(), shard(), select()),
# applying transformations (e.g., train_test_split(), shard(), select()),

"Dataset. To convert just a single Hugging Face Dataset to a "
"Ray Dataset, specify a split. For example, "
"`ray.data.from_huggingface(my_dataset_dictionary"
"You provided a Hugging Face DatasetDict or IterableDatasetDict "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"You provided a Hugging Face DatasetDict or IterableDatasetDict "
"You provided a Hugging Face DatasetDict or IterableDatasetDict, "

@angelinalg
Copy link
Contributor

Just some copy edit nits.

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee requested a review from amogkam August 24, 2023 01:57
@amogkam
Copy link
Contributor

amogkam commented Aug 24, 2023

CI all looks good here, cc @zhe-thoughts for approval.

Copy link
Collaborator

@zhe-thoughts zhe-thoughts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK to merge

@zhe-thoughts zhe-thoughts merged commit 8ace253 into ray-project:master Aug 24, 2023
49 of 53 checks passed
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
…t#38432)

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
…t#38432)

Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Victor <vctr.y.m@example.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants