Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix backpressure handling of queued actor pool tasks #34254

Merged
merged 12 commits into from
Apr 17, 2023

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Apr 11, 2023

Why are these changes needed?

There is a bug in the backpressure implementation with regard to actor pools, in that once a task is queued for an actor pool, it is no longer subject to backpressure. This is problematic when the output size of a task is much bigger than the input size. In this situation, the actor pool will keep executing tasks (converting small objects into larger objects), even when this would grossly exceed memory limits.

Put another way: it fixes the issue where the streaming executor queues tasks on an actor pool operator, but later on wants to "take it back" due to unexpectedly high memory usage. This avoids the issue by not queueing tasks that won't be immediately executed (so they won't need to be taken back).

Example:

  1. Suppose there is an actor pool of size 10, each of which can take 1 active task each.
  2. Each input task is size 1GB. The memory limit is 100GB, so we add 100 of these inputs in an actor pool operator.
  3. When the tasks run, they expand into 100GB of output each. Now, the memory usage overall is 200GB (2x over our limit!).
  4. However, since we already added those 100 inputs to the actor pool, there is no way of the streaming scheduler to pause execution of those 90 remaining queued inputs.
  5. Now the 90 queued inputs execute and we end up using 1TB, or 10x our intended memory limit.

We need to check for the memory limit right before executing a task in the actor pool; one way of doing this is to eliminate the internal queue in the actor pool operator and instead always queue work outside the operator.

TODO:

  • Performance testing
  • Unit tests
  • Perf test final version

Related issue number

Closes #34041

Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
@ericl ericl changed the title [WIP] Fix backpressure handling of queued actor pool tasks Fix backpressure handling of queued actor pool tasks Apr 12, 2023
@ericl ericl assigned c21 and amogkam Apr 12, 2023
@@ -445,6 +453,17 @@ def incremental_resource_usage(self) -> ExecutionResources:
"""
return ExecutionResources()

def notify_resource_usage(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of gross, but not sure of a better way to still trigger autoscaling.

@ericl ericl added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Apr 12, 2023
Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm, but will leave to @c21 for a more thorough review


Args:
input_queue_size: The number of inputs queued outside this operator.
under_resource_limits: Whether this operator is under resource limits.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my understanding, how come the operator needs to be told if it's under resource limits instead of knowing this information itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is since the limit is across all the operators, so only the executor knows the full picture.


Args:
input_queue_size: The number of inputs queued outside this operator.
under_resource_limits: Whether this operator is under resource limits.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
under_resource_limits: Whether this operator is under resource limits.
under_resource_limits: Whether all operators are under resource limits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this still indicates whether this particular operator is under resource limits right? Is Eric saying above that the executor is the own who knows about all operators' limits, but would also be the one to dictate whether this particular operator is under resource limits?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's still about this particular operator. It's possible for some operators to be under limits but others to be above. For example, we count all downstream operator memory usage towards the "effective usage" of an operator.

@@ -471,7 +499,7 @@ class _ActorPool:
actors when the operator is done submitting work to the pool.
"""

def __init__(self, max_tasks_in_flight: int = float("inf")):
def __init__(self, max_tasks_in_flight: int = 4):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the same value as DEFAULT_MAX_TASKS_IN_FLIGHT? if so, any way we can use the same constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Signed-off-by: Eric Liang <ekhliang@gmail.com>
@ericl ericl merged commit 3150211 into ray-project:master Apr 17, 2023
justinvyu pushed a commit to justinvyu/ray that referenced this pull request Apr 18, 2023
There is a bug in the backpressure implementation with regard to actor pools, in that once a task is queued for an actor pool, it is no longer subject to backpressure. This is problematic when the output size of a task is much bigger than the input size. In this situation, the actor pool will keep executing tasks (converting small objects into larger objects), even when this would grossly exceed memory limits.

Put another way: it fixes the issue where the streaming executor queues tasks on an actor pool operator, but later on wants to "take it back" due to unexpectedly high memory usage. This avoids the issue by not queueing tasks that won't be immediately executed (so they won't need to be taken back).

Example:
1. Suppose there is an actor pool of size 10, each of which can take 1 active task each.
2. Each input task is size 1GB. The memory limit is 100GB, so we add 100 of these inputs in an actor pool operator.
3. When the tasks run, they expand into 100GB of output each. Now, the memory usage overall is 200GB (2x over our limit!).
4. However, since we already added those 100 inputs to the actor pool, there is no way of the streaming scheduler to pause execution of those 90 remaining queued inputs.
5. Now the 90 queued inputs execute and we end up using 1TB, or 10x our intended memory limit.

We need to check for the memory limit right before executing a task in the actor pool; one way of doing this is to eliminate the internal queue in the actor pool operator and instead always queue work outside the operator.

TODO:
- [x] Performance testing
- [x] Unit tests
- [x] Perf test final version
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
There is a bug in the backpressure implementation with regard to actor pools, in that once a task is queued for an actor pool, it is no longer subject to backpressure. This is problematic when the output size of a task is much bigger than the input size. In this situation, the actor pool will keep executing tasks (converting small objects into larger objects), even when this would grossly exceed memory limits.

Put another way: it fixes the issue where the streaming executor queues tasks on an actor pool operator, but later on wants to "take it back" due to unexpectedly high memory usage. This avoids the issue by not queueing tasks that won't be immediately executed (so they won't need to be taken back).

Example:
1. Suppose there is an actor pool of size 10, each of which can take 1 active task each.
2. Each input task is size 1GB. The memory limit is 100GB, so we add 100 of these inputs in an actor pool operator.
3. When the tasks run, they expand into 100GB of output each. Now, the memory usage overall is 200GB (2x over our limit!).
4. However, since we already added those 100 inputs to the actor pool, there is no way of the streaming scheduler to pause execution of those 90 remaining queued inputs.
5. Now the 90 queued inputs execute and we end up using 1TB, or 10x our intended memory limit.

We need to check for the memory limit right before executing a task in the actor pool; one way of doing this is to eliminate the internal queue in the actor pool operator and instead always queue work outside the operator.

TODO:
- [x] Performance testing
- [x] Unit tests
- [x] Perf test final version

Signed-off-by: elliottower <elliot@elliottower.com>
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request May 4, 2023
There is a bug in the backpressure implementation with regard to actor pools, in that once a task is queued for an actor pool, it is no longer subject to backpressure. This is problematic when the output size of a task is much bigger than the input size. In this situation, the actor pool will keep executing tasks (converting small objects into larger objects), even when this would grossly exceed memory limits.

Put another way: it fixes the issue where the streaming executor queues tasks on an actor pool operator, but later on wants to "take it back" due to unexpectedly high memory usage. This avoids the issue by not queueing tasks that won't be immediately executed (so they won't need to be taken back).

Example:
1. Suppose there is an actor pool of size 10, each of which can take 1 active task each.
2. Each input task is size 1GB. The memory limit is 100GB, so we add 100 of these inputs in an actor pool operator.
3. When the tasks run, they expand into 100GB of output each. Now, the memory usage overall is 200GB (2x over our limit!).
4. However, since we already added those 100 inputs to the actor pool, there is no way of the streaming scheduler to pause execution of those 90 remaining queued inputs.
5. Now the 90 queued inputs execute and we end up using 1TB, or 10x our intended memory limit.

We need to check for the memory limit right before executing a task in the actor pool; one way of doing this is to eliminate the internal queue in the actor pool operator and instead always queue work outside the operator.

TODO:
- [x] Performance testing
- [x] Unit tests
- [x] Perf test final version

Signed-off-by: Jack He <jackhe2345@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[data] [streaming] The transformation changes the size of the block, and spilled to disk
5 participants