Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time backpressure #43110

Merged
merged 15 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from dataclasses import dataclass, field, fields
from typing import TYPE_CHECKING, Any, Dict, Optional

Expand Down Expand Up @@ -112,6 +113,16 @@ class OpRuntimeMetrics:
default=0, metadata={"map_only": True, "export_metric": True}
)

# Time operator spent in task submission backpressure
task_submission_backpressure_time: float = field(
default=0, metadata={"export": False}
)

# Start time of current pause due to task submission backpressure
_task_submission_backpressure_start_time: float = field(
default=-1, metadata={"export": False}
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, if this is an internal-only field, we can move it to __init__.


def __init__(self, op: "PhysicalOperator"):
from ray.data._internal.execution.operators.map_operator import MapOperator

Expand Down Expand Up @@ -266,6 +277,17 @@ def on_output_dequeued(self, output: RefBundle):
"""Callback when an output is dequeued by the operator."""
self.obj_store_mem_internal_outqueue -= output.size_bytes()

def on_toggle_task_submission_backpressure(self, in_backpressure):
if in_backpressure and self._task_submission_backpressure_start_time == -1:
# backpressure starting, start timer
self._task_submission_backpressure_start_time = time.perf_counter()
elif self._task_submission_backpressure_start_time != -1:
# backpressure stopping, stop timer
self.task_submission_backpressure_time += (
time.perf_counter() - self._task_submission_backpressure_start_time
)
self._task_submission_backpressure_start_time = -1

def on_output_taken(self, output: RefBundle):
"""Callback when an output is taken from the operator."""
self.num_outputs_taken += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def __init__(
self._inputs_complete = not input_dependencies
self._target_max_block_size = target_max_block_size
self._started = False
self._in_task_submission_backpressure = False
self._metrics = OpRuntimeMetrics(self)
self._estimated_output_blocks = None
self._execution_completed = False
Expand Down Expand Up @@ -404,3 +405,15 @@ def notify_resource_usage(
under_resource_limits: Whether this operator is under resource limits.
"""
pass

def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None:
"""Called periodically from the executor to update internal in backpressure
status for stats collection purposes.

Args:
in_backpressure: Value this operator's in_backpressure should be set to.
"""
# only update on change to in_backpressure
omatthew98 marked this conversation as resolved.
Show resolved Hide resolved
if self._in_task_submission_backpressure != in_backpressure:
self._metrics.on_toggle_task_submission_backpressure(in_backpressure)
self._in_task_submission_backpressure = in_backpressure
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,19 @@ def select_operator_to_run(
ops = []
for op, state in topology.items():
under_resource_limits = _execution_allowed(op, resource_manager)
in_backpressure = (
any(not p.can_add_input(op) for p in backpressure_policies)
or not under_resource_limits
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, swap the order of these 2 conditions. so when not under_resource_limits, we don't need to go over the policies.

)
if (
under_resource_limits
not in_backpressure
and not op.completed()
and state.num_queued() > 0
and op.should_add_input()
and all(p.can_add_input(op) for p in backpressure_policies)
):
ops.append(op)
# Signal whether op in backpressure for stats collections
op.notify_in_task_submission_backpressure(in_backpressure)
# Update the op in all cases to enable internal autoscaling, etc.
op.notify_resource_usage(state.num_queued(), under_resource_limits)

Expand Down
32 changes: 31 additions & 1 deletion python/ray/data/tests/test_backpressure_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_e2e_normal(self):
map_func1 = self._get_map_func(actor, 1)
map_func2 = self._get_map_func(actor, 2)

# Creat a dataset with 2 map ops. Each map op has N tasks, where N is
# Create a dataset with 2 map ops. Each map op has N tasks, where N is
# the number of cluster CPUs.
N = self.__class__._cluster_cpus
ds = ray.data.range(N, parallelism=N)
Expand All @@ -138,6 +138,36 @@ def test_e2e_normal(self):
start2, end2 = ray.get(actor.get_start_and_end_time_for_op.remote(2))
assert start1 < start2 < end1 < end2, (start1, start2, end1, end2)

def test_e2e_time_backpressure(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move this outside of TestConcurrencyCapBackpressurePolicy? since the purpose of this test is to test measuring backpress time, not the ConcurrencyCapBackpressurePolicy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to leave as is to prevent having to duplicate some of the class methods outside the class. Once the metric is being exported, I will remove this duplicated test and include the assert from this test in each of the backpressure policy tests.

"""A simple E2E test with ConcurrencyCapBackpressurePolicy enabled."""
# TODO: merge this test with the above once we are exporting the
# backpressure_time in op_runtime_metrics.py. This test currently
# mutates the OpRuntimeMetrics dataclass to observe the backpressure time.
from ray.data._internal.execution.interfaces.op_runtime_metrics import (
OpRuntimeMetrics,
)

with patch.object(
OpRuntimeMetrics.__dataclass_fields__["task_submission_backpressure_time"],
"metadata",
{"export": True},
):
actor = self._create_record_time_actor()
map_func1 = self._get_map_func(actor, 1)
map_func2 = self._get_map_func(actor, 2)

# Create a dataset with 2 map ops. Each map op has N tasks, where N is
# the number of cluster CPUs.
N = self.__class__._cluster_cpus
ds = ray.data.range(N, parallelism=N)
# Use different `num_cpus` to make sure they don't fuse.
ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1, concurrency=1)
ds = ds.map_batches(map_func2, batch_size=None, num_cpus=1.1, concurrency=1)
ds.take_all()
assert (
0 < ds._plan.stats().extra_metrics["task_submission_backpressure_time"]
)


class TestStreamOutputBackpressurePolicy(unittest.TestCase):
"""Tests for StreamOutputBackpressurePolicy."""
Expand Down