From 0b14e49237d917513f30a3742978e93b95aacaf0 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Thu, 18 Jan 2024 17:27:01 -0800 Subject: [PATCH 01/10] Initial commit Signed-off-by: Balaji Veeramani --- python/ray/data/BUILD | 8 ++++ .../interfaces/op_runtime_metrics.py | 32 ++++++++++---- .../operators/actor_pool_map_operator.py | 2 +- .../operators/task_pool_map_operator.py | 2 +- .../_internal/execution/streaming_executor.py | 3 -- .../execution/streaming_executor_state.py | 6 +-- .../tests/test_runtime_metrics_scheduling.py | 43 +++++++++++++++++++ 7 files changed, 79 insertions(+), 17 deletions(-) create mode 100644 python/ray/data/tests/test_runtime_metrics_scheduling.py diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index d40e923eb1163..cd8045cafcbab 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -442,6 +442,14 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) +py_test( + name = "test_runtime_metrics_scheduling", + size = "small", + srcs = ["tests/test_runtime_metrics_scheduling.py"], + tags = ["team:data", "exclusive"], + deps = ["//:ray_lib", ":conftest"], +) + py_test( name = "test_size_estimation", size = "medium", diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index dcf036dd489d3..a4a379ec804c0 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -91,10 +91,12 @@ class OpRuntimeMetrics: obj_store_mem_freed: int = field( default=0, metadata={"map_only": True, "export_metric": True} ) - # Current memory size in the object store. - obj_store_mem_cur: int = field( - default=0, metadata={"map_only": True, "export_metric": True} - ) + + # Current memory size in the object store from inputs. + obj_store_mem_inputs: int = field(default=0, metadata={"map_only": True}) + # Current memory size in the object store from outputs. + obj_store_mem_outputs: int = field(default=0, metadata={"map_only": True}) + # Peak memory size in the object store. obj_store_mem_peak: int = field(default=0, metadata={"map_only": True}) # Spilled memory size in the object store. @@ -201,13 +203,27 @@ def output_buffer_bytes(self) -> int: """Size in bytes of output blocks that are not taken by the downstream yet.""" return self.bytes_outputs_generated - self.bytes_outputs_taken + @property + def obj_store_mem_cur(self) -> int: + return self.obj_store_mem_inputs + self.obj_store_mem_outputs + + @property + def obj_store_mem_cur_upper_bound(self) -> int: + if self.average_bytes_outputs_per_task is not None: + return self.obj_store_mem_inputs + max( + self.obj_store_mem_outputs, + self.num_tasks_running * self.average_bytes_outputs_per_task, + ) + else: + return self.obj_store_mem_inputs + self.obj_store_mem_outputs + def on_input_received(self, input: RefBundle): """Callback when the operator receives a new input.""" self.num_inputs_received += 1 input_size = input.size_bytes() self.bytes_inputs_received += input_size # Update object store metrics. - self.obj_store_mem_cur += input_size + self.obj_store_mem_inputs += input_size if self.obj_store_mem_cur > self.obj_store_mem_peak: self.obj_store_mem_peak = self.obj_store_mem_cur @@ -216,7 +232,7 @@ def on_output_taken(self, output: RefBundle): output_bytes = output.size_bytes() self.num_outputs_taken += 1 self.bytes_outputs_taken += output_bytes - self.obj_store_mem_cur -= output_bytes + self.obj_store_mem_outputs -= output_bytes def on_task_submitted(self, task_index: int, inputs: RefBundle): """Callback when the operator submits a task.""" @@ -241,7 +257,7 @@ def on_output_generated(self, task_index: int, output: RefBundle): # Update object store metrics. self.obj_store_mem_alloc += output_bytes - self.obj_store_mem_cur += output_bytes + self.obj_store_mem_outputs += output_bytes if self.obj_store_mem_cur > self.obj_store_mem_peak: self.obj_store_mem_peak = self.obj_store_mem_cur @@ -279,7 +295,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]): self.obj_store_mem_spilled += meta.size_bytes self.obj_store_mem_freed += total_input_size - self.obj_store_mem_cur -= total_input_size + self.obj_store_mem_inputs -= total_input_size inputs.destroy_if_owned() del self._running_tasks[task_index] diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index afd8fe5606a6f..741946a1b4e8f 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -308,7 +308,7 @@ def current_resource_usage(self) -> ExecutionResources: return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers, gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, - object_store_memory=self.metrics.obj_store_mem_cur, + object_store_memory=self.metrics.obj_store_mem_cur_upper_bound, ) def incremental_resource_usage(self) -> ExecutionResources: diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index 549898078a7f4..d4b6b955f2966 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -95,7 +95,7 @@ def current_resource_usage(self) -> ExecutionResources: return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers, gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, - object_store_memory=self.metrics.obj_store_mem_cur, + object_store_memory=self.metrics.obj_store_mem_cur_upper_bound, ) def incremental_resource_usage(self) -> ExecutionResources: diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 1ad17eb09cb1a..34c1043a18a45 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -270,7 +270,6 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: self._report_current_usage(cur_usage, limits) op = select_operator_to_run( topology, - cur_usage, limits, self._backpressure_policies, ensure_at_least_one_running=self._consumer_idling(), @@ -285,10 +284,8 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: if DEBUG_TRACE_SCHEDULING: _debug_dump_topology(topology) topology[op].dispatch_next_task() - cur_usage = TopologyResourceUsage.of(topology) op = select_operator_to_run( topology, - cur_usage, limits, self._backpressure_policies, ensure_at_least_one_running=self._consumer_idling(), diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index c73c6451c1cef..427466f8c3f46 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -524,7 +524,6 @@ def update_operator_states(topology: Topology) -> None: def select_operator_to_run( topology: Topology, - cur_usage: TopologyResourceUsage, limits: ExecutionResources, backpressure_policies: List[BackpressurePolicy], ensure_at_least_one_running: bool, @@ -544,11 +543,10 @@ def select_operator_to_run( provides backpressure if the consumer is slow. However, once a bundle is returned to the user, it is no longer tracked. """ - assert isinstance(cur_usage, TopologyResourceUsage), cur_usage - # Filter to ops that are eligible for execution. ops = [] for op, state in topology.items(): + cur_usage = TopologyResourceUsage.of(topology) under_resource_limits = _execution_allowed(op, cur_usage, limits) if ( op.need_more_inputs() @@ -730,4 +728,4 @@ def _execution_allowed( ): return True - return global_ok_sans_memory and downstream_memory_ok + return False diff --git a/python/ray/data/tests/test_runtime_metrics_scheduling.py b/python/ray/data/tests/test_runtime_metrics_scheduling.py new file mode 100644 index 0000000000000..b68af20d1e100 --- /dev/null +++ b/python/ray/data/tests/test_runtime_metrics_scheduling.py @@ -0,0 +1,43 @@ +import time + +import numpy as np +import pytest + +import ray +from ray._private.internal_api import memory_summary +from ray.data._internal.execution.backpressure_policy import ( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, + ConcurrencyCapBackpressurePolicy, + StreamingOutputBackpressurePolicy, +) + + +def test_spam(shutdown_only, restore_data_context): + ctx = ray.init(object_store_memory=100 * 1024**2) + + ray.data.DataContext.get_current().use_runtime_metrics_scheduling = True + ray.data.DataContext.get_current().set_config( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, [ConcurrencyCapBackpressurePolicy] + ) + ray.data.DataContext.get_current().set_config( + ConcurrencyCapBackpressurePolicy.INIT_CAP_CONFIG_KEY, 1 + ) + + def f(batch): + time.sleep(0.1) + return {"data": np.zeros(20 * 1024**2, dtype=np.uint8)} + + ds = ray.data.range(10).repartition(10).materialize() + ds = ds.map_batches(f, batch_size=None) + + for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"): + pass + + meminfo = memory_summary(ctx.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) From f63de6504e42062100008a660ed91e73e053eebd Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Thu, 18 Jan 2024 17:27:47 -0800 Subject: [PATCH 02/10] Appease lint Signed-off-by: Balaji Veeramani --- .../ray/data/_internal/execution/streaming_executor_state.py | 5 ----- python/ray/data/tests/test_runtime_metrics_scheduling.py | 1 - 2 files changed, 6 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 427466f8c3f46..e23a79d4c5551 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -712,11 +712,6 @@ def _execution_allowed( cpu=global_limits.cpu, gpu=global_limits.gpu ) global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory) - downstream_usage = global_usage.downstream_memory_usage[op] - downstream_limit = global_limits.scale(downstream_usage.topology_fraction) - downstream_memory_ok = ExecutionResources( - object_store_memory=downstream_usage.object_store_memory - ).satisfies_limit(downstream_limit) # If completing a task decreases the overall object store memory usage, allow it # even if we're over the global limit. diff --git a/python/ray/data/tests/test_runtime_metrics_scheduling.py b/python/ray/data/tests/test_runtime_metrics_scheduling.py index b68af20d1e100..e4998737021cf 100644 --- a/python/ray/data/tests/test_runtime_metrics_scheduling.py +++ b/python/ray/data/tests/test_runtime_metrics_scheduling.py @@ -8,7 +8,6 @@ from ray.data._internal.execution.backpressure_policy import ( ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, ConcurrencyCapBackpressurePolicy, - StreamingOutputBackpressurePolicy, ) From 7046eee06389fde865b7c07c3b7d8995d566ced3 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 22 Jan 2024 04:14:54 -0800 Subject: [PATCH 03/10] Update stuff Signed-off-by: Balaji Veeramani --- .../execution/streaming_executor_state.py | 16 ++++++++++++++-- .../tests/test_runtime_metrics_scheduling.py | 5 ++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index e23a79d4c5551..d3fbde163d8e7 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -543,10 +543,11 @@ def select_operator_to_run( provides backpressure if the consumer is slow. However, once a bundle is returned to the user, it is no longer tracked. """ + cur_usage = TopologyResourceUsage.of(topology) + # Filter to ops that are eligible for execution. ops = [] for op, state in topology.items(): - cur_usage = TopologyResourceUsage.of(topology) under_resource_limits = _execution_allowed(op, cur_usage, limits) if ( op.need_more_inputs() @@ -712,6 +713,17 @@ def _execution_allowed( cpu=global_limits.cpu, gpu=global_limits.gpu ) global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory) + downstream_usage = global_usage.downstream_memory_usage[op] + downstream_memory = downstream_usage.object_store_memory + if ( + DataContext.get_current().use_runtime_metrics_scheduling + and inc.object_store_memory + ): + downstream_memory += inc.object_store_memory + downstream_limit = global_limits.scale(downstream_usage.topology_fraction) + downstream_memory_ok = ExecutionResources( + object_store_memory=downstream_memory + ).satisfies_limit(downstream_limit) # If completing a task decreases the overall object store memory usage, allow it # even if we're over the global limit. @@ -723,4 +735,4 @@ def _execution_allowed( ): return True - return False + return global_ok_sans_memory and downstream_memory_ok diff --git a/python/ray/data/tests/test_runtime_metrics_scheduling.py b/python/ray/data/tests/test_runtime_metrics_scheduling.py index e4998737021cf..ec20b4a6794af 100644 --- a/python/ray/data/tests/test_runtime_metrics_scheduling.py +++ b/python/ray/data/tests/test_runtime_metrics_scheduling.py @@ -24,10 +24,9 @@ def test_spam(shutdown_only, restore_data_context): def f(batch): time.sleep(0.1) - return {"data": np.zeros(20 * 1024**2, dtype=np.uint8)} + return {"data": np.zeros(25 * 1024**2, dtype=np.uint8)} - ds = ray.data.range(10).repartition(10).materialize() - ds = ds.map_batches(f, batch_size=None) + ds = ray.data.range(3, parallelism=3).map_batches(f, batch_size=None) for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"): pass From 788d080c4c4834c2ee7b01d1510cb9ad1a375e83 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 22 Jan 2024 04:30:23 -0800 Subject: [PATCH 04/10] Update stuff Signed-off-by: Balaji Veeramani --- python/ray/data/_internal/execution/streaming_executor.py | 3 +++ .../data/_internal/execution/streaming_executor_state.py | 3 ++- python/ray/data/tests/test_runtime_metrics_scheduling.py | 7 +++++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 34c1043a18a45..1ad17eb09cb1a 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -270,6 +270,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: self._report_current_usage(cur_usage, limits) op = select_operator_to_run( topology, + cur_usage, limits, self._backpressure_policies, ensure_at_least_one_running=self._consumer_idling(), @@ -284,8 +285,10 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: if DEBUG_TRACE_SCHEDULING: _debug_dump_topology(topology) topology[op].dispatch_next_task() + cur_usage = TopologyResourceUsage.of(topology) op = select_operator_to_run( topology, + cur_usage, limits, self._backpressure_policies, ensure_at_least_one_running=self._consumer_idling(), diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index d3fbde163d8e7..96e0cb66f89f4 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -524,6 +524,7 @@ def update_operator_states(topology: Topology) -> None: def select_operator_to_run( topology: Topology, + cur_usage: TopologyResourceUsage, limits: ExecutionResources, backpressure_policies: List[BackpressurePolicy], ensure_at_least_one_running: bool, @@ -543,7 +544,7 @@ def select_operator_to_run( provides backpressure if the consumer is slow. However, once a bundle is returned to the user, it is no longer tracked. """ - cur_usage = TopologyResourceUsage.of(topology) + assert isinstance(cur_usage, TopologyResourceUsage), cur_usage # Filter to ops that are eligible for execution. ops = [] diff --git a/python/ray/data/tests/test_runtime_metrics_scheduling.py b/python/ray/data/tests/test_runtime_metrics_scheduling.py index ec20b4a6794af..439840bfc67e9 100644 --- a/python/ray/data/tests/test_runtime_metrics_scheduling.py +++ b/python/ray/data/tests/test_runtime_metrics_scheduling.py @@ -21,12 +21,15 @@ def test_spam(shutdown_only, restore_data_context): ray.data.DataContext.get_current().set_config( ConcurrencyCapBackpressurePolicy.INIT_CAP_CONFIG_KEY, 1 ) + ray.data.DataContext.get_current().set_config( + ConcurrencyCapBackpressurePolicy.CAP_MULTIPLIER_CONFIG_KEY, 4 + ) def f(batch): time.sleep(0.1) - return {"data": np.zeros(25 * 1024**2, dtype=np.uint8)} + return {"data": np.zeros(24 * 1024**2, dtype=np.uint8)} - ds = ray.data.range(3, parallelism=3).map_batches(f, batch_size=None) + ds = ray.data.range(5, parallelism=5).map_batches(f, batch_size=None) for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"): pass From 983b557d6c5ee99a7be1715eeb955d124c602de0 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 23 Jan 2024 15:58:55 -0800 Subject: [PATCH 05/10] Update stuff Signed-off-by: Balaji Veeramani --- .../streaming_output_backpressure_policy.py | 25 ++++--- .../interfaces/op_runtime_metrics.py | 69 ++++++++++++------- .../operators/actor_pool_map_operator.py | 5 +- .../operators/task_pool_map_operator.py | 5 +- .../tests/test_runtime_metrics_scheduling.py | 17 ++--- 5 files changed, 77 insertions(+), 44 deletions(-) diff --git a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py index a46035c543c82..8078a566d5f81 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py @@ -35,13 +35,13 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy): # The max number of blocks that can be buffered at the streaming generator # of each `DataOpTask`. - MAX_BLOCKS_IN_GENERATOR_BUFFER = 10 + DEFAULT_MAX_BLOCKS_IN_GENERATOR_BUFFER = 4 MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY = ( "backpressure_policies.streaming_output.max_blocks_in_generator_buffer" ) # The max number of blocks that can be buffered at the operator output queue # (`OpState.outqueue`). - MAX_BLOCKS_IN_OP_OUTPUT_QUEUE = 20 + DEFAULT_MAX_BLOCKS_IN_OP_OUTPUT_QUEUE = 20 MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY = ( "backpressure_policies.streaming_output.max_blocks_in_op_output_queue" ) @@ -51,12 +51,11 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy): MAX_OUTPUT_IDLE_SECONDS = 10 def __init__(self, topology: "Topology"): - data_context = ray.data.DataContext.get_current() - self._max_num_blocks_in_streaming_gen_buffer = data_context.get_config( - self.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY, - self.MAX_BLOCKS_IN_GENERATOR_BUFFER, + self._max_num_blocks_in_streaming_gen_buffer = ( + self.get_max_num_blocks_in_streaming_gen_buffer() ) - assert self._max_num_blocks_in_streaming_gen_buffer > 0 + + data_context = ray.data.DataContext.get_current() # The `_generator_backpressure_num_objects` parameter should be # `2 * self._max_num_blocks_in_streaming_gen_buffer` because we yield # 2 objects for each block: the block and the block metadata. @@ -66,7 +65,7 @@ def __init__(self, topology: "Topology"): self._max_num_blocks_in_op_output_queue = data_context.get_config( self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY, - self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE, + self.DEFAULT_MAX_BLOCKS_IN_OP_OUTPUT_QUEUE, ) assert self._max_num_blocks_in_op_output_queue > 0 @@ -131,6 +130,16 @@ def calculate_max_blocks_to_read_per_op( self._print_warning(state.op, cur_time - last_time) return max_blocks_to_read_per_op + @classmethod + def get_max_num_blocks_in_streaming_gen_buffer(cls): + data_context = ray.data.DataContext.get_current() + value = data_context.get_config( + cls.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY, + cls.DEFAULT_MAX_BLOCKS_IN_GENERATOR_BUFFER, + ) + assert value > 0 + return value + def _print_warning(self, op: "PhysicalOperator", idle_time: float): if self._warning_printed: return diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index a4a379ec804c0..e473e4b00736b 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -2,6 +2,10 @@ from typing import TYPE_CHECKING, Any, Dict, Optional import ray +from ray.data._internal.execution.backpressure_policy import ( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, + StreamingOutputBackpressurePolicy, +) from ray.data._internal.execution.interfaces.ref_bundle import RefBundle from ray.data._internal.memory_tracing import trace_allocation @@ -91,12 +95,10 @@ class OpRuntimeMetrics: obj_store_mem_freed: int = field( default=0, metadata={"map_only": True, "export_metric": True} ) - - # Current memory size in the object store from inputs. - obj_store_mem_inputs: int = field(default=0, metadata={"map_only": True}) - # Current memory size in the object store from outputs. - obj_store_mem_outputs: int = field(default=0, metadata={"map_only": True}) - + # Current memory size in the object store. + obj_store_mem_cur: int = field( + default=0, metadata={"map_only": True, "export_metric": True} + ) # Peak memory size in the object store. obj_store_mem_peak: int = field(default=0, metadata={"map_only": True}) # Spilled memory size in the object store. @@ -163,6 +165,39 @@ def average_num_outputs_per_task(self) -> Optional[float]: else: return self.num_outputs_of_finished_tasks / self.num_tasks_finished + @property + def average_bytes_per_output(self) -> Optional[float]: + """Average size in bytes of output blocks.""" + if self.num_outputs_generated == 0: + return None + else: + return self.bytes_outputs_generated / self.num_outputs_generated + + @property + def obj_store_mem_pending(self) -> Optional[float]: + """Estimated size in bytes of output blocks in Ray generator buffers. + + If an estimate isn't available, this property returns ``None``. + """ + backpressure_policies = ray.data.DataContext.get_current().get_config( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY + ) + if StreamingOutputBackpressurePolicy not in backpressure_policies: + return None + + estimated_bytes_per_output = ( + self.average_bytes_per_output + or ray.data.DataContext.get_current().target_max_block_size + ) + max_num_outputs_in_streaming_gen_buffer = ( + StreamingOutputBackpressurePolicy.get_max_num_blocks_in_streaming_gen_buffer() + ) + return ( + self.num_tasks_running + * estimated_bytes_per_output + * max_num_outputs_in_streaming_gen_buffer + ) + @property def average_bytes_inputs_per_task(self) -> Optional[float]: """Average size in bytes of ref bundles passed to tasks, or ``None`` if no @@ -203,27 +238,13 @@ def output_buffer_bytes(self) -> int: """Size in bytes of output blocks that are not taken by the downstream yet.""" return self.bytes_outputs_generated - self.bytes_outputs_taken - @property - def obj_store_mem_cur(self) -> int: - return self.obj_store_mem_inputs + self.obj_store_mem_outputs - - @property - def obj_store_mem_cur_upper_bound(self) -> int: - if self.average_bytes_outputs_per_task is not None: - return self.obj_store_mem_inputs + max( - self.obj_store_mem_outputs, - self.num_tasks_running * self.average_bytes_outputs_per_task, - ) - else: - return self.obj_store_mem_inputs + self.obj_store_mem_outputs - def on_input_received(self, input: RefBundle): """Callback when the operator receives a new input.""" self.num_inputs_received += 1 input_size = input.size_bytes() self.bytes_inputs_received += input_size # Update object store metrics. - self.obj_store_mem_inputs += input_size + self.obj_store_mem_cur += input_size if self.obj_store_mem_cur > self.obj_store_mem_peak: self.obj_store_mem_peak = self.obj_store_mem_cur @@ -232,7 +253,7 @@ def on_output_taken(self, output: RefBundle): output_bytes = output.size_bytes() self.num_outputs_taken += 1 self.bytes_outputs_taken += output_bytes - self.obj_store_mem_outputs -= output_bytes + self.obj_store_mem_cur -= output_bytes def on_task_submitted(self, task_index: int, inputs: RefBundle): """Callback when the operator submits a task.""" @@ -257,7 +278,7 @@ def on_output_generated(self, task_index: int, output: RefBundle): # Update object store metrics. self.obj_store_mem_alloc += output_bytes - self.obj_store_mem_outputs += output_bytes + self.obj_store_mem_cur += output_bytes if self.obj_store_mem_cur > self.obj_store_mem_peak: self.obj_store_mem_peak = self.obj_store_mem_cur @@ -295,7 +316,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]): self.obj_store_mem_spilled += meta.size_bytes self.obj_store_mem_freed += total_input_size - self.obj_store_mem_inputs -= total_input_size + self.obj_store_mem_cur -= total_input_size inputs.destroy_if_owned() del self._running_tasks[task_index] diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index 741946a1b4e8f..b5ef9d2eabfe3 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -305,10 +305,13 @@ def base_resource_usage(self) -> ExecutionResources: def current_resource_usage(self) -> ExecutionResources: # Both pending and running actors count towards our current resource usage. num_active_workers = self._actor_pool.num_total_actors() + object_store_memory = self.metrics.obj_store_mem_cur + if self.metrics.obj_store_mem_pending is not None: + object_store_memory += self.metrics.obj_store_mem_pending return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers, gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, - object_store_memory=self.metrics.obj_store_mem_cur_upper_bound, + object_store_memory=object_store_memory, ) def incremental_resource_usage(self) -> ExecutionResources: diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index d4b6b955f2966..d1f290f599552 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -92,10 +92,13 @@ def base_resource_usage(self) -> ExecutionResources: def current_resource_usage(self) -> ExecutionResources: num_active_workers = self.num_active_tasks() + object_store_memory = self.metrics.obj_store_mem_cur + if self.metrics.obj_store_mem_pending is not None: + object_store_memory += self.metrics.obj_store_mem_pending return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers, gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, - object_store_memory=self.metrics.obj_store_mem_cur_upper_bound, + object_store_memory=object_store_memory, ) def incremental_resource_usage(self) -> ExecutionResources: diff --git a/python/ray/data/tests/test_runtime_metrics_scheduling.py b/python/ray/data/tests/test_runtime_metrics_scheduling.py index 439840bfc67e9..830cfdf529ddc 100644 --- a/python/ray/data/tests/test_runtime_metrics_scheduling.py +++ b/python/ray/data/tests/test_runtime_metrics_scheduling.py @@ -7,29 +7,26 @@ from ray._private.internal_api import memory_summary from ray.data._internal.execution.backpressure_policy import ( ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, - ConcurrencyCapBackpressurePolicy, + StreamingOutputBackpressurePolicy, ) -def test_spam(shutdown_only, restore_data_context): +def test_scheduler_accounts_for_in_flight_tasks(shutdown_only, restore_data_context): + # The executor launches multiple tasks in each scheduling step. If it doesn't + # account for the potential output of in flight tasks, it may launch too many tasks + # and cause spilling. ctx = ray.init(object_store_memory=100 * 1024**2) ray.data.DataContext.get_current().use_runtime_metrics_scheduling = True ray.data.DataContext.get_current().set_config( - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, [ConcurrencyCapBackpressurePolicy] - ) - ray.data.DataContext.get_current().set_config( - ConcurrencyCapBackpressurePolicy.INIT_CAP_CONFIG_KEY, 1 - ) - ray.data.DataContext.get_current().set_config( - ConcurrencyCapBackpressurePolicy.CAP_MULTIPLIER_CONFIG_KEY, 4 + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, [StreamingOutputBackpressurePolicy] ) def f(batch): time.sleep(0.1) return {"data": np.zeros(24 * 1024**2, dtype=np.uint8)} - ds = ray.data.range(5, parallelism=5).map_batches(f, batch_size=None) + ds = ray.data.range(8, parallelism=8).map_batches(f, batch_size=None) for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"): pass From 139d39173342712f04c62267b39e5d6e6fee0374 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 23 Jan 2024 15:59:13 -0800 Subject: [PATCH 06/10] Appease lint Signed-off-by: Balaji Veeramani --- .../data/_internal/execution/interfaces/op_runtime_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index e473e4b00736b..e09ea90bb2056 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -190,7 +190,7 @@ def obj_store_mem_pending(self) -> Optional[float]: or ray.data.DataContext.get_current().target_max_block_size ) max_num_outputs_in_streaming_gen_buffer = ( - StreamingOutputBackpressurePolicy.get_max_num_blocks_in_streaming_gen_buffer() + StreamingOutputBackpressurePolicy.get_max_num_blocks_in_streaming_gen_buffer() # noqa: E501 ) return ( self.num_tasks_running From db6f02f9e36481dee5f7343635a22dbc349b5ca1 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Tue, 23 Jan 2024 16:04:53 -0800 Subject: [PATCH 07/10] Add note Signed-off-by: Balaji Veeramani --- python/ray/data/tests/test_runtime_metrics_scheduling.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/data/tests/test_runtime_metrics_scheduling.py b/python/ray/data/tests/test_runtime_metrics_scheduling.py index 830cfdf529ddc..12df40a5ffd9b 100644 --- a/python/ray/data/tests/test_runtime_metrics_scheduling.py +++ b/python/ray/data/tests/test_runtime_metrics_scheduling.py @@ -26,6 +26,8 @@ def f(batch): time.sleep(0.1) return {"data": np.zeros(24 * 1024**2, dtype=np.uint8)} + # If the executor doesn't account for the potential output of in flight tasks, it + # will launch all 8 tasks at once, producing 8 * 24MiB = 192MiB > 100MiB of data. ds = ray.data.range(8, parallelism=8).map_batches(f, batch_size=None) for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"): From 64178908d91ade769722a0bd8a1c428528930208 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 24 Jan 2024 20:16:37 -0800 Subject: [PATCH 08/10] Address review comments Signed-off-by: Balaji Veeramani --- .../streaming_output_backpressure_policy.py | 25 ++++--------------- .../interfaces/op_runtime_metrics.py | 14 +++-------- .../operators/task_pool_map_operator.py | 9 ++++++- python/ray/data/context.py | 3 +++ 4 files changed, 20 insertions(+), 31 deletions(-) diff --git a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py index 59e0611f9b4cb..a627b2a4ee29c 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py @@ -51,17 +51,12 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy): MAX_OUTPUT_IDLE_SECONDS = 10 def __init__(self, topology: "Topology"): - self._max_num_blocks_in_streaming_gen_buffer = ( - self.get_max_num_blocks_in_streaming_gen_buffer() - ) - data_context = ray.data.DataContext.get_current() - # The `_generator_backpressure_num_objects` parameter should be - # `2 * self._max_num_blocks_in_streaming_gen_buffer` because we yield - # 2 objects for each block: the block and the block metadata. - data_context._task_pool_data_task_remote_args[ - "_generator_backpressure_num_objects" - ] = (2 * self._max_num_blocks_in_streaming_gen_buffer) + data_context._max_num_blocks_in_streaming_gen_buffer = data_context.get_config( + self.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY, + self.DEFAULT_MAX_BLOCKS_IN_GENERATOR_BUFFER, + ) + assert data_context._max_num_blocks_in_streaming_gen_buffer > 0 self._max_num_blocks_in_op_output_queue = data_context.get_config( self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY, @@ -130,16 +125,6 @@ def calculate_max_blocks_to_read_per_op( self._print_warning(state.op, cur_time - last_time) return max_blocks_to_read_per_op - @classmethod - def get_max_num_blocks_in_streaming_gen_buffer(cls): - data_context = ray.data.DataContext.get_current() - value = data_context.get_config( - cls.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY, - cls.DEFAULT_MAX_BLOCKS_IN_GENERATOR_BUFFER, - ) - assert value > 0 - return value - def _print_warning(self, op: "PhysicalOperator", idle_time: float): if self._warning_printed: return diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index e09ea90bb2056..b18a945e7f744 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -179,23 +179,17 @@ def obj_store_mem_pending(self) -> Optional[float]: If an estimate isn't available, this property returns ``None``. """ - backpressure_policies = ray.data.DataContext.get_current().get_config( - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY - ) - if StreamingOutputBackpressurePolicy not in backpressure_policies: + context = ray.data.DataContext.get_current() + if context._max_num_blocks_in_streaming_gen_buffer is None: return None estimated_bytes_per_output = ( - self.average_bytes_per_output - or ray.data.DataContext.get_current().target_max_block_size - ) - max_num_outputs_in_streaming_gen_buffer = ( - StreamingOutputBackpressurePolicy.get_max_num_blocks_in_streaming_gen_buffer() # noqa: E501 + self.average_bytes_per_output or context.target_max_block_size ) return ( self.num_tasks_running * estimated_bytes_per_output - * max_num_outputs_in_streaming_gen_buffer + * context._max_num_blocks_in_streaming_gen_buffer ) @property diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index d1f290f599552..a3404897d4407 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -59,7 +59,14 @@ def _add_bundled_input(self, bundle: RefBundle): data_context = DataContext.get_current() ray_remote_args = self._get_runtime_ray_remote_args(input_bundle=bundle) ray_remote_args["name"] = self.name - ray_remote_args.update(data_context._task_pool_data_task_remote_args) + + if data_context._max_num_blocks_in_streaming_gen_buffer is not None: + # The `_generator_backpressure_num_objects` parameter should be + # `2 * _max_num_blocks_in_streaming_gen_buffer` because we yield + # 2 objects for each block: the block and the block metadata. + ray_remote_args["_generator_backpressure_num_objects"] = ( + 2 * data_context._max_num_blocks_in_streaming_gen_buffer + ) gen = map_task.options(**ray_remote_args).remote( self._map_transformer_ref, diff --git a/python/ray/data/context.py b/python/ray/data/context.py index ce2171d986c66..b7bfe0b116f4a 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -265,6 +265,9 @@ def __init__( # the DataContext from the plugin implementations, as well as to avoid # circular dependencies. self._kv_configs: Dict[str, Any] = {} + # The max number of blocks that can be buffered at the streaming generator of + # each `DataOpTask`. + self._max_num_blocks_in_streaming_gen_buffer = None @staticmethod def get_current() -> "DataContext": From 57e63a77b9cf2c96416ee7f45b5570f898efde47 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 24 Jan 2024 20:18:19 -0800 Subject: [PATCH 09/10] Appease lint Signed-off-by: Balaji Veeramani --- .../streaming_output_backpressure_policy.py | 8 ++++---- .../_internal/execution/interfaces/op_runtime_metrics.py | 4 ---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py index a627b2a4ee29c..088d8d593ea3e 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py @@ -35,13 +35,13 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy): # The max number of blocks that can be buffered at the streaming generator # of each `DataOpTask`. - DEFAULT_MAX_BLOCKS_IN_GENERATOR_BUFFER = 4 + MAX_BLOCKS_IN_GENERATOR_BUFFER = 4 MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY = ( "backpressure_policies.streaming_output.max_blocks_in_generator_buffer" ) # The max number of blocks that can be buffered at the operator output queue # (`OpState.outqueue`). - DEFAULT_MAX_BLOCKS_IN_OP_OUTPUT_QUEUE = 20 + MAX_BLOCKS_IN_OP_OUTPUT_QUEUE = 20 MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY = ( "backpressure_policies.streaming_output.max_blocks_in_op_output_queue" ) @@ -54,13 +54,13 @@ def __init__(self, topology: "Topology"): data_context = ray.data.DataContext.get_current() data_context._max_num_blocks_in_streaming_gen_buffer = data_context.get_config( self.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY, - self.DEFAULT_MAX_BLOCKS_IN_GENERATOR_BUFFER, + self.MAX_BLOCKS_IN_GENERATOR_BUFFER, ) assert data_context._max_num_blocks_in_streaming_gen_buffer > 0 self._max_num_blocks_in_op_output_queue = data_context.get_config( self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY, - self.DEFAULT_MAX_BLOCKS_IN_OP_OUTPUT_QUEUE, + self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE, ) assert self._max_num_blocks_in_op_output_queue > 0 diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index b18a945e7f744..015c2f3bb0384 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -2,10 +2,6 @@ from typing import TYPE_CHECKING, Any, Dict, Optional import ray -from ray.data._internal.execution.backpressure_policy import ( - ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, - StreamingOutputBackpressurePolicy, -) from ray.data._internal.execution.interfaces.ref_bundle import RefBundle from ray.data._internal.memory_tracing import trace_allocation From 6efb412897fc71031dc5e1dc037a44d94d501d48 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 24 Jan 2024 21:00:49 -0800 Subject: [PATCH 10/10] Fix test Signed-off-by: Balaji Veeramani --- python/ray/data/tests/test_backpressure_policies.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index 6a5ae6fc6b3aa..2673d069e802d 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -273,8 +273,9 @@ def test_policy_basic(self): policy._max_num_blocks_in_op_output_queue == self._max_blocks_in_op_output_queue ) + data_context = ray.data.DataContext.get_current() assert ( - policy._max_num_blocks_in_streaming_gen_buffer + data_context._max_num_blocks_in_streaming_gen_buffer == self._max_blocks_in_generator_buffer )