Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Disable block slicing for shuffle ops #40538

Merged
merged 5 commits into from
Oct 23, 2023

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Oct 20, 2023

Why are these changes needed?

#40248 changed output block creation so that when a task produces its output blocks, it will try to slice them before yielding to respect the target block size. Unfortunately, all-to-all ops currently don't support dynamic block splitting. This means that if we try to fuse an upstream map iterator with an all-to-all op, the all-to-all task will still have to fuse all of the sliced blocks back together again. This seems to increase memory usage significantly.

This PR avoids this issue by overriding the upstream map iterator's target block size to infinity when it is fused with an all-to-all op. This also adds a logger warning for how to workaround.

Related issue number

Closes #40518.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang
Copy link
Contributor Author

We don't have a good way to unit test this right now, but I think with better perf introspection, we can check the peak memory usage.

@stephanie-wang
Copy link
Contributor Author

Running the failing test here.

@@ -33,22 +42,37 @@ def map(
idx: int,
block: Block,
output_num_blocks: int,
target_max_block_size: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
target_max_block_size: int,
target_shuffle_max_block_size: int,

"This can lead to out-of-memory errors and can happen "
"when map tasks are fused to the shuffle operation. "
"To prevent fusion, call Dataset.materialize() on the "
"dataset before shuffling."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting different resources can also prevent fusion, without having to materialize all data.
Another workaround is to increase the parallelism to make each map op finer-grained.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think materializing is fine here because you have to materialize all data for an all-to-all op anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense.

block = BlockAccessor.for_block(block)
if block.size_bytes() > target_max_block_size:
logger.get_logger().warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried about if this warning would confuse users. Can we log it as debug?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think print a warning makes sense. But we may want to make it less verbose by, for example, checking size_bytes > 2 * target_max_block_size.
Also, suggesting increasing parallelism seems to make more sense than disabling fusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change to 2 * target_max_block_size.

But materialize() is the best solution here (see other comment). Hard to recommend an exact fix via increasing parallelism.

Comment on lines +39 to +46
# NOTE(swang): We override the target block size with infinity, to
# prevent the upstream map from slicing its output into smaller
# blocks. Since the shuffle task will just fuse these back
# together, the extra slicing and re-fusing can add high memory
# overhead. This can be removed once dynamic block splitting is
# supported for all-to-all ops.
# See https://github.com/ray-project/ray/issues/40518.
map_transformer.set_target_max_block_size(float("inf"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better to put this into a separate method, to avoid repeated comments.

x
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang stephanie-wang added release-blocker P0 Issue that blocks the release v2.8.0-pick labels Oct 20, 2023
@stephanie-wang
Copy link
Contributor Author

Re-ran the failing release test here.

Although the run time still looks a bit higher than before the offending PR, the number of workers killed due to out-of-memory is back to normal, so appears that the test is fixed.

@stephanie-wang stephanie-wang merged commit 1c405fc into ray-project:master Oct 23, 2023
30 of 35 checks passed
stephanie-wang added a commit to stephanie-wang/ray that referenced this pull request Oct 23, 2023
ray-project#40248 changed output block creation so that when a task produces its output blocks, it will try to slice them before yielding to respect the target block size. Unfortunately, all-to-all ops currently don't support dynamic block splitting. This means that if we try to fuse an upstream map iterator with an all-to-all op, the all-to-all task will still have to fuse all of the sliced blocks back together again. This seems to increase memory usage significantly.

This PR avoids this issue by overriding the upstream map iterator's target block size to infinity when it is fused with an all-to-all op. This also adds a logger warning for how to workaround.
Related issue number

Closes ray-project#40518.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
vitsai pushed a commit that referenced this pull request Oct 24, 2023
#40248 changed output block creation so that when a task produces its output blocks, it will try to slice them before yielding to respect the target block size. Unfortunately, all-to-all ops currently don't support dynamic block splitting. This means that if we try to fuse an upstream map iterator with an all-to-all op, the all-to-all task will still have to fuse all of the sliced blocks back together again. This seems to increase memory usage significantly.

This PR avoids this issue by overriding the upstream map iterator's target block size to infinity when it is fused with an all-to-all op. This also adds a logger warning for how to workaround.
Related issue number

Closes #40518.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-blocker P0 Issue that blocks the release
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Release test dataset_shuffle_random_shuffle_1tb.aws failed
3 participants