Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Cap op concurrency with exponential ramp-up #40275

Merged
merged 19 commits into from
Oct 18, 2023
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,11 @@ py_test(
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_backpressure_policies",
size = "small",
srcs = ["tests/test_backpressure_policies.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)
115 changes: 115 additions & 0 deletions python/ray/data/_internal/execution/backpressure_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

import ray

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces.physical_operator import (
PhysicalOperator,
)
from ray.data._internal.execution.streaming_executor_state import Topology

logger = logging.getLogger(__name__)


# Default enabled backpressure policies and its config key.
# Use `DataContext.set_plugin_config` to config it.
# TODO(hchen): Enable ConcurrencyCapBackpressurePolicy by default.
ENABLED_BACKPRESSURE_POLICIES = []
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY = "backpressure_policies.enabled"


def get_backpressure_policies(topology: "Topology"):
data_context = ray.data.DataContext.get_current()
policies = data_context.get_plugin_config(
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, ENABLED_BACKPRESSURE_POLICIES
)

return [policy(topology) for policy in policies]


class BackpressurePolicy(ABC):
"""Interface for back pressure policies."""

@abstractmethod
def __init__(self, topology: "Topology"):
...

@abstractmethod
def can_run(self, op: "PhysicalOperator") -> bool:
"""Called when StreamingExecutor selects an operator to run in
`streaming_executor_state.select_operator_to_run()`.

Returns: True if the operator can run, False otherwise.
"""
...


class ConcurrencyCapBackpressurePolicy(BackpressurePolicy):
"""A backpressure policy that caps the concurrency of each operator.

The concurrency cap limits the number of concurrently running tasks.
It will be set to an intial value, and will ramp up exponentially.

The concrete stategy is as follows:
- Each PhysicalOperator is assigned an initial concurrency cap.
- An PhysicalOperator can run new tasks if the number of running tasks is less
than the cap.
- When the number of finished tasks reaches a threshold, the concurrency cap will
increase.
"""

# Following are the default values followed by the config keys of the
# available configs.
# Use `DataContext.set_plugin_config` to config them.

# The intial concurrency cap for each operator.
INIT_CAP = 4
INIT_CAP_CONFIG_KEY = "backpressure_policies.concurrency_cap.init_cap"
# When the number of finished tasks reaches this threshold, the concurrency cap
# will be multiplied by the multiplier.
CAP_MULTIPLY_THRESHOLD = 0.5
CAP_MULTIPLY_THRESHOLD_CONFIG_KEY = (
"backpressure_policies.concurrency_cap.cap_multiply_threshold"
)
# The multiplier to multiply the concurrency cap by.
CAP_MULTIPLIER = 2.0
CAP_MULTIPLIER_CONFIG_KEY = "backpressure_policies.concurrency_cap.cap_multiplier"

def __init__(self, topology: "Topology"):
self._concurrency_caps: dict["PhysicalOperator", float] = {}

data_context = ray.data.DataContext.get_current()
self._init_cap = data_context.get_plugin_config(
self.INIT_CAP_CONFIG_KEY, self.INIT_CAP
)
self._cap_multiplier = data_context.get_plugin_config(
self.CAP_MULTIPLIER_CONFIG_KEY, self.CAP_MULTIPLIER
)
self._cap_multiply_threshold = data_context.get_plugin_config(
self.CAP_MULTIPLY_THRESHOLD_CONFIG_KEY, self.CAP_MULTIPLY_THRESHOLD
)

assert self._init_cap > 0
assert 0 < self._cap_multiply_threshold <= 1
assert self._cap_multiplier > 1

logger.debug(
"ConcurrencyCapBackpressurePolicy initialized with config: "
f"{self._init_cap}, {self._cap_multiply_threshold}, {self._cap_multiplier}"
)

for op, _ in topology.items():
self._concurrency_caps[op] = self._init_cap

def can_run(self, op: "PhysicalOperator") -> bool:
metrics = op.metrics
while metrics.num_tasks_finished >= (
self._concurrency_caps[op] * self._cap_multiply_threshold
):
self._concurrency_caps[op] *= self._cap_multiplier
logger.debug(
f"Concurrency cap for {op} increased to {self._concurrency_caps[op]}"
)
return metrics.num_tasks_running < self._concurrency_caps[op]
10 changes: 9 additions & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
import threading
import time
import uuid
from typing import Iterator, Optional
from typing import Iterator, List, Optional

import ray
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
from ray.data._internal.execution.backpressure_policy import (
BackpressurePolicy,
get_backpressure_policies,
)
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -76,6 +80,7 @@ def __init__(self, options: ExecutionOptions, dataset_uuid: str = "unknown_uuid"
# generator `yield`s.
self._topology: Optional[Topology] = None
self._output_node: Optional[OpState] = None
self._backpressure_policies: Optional[List[BackpressurePolicy]] = None

self._dataset_uuid = dataset_uuid

Expand Down Expand Up @@ -107,6 +112,7 @@ def execute(

# Setup the streaming DAG topology and start the runner thread.
self._topology, _ = build_streaming_topology(dag, self._options)
self._backpressure_policies = get_backpressure_policies(self._topology)

if not isinstance(dag, InputDataBuffer):
# Note: DAG must be initialized in order to query num_outputs_total.
Expand Down Expand Up @@ -253,6 +259,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
topology,
cur_usage,
limits,
self._backpressure_policies,
ensure_at_least_one_running=self._consumer_idling(),
execution_id=self._execution_id,
autoscaling_state=self._autoscaling_state,
Expand All @@ -270,6 +277,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
topology,
cur_usage,
limits,
self._backpressure_policies,
ensure_at_least_one_running=self._consumer_idling(),
execution_id=self._execution_id,
autoscaling_state=self._autoscaling_state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
from ray.data._internal.execution.backpressure_policy import BackpressurePolicy
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -377,6 +378,7 @@ def select_operator_to_run(
topology: Topology,
cur_usage: TopologyResourceUsage,
limits: ExecutionResources,
backpressure_policies: List[BackpressurePolicy],
ensure_at_least_one_running: bool,
execution_id: str,
autoscaling_state: AutoscalingState,
Expand Down Expand Up @@ -406,6 +408,7 @@ def select_operator_to_run(
and op.should_add_input()
and under_resource_limits
and not op.completed()
and all(p.can_run(op) for p in backpressure_policies)
):
ops.append(op)
# Update the op in all cases to enable internal autoscaling, etc.
Expand Down
12 changes: 11 additions & 1 deletion python/ray/data/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import threading
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional

import ray
from ray._private.ray_constants import env_integer
Expand Down Expand Up @@ -219,6 +219,7 @@ def __init__(
self.enable_get_object_locations_for_metrics = (
enable_get_object_locations_for_metrics
)
self._plugin_configs: Dict[str, Any] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename it as _backpressure_plugin_configs? In the future, we may introduce other plugin components.


@staticmethod
def get_current() -> "DataContext":
Expand Down Expand Up @@ -283,6 +284,15 @@ def _set_current(context: "DataContext") -> None:
global _default_context
_default_context = context

def get_plugin_config(self, key: str, default: Any = None) -> Any:
return self._plugin_configs.get(key, default)

def set_plugin_config(self, key: str, value: Any) -> None:
self._plugin_configs[key] = value

def remove_plugin_config(self, key: str) -> None:
self._plugin_configs.pop(key, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's declare these methods with _ prefix, so they remain private and easy to change later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, we'll make this API reusable for other components.



# Backwards compatibility alias.
DatasetContext = DataContext
Loading
Loading