From 0c0ed96c33c566d12a6e4b337e125d12b004646f Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Thu, 25 Jan 2024 12:01:22 -0800 Subject: [PATCH] [Data] Estimate object store memory from in-flight tasks (#42504) Ray Data's streaming executor launches as many as 50 tasks in a single scheduling step. If the executor doesn't account for the potential output of in-flight tasks, it launches too many tasks (since tasks don't immediately output data) and causes spilling. This PR fixes the issue by considering data buffered at the Ray Core level to computations of topology resource usage. --------- Signed-off-by: Balaji Veeramani Co-authored-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com> --- python/ray/data/BUILD | 8 ++++ .../streaming_output_backpressure_policy.py | 12 ++---- .../interfaces/op_runtime_metrics.py | 27 ++++++++++++ .../operators/actor_pool_map_operator.py | 5 ++- .../operators/task_pool_map_operator.py | 14 +++++- .../execution/streaming_executor_state.py | 8 +++- python/ray/data/context.py | 3 ++ .../data/tests/test_backpressure_policies.py | 3 +- .../tests/test_runtime_metrics_scheduling.py | 43 +++++++++++++++++++ 9 files changed, 109 insertions(+), 14 deletions(-) create mode 100644 python/ray/data/tests/test_runtime_metrics_scheduling.py diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index d40e923eb1163..cd8045cafcbab 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -442,6 +442,14 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) +py_test( + name = "test_runtime_metrics_scheduling", + size = "small", + srcs = ["tests/test_runtime_metrics_scheduling.py"], + tags = ["team:data", "exclusive"], + deps = ["//:ray_lib", ":conftest"], +) + py_test( name = "test_size_estimation", size = "medium", diff --git a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py index f7657d6b185e9..088d8d593ea3e 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py @@ -35,7 +35,7 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy): # The max number of blocks that can be buffered at the streaming generator # of each `DataOpTask`. - MAX_BLOCKS_IN_GENERATOR_BUFFER = 10 + MAX_BLOCKS_IN_GENERATOR_BUFFER = 4 MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY = ( "backpressure_policies.streaming_output.max_blocks_in_generator_buffer" ) @@ -52,17 +52,11 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy): def __init__(self, topology: "Topology"): data_context = ray.data.DataContext.get_current() - self._max_num_blocks_in_streaming_gen_buffer = data_context.get_config( + data_context._max_num_blocks_in_streaming_gen_buffer = data_context.get_config( self.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY, self.MAX_BLOCKS_IN_GENERATOR_BUFFER, ) - assert self._max_num_blocks_in_streaming_gen_buffer > 0 - # The `_generator_backpressure_num_objects` parameter should be - # `2 * self._max_num_blocks_in_streaming_gen_buffer` because we yield - # 2 objects for each block: the block and the block metadata. - data_context._task_pool_data_task_remote_args[ - "_generator_backpressure_num_objects" - ] = (2 * self._max_num_blocks_in_streaming_gen_buffer) + assert data_context._max_num_blocks_in_streaming_gen_buffer > 0 self._max_num_blocks_in_op_output_queue = data_context.get_config( self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY, diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index dcf036dd489d3..015c2f3bb0384 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -161,6 +161,33 @@ def average_num_outputs_per_task(self) -> Optional[float]: else: return self.num_outputs_of_finished_tasks / self.num_tasks_finished + @property + def average_bytes_per_output(self) -> Optional[float]: + """Average size in bytes of output blocks.""" + if self.num_outputs_generated == 0: + return None + else: + return self.bytes_outputs_generated / self.num_outputs_generated + + @property + def obj_store_mem_pending(self) -> Optional[float]: + """Estimated size in bytes of output blocks in Ray generator buffers. + + If an estimate isn't available, this property returns ``None``. + """ + context = ray.data.DataContext.get_current() + if context._max_num_blocks_in_streaming_gen_buffer is None: + return None + + estimated_bytes_per_output = ( + self.average_bytes_per_output or context.target_max_block_size + ) + return ( + self.num_tasks_running + * estimated_bytes_per_output + * context._max_num_blocks_in_streaming_gen_buffer + ) + @property def average_bytes_inputs_per_task(self) -> Optional[float]: """Average size in bytes of ref bundles passed to tasks, or ``None`` if no diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index c9a01a14a3169..a174a99c9737c 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -311,10 +311,13 @@ def base_resource_usage(self) -> ExecutionResources: def current_resource_usage(self) -> ExecutionResources: # Both pending and running actors count towards our current resource usage. num_active_workers = self._actor_pool.num_total_actors() + object_store_memory = self.metrics.obj_store_mem_cur + if self.metrics.obj_store_mem_pending is not None: + object_store_memory += self.metrics.obj_store_mem_pending return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers, gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, - object_store_memory=self.metrics.obj_store_mem_cur, + object_store_memory=object_store_memory, ) def incremental_resource_usage(self) -> ExecutionResources: diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index 549898078a7f4..a3404897d4407 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -59,7 +59,14 @@ def _add_bundled_input(self, bundle: RefBundle): data_context = DataContext.get_current() ray_remote_args = self._get_runtime_ray_remote_args(input_bundle=bundle) ray_remote_args["name"] = self.name - ray_remote_args.update(data_context._task_pool_data_task_remote_args) + + if data_context._max_num_blocks_in_streaming_gen_buffer is not None: + # The `_generator_backpressure_num_objects` parameter should be + # `2 * _max_num_blocks_in_streaming_gen_buffer` because we yield + # 2 objects for each block: the block and the block metadata. + ray_remote_args["_generator_backpressure_num_objects"] = ( + 2 * data_context._max_num_blocks_in_streaming_gen_buffer + ) gen = map_task.options(**ray_remote_args).remote( self._map_transformer_ref, @@ -92,10 +99,13 @@ def base_resource_usage(self) -> ExecutionResources: def current_resource_usage(self) -> ExecutionResources: num_active_workers = self.num_active_tasks() + object_store_memory = self.metrics.obj_store_mem_cur + if self.metrics.obj_store_mem_pending is not None: + object_store_memory += self.metrics.obj_store_mem_pending return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers, gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, - object_store_memory=self.metrics.obj_store_mem_cur, + object_store_memory=object_store_memory, ) def incremental_resource_usage(self) -> ExecutionResources: diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index c73c6451c1cef..96e0cb66f89f4 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -715,9 +715,15 @@ def _execution_allowed( ) global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory) downstream_usage = global_usage.downstream_memory_usage[op] + downstream_memory = downstream_usage.object_store_memory + if ( + DataContext.get_current().use_runtime_metrics_scheduling + and inc.object_store_memory + ): + downstream_memory += inc.object_store_memory downstream_limit = global_limits.scale(downstream_usage.topology_fraction) downstream_memory_ok = ExecutionResources( - object_store_memory=downstream_usage.object_store_memory + object_store_memory=downstream_memory ).satisfies_limit(downstream_limit) # If completing a task decreases the overall object store memory usage, allow it diff --git a/python/ray/data/context.py b/python/ray/data/context.py index ce2171d986c66..b7bfe0b116f4a 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -265,6 +265,9 @@ def __init__( # the DataContext from the plugin implementations, as well as to avoid # circular dependencies. self._kv_configs: Dict[str, Any] = {} + # The max number of blocks that can be buffered at the streaming generator of + # each `DataOpTask`. + self._max_num_blocks_in_streaming_gen_buffer = None @staticmethod def get_current() -> "DataContext": diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index 6a5ae6fc6b3aa..2673d069e802d 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -273,8 +273,9 @@ def test_policy_basic(self): policy._max_num_blocks_in_op_output_queue == self._max_blocks_in_op_output_queue ) + data_context = ray.data.DataContext.get_current() assert ( - policy._max_num_blocks_in_streaming_gen_buffer + data_context._max_num_blocks_in_streaming_gen_buffer == self._max_blocks_in_generator_buffer ) diff --git a/python/ray/data/tests/test_runtime_metrics_scheduling.py b/python/ray/data/tests/test_runtime_metrics_scheduling.py new file mode 100644 index 0000000000000..12df40a5ffd9b --- /dev/null +++ b/python/ray/data/tests/test_runtime_metrics_scheduling.py @@ -0,0 +1,43 @@ +import time + +import numpy as np +import pytest + +import ray +from ray._private.internal_api import memory_summary +from ray.data._internal.execution.backpressure_policy import ( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, + StreamingOutputBackpressurePolicy, +) + + +def test_scheduler_accounts_for_in_flight_tasks(shutdown_only, restore_data_context): + # The executor launches multiple tasks in each scheduling step. If it doesn't + # account for the potential output of in flight tasks, it may launch too many tasks + # and cause spilling. + ctx = ray.init(object_store_memory=100 * 1024**2) + + ray.data.DataContext.get_current().use_runtime_metrics_scheduling = True + ray.data.DataContext.get_current().set_config( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, [StreamingOutputBackpressurePolicy] + ) + + def f(batch): + time.sleep(0.1) + return {"data": np.zeros(24 * 1024**2, dtype=np.uint8)} + + # If the executor doesn't account for the potential output of in flight tasks, it + # will launch all 8 tasks at once, producing 8 * 24MiB = 192MiB > 100MiB of data. + ds = ray.data.range(8, parallelism=8).map_batches(f, batch_size=None) + + for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"): + pass + + meminfo = memory_summary(ctx.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__]))