diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index 7a7f0e5d03253..1c2c71918c75b 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -489,3 +489,11 @@ py_test( tags = ["team:data", "exclusive"], deps = ["//:ray_lib", ":conftest"], ) + +py_test( + name = "test_backpressure_policies", + size = "small", + srcs = ["tests/test_backpressure_policies.py"], + tags = ["team:data", "exclusive"], + deps = ["//:ray_lib", ":conftest"], +) diff --git a/python/ray/data/_internal/execution/backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy.py new file mode 100644 index 0000000000000..0c0aa64f3ba9b --- /dev/null +++ b/python/ray/data/_internal/execution/backpressure_policy.py @@ -0,0 +1,115 @@ +import logging +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +import ray + +if TYPE_CHECKING: + from ray.data._internal.execution.interfaces.physical_operator import ( + PhysicalOperator, + ) + from ray.data._internal.execution.streaming_executor_state import Topology + +logger = logging.getLogger(__name__) + + +# Default enabled backpressure policies and its config key. +# Use `DataContext.set_config` to config it. +# TODO(hchen): Enable ConcurrencyCapBackpressurePolicy by default. +ENABLED_BACKPRESSURE_POLICIES = [] +ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY = "backpressure_policies.enabled" + + +def get_backpressure_policies(topology: "Topology"): + data_context = ray.data.DataContext.get_current() + policies = data_context.get_config( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, ENABLED_BACKPRESSURE_POLICIES + ) + + return [policy(topology) for policy in policies] + + +class BackpressurePolicy(ABC): + """Interface for back pressure policies.""" + + @abstractmethod + def __init__(self, topology: "Topology"): + ... + + @abstractmethod + def can_run(self, op: "PhysicalOperator") -> bool: + """Called when StreamingExecutor selects an operator to run in + `streaming_executor_state.select_operator_to_run()`. + + Returns: True if the operator can run, False otherwise. + """ + ... + + +class ConcurrencyCapBackpressurePolicy(BackpressurePolicy): + """A backpressure policy that caps the concurrency of each operator. + + The concurrency cap limits the number of concurrently running tasks. + It will be set to an intial value, and will ramp up exponentially. + + The concrete stategy is as follows: + - Each PhysicalOperator is assigned an initial concurrency cap. + - An PhysicalOperator can run new tasks if the number of running tasks is less + than the cap. + - When the number of finished tasks reaches a threshold, the concurrency cap will + increase. + """ + + # Following are the default values followed by the config keys of the + # available configs. + # Use `DataContext.set_config` to config them. + + # The intial concurrency cap for each operator. + INIT_CAP = 4 + INIT_CAP_CONFIG_KEY = "backpressure_policies.concurrency_cap.init_cap" + # When the number of finished tasks reaches this threshold, the concurrency cap + # will be multiplied by the multiplier. + CAP_MULTIPLY_THRESHOLD = 0.5 + CAP_MULTIPLY_THRESHOLD_CONFIG_KEY = ( + "backpressure_policies.concurrency_cap.cap_multiply_threshold" + ) + # The multiplier to multiply the concurrency cap by. + CAP_MULTIPLIER = 2.0 + CAP_MULTIPLIER_CONFIG_KEY = "backpressure_policies.concurrency_cap.cap_multiplier" + + def __init__(self, topology: "Topology"): + self._concurrency_caps: dict["PhysicalOperator", float] = {} + + data_context = ray.data.DataContext.get_current() + self._init_cap = data_context.get_config( + self.INIT_CAP_CONFIG_KEY, self.INIT_CAP + ) + self._cap_multiplier = data_context.get_config( + self.CAP_MULTIPLIER_CONFIG_KEY, self.CAP_MULTIPLIER + ) + self._cap_multiply_threshold = data_context.get_config( + self.CAP_MULTIPLY_THRESHOLD_CONFIG_KEY, self.CAP_MULTIPLY_THRESHOLD + ) + + assert self._init_cap > 0 + assert 0 < self._cap_multiply_threshold <= 1 + assert self._cap_multiplier > 1 + + logger.debug( + "ConcurrencyCapBackpressurePolicy initialized with config: " + f"{self._init_cap}, {self._cap_multiply_threshold}, {self._cap_multiplier}" + ) + + for op, _ in topology.items(): + self._concurrency_caps[op] = self._init_cap + + def can_run(self, op: "PhysicalOperator") -> bool: + metrics = op.metrics + while metrics.num_tasks_finished >= ( + self._concurrency_caps[op] * self._cap_multiply_threshold + ): + self._concurrency_caps[op] *= self._cap_multiplier + logger.debug( + f"Concurrency cap for {op} increased to {self._concurrency_caps[op]}" + ) + return metrics.num_tasks_running < self._concurrency_caps[op] diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 50b0292f14d2f..eb5b7c93b8609 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -2,13 +2,17 @@ import threading import time import uuid -from typing import Iterator, Optional +from typing import Iterator, List, Optional import ray from ray.data._internal.dataset_logger import DatasetLogger from ray.data._internal.execution.autoscaling_requester import ( get_or_create_autoscaling_requester_actor, ) +from ray.data._internal.execution.backpressure_policy import ( + BackpressurePolicy, + get_backpressure_policies, +) from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, @@ -76,6 +80,7 @@ def __init__(self, options: ExecutionOptions, dataset_uuid: str = "unknown_uuid" # generator `yield`s. self._topology: Optional[Topology] = None self._output_node: Optional[OpState] = None + self._backpressure_policies: Optional[List[BackpressurePolicy]] = None self._dataset_uuid = dataset_uuid @@ -107,6 +112,7 @@ def execute( # Setup the streaming DAG topology and start the runner thread. self._topology, _ = build_streaming_topology(dag, self._options) + self._backpressure_policies = get_backpressure_policies(self._topology) if not isinstance(dag, InputDataBuffer): # Note: DAG must be initialized in order to query num_outputs_total. @@ -253,6 +259,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: topology, cur_usage, limits, + self._backpressure_policies, ensure_at_least_one_running=self._consumer_idling(), execution_id=self._execution_id, autoscaling_state=self._autoscaling_state, @@ -270,6 +277,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: topology, cur_usage, limits, + self._backpressure_policies, ensure_at_least_one_running=self._consumer_idling(), execution_id=self._execution_id, autoscaling_state=self._autoscaling_state, diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 2e2c198892c8c..d2c1ca8c22317 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -13,6 +13,7 @@ from ray.data._internal.execution.autoscaling_requester import ( get_or_create_autoscaling_requester_actor, ) +from ray.data._internal.execution.backpressure_policy import BackpressurePolicy from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, @@ -377,6 +378,7 @@ def select_operator_to_run( topology: Topology, cur_usage: TopologyResourceUsage, limits: ExecutionResources, + backpressure_policies: List[BackpressurePolicy], ensure_at_least_one_running: bool, execution_id: str, autoscaling_state: AutoscalingState, @@ -406,6 +408,7 @@ def select_operator_to_run( and op.should_add_input() and under_resource_limits and not op.completed() + and all(p.can_run(op) for p in backpressure_policies) ): ops.append(op) # Update the op in all cases to enable internal autoscaling, etc. diff --git a/python/ray/data/context.py b/python/ray/data/context.py index a6447eb1c9e72..d2f14e31475c4 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -1,6 +1,6 @@ import os import threading -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional import ray from ray._private.ray_constants import env_integer @@ -219,6 +219,13 @@ def __init__( self.enable_get_object_locations_for_metrics = ( enable_get_object_locations_for_metrics ) + # The extra key-value style configs. + # These configs are managed by individual components or plugins via + # `set_config`, `get_config` and `remove_config`. + # The reason why we use a dict instead of individual fields is to decouple + # the DataContext from the plugin implementations, as well as to avoid + # circular dependencies. + self._kv_configs: Dict[str, Any] = {} @staticmethod def get_current() -> "DataContext": @@ -283,6 +290,33 @@ def _set_current(context: "DataContext") -> None: global _default_context _default_context = context + def get_config(self, key: str, default: Any = None) -> Any: + """Get the value for a key-value style config. + + Args: + key: The key of the config. + default: The default value to return if the key is not found. + Returns: The value for the key, or the default value if the key is not found. + """ + return self._kv_configs.get(key, default) + + def set_config(self, key: str, value: Any) -> None: + """Set the value for a key-value style config. + + Args: + key: The key of the config. + value: The value of the config. + """ + self._kv_configs[key] = value + + def remove_config(self, key: str) -> None: + """Remove a key-value style config. + + Args: + key: The key of the config. + """ + self._kv_configs.pop(key, None) + # Backwards compatibility alias. DatasetContext = DataContext diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py new file mode 100644 index 0000000000000..21db92be06e47 --- /dev/null +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -0,0 +1,182 @@ +import functools +import time +import unittest +from collections import defaultdict +from contextlib import contextmanager +from unittest.mock import MagicMock + +import ray +from ray.data._internal.execution.backpressure_policy import ( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, + ConcurrencyCapBackpressurePolicy, +) + + +@contextmanager +def enable_backpressure_policies(policies): + data_context = ray.data.DataContext.get_current() + old_policies = data_context.get_config( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, + [], + ) + data_context.set_config( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, + policies, + ) + yield + data_context.set_config( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, + old_policies, + ) + + +class TestConcurrencyCapBackpressurePolicy(unittest.TestCase): + """Tests for ConcurrencyCapBackpressurePolicy.""" + + @classmethod + def setUpClass(cls): + cls._cluster_cpus = 10 + ray.init(num_cpus=cls._cluster_cpus) + + @classmethod + def tearDownClass(cls): + ray.shutdown() + + @contextmanager + def _patch_config(self, init_cap, cap_multiply_threshold, cap_multiplier): + data_context = ray.data.DataContext.get_current() + data_context.set_config( + ConcurrencyCapBackpressurePolicy.INIT_CAP_CONFIG_KEY, + init_cap, + ) + data_context.set_config( + ConcurrencyCapBackpressurePolicy.CAP_MULTIPLY_THRESHOLD_CONFIG_KEY, + cap_multiply_threshold, + ) + data_context.set_config( + ConcurrencyCapBackpressurePolicy.CAP_MULTIPLIER_CONFIG_KEY, + cap_multiplier, + ) + yield + data_context.remove_config(ConcurrencyCapBackpressurePolicy.INIT_CAP_CONFIG_KEY) + data_context.remove_config( + ConcurrencyCapBackpressurePolicy.CAP_MULTIPLY_THRESHOLD_CONFIG_KEY + ) + data_context.remove_config( + ConcurrencyCapBackpressurePolicy.CAP_MULTIPLIER_CONFIG_KEY + ) + + def test_basic(self): + op = MagicMock() + op.metrics = MagicMock( + num_tasks_running=0, + num_tasks_finished=0, + ) + topology = {op: MagicMock()} + + init_cap = 4 + cap_multiply_threshold = 0.5 + cap_multiplier = 2.0 + + with self._patch_config(init_cap, cap_multiply_threshold, cap_multiplier): + policy = ConcurrencyCapBackpressurePolicy(topology) + + self.assertEqual(policy._concurrency_caps[op], 4) + # Gradually increase num_tasks_running to the cap. + for i in range(1, init_cap + 1): + self.assertTrue(policy.can_run(op)) + op.metrics.num_tasks_running = i + # Now num_tasks_running reaches the cap, so can_run should return False. + self.assertFalse(policy.can_run(op)) + + # If we increase num_task_finished to the threshold (4 * 0.5 = 2), + # it should trigger the cap to increase. + op.metrics.num_tasks_finished = init_cap * cap_multiply_threshold + self.assertEqual(policy.can_run(op), True) + self.assertEqual(policy._concurrency_caps[op], init_cap * cap_multiplier) + + # Now the cap is 8 (4 * 2). + # If we increase num_tasks_finished directly to the next-level's threshold + # (8 * 2 * 0.5 = 8), it should trigger the cap to increase twice. + op.metrics.num_tasks_finished = ( + policy._concurrency_caps[op] * cap_multiplier * cap_multiply_threshold + ) + op.metrics.num_tasks_running = 0 + self.assertEqual(policy.can_run(op), True) + self.assertEqual(policy._concurrency_caps[op], init_cap * cap_multiplier**3) + + def test_config(self): + topology = {} + # Test good config. + with self._patch_config(10, 0.3, 1.5): + policy = ConcurrencyCapBackpressurePolicy(topology) + self.assertEqual(policy._init_cap, 10) + self.assertEqual(policy._cap_multiply_threshold, 0.3) + self.assertEqual(policy._cap_multiplier, 1.5) + + # Test bad configs. + with self._patch_config(-1, 0.3, 1.5): + with self.assertRaises(AssertionError): + policy = ConcurrencyCapBackpressurePolicy(topology) + with self._patch_config(10, 1.1, 1.5): + with self.assertRaises(AssertionError): + policy = ConcurrencyCapBackpressurePolicy(topology) + with self._patch_config(10, 0.3, 0.5): + with self.assertRaises(AssertionError): + policy = ConcurrencyCapBackpressurePolicy(topology) + + def test_e2e(self): + """A simple E2E test with ConcurrencyCapBackpressurePolicy enabled.""" + + @ray.remote(num_cpus=0) + class RecordTimeActor: + def __init__(self): + self._start_time = defaultdict(lambda: float("inf")) + self._end_time = defaultdict(lambda: 0.0) + + def record_start_time(self, index): + self._start_time[index] = min(time.time(), self._start_time[index]) + + def record_end_time(self, index): + self._end_time[index] = max(time.time(), self._end_time[index]) + + def get_start_and_end_time(self, index): + return self._start_time[index], self._end_time[index] + + actor = RecordTimeActor.remote() + + def map_func(data, index): + actor.record_start_time.remote(index) + yield data + actor.record_end_time.remote(index) + + with enable_backpressure_policies([ConcurrencyCapBackpressurePolicy]): + # Creat a dataset with 2 map ops. Each map op has N tasks, where N is + # the number of cluster CPUs. + N = self.__class__._cluster_cpus + ds = ray.data.range(N, parallelism=N) + # Use different `num_cpus` to make sure they don't fuse. + ds = ds.map_batches( + functools.partial(map_func, index=1), batch_size=None, num_cpus=1 + ) + ds = ds.map_batches( + functools.partial(map_func, index=2), batch_size=None, num_cpus=1.1 + ) + res = ds.take_all() + self.assertEqual(len(res), N) + + # We recorded the start and end time of each op, + # check that these 2 ops are executed interleavingly. + # This means that the executor didn't allocate all resources to the first + # op in the beginning. + start1, end1 = ray.get(actor.get_start_and_end_time.remote(1)) + start2, end2 = ray.get(actor.get_start_and_end_time.remote(2)) + assert start1 < start2 < end1 < end2, (start1, start2, end1, end2) + + +if __name__ == "__main__": + import sys + + import pytest + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 2cfa7ddbfa48d..d24e6fb748723 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -160,7 +160,7 @@ def test_select_operator_to_run(): # Test empty. assert ( select_operator_to_run( - topo, NO_USAGE, ExecutionResources(), True, "dummy", AutoscalingState() + topo, NO_USAGE, ExecutionResources(), [], True, "dummy", AutoscalingState() ) is None ) @@ -169,21 +169,21 @@ def test_select_operator_to_run(): topo[o1].outqueue.append("dummy1") assert ( select_operator_to_run( - topo, NO_USAGE, ExecutionResources(), True, "dummy", AutoscalingState() + topo, NO_USAGE, ExecutionResources(), [], True, "dummy", AutoscalingState() ) == o2 ) topo[o1].outqueue.append("dummy2") assert ( select_operator_to_run( - topo, NO_USAGE, ExecutionResources(), True, "dummy", AutoscalingState() + topo, NO_USAGE, ExecutionResources(), [], True, "dummy", AutoscalingState() ) == o2 ) topo[o2].outqueue.append("dummy3") assert ( select_operator_to_run( - topo, NO_USAGE, ExecutionResources(), True, "dummy", AutoscalingState() + topo, NO_USAGE, ExecutionResources(), [], True, "dummy", AutoscalingState() ) == o3 ) @@ -193,7 +193,7 @@ def test_select_operator_to_run(): o3.internal_queue_size = MagicMock(return_value=0) assert ( select_operator_to_run( - topo, NO_USAGE, ExecutionResources(), True, "dummy", AutoscalingState() + topo, NO_USAGE, ExecutionResources(), [], True, "dummy", AutoscalingState() ) == o2 ) @@ -202,7 +202,7 @@ def test_select_operator_to_run(): o3.internal_queue_size = MagicMock(return_value=2) assert ( select_operator_to_run( - topo, NO_USAGE, ExecutionResources(), True, "dummy", AutoscalingState() + topo, NO_USAGE, ExecutionResources(), [], True, "dummy", AutoscalingState() ) == o2 ) @@ -210,7 +210,7 @@ def test_select_operator_to_run(): o2.internal_queue_size = MagicMock(return_value=0) assert ( select_operator_to_run( - topo, NO_USAGE, ExecutionResources(), True, "dummy", AutoscalingState() + topo, NO_USAGE, ExecutionResources(), [], True, "dummy", AutoscalingState() ) == o3 ) @@ -218,7 +218,7 @@ def test_select_operator_to_run(): o2.internal_queue_size = MagicMock(return_value=2) assert ( select_operator_to_run( - topo, NO_USAGE, ExecutionResources(), True, "dummy", AutoscalingState() + topo, NO_USAGE, ExecutionResources(), [], True, "dummy", AutoscalingState() ) == o3 ) @@ -227,7 +227,7 @@ def test_select_operator_to_run(): o2.throttling_disabled = MagicMock(return_value=True) assert ( select_operator_to_run( - topo, NO_USAGE, ExecutionResources(), True, "dummy", AutoscalingState() + topo, NO_USAGE, ExecutionResources(), [], True, "dummy", AutoscalingState() ) == o2 ) @@ -406,6 +406,7 @@ def run_execution( EMPTY_DOWNSTREAM_USAGE, ), ExecutionResources(cpu=2, gpu=1, object_store_memory=1000), + [], True, execution_id, autoscaling_state, @@ -515,6 +516,7 @@ def test_select_ops_ensure_at_least_one_live_operator(): topo, TopologyResourceUsage(ExecutionResources(cpu=1), EMPTY_DOWNSTREAM_USAGE), ExecutionResources(cpu=1), + [], True, "dummy", AutoscalingState(), @@ -527,6 +529,7 @@ def test_select_ops_ensure_at_least_one_live_operator(): topo, TopologyResourceUsage(ExecutionResources(cpu=1), EMPTY_DOWNSTREAM_USAGE), ExecutionResources(cpu=1), + [], True, "dummy", AutoscalingState(), @@ -538,6 +541,7 @@ def test_select_ops_ensure_at_least_one_live_operator(): topo, TopologyResourceUsage(ExecutionResources(cpu=1), EMPTY_DOWNSTREAM_USAGE), ExecutionResources(cpu=1), + [], False, "dummy", AutoscalingState(),