Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time backpressure #43110

Merged
merged 15 commits into from Feb 28, 2024
Merged

Time backpressure #43110

merged 15 commits into from Feb 28, 2024

Conversation

omatthew98
Copy link
Contributor

@omatthew98 omatthew98 commented Feb 12, 2024

Why are these changes needed?

Currently we are not timing how much time is spent in backpressure. This allows a per op timing for backpressure to be recorded, this can be used to inform how operators are being executed and how a set of backpressure policies are affecting the execution.

These stats are not propagated through to DatasetStats yet, they currently are only stored in OpRuntimeMetrics. Ideally we would be able to pass them along at a per operator level, but we could also pass along a total time spent in backpressure. A later PR will include this and other StreamingExecutor stats into the DatasetStatsSummary.

Related issue number

Closes #42799

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Matthew Owen <mowen@anyscale.com>
@@ -105,6 +106,17 @@ class OpRuntimeMetrics:
default=0, metadata={"map_only": True, "export_metric": True}
)

# Time operator spent in backpressure
# TODO: Do we need both of these metadata here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't need map_only, since we expect this to apply for all operators. (the map_only field indicates that the field is only working for map-style operators).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the export_metric flag gates whether the field is included in OpRuntimeMetrics.as_dict(), which is the string outputted: https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/execution/streaming_executor.py#L315

so, we should enable this later once we are set to include this in the stats output string.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@@ -118,17 +115,16 @@ def extra_metrics(self) -> Dict[str, Any]:
"""Return a dict of extra metrics."""
return self._extra_metrics

def as_dict(self, metrics_only: bool = False):
def as_dict(self, metrics_only: bool = True):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the as_dict function here to have metrics_only default to True and have made it so unless a metric explicitly has export_metric: False then the metric will be exported. The same metrics will be exported but this allows us to properly hide the backpressure metrics for now.

Comment on lines 147 to 153
from ray.data._internal.execution.interfaces.op_runtime_metrics import (
OpRuntimeMetrics,
)

OpRuntimeMetrics.__dataclass_fields__["backpressure_time"].metadata = {
"export_metric": True
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it feasible to mock OpRuntimeMetrics instead and modify the attribute?

python/ray/data/tests/test_backpressure_policies.py Outdated Show resolved Hide resolved
ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1, concurrency=1)
ds = ds.map_batches(map_func2, batch_size=None, num_cpus=1.1, concurrency=1)
ds.take_all()
assert 0 < ds._plan.stats().extra_metrics["backpressure_time"] < 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a short comment explaining how the upper limit 1 was calculated/estimated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually going to remove this upper limit, we re really just testing that this time is greater than 0. 1 was just chosen arbitrarily.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
block_generation_time: float = field(
default=0, metadata={"map_only": True, "export_metric": True}
)
block_generation_time: float = field(default=0, metadata={"map_only": True})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we keep "export_metric": True for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, lemme change back!

Signed-off-by: Matthew Owen <mowen@anyscale.com>
@@ -523,14 +523,19 @@ def select_operator_to_run(
ops = []
for op, state in topology.items():
under_resource_limits = _execution_allowed(op, resource_manager)
# TODO: is this the only component of backpressure or should these
# other conditions be considered for timing?
in_backpressure = any(not p.can_add_input(op) for p in backpressure_policies)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when under_resource_limits or any(...), the operator is in backpressure for "task submission".
For this one, we can probably rename the metric to something like in_task_submission_backpressure.

another place regarding backpressure is here. This is to backpressure the output speed of the running tasks.
for the latter, it doesn't seem to make sense to record the "backpressure time". we can think of a better metric later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the metric names to make it more clear this was dealing with task submission backpressure and included the under_resource_limits in the in_backpressure boolean.

Did you mean not under_resource_limits or any(...)? In other words is this the correct logic:

under_resource_limits = _execution_allowed(op, resource_manager)
in_backpressure = (
    any(not p.can_add_input(op) for p in backpressure_policies)
    or not under_resource_limits
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And agree that makes sense to add something for the second backpressure case but will do that in a later PR when we have a better understanding on what metric would be.

@@ -523,14 +523,19 @@ def select_operator_to_run(
ops = []
for op, state in topology.items():
under_resource_limits = _execution_allowed(op, resource_manager)
in_backpressure = (
any(not p.can_add_input(op) for p in backpressure_policies)
or not under_resource_limits
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, swap the order of these 2 conditions. so when not under_resource_limits, we don't need to go over the policies.

# Start time of current pause due to task submission backpressure
_task_submission_backpressure_start_time: float = field(
default=-1, metadata={"export": False}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, if this is an internal-only field, we can move it to __init__.

@@ -138,6 +138,36 @@ def test_e2e_normal(self):
start2, end2 = ray.get(actor.get_start_and_end_time_for_op.remote(2))
assert start1 < start2 < end1 < end2, (start1, start2, end1, end2)

def test_e2e_time_backpressure(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move this outside of TestConcurrencyCapBackpressurePolicy? since the purpose of this test is to test measuring backpress time, not the ConcurrencyCapBackpressurePolicy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to leave as is to prevent having to duplicate some of the class methods outside the class. Once the metric is being exported, I will remove this duplicated test and include the assert from this test in each of the backpressure policy tests.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@raulchen raulchen merged commit 076d3ba into ray-project:master Feb 28, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Data] Measure time from scheduler paused due to backpressure in DatasetStats
3 participants