Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Cap op concurrency with exponential ramp-up #40275

Merged
merged 19 commits into from
Oct 18, 2023
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,11 @@ py_test(
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_backpressure_policies",
size = "small",
srcs = ["tests/test_backpressure_policies.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)
102 changes: 102 additions & 0 deletions python/ray/data/_internal/execution/backpressure_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import logging
import os
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces.physical_operator import (
PhysicalOperator,
)
from ray.data._internal.execution.streaming_executor_state import Topology

logger = logging.getLogger(__name__)


# TODO(hchen): Enable ConcurrencyCapBackpressurePolicy by default.
DEFAULT_BACKPRESSURE_POLICIES = []


def get_backpressure_policies(topology: "Topology"):
return [policy(topology) for policy in DEFAULT_BACKPRESSURE_POLICIES]


class BackpressurePolicy(ABC):
"""Interface for back pressure policies."""

@abstractmethod
def __init__(self, topology: "Topology"):
...

@abstractmethod
def can_run(self, op: "PhysicalOperator") -> bool:
"""Called when StreamingExecutor selects an operator to run in
`streaming_executor_state.select_operator_to_run()`.

Returns: True if the operator can run, False otherwise.
"""
...


class ConcurrencyCapBackpressurePolicy(BackpressurePolicy):
"""A backpressure policy that caps the concurrency of each operator.

The concurrency cap limits the number of concurrently running tasks.
It will be set to an intial value, and will ramp up exponentially.

The concrete stategy is as follows:
- Each PhysicalOperator is assigned an initial concurrency cap.
- An PhysicalOperator can run new tasks if the number of running tasks is less than the cap.
- When the number of finished tasks reaches a threshold, the concurrency cap will
increase.
"""

# Environment variable to configure this policy.
# The format is: "<init_cap>,<cap_multiply_threshold>,<cap_multiplier>"
CONFIG_ENV_VAR = "RAY_DATA_CONCURRENCY_CAP_CONFIG"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not blocking comment, but better to have separate environment variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to facilitate internal tests in 2.9. When we officially release this feature, we will probably simplify the configs. So I'll keep it for now.


# The intial concurrency cap for each operator.
INIT_CAP = 4
# When the number of finished tasks reaches this threshold, the concurrency cap
# will be multiplied by the multiplier.
CAP_MULTIPLY_THRESHOLD = 0.5
# The multiplier to multiply the concurrency cap by.
CAP_MULTIPLIER = 2.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another way is to define these constant on the file level, so we don't need to parse environment variables. It's also maybe easier for advanced users to test out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with constants is that if the executor doesn't run on the driver (e.g., on the SplitCoordinator actor), it's hard to change the configs. I've seen this issue for other configs that depend on constants. Do you know a good solution to bypass this issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll need to put them into the DataContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang @c21 If we put them into DataContext, I'd like to to save them in a dict and add a key-value interface in DataContext.
The reason is because this is a plugin and DataContext shouldn't need to know about the plugin configs.
What do you think?

The API will be something like:

data_context.set("concucrrency_cap_backpressure_policy.init_cap", 4)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me for now while it's still experimental. Actually, could you prepend the name with "experimental" or something like that? Makes deprecation a bit smoother.


def __init__(self, topology: "Topology"):
self._concurrency_caps: dict["PhysicalOperator", float] = {}

self._init_cap = self.INIT_CAP
self._cap_multiplier = self.CAP_MULTIPLIER
self._cap_multiply_threshold = self.CAP_MULTIPLY_THRESHOLD

env_config = os.environ.get(self.CONFIG_ENV_VAR, "")
if env_config:
try:
configs = env_config.split(",")
self._init_cap = int(configs[0])
self._cap_multiply_threshold = float(configs[1])
self._cap_multiplier = float(configs[2])
assert self._init_cap > 0
assert 0 < self._cap_multiply_threshold <= 1
assert self._cap_multiplier > 1
except Exception as e:
raise ValueError("Invalid concurrency cap config", env_config) from e

logger.debug(
"Concurrency cap config: "
f"{self._init_cap}, {self._cap_multiply_threshold}, {self._cap_multiplier}"
)

for op, _ in topology.items():
self._concurrency_caps[op] = self._init_cap

def can_run(self, op: "PhysicalOperator") -> bool:
metrics = op.metrics
while metrics.num_tasks_finished >= (
self._concurrency_caps[op] * self._cap_multiply_threshold
):
self._concurrency_caps[op] *= self._cap_multiplier
logger.debug(
f"Concurrency cap for {op} increased to {self._concurrency_caps[op]}"
)
return metrics.num_tasks_running < self._concurrency_caps[op]
10 changes: 9 additions & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
import threading
import time
import uuid
from typing import Iterator, Optional
from typing import Iterator, List, Optional

import ray
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
from ray.data._internal.execution.backpressure_policy import (
BackpressurePolicy,
get_backpressure_policies,
)
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -72,6 +76,7 @@ def __init__(self, options: ExecutionOptions):
# generator `yield`s.
self._topology: Optional[Topology] = None
self._output_node: Optional[OpState] = None
self._backpressure_policies: Optional[List[BackpressurePolicy]] = None

Executor.__init__(self, options)
thread_name = f"StreamingExecutor-{self._execution_id}"
Expand Down Expand Up @@ -101,6 +106,7 @@ def execute(

# Setup the streaming DAG topology and start the runner thread.
self._topology, _ = build_streaming_topology(dag, self._options)
self._backpressure_policies = get_backpressure_policies(self._topology)

if not isinstance(dag, InputDataBuffer):
# Note: DAG must be initialized in order to query num_outputs_total.
Expand Down Expand Up @@ -244,6 +250,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
topology,
cur_usage,
limits,
self._backpressure_policies,
ensure_at_least_one_running=self._consumer_idling(),
execution_id=self._execution_id,
autoscaling_state=self._autoscaling_state,
Expand All @@ -261,6 +268,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
topology,
cur_usage,
limits,
self._backpressure_policies,
ensure_at_least_one_running=self._consumer_idling(),
execution_id=self._execution_id,
autoscaling_state=self._autoscaling_state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
from ray.data._internal.execution.backpressure_policy import BackpressurePolicy
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -377,6 +378,7 @@ def select_operator_to_run(
topology: Topology,
cur_usage: TopologyResourceUsage,
limits: ExecutionResources,
backpressure_policies: List[BackpressurePolicy],
ensure_at_least_one_running: bool,
execution_id: str,
autoscaling_state: AutoscalingState,
Expand Down Expand Up @@ -406,6 +408,7 @@ def select_operator_to_run(
and op.should_add_input()
and under_resource_limits
and not op.completed()
and all(p.can_run(op) for p in backpressure_policies)
):
ops.append(op)
# Update the op in all cases to enable internal autoscaling, etc.
Expand Down
92 changes: 92 additions & 0 deletions python/ray/data/tests/test_backpressure_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import unittest
from contextlib import contextmanager
from unittest.mock import MagicMock, patch

from pyarrow.hdfs import os

from ray.data._internal.execution.backpressure_policy import (
ConcurrencyCapBackpressurePolicy,
)
from ray.data._internal.execution.streaming_executor_state import Topology


class TestConcurrencyCapBackpressurePolicy(unittest.TestCase):
@contextmanager
def _patch_env_var(self, value):
with patch.dict(
os.environ, {ConcurrencyCapBackpressurePolicy.CONFIG_ENV_VAR: value}
):
yield

def test_basic(self):
op = MagicMock()
op.metrics = MagicMock(
num_tasks_running=0,
num_tasks_finished=0,
)
topology = {op: MagicMock()}

init_cap = 4
cap_multiply_threshold = 0.5
cap_multiplier = 2.0

with self._patch_env_var(
f"{init_cap},{cap_multiply_threshold},{cap_multiplier}"
):
policy = ConcurrencyCapBackpressurePolicy(topology)

self.assertEqual(policy._concurrency_caps[op], 4)
# Gradually increase num_tasks_running to the cap.
for i in range(1, init_cap + 1):
self.assertTrue(policy.can_run(op))
op.metrics.num_tasks_running = i
# Now num_tasks_running reaches the cap, so can_run should return False.
self.assertFalse(policy.can_run(op))

# If we increase num_task_finished to the threshold (4 * 0.5 = 2),
# it should trigger the cap to increase.
op.metrics.num_tasks_finished = init_cap * cap_multiply_threshold
self.assertEqual(policy.can_run(op), True)
self.assertEqual(policy._concurrency_caps[op], init_cap * cap_multiplier)

# Now the cap is 8 (4 * 2).
# If we increase num_tasks_finished directly to the next-level's threshold
# (8 * 2 * 0.5 = 8), it should trigger the cap to increase twice.
op.metrics.num_tasks_finished = (
policy._concurrency_caps[op] * cap_multiplier * cap_multiply_threshold
)
op.metrics.num_tasks_running = 0
self.assertEqual(policy.can_run(op), True)
self.assertEqual(policy._concurrency_caps[op], init_cap * cap_multiplier ** 3)


def test_env_var_config(self):
topology = MagicMock(Topology)
# Test good config.
with self._patch_env_var("10,0.3,1.5"):
policy = ConcurrencyCapBackpressurePolicy(topology)
self.assertEqual(policy._init_cap, 10)
self.assertEqual(policy._cap_multiply_threshold, 0.3)
self.assertEqual(policy._cap_multiplier, 1.5)

# Test bad configs.
with self._patch_env_var("10,0.3"):
with self.assertRaises(ValueError):
policy = ConcurrencyCapBackpressurePolicy(topology)
with self._patch_env_var("-1,0.3,1.5"):
with self.assertRaises(ValueError):
policy = ConcurrencyCapBackpressurePolicy(topology)
with self._patch_env_var("10,1.1,1.5"):
with self.assertRaises(ValueError):
policy = ConcurrencyCapBackpressurePolicy(topology)
with self._patch_env_var("10,0.3,0.5"):
with self.assertRaises(ValueError):
policy = ConcurrencyCapBackpressurePolicy(topology)


if __name__ == "__main__":
import sys

import pytest

sys.exit(pytest.main(["-v", __file__]))
Loading