Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time backpressure #43110

Merged
merged 15 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from dataclasses import dataclass, field, fields
from typing import TYPE_CHECKING, Any, Dict, Optional

Expand Down Expand Up @@ -112,6 +113,12 @@ class OpRuntimeMetrics:
default=0, metadata={"map_only": True, "export_metric": True}
)

# Time operator spent in backpressure
backpressure_time: float = field(default=0, metadata={"export": False})

# Start time of current pause due to backpressure
_backpressure_start_time: float = field(default=-1, metadata={"export": False})

def __init__(self, op: "PhysicalOperator"):
from ray.data._internal.execution.operators.map_operator import MapOperator

Expand Down Expand Up @@ -242,6 +249,17 @@ def on_output_dequeued(self, output: RefBundle):
"""Callback when an output is dequeued by the operator."""
self.obj_store_mem_internal_outqueue -= output.size_bytes()

def on_toggle_backpressure(self, in_backpressure):
if in_backpressure and self._backpressure_start_time == -1:
# backpressure starting, start timer
self._backpressure_start_time = time.perf_counter()
elif self._backpressure_start_time != -1:
# backpressure stopping, stop timer
self.backpressure_time += (
time.perf_counter() - self._backpressure_start_time
)
self._backpressure_start_time = -1

def on_output_taken(self, output: RefBundle):
"""Callback when an output is taken from the operator."""
self.num_outputs_taken += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def __init__(
self._inputs_complete = not input_dependencies
self._target_max_block_size = target_max_block_size
self._started = False
self._in_backpressure = False
self._metrics = OpRuntimeMetrics(self)
self._estimated_output_blocks = None
self._execution_completed = False
Expand Down Expand Up @@ -404,3 +405,15 @@ def notify_resource_usage(
under_resource_limits: Whether this operator is under resource limits.
"""
pass

def notify_in_backpressure(self, in_backpressure: bool) -> None:
"""Called periodically from the executor to update internal in backpressure
status for stats collection purposes.

Args:
in_backpressure: Value this operator's in_backpressure should be set to.
"""
# only update on change to in_backpressure
omatthew98 marked this conversation as resolved.
Show resolved Hide resolved
if self._in_backpressure != in_backpressure:
self._metrics.on_toggle_backpressure(in_backpressure)
self._in_backpressure = in_backpressure
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,19 @@ def select_operator_to_run(
ops = []
for op, state in topology.items():
under_resource_limits = _execution_allowed(op, resource_manager)
# TODO: is this the only component of backpressure or should these
# other conditions be considered for timing?
in_backpressure = any(not p.can_add_input(op) for p in backpressure_policies)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when under_resource_limits or any(...), the operator is in backpressure for "task submission".
For this one, we can probably rename the metric to something like in_task_submission_backpressure.

another place regarding backpressure is here. This is to backpressure the output speed of the running tasks.
for the latter, it doesn't seem to make sense to record the "backpressure time". we can think of a better metric later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the metric names to make it more clear this was dealing with task submission backpressure and included the under_resource_limits in the in_backpressure boolean.

Did you mean not under_resource_limits or any(...)? In other words is this the correct logic:

under_resource_limits = _execution_allowed(op, resource_manager)
in_backpressure = (
    any(not p.can_add_input(op) for p in backpressure_policies)
    or not under_resource_limits
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And agree that makes sense to add something for the second backpressure case but will do that in a later PR when we have a better understanding on what metric would be.

if (
under_resource_limits
and not op.completed()
and state.num_queued() > 0
and op.should_add_input()
and all(p.can_add_input(op) for p in backpressure_policies)
and not in_backpressure
):
ops.append(op)
# Signal whether op in backpressure for stats collections
op.notify_in_backpressure(in_backpressure)
# Update the op in all cases to enable internal autoscaling, etc.
op.notify_resource_usage(state.num_queued(), under_resource_limits)

Expand Down
30 changes: 29 additions & 1 deletion python/ray/data/tests/test_backpressure_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_e2e_normal(self):
map_func1 = self._get_map_func(actor, 1)
map_func2 = self._get_map_func(actor, 2)

# Creat a dataset with 2 map ops. Each map op has N tasks, where N is
# Create a dataset with 2 map ops. Each map op has N tasks, where N is
# the number of cluster CPUs.
N = self.__class__._cluster_cpus
ds = ray.data.range(N, parallelism=N)
Expand All @@ -138,6 +138,34 @@ def test_e2e_normal(self):
start2, end2 = ray.get(actor.get_start_and_end_time_for_op.remote(2))
assert start1 < start2 < end1 < end2, (start1, start2, end1, end2)

def test_e2e_time_backpressure(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move this outside of TestConcurrencyCapBackpressurePolicy? since the purpose of this test is to test measuring backpress time, not the ConcurrencyCapBackpressurePolicy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to leave as is to prevent having to duplicate some of the class methods outside the class. Once the metric is being exported, I will remove this duplicated test and include the assert from this test in each of the backpressure policy tests.

"""A simple E2E test with ConcurrencyCapBackpressurePolicy enabled."""
# TODO: merge this test with the above once we are exporting the
# backpressure_time in op_runtime_metrics.py. This test currently
# mutates the OpRuntimeMetrics dataclass to observe the backpressure time.
from ray.data._internal.execution.interfaces.op_runtime_metrics import (
OpRuntimeMetrics,
)

with patch.object(
OpRuntimeMetrics.__dataclass_fields__["backpressure_time"],
"metadata",
{"export": True},
):
actor = self._create_record_time_actor()
map_func1 = self._get_map_func(actor, 1)
map_func2 = self._get_map_func(actor, 2)

# Create a dataset with 2 map ops. Each map op has N tasks, where N is
# the number of cluster CPUs.
N = self.__class__._cluster_cpus
ds = ray.data.range(N, parallelism=N)
# Use different `num_cpus` to make sure they don't fuse.
ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1, concurrency=1)
ds = ds.map_batches(map_func2, batch_size=None, num_cpus=1.1, concurrency=1)
ds.take_all()
assert 0 < ds._plan.stats().extra_metrics["backpressure_time"]


class TestStreamOutputBackpressurePolicy(unittest.TestCase):
"""Tests for StreamOutputBackpressurePolicy."""
Expand Down