Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] implement streaming output backpressure #40387

Merged
merged 42 commits into from
Oct 23, 2023

Conversation

raulchen
Copy link
Contributor

@raulchen raulchen commented Oct 17, 2023

Why are these changes needed?

This PR integrates Ray Core's streaming generator backpressure to Data. This enables throttling the streaming outputs of tasks. That is, when the downstream ops are too slow to consume data, the upstream ops should stop yielding streaming outputs.

Also fixes a Ray Core bug, see task_manager.cc.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen
Copy link
Contributor Author

This PR is ready for a preliminary review. It depends on #40285. I've tested it locally by patching that PR. Unit tests will be added later when that PR is merged.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe I am missing something about how the streaming generator backpressure works, or maybe the code just needs more documentation (I left some comments on places that could be explained more).

Does the streaming generator's backpressure limit apply to ObjectRefs that haven't been yielded to the caller yet, or is it based on how many output ObjectRefs are still in scope? What exactly is the guarantee of the max output that may get queued for a single op (is it op_output_buffer_size_bytes, streaming_gen_backpressure_size, or the sum)? It'd be helpful if these things could be explained in the code.

read_bytes = 0
# If max_bytes_to_read is None, we will read all available blocks.
# Otherwise, we will read until we reach max_bytes_to_read.
while max_bytes_to_read is None or max_bytes_to_read > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this guarantee that we will read at least one block if the block's size is greater than max_bytes_to_read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question. It doesn't guarantee now. I think I can make it read at least one block when the op buffer is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I misunderstood. Actually it already guarantees. I'll add a note here.

active_tasks: Dict[Waitable, Tuple[OpState, OpTask]] = {}
# Current output buffer sizes for each operator.
# Used for streaming output backpressure.
output_bufer_sizes: Dict[OpState, int] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
output_bufer_sizes: Dict[OpState, int] = {}
output_buffer_sizes_bytes: Dict[OpState, int] = {}

# `_streaming_generator_backpressure_size_bytes` parameter).
streaming_gen_backpressure_size: int = 1 * 1024 * 1024 * 1024
# Max size of the output buffer (`OpState.outqueue`) for each operator.
op_output_buffer_size_bytes: int = 1 * 1024 * 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
op_output_buffer_size_bytes: int = 1 * 1024 * 1024 * 1024
max_op_output_buffer_size_bytes: int = 1 * 1024 * 1024 * 1024


# The streaming genenrator-level backpressure size (i.e., the
# `_streaming_generator_backpressure_size_bytes` parameter).
streaming_gen_backpressure_size: int = 1 * 1024 * 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
streaming_gen_backpressure_size: int = 1 * 1024 * 1024 * 1024
max_streaming_gen_output_size_bytes: int = 1 * 1024 * 1024 * 1024

# All active tasks, keyed by their waitable.
active_tasks: Dict[Waitable, Tuple[OpState, OpTask]] = {}
# Current output buffer sizes for each operator.
# Used for streaming output backpressure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to expand this comment.

@@ -144,6 +145,24 @@
DEFAULT_ENABLE_GET_OBJECT_LOCATIONS_FOR_METRICS = False


@dataclass
class StreamingOutputBackpressureConfig:
"""Configuration for task-level streaming output backpressure."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to expand this comment.

# the app-level buffer size.

# The streaming genenrator-level backpressure size (i.e., the
# `_streaming_generator_backpressure_size_bytes` parameter).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to expand this comment.

Also, if you can explain the relationship between this and the other output cap.

@stephanie-wang
Copy link
Contributor

There could be a deadlock if an upstream op is backpressured, and all resources are allocated for this op. In this case, we have no resources to schedule downstream op tasks to consume the data. This issue isn't solved in this PR. https://github.com/ray-project/ray/pull/40275can mitigate this issue, but cannot eliminate it completely. There are some potential solution we can try in 2.9:

So we will disable this by default, right?

Long-term, I don't see a way around this except by implementing some sort of task preemption.

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Oct 17, 2023
@raulchen
Copy link
Contributor Author

I'll add the comment on how streaming gen backpressure works in code.
Sang's PR has a link to the doc.
Regarding the solution to the deadlock issue, I guess the simplest approach we can implement in 2.9 is to detect if the downstream ops have no resources to run. If so, we temporarily disable backpressure for the current op.
I guess this should reduce memory usage for most cases. For the edge cases, the behavior should remain the same as before.

@stephanie-wang
Copy link
Contributor

I'll add the comment on how streaming gen backpressure works in code. Sang's PR has a link to the doc. Regarding the solution to the deadlock issue, I guess the simplest approach we can implement in 2.9 is to detect if the downstream ops have no resources to run. If so, we temporarily disable backpressure for the current op. I guess this should reduce memory usage for most cases. For the edge cases, the behavior should remain the same as before.

I see, I didn't realize we had a way to disable backpressure. That seems fine then.

Can we add that to this PR? Even if the feature is disabled, it's not really useful to merge into 2.8 unless we have something close to the final version that we can test. If we add the deadlock fix, we can test for unknown unknowns, instead of just running into the known deadlock issue.

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen
Copy link
Contributor Author

raulchen commented Oct 17, 2023

@stephanie-wang that fix doesn't seems super straight-forward. I slightly lean towards merging this PR first and add that fix later (hopefully before 2.8 release). Without that fix, this should still be useful. Because: 1) If one op's tasks won't use all resources, it's fine; 2) the concurrency cap ramp-up will make the resource allocation more balanced and mitigate the issue; 3) for internal tests, we can always tune configs to avoid the issue.
What do you think?
Other comments are addressed.


Update: per offline discussion, we can check the sum of downstreams' num_active_tasks . This would be simpler than checking resources.

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen
Copy link
Contributor Author

@stephanie-wang @c21 This PR is ready for another review.
Main changes include: 1) migrated to the new BackpressurePolicy framework. 2) resolved the deadlock issue.
one remaining comment is to make the config based on number of blocks, I'll change that tomorrow.

@raulchen raulchen removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Oct 19, 2023
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen raulchen requested review from a team and pcmoritz as code owners October 19, 2023 20:38
This reverts commit 259983c.
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen
Copy link
Contributor Author

Changed configs to be based on number of blocks.

- At the Ray Core level, we use
`MAX_BLOCKS_IN_GENERATOR_BUFFER` to limit the number of blocks buffered in
the streaming generator of each OpDataTask. When it's reached, the task will
be blocked at `yield`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
be blocked at `yield`.
be blocked at `yield` until the caller reads another `ObjectRef`.

Comment on lines +195 to +197
# If all downstream operators are idle, it could be because no resources
# are available. In this case, we'll make sure to read at least one
# block to avoid deadlock.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still unsure about this condition.

"it could be" -> meaning that there cases where the downstream operators are idle but it is not a deadlock? Can we guarantee that it is a deadlock?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually isn't exponential rampup one of the cases where it's not actually a deadlock?

I guess it is okay to merge for now, but I think we should revisit this condition after 2.8.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is not 100% percent accurate. so the current strategy is conservative, it only unblocks one block at a time. I'll add a TODO here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, it seems that the only false positive case is when the executor just starts, the first op hasn't produced blocks for the second op to consume.
For the exponential ramp-up case, if an op is backpressured due to the concurrency cap. num_active_tasks will be greater than 0.

def can_run(self, op: "PhysicalOperator") -> bool:
"""Called when StreamingExecutor selects an operator to run in
`streaming_executor_state.select_operator_to_run()`.
def calcuate_max_blocks_to_read_per_op(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def calcuate_max_blocks_to_read_per_op(
def calculate_max_blocks_to_read_per_op(

assert start1 < start2 < end1 < end2, (start1, start2, end1, end2)


class TestStreamOutputBackpressurePolicy(unittest.TestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice tests!

@raulchen raulchen requested a review from Zandew as a code owner October 20, 2023 21:25
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
total_unconsumed >= backpressure_threshold
// We can only backpressure the last generated item.
&& item_index >= total_generated - 1) {
(item_index - stream_it->second.LastConsumedIndex()) >= backpressure_threshold) {
Copy link
Contributor Author

@raulchen raulchen Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rkooo567 @jjyao This fixes a bug introduced by #40285
The issue happens when we set threshold to 2, and index 1 comes before index 0. the unit tests of this PR can cover this issue.

@raulchen raulchen merged commit d59f597 into ray-project:master Oct 23, 2023
42 of 51 checks passed
@raulchen raulchen deleted the streaming-gen-backpressure branch October 23, 2023 23:27
raulchen added a commit to raulchen/ray that referenced this pull request Oct 24, 2023
This PR integrates [Ray Core's streaming generator backpressure](ray-project#40285) to Data. This enables throttling the streaming outputs of tasks. That is, when the downstream ops are too slow to consume data, the upstream ops should stop yielding streaming outputs.

Also fixes a Ray Core bug, see `task_manager.cc`.
---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
vitsai pushed a commit that referenced this pull request Oct 24, 2023
This PR integrates [Ray Core's streaming generator backpressure](#40285) to Data. This enables throttling the streaming outputs of tasks. That is, when the downstream ops are too slow to consume data, the upstream ops should stop yielding streaming outputs.

Also fixes a Ray Core bug, see `task_manager.cc`.
---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants