Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Cap op concurrency with exponential ramp-up #40275

Merged
merged 19 commits into from
Oct 18, 2023

Conversation

raulchen
Copy link
Contributor

@raulchen raulchen commented Oct 11, 2023

Why are these changes needed?

Today when a new dataset is launched, the StreamingExecutor will allocate all resources to the first operator.
Consider a simple case read -> map where the read and map are not fused, the scheduler will submit as many read tasks as possible up front. Assuming the read tasks will output many blocks, the blocks will pile up because we don’t have resources to submit map tasks to consume the data.

This PR tries to mitigate this issue by cap the concurrency of each op with an initial value and ramp up the cap exponentially as the execution goes on (see ConcurrencyCapBackpressurePolicy docstring for details).

This PR also introduces a BackpressurePolicy interface, making the backpressure policies configurable and pluggable. Later we should migrate existing backpressure mechanisms to this new interface.

Known limitations:

  • If the config is not properly set, perf may regress for some workloads. Thus we disable this feature by default in 2.8, and will enable it in 2.9.
  • This feature only caps the initial concurrency. Once the cap has ramped up, data can still pile up. This issue will be resolved by a different backpressure policy that will profile the runtime metrics (e.g, object store increase, RAM usage) of each op.
  • It doesn't backpressure tasks that output many blocks. This issue will be solved by streaming generator backpressure instead.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG overall.

python/ray/data/_internal/execution/backpressure_policy.py Outdated Show resolved Hide resolved
python/ray/data/_internal/execution/backpressure_policy.py Outdated Show resolved Hide resolved

# Environment variable to configure this policy.
# The format is: "<init_cap>,<cap_multiply_threshold>,<cap_multiplier>"
CONFIG_ENV_VAR = "RAY_DATA_CONCURRENCY_CAP_CONFIG"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not blocking comment, but better to have separate environment variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to facilitate internal tests in 2.9. When we officially release this feature, we will probably simplify the configs. So I'll keep it for now.

Comment on lines 55 to 61
# The intial concurrency cap for each operator.
INIT_CAP = 4
# When the number of finished tasks reaches this threshold, the concurrency cap
# will be multiplied by the multiplier.
CAP_MULTIPLY_THRESHOLD = 0.5
# The multiplier to multiply the concurrency cap by.
CAP_MULTIPLIER = 2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another way is to define these constant on the file level, so we don't need to parse environment variables. It's also maybe easier for advanced users to test out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with constants is that if the executor doesn't run on the driver (e.g., on the SplitCoordinator actor), it's hard to change the configs. I've seen this issue for other configs that depend on constants. Do you know a good solution to bypass this issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll need to put them into the DataContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang @c21 If we put them into DataContext, I'd like to to save them in a dict and add a key-value interface in DataContext.
The reason is because this is a plugin and DataContext shouldn't need to know about the plugin configs.
What do you think?

The API will be something like:

data_context.set("concucrrency_cap_backpressure_policy.init_cap", 4)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me for now while it's still experimental. Actually, could you prepend the name with "experimental" or something like that? Makes deprecation a bit smoother.



# TODO(hchen): Enable ConcurrencyCapBackpressurePolicy by default.
DEFAULT_BACKPRESSURE_PLOCIES = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do we foresee in the near future, we will have multiple policies enabled at same time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is too early to abstract backpressure policies, considering we only have one right now. Can we just put the new concurrency caps inside the current backpressure code under a feature flag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do plan to migrate existing backpressure code to this interface.
Another reason why I wanted to introduce this interface is that it will allow us experimenting new back-pressure policies without touching the code base. E.g., one idea is to take the real runtime metrics into consideration for backpressure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I don't feel strongly about it, just think that it may be too early to abstract (we don't know yet if this is the right interface).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the interface is likely to change as we implement other policies. it's an internal interface, so that should be fine.

python/ray/data/_internal/execution/backpressure_policy.py Outdated Show resolved Hide resolved
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine merging if we turn it off by default, but I don't think this policy is going to be usable. The performance regression is scary, and the configuration parameters here are going to be pretty confusing for the average user, I think. Also, off the top of my head, there are likely many cases that won't work or will regress (e.g., concurrency=4 cap may or may not work depending on num_cpus/task and num_cpus total).

Offline, let's try to come up with a policy that minimizes regressions and has as few configuration parameters as possible, ideally none. I believe we can come up with something reasonable by considering the total number of operators when assigning the initial caps. For example, if we have N operators to run, we give each 1/N cores initially then adjust based on which operators are ready.

Also, I think this PR needs some doc changes to instruct on how to turn on the feature and configure it.



# TODO(hchen): Enable ConcurrencyCapBackpressurePolicy by default.
DEFAULT_BACKPRESSURE_PLOCIES = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is too early to abstract backpressure policies, considering we only have one right now. Can we just put the new concurrency caps inside the current backpressure code under a feature flag?

python/ray/data/_internal/execution/backpressure_policy.py Outdated Show resolved Hide resolved
@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Oct 16, 2023
@raulchen
Copy link
Contributor Author

@stephanie-wang You are right. The current default config isn't optimal. I also thought of making the default config smarter by taking into consideration the number of ops and their resource requirements. The plan is to implement the basic framework and turn this off in 2.8. In 2.9, we'll need to do more experiments to figure out the best configuration and officially release this feature.

Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen
Copy link
Contributor Author

@c21 @stephanie-wang thanks for your comments. They are either addressed (resolved threads) or replied (non-resolved threads). Please take a look again.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's merge after making it configurable through DataContext instead of the env variables.

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen
Copy link
Contributor Author

Moved the config to DataContext and added the set_plugin_config/get_plugin_config API. Reason was commented here.

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG w/ two minor comments.

@@ -219,6 +219,7 @@ def __init__(
self.enable_get_object_locations_for_metrics = (
enable_get_object_locations_for_metrics
)
self._plugin_configs: Dict[str, Any] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename it as _backpressure_plugin_configs? In the future, we may introduce other plugin components.

Comment on lines 287 to 294
def get_plugin_config(self, key: str, default: Any = None) -> Any:
return self._plugin_configs.get(key, default)

def set_plugin_config(self, key: str, value: Any) -> None:
self._plugin_configs[key] = value

def remove_plugin_config(self, key: str) -> None:
self._plugin_configs.pop(key, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's declare these methods with _ prefix, so they remain private and easy to change later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, we'll make this API reusable for other components.

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
@raulchen raulchen merged commit 6ba659f into ray-project:master Oct 18, 2023
26 of 37 checks passed
@raulchen raulchen deleted the concurrency-cap-backpressure branch October 18, 2023 03:02
jonathan-anyscale pushed a commit to jonathan-anyscale/ray that referenced this pull request Oct 26, 2023
Today when a new dataset is launched, the StreamingExecutor will allocate all resources to the first operator. 
Consider a simple case `read -> map` where the read and map are not fused, the scheduler will submit as many read tasks as possible up front. Assuming the read tasks will output many blocks, the blocks will pile up because we don’t have resources to submit map tasks to consume the data.

This PR tries to mitigate this issue by cap the concurrency of each op with an initial value and ramp up the cap exponentially as the execution goes on (see `ConcurrencyCapBackpressurePolicy` docstring for details). 

This PR also introduces a `BackpressurePolicy` interface, making the backpressure policies configurable and pluggable. Later we should migrate existing backpressure mechanisms to this new interface.

Known limitations:
- If the config is not properly set, perf may regress for some workloads. Thus we disable this feature by default in 2.8, and will enable it in 2.9.
- This feature only caps the initial concurrency. Once the cap has ramped up, data can still pile up. This issue will be resolved by a different backpressure policy that will profile the runtime metrics (e.g, object store increase, RAM usage) of each op. 
- It doesn't backpressure tasks that output many blocks. This issue will be solved by streaming generator backpressure instead. 

---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
jonathan-anyscale pushed a commit to jonathan-anyscale/ray that referenced this pull request Oct 26, 2023
Today when a new dataset is launched, the StreamingExecutor will allocate all resources to the first operator. 
Consider a simple case `read -> map` where the read and map are not fused, the scheduler will submit as many read tasks as possible up front. Assuming the read tasks will output many blocks, the blocks will pile up because we don’t have resources to submit map tasks to consume the data.

This PR tries to mitigate this issue by cap the concurrency of each op with an initial value and ramp up the cap exponentially as the execution goes on (see `ConcurrencyCapBackpressurePolicy` docstring for details). 

This PR also introduces a `BackpressurePolicy` interface, making the backpressure policies configurable and pluggable. Later we should migrate existing backpressure mechanisms to this new interface.

Known limitations:
- If the config is not properly set, perf may regress for some workloads. Thus we disable this feature by default in 2.8, and will enable it in 2.9.
- This feature only caps the initial concurrency. Once the cap has ramped up, data can still pile up. This issue will be resolved by a different backpressure policy that will profile the runtime metrics (e.g, object store increase, RAM usage) of each op. 
- It doesn't backpressure tasks that output many blocks. This issue will be solved by streaming generator backpressure instead. 

---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants