Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Distributed reads for from_huggingface #42599

Merged
merged 14 commits into from Jan 29, 2024

Conversation

omatthew98
Copy link
Contributor

@omatthew98 omatthew98 commented Jan 23, 2024

Why are these changes needed?

Currently reads performed in from_huggingface must be performed on a single node. This allows parallelism to be used if the Hugging Face dataset matches a public dataset on the Hugging Face Hub. These reads will be performed using the dataset server list parquet files API method provided by Hugging Face.

Testing Data

To test the performance improvements conducted some tests before and after the code change. Each test creates a hugging face dataset, creates a ray dataset from the hugging face dataset, then reads counts the number of rows using iter batches to consume the dataset. The dataset being used is the ca subset of the mc4 dataset (14.5M rows). The code executed was roughly as follows (missing timing of the three stages, and parallelism for the post change testing).

def time_from_huggingface(num_trials):
    for _ in range(num_trials):
        # load_dataset
        hfds = datasets.load_dataset('mc4', 'ca', split='train', download_mode="force_redownload", streaming=True)
        
        # from_huggingface
        ds = ray.data.from_huggingface(hfds)

        # iter_batches
        num_rows = 0
        for batch in ds.iter_batches():
            num_rows += len(batch["text"])

Pre Testing

Before the change there was only auto-parallelism selection, so 3 trials were conducted with this setting.

parallelism mean load_dataset mean from_huggingface mean iter_batches
auto 2.032291132 4.749824647 2445.458511

Post Testing

After the change, parallelism can directly be set through the from_huggingface method, so various parallelisms were tested (3 trials each) as well as auto parallelism. The increase of parallelism results in a speedup of iter_batches although with only 90 parquet files hosted, this speedup is quickly saturated.

parallelism mean load_dataset mean from_huggingface mean iter_batches
1 1.331597375 12.37903487 516.9390124
4 1.324397328 11.36870353 248.9746698
16 1.271981214 10.85690496 112.6530489
64 1.340174665 11.72287398 112.8370639
auto 1.319128252 19.50827085 88.92360566

Read Parquet Testing

In addition to testing from_huggingface, read_parquet was directly tested with the list of parquet files hosted by hugging face. The read_parquet data matches the from_huggingface data quite closely which shows there is not much additional overhead in the from_huggingface method.

parallelism mean read_parquet mean iter_batches
1 22.28880657 601.3974171
4 10.57513243 266.8270524
16 11.03263622 116.4108693
64 11.32171363 113.5311583
auto 10.94352284 89.88996218

Related issue number

This issue addresses #37591, although this does add parallelism for all calls to from_huggingface, namely reads from private datasets, local datasets, or datasets that have been transformed in anyway from the publicly hosted files will not be parallelizable with this change.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Performance tests
    • Release tests
    • This PR is not tested :(

@omatthew98 omatthew98 force-pushed the dist-hugging-face-read branch 2 times, most recently from 01cadf8 to 53c53c6 Compare January 23, 2024 22:19
@@ -2372,7 +2372,7 @@ def from_spark(

@PublicAPI
def from_huggingface(
dataset: Union["datasets.Dataset", "datasets.IterableDataset"],
dataset: Union["datasets.Dataset", "datasets.IterableDataset"], parallelism=-1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add parallelism in the docstring? let's make sure we state that the user should not need to set parallelism in most cases since it is auto-configured--similar to the docstring for parallelism under read_parquet() and other read methods.

in addition, we should also clarify that this is only used when the distributed parquet read is possible (i.e. when it is a public dataset with no transformations), otherwise it's a single node read.

Comment on lines 2433 to 2438
# If file urls are returned, the parquet files are available via API
import fsspec.implementations.http

http = fsspec.implementations.http.HTTPFileSystem()
return read_parquet(file_urls, parallelism=parallelism, filesystem=http)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create a new GH issue for adding support for reading from http via FileBasedDatasource, and leave a TODO comment linking to the issue here?

@@ -8,7 +8,9 @@


def test_from_huggingface(ray_start_regular_shared):
data = datasets.load_dataset("tweet_eval", "emotion")
data = datasets.load_dataset(
"tweet_eval", "emotion", download_mode="force_redownload"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the test work without download_mode="force_redownload" now? if so can we drop it?

"validation": ray.data.from_huggingface(data["validation"]),
"test": ray.data.from_huggingface(data["test"]),
}
for num_par in [1, 4]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we parametrize these with pytest parameters? https://docs.pytest.org/en/7.1.x/example/parametrize.html

assert ray_dataset._plan._logical_plan.dag.name == "FromArrow"
assert ray.get(ray_dataset.to_arrow_refs())[0].equals(data["train"].data.table)
_check_usage_record(["FromArrow"])
# assert "FromArrow" in ds.stats()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# assert "FromArrow" in ds.stats()
assert "ReadParquet" in ds.stats()

assert ray.get(ray_dataset.to_arrow_refs())[0].equals(data["train"].data.table)
_check_usage_record(["FromArrow"])
# assert "FromArrow" in ds.stats()
assert ds._plan._logical_plan.dag.name == "ReadParquet"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment briefly explaining why we check for ReadParquet in this test for Hugging Face?

or even better would be to have two test paths, one which uses the new distributed parquet path, and one which uses the backup direct read from HF dataset. I know that might be a bit tricky though, since you'll likely need to find a private dataset (that we can access) or a public dataset that doesn't have parquet files converted for some reason. So no worries if you cannot find a way to easily test both paths

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up adding a second test case which tests the same on a modified dataset (just did a train test split and had similar tests as the unmodified), so we should be testing both paths now!

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@omatthew98 omatthew98 marked this pull request as ready for review January 25, 2024 18:30
@c21 c21 self-assigned this Jan 25, 2024
Signed-off-by: Matthew Owen <mowen@anyscale.com>
python/ray/data/read_api.py Outdated Show resolved Hide resolved
python/ray/data/read_api.py Outdated Show resolved Hide resolved
Comment on lines 1342 to 1346
expected_table = data[ds_key].data.table.sort_by("text")
output_full_table = pyarrow.concat_tables(
[ray.get(tbl) for tbl in ds.to_arrow_refs()]
).sort_by("text")
expected_table.equals(output_full_table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's consolidate the logic for comparing tables with column sort + concat + equals, since it is used several times across the code. we can put it in test_huggingface.py or ray/data/tests/util.py

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also an error with pyarrow 6 not supporting the sort_by() method:

AttributeError: 'pyarrow.lib.Table' object has no attribute 'sort_by'

so let's find an alternate implementation that also works with pyarrow 6

omatthew98 and others added 4 commits January 25, 2024 15:11
Co-authored-by: Scott Lee <scottjlee@users.noreply.github.com>
Signed-off-by: Matthew Owen <omatthew98@berkeley.edu>
Co-authored-by: Scott Lee <scottjlee@users.noreply.github.com>
Signed-off-by: Matthew Owen <omatthew98@berkeley.edu>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG

@c21 c21 changed the title Distributed reads for from_huggingface [Data] Distributed reads for from_huggingface Jan 29, 2024
@c21 c21 merged commit da21b6e into ray-project:master Jan 29, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants