Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Write each block to a separate file #37986

Merged

Conversation

stephanie-wang
Copy link
Contributor

Why are these changes needed?

During file-based write tasks, write each block to a separate file so that we avoid needing to keep all blocks in memory at the same time.

Related issue number

Closes #37948.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Just had a few questions

@@ -159,13 +159,12 @@ def _get_write_path_for_block(
*,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: (I know this isn't directly related to this PR) test_block_write_path_provider is a confusing name for a fixture because it sounds like a unit test. Something like mock_block_write_path_provider and MockBlockWritePathProvider might clearer.

python/ray/data/_internal/stats.py Show resolved Hide resolved
Comment on lines +141 to +144
if block_index is not None:
suffix = f"{dataset_uuid}_{task_index:06}_{block_index:06}.{file_format}"
else:
suffix = f"{dataset_uuid}_{task_index:06}.{file_format}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe dumb question, but why do we encode information like task_index at all? Like, why don't we do something like suffix = "{random_uuid}.{file_format}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to do this to make sure each task writes to different files. That's also why I added the additional block_index in this PR. I'll add a comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a random file name also guarantee that we write to different files?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't want duplicate data on retries. Also, it's harder to debug with random IDs.

@@ -369,6 +379,91 @@ def foo(batch):
assert ds.count() == num_blocks_per_task


def _test_write_large_data(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than writing several distinct unit tests, would it make sense to parametrize this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's pretty much the same thing; is it okay if I keep it like this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO explicitly parametrized tests with @pytest.mark.parametrize are easier to read, but I won't block on it.

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 3, 2023
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang stephanie-wang merged commit d09e7c5 into ray-project:master Aug 4, 2023
57 of 63 checks passed
@stephanie-wang stephanie-wang deleted the write-multiple-files branch August 4, 2023 15:16
bveeramani added a commit that referenced this pull request Aug 10, 2023
In FileBasedDatasource.write, we wrap then immediately unwrap the filesystem object. Since there's no point in wrapping the filesystem object, this PR removes the line. For context, this logic is likely from legacy code that was refactored by #37986.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
shrekris-anyscale pushed a commit to shrekris-anyscale/ray that referenced this pull request Aug 10, 2023
In FileBasedDatasource.write, we wrap then immediately unwrap the filesystem object. Since there's no point in wrapping the filesystem object, this PR removes the line. For context, this logic is likely from legacy code that was refactored by ray-project#37986.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
During file-based write tasks, write each block to a separate file so that we avoid needing to keep all blocks in memory at the same time.
Related issue number

Closes ray-project#37948.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: NripeshN <nn2012@hw.ac.uk>
NripeshN pushed a commit to NripeshN/ray that referenced this pull request Aug 15, 2023
In FileBasedDatasource.write, we wrap then immediately unwrap the filesystem object. Since there's no point in wrapping the filesystem object, this PR removes the line. For context, this logic is likely from legacy code that was refactored by ray-project#37986.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: NripeshN <nn2012@hw.ac.uk>
@c21 c21 mentioned this pull request Aug 16, 2023
8 tasks
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
During file-based write tasks, write each block to a separate file so that we avoid needing to keep all blocks in memory at the same time.
Related issue number

Closes ray-project#37948.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: harborn <gangsheng.wu@intel.com>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
In FileBasedDatasource.write, we wrap then immediately unwrap the filesystem object. Since there's no point in wrapping the filesystem object, this PR removes the line. For context, this logic is likely from legacy code that was refactored by ray-project#37986.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: harborn <gangsheng.wu@intel.com>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
During file-based write tasks, write each block to a separate file so that we avoid needing to keep all blocks in memory at the same time.
Related issue number

Closes ray-project#37948.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
harborn pushed a commit to harborn/ray that referenced this pull request Aug 17, 2023
In FileBasedDatasource.write, we wrap then immediately unwrap the filesystem object. Since there's no point in wrapping the filesystem object, this PR removes the line. For context, this logic is likely from legacy code that was refactored by ray-project#37986.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
During file-based write tasks, write each block to a separate file so that we avoid needing to keep all blocks in memory at the same time.
Related issue number

Closes ray-project#37948.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
In FileBasedDatasource.write, we wrap then immediately unwrap the filesystem object. Since there's no point in wrapping the filesystem object, this PR removes the line. For context, this logic is likely from legacy code that was refactored by ray-project#37986.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
During file-based write tasks, write each block to a separate file so that we avoid needing to keep all blocks in memory at the same time.
Related issue number

Closes ray-project#37948.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Victor <vctr.y.m@example.com>
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
In FileBasedDatasource.write, we wrap then immediately unwrap the filesystem object. Since there's no point in wrapping the filesystem object, this PR removes the line. For context, this logic is likely from legacy code that was refactored by ray-project#37986.

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Victor <vctr.y.m@example.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[data] OOM and churning workers when scaling ray.data range(N).map_batches().write_parquet()
4 participants