diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index d40e923eb1163..cd8045cafcbab 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -442,6 +442,14 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) +py_test( + name = "test_runtime_metrics_scheduling", + size = "small", + srcs = ["tests/test_runtime_metrics_scheduling.py"], + tags = ["team:data", "exclusive"], + deps = ["//:ray_lib", ":conftest"], +) + py_test( name = "test_size_estimation", size = "medium", diff --git a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py index f7657d6b185e9..088d8d593ea3e 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/streaming_output_backpressure_policy.py @@ -35,7 +35,7 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy): # The max number of blocks that can be buffered at the streaming generator # of each `DataOpTask`. - MAX_BLOCKS_IN_GENERATOR_BUFFER = 10 + MAX_BLOCKS_IN_GENERATOR_BUFFER = 4 MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY = ( "backpressure_policies.streaming_output.max_blocks_in_generator_buffer" ) @@ -52,17 +52,11 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy): def __init__(self, topology: "Topology"): data_context = ray.data.DataContext.get_current() - self._max_num_blocks_in_streaming_gen_buffer = data_context.get_config( + data_context._max_num_blocks_in_streaming_gen_buffer = data_context.get_config( self.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY, self.MAX_BLOCKS_IN_GENERATOR_BUFFER, ) - assert self._max_num_blocks_in_streaming_gen_buffer > 0 - # The `_generator_backpressure_num_objects` parameter should be - # `2 * self._max_num_blocks_in_streaming_gen_buffer` because we yield - # 2 objects for each block: the block and the block metadata. - data_context._task_pool_data_task_remote_args[ - "_generator_backpressure_num_objects" - ] = (2 * self._max_num_blocks_in_streaming_gen_buffer) + assert data_context._max_num_blocks_in_streaming_gen_buffer > 0 self._max_num_blocks_in_op_output_queue = data_context.get_config( self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY, diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index dcf036dd489d3..015c2f3bb0384 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -161,6 +161,33 @@ def average_num_outputs_per_task(self) -> Optional[float]: else: return self.num_outputs_of_finished_tasks / self.num_tasks_finished + @property + def average_bytes_per_output(self) -> Optional[float]: + """Average size in bytes of output blocks.""" + if self.num_outputs_generated == 0: + return None + else: + return self.bytes_outputs_generated / self.num_outputs_generated + + @property + def obj_store_mem_pending(self) -> Optional[float]: + """Estimated size in bytes of output blocks in Ray generator buffers. + + If an estimate isn't available, this property returns ``None``. + """ + context = ray.data.DataContext.get_current() + if context._max_num_blocks_in_streaming_gen_buffer is None: + return None + + estimated_bytes_per_output = ( + self.average_bytes_per_output or context.target_max_block_size + ) + return ( + self.num_tasks_running + * estimated_bytes_per_output + * context._max_num_blocks_in_streaming_gen_buffer + ) + @property def average_bytes_inputs_per_task(self) -> Optional[float]: """Average size in bytes of ref bundles passed to tasks, or ``None`` if no diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index c9a01a14a3169..a174a99c9737c 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -311,10 +311,13 @@ def base_resource_usage(self) -> ExecutionResources: def current_resource_usage(self) -> ExecutionResources: # Both pending and running actors count towards our current resource usage. num_active_workers = self._actor_pool.num_total_actors() + object_store_memory = self.metrics.obj_store_mem_cur + if self.metrics.obj_store_mem_pending is not None: + object_store_memory += self.metrics.obj_store_mem_pending return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers, gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, - object_store_memory=self.metrics.obj_store_mem_cur, + object_store_memory=object_store_memory, ) def incremental_resource_usage(self) -> ExecutionResources: diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index 549898078a7f4..a3404897d4407 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -59,7 +59,14 @@ def _add_bundled_input(self, bundle: RefBundle): data_context = DataContext.get_current() ray_remote_args = self._get_runtime_ray_remote_args(input_bundle=bundle) ray_remote_args["name"] = self.name - ray_remote_args.update(data_context._task_pool_data_task_remote_args) + + if data_context._max_num_blocks_in_streaming_gen_buffer is not None: + # The `_generator_backpressure_num_objects` parameter should be + # `2 * _max_num_blocks_in_streaming_gen_buffer` because we yield + # 2 objects for each block: the block and the block metadata. + ray_remote_args["_generator_backpressure_num_objects"] = ( + 2 * data_context._max_num_blocks_in_streaming_gen_buffer + ) gen = map_task.options(**ray_remote_args).remote( self._map_transformer_ref, @@ -92,10 +99,13 @@ def base_resource_usage(self) -> ExecutionResources: def current_resource_usage(self) -> ExecutionResources: num_active_workers = self.num_active_tasks() + object_store_memory = self.metrics.obj_store_mem_cur + if self.metrics.obj_store_mem_pending is not None: + object_store_memory += self.metrics.obj_store_mem_pending return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers, gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers, - object_store_memory=self.metrics.obj_store_mem_cur, + object_store_memory=object_store_memory, ) def incremental_resource_usage(self) -> ExecutionResources: diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index c73c6451c1cef..96e0cb66f89f4 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -715,9 +715,15 @@ def _execution_allowed( ) global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory) downstream_usage = global_usage.downstream_memory_usage[op] + downstream_memory = downstream_usage.object_store_memory + if ( + DataContext.get_current().use_runtime_metrics_scheduling + and inc.object_store_memory + ): + downstream_memory += inc.object_store_memory downstream_limit = global_limits.scale(downstream_usage.topology_fraction) downstream_memory_ok = ExecutionResources( - object_store_memory=downstream_usage.object_store_memory + object_store_memory=downstream_memory ).satisfies_limit(downstream_limit) # If completing a task decreases the overall object store memory usage, allow it diff --git a/python/ray/data/context.py b/python/ray/data/context.py index ce2171d986c66..b7bfe0b116f4a 100644 --- a/python/ray/data/context.py +++ b/python/ray/data/context.py @@ -265,6 +265,9 @@ def __init__( # the DataContext from the plugin implementations, as well as to avoid # circular dependencies. self._kv_configs: Dict[str, Any] = {} + # The max number of blocks that can be buffered at the streaming generator of + # each `DataOpTask`. + self._max_num_blocks_in_streaming_gen_buffer = None @staticmethod def get_current() -> "DataContext": diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index 6a5ae6fc6b3aa..2673d069e802d 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -273,8 +273,9 @@ def test_policy_basic(self): policy._max_num_blocks_in_op_output_queue == self._max_blocks_in_op_output_queue ) + data_context = ray.data.DataContext.get_current() assert ( - policy._max_num_blocks_in_streaming_gen_buffer + data_context._max_num_blocks_in_streaming_gen_buffer == self._max_blocks_in_generator_buffer ) diff --git a/python/ray/data/tests/test_runtime_metrics_scheduling.py b/python/ray/data/tests/test_runtime_metrics_scheduling.py new file mode 100644 index 0000000000000..12df40a5ffd9b --- /dev/null +++ b/python/ray/data/tests/test_runtime_metrics_scheduling.py @@ -0,0 +1,43 @@ +import time + +import numpy as np +import pytest + +import ray +from ray._private.internal_api import memory_summary +from ray.data._internal.execution.backpressure_policy import ( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, + StreamingOutputBackpressurePolicy, +) + + +def test_scheduler_accounts_for_in_flight_tasks(shutdown_only, restore_data_context): + # The executor launches multiple tasks in each scheduling step. If it doesn't + # account for the potential output of in flight tasks, it may launch too many tasks + # and cause spilling. + ctx = ray.init(object_store_memory=100 * 1024**2) + + ray.data.DataContext.get_current().use_runtime_metrics_scheduling = True + ray.data.DataContext.get_current().set_config( + ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, [StreamingOutputBackpressurePolicy] + ) + + def f(batch): + time.sleep(0.1) + return {"data": np.zeros(24 * 1024**2, dtype=np.uint8)} + + # If the executor doesn't account for the potential output of in flight tasks, it + # will launch all 8 tasks at once, producing 8 * 24MiB = 192MiB > 100MiB of data. + ds = ray.data.range(8, parallelism=8).map_batches(f, batch_size=None) + + for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"): + pass + + meminfo = memory_summary(ctx.address_info["address"], stats_only=True) + assert "Spilled" not in meminfo, meminfo + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__]))