Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Improve stall detection for StreamingOutputsBackpressurePolicy #41637

Merged

Conversation

raulchen
Copy link
Contributor

@raulchen raulchen commented Dec 5, 2023

Why are these changes needed?

When there is non-Data code running in the same clusters. Data StreamExecutor will consider all submitted tasks as active, while they may not actually have resources to run.
#41603 is an attempt to fix the data+train workload by excluding training resources.

While this PR is a more general fix for other workloads, with two main changes:

  1. Besides detecting active tasks, we also detect if the downstream is not making any progress for a specific interval.
  2. Introduce a new reserved_resources option to allow specifying non-Data resources.

This PR along can also fix #41496

Related issue number

Closes #41496

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add unit tests for the backpressure policy itself (no Dataset execution)?

@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Dec 5, 2023
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also address the comment about adding a unit test for the policy?

Comment on lines 92 to 94
reserved_resources: Amount of reserved resources for non-Ray-Data
workloads. Ray Data will exlcude these resources when scheduling tasks,
unless resource_limits is manually set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
reserved_resources: Amount of reserved resources for non-Ray-Data
workloads. Ray Data will exlcude these resources when scheduling tasks,
unless resource_limits is manually set.
exclude_resources: Amount of reserved resources for non-Ray-Data
workloads. Ray Data will exlcude these resources when scheduling tasks,
unless resource_limits is manually set.

I just realized reserved_resources sounds a bit like resources reserved for Data, which is the opposite of what we want.

@@ -105,6 +108,10 @@ class ExecutionOptions:

resource_limits: ExecutionResources = field(default_factory=ExecutionResources)

reserved_resources: ExecutionResources = field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a check that this is not set if resource_limits is manually specified?

Comment on lines 139 to 146
"Temporarily unblocking backpressure."
f" Because some tasks of operator {op} have been submitted,"
f" but no outputs are generated for {no_output_time} seconds."
" Ignore this warning if your UDF is expected to be slow."
" This may also be because some resources are preempted by"
" non-Ray-Data workloads."
" If this is the case, set `ExecutionOptions.reserved_resources`."
" This message will only be printed once."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the error message expose fewer implementation details? I don't think users will necessarily know what "backpressure" means, for example.

Maybe something like:

"Operator {op} is running, but has not produced outputs for {no_output_time}s. Ignore this warning if your UDF is expected to be slow.

Otherwise, this can happen when there are fewer cluster resources available to Ray Data than expected. If you have non-Data tasks or actors running in the cluster, reserve resources for them with ray.data.ExecutionOptions.reserved_resources = {"num_cpus": <CPUs to exclude>}."

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen
Copy link
Contributor Author

raulchen commented Dec 7, 2023

@stephanie-wang thanks. unit test is added and other comments are addressed too.

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
python/ray/train/_internal/data_config.py Outdated Show resolved Hide resolved
assert res == {
up_state: 0,
down_state: self._max_blocks_in_op_output_queue,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice unit tests!

exclude_resources: Amount of resources to exclude from Ray Data.
Set this if you have other workloads running on the same cluster.
For Ray Data + Ray Train, this should be automatically set.
Note for each resource type, resource_limits and exclude_resources can
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just comment to update the doc here that resources from DataConfig add to exclude_resources, instead of overwriting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to add a test for this case also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, updated the unit test to reflect this change.

stephanie-wang and others added 10 commits December 7, 2023 11:13
…ing_output_backpressure_policy.py

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
…ns.py

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
…ns.py

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
…ns.py

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
…chen/ray into streaming-backpressure-detect-stall
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen raulchen merged commit 9ea2e8b into ray-project:master Dec 8, 2023
15 of 16 checks passed
@raulchen raulchen deleted the streaming-backpressure-detect-stall branch December 8, 2023 04:07
raulchen added a commit to raulchen/ray that referenced this pull request Dec 8, 2023
…ray-project#41637)

When there is non-Data code running in the same clusters. Data StreamExecutor will consider all submitted tasks as active, while they may not actually have resources to run.
ray-project#41603 is an attempt to fix the data+train workload by excluding training resources.

While this PR is a more general fix for other workloads, with two main changes:
1. Besides detecting active tasks, we also detect if the downstream is not making any progress for a specific interval.
2. Introduce a new `reserved_resources` option to allow specifying non-Data resources.

This PR along can also fix ray-project#41496
---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
architkulkarni pushed a commit that referenced this pull request Dec 8, 2023
…#41637) (#41720)

When there is non-Data code running in the same clusters. Data StreamExecutor will consider all submitted tasks as active, while they may not actually have resources to run.
#41603 is an attempt to fix the data+train workload by excluding training resources.

While this PR is a more general fix for other workloads, with two main changes:
1. Besides detecting active tasks, we also detect if the downstream is not making any progress for a specific interval.
2. Introduce a new `reserved_resources` option to allow specifying non-Data resources.

This PR along can also fix #41496
---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Release test ray-data-resnet50-ingest-file-size-benchmark.aws failed
3 participants