-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Use runtime object memory for scheduling #41383
[Data] Use runtime object memory for scheduling #41383
Conversation
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
python/ray/data/context.py
Outdated
@@ -102,6 +102,11 @@ | |||
int(os.environ.get("RAY_DATA_USE_STREAMING_EXECUTOR", "1")) | |||
) | |||
|
|||
# Whether to use the runtime object store memory metrics for scheduling. | |||
DEFAULT_USE_RUNTIME_METRICS_SCHEDULING = bool( | |||
int(os.environ.get("DEFAULT_USE_RUNTIME_METRICS_SCHEDULING", "1")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will set this to 0
before merging. Currently enabled to ensure tests pass.
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
if ( | ||
DataContext.get_current().use_runtime_metrics_scheduling | ||
and global_ok_sans_memory | ||
and op.metrics.average_bytes_change_per_task is not None | ||
and op.metrics.average_bytes_change_per_task <= 0 | ||
): | ||
return True | ||
|
||
return global_ok_sans_memory and downstream_memory_ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ( | |
DataContext.get_current().use_runtime_metrics_scheduling | |
and global_ok_sans_memory | |
and op.metrics.average_bytes_change_per_task is not None | |
and op.metrics.average_bytes_change_per_task <= 0 | |
): | |
return True | |
return global_ok_sans_memory and downstream_memory_ok | |
if DataContext.get_current().use_runtime_metrics_scheduling: | |
return ( | |
global_ok_sans_memory | |
and op.metrics.average_bytes_change_per_task is not None | |
and op.metrics.average_bytes_change_per_task <= 0 | |
) | |
else: | |
return global_ok_sans_memory and downstream_memory_ok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested this out with test_large_e2e_backpressure
, and doing an if
/ else
might not work. We're over the memory limit and average_bytes_change_per_task
is initially None
, so consume
tasks aren't launched; as a result, we keep pulling data from the produce
tasks until all of the produce
tasks are complete.
Left this code as the original change for now. Hopefully that should minimize the risk, while still providing some benefit. In the long term, I think we need to devise a strategy for when metrics are None
.
Co-authored-by: Hao Chen <chenh1024@gmail.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Why are these changes needed?
When selecting an operator to run, the scheduler doesn't consider how much object store memory an operator consumes. If an operator produces large blocks, the scheduler might select the operator too frequently, and your cluster can out of memory.
To prevent this from happening, this PR updates the scheduler so that it considers the incremental object store memory usage when selecting an operator.
Related issue number
Fixes #41190
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.