Skip to content

Commit

Permalink
[Data] Estimate object store memory from in-flight tasks (#42504)
Browse files Browse the repository at this point in the history
Ray Data's streaming executor launches as many as 50 tasks in a single scheduling step. If the executor doesn't account for the potential output of in-flight tasks, it launches too many tasks (since tasks don't immediately output data) and causes spilling.

This PR fixes the issue by considering data buffered at the Ray Core level to computations of topology resource usage.

---------

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
Co-authored-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
  • Loading branch information
bveeramani and aslonnie committed Jan 25, 2024
1 parent fae8d2f commit 0c0ed96
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 14 deletions.
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_runtime_metrics_scheduling",
size = "small",
srcs = ["tests/test_runtime_metrics_scheduling.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_size_estimation",
size = "medium",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy):

# The max number of blocks that can be buffered at the streaming generator
# of each `DataOpTask`.
MAX_BLOCKS_IN_GENERATOR_BUFFER = 10
MAX_BLOCKS_IN_GENERATOR_BUFFER = 4
MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY = (
"backpressure_policies.streaming_output.max_blocks_in_generator_buffer"
)
Expand All @@ -52,17 +52,11 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy):

def __init__(self, topology: "Topology"):
data_context = ray.data.DataContext.get_current()
self._max_num_blocks_in_streaming_gen_buffer = data_context.get_config(
data_context._max_num_blocks_in_streaming_gen_buffer = data_context.get_config(
self.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY,
self.MAX_BLOCKS_IN_GENERATOR_BUFFER,
)
assert self._max_num_blocks_in_streaming_gen_buffer > 0
# The `_generator_backpressure_num_objects` parameter should be
# `2 * self._max_num_blocks_in_streaming_gen_buffer` because we yield
# 2 objects for each block: the block and the block metadata.
data_context._task_pool_data_task_remote_args[
"_generator_backpressure_num_objects"
] = (2 * self._max_num_blocks_in_streaming_gen_buffer)
assert data_context._max_num_blocks_in_streaming_gen_buffer > 0

self._max_num_blocks_in_op_output_queue = data_context.get_config(
self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,33 @@ def average_num_outputs_per_task(self) -> Optional[float]:
else:
return self.num_outputs_of_finished_tasks / self.num_tasks_finished

@property
def average_bytes_per_output(self) -> Optional[float]:
"""Average size in bytes of output blocks."""
if self.num_outputs_generated == 0:
return None
else:
return self.bytes_outputs_generated / self.num_outputs_generated

@property
def obj_store_mem_pending(self) -> Optional[float]:
"""Estimated size in bytes of output blocks in Ray generator buffers.
If an estimate isn't available, this property returns ``None``.
"""
context = ray.data.DataContext.get_current()
if context._max_num_blocks_in_streaming_gen_buffer is None:
return None

estimated_bytes_per_output = (
self.average_bytes_per_output or context.target_max_block_size
)
return (
self.num_tasks_running
* estimated_bytes_per_output
* context._max_num_blocks_in_streaming_gen_buffer
)

@property
def average_bytes_inputs_per_task(self) -> Optional[float]:
"""Average size in bytes of ref bundles passed to tasks, or ``None`` if no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,13 @@ def base_resource_usage(self) -> ExecutionResources:
def current_resource_usage(self) -> ExecutionResources:
# Both pending and running actors count towards our current resource usage.
num_active_workers = self._actor_pool.num_total_actors()
object_store_memory = self.metrics.obj_store_mem_cur
if self.metrics.obj_store_mem_pending is not None:
object_store_memory += self.metrics.obj_store_mem_pending
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
object_store_memory=self.metrics.obj_store_mem_cur,
object_store_memory=object_store_memory,
)

def incremental_resource_usage(self) -> ExecutionResources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ def _add_bundled_input(self, bundle: RefBundle):
data_context = DataContext.get_current()
ray_remote_args = self._get_runtime_ray_remote_args(input_bundle=bundle)
ray_remote_args["name"] = self.name
ray_remote_args.update(data_context._task_pool_data_task_remote_args)

if data_context._max_num_blocks_in_streaming_gen_buffer is not None:
# The `_generator_backpressure_num_objects` parameter should be
# `2 * _max_num_blocks_in_streaming_gen_buffer` because we yield
# 2 objects for each block: the block and the block metadata.
ray_remote_args["_generator_backpressure_num_objects"] = (
2 * data_context._max_num_blocks_in_streaming_gen_buffer
)

gen = map_task.options(**ray_remote_args).remote(
self._map_transformer_ref,
Expand Down Expand Up @@ -92,10 +99,13 @@ def base_resource_usage(self) -> ExecutionResources:

def current_resource_usage(self) -> ExecutionResources:
num_active_workers = self.num_active_tasks()
object_store_memory = self.metrics.obj_store_mem_cur
if self.metrics.obj_store_mem_pending is not None:
object_store_memory += self.metrics.obj_store_mem_pending
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
object_store_memory=self.metrics.obj_store_mem_cur,
object_store_memory=object_store_memory,
)

def incremental_resource_usage(self) -> ExecutionResources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,15 @@ def _execution_allowed(
)
global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory)
downstream_usage = global_usage.downstream_memory_usage[op]
downstream_memory = downstream_usage.object_store_memory
if (
DataContext.get_current().use_runtime_metrics_scheduling
and inc.object_store_memory
):
downstream_memory += inc.object_store_memory
downstream_limit = global_limits.scale(downstream_usage.topology_fraction)
downstream_memory_ok = ExecutionResources(
object_store_memory=downstream_usage.object_store_memory
object_store_memory=downstream_memory
).satisfies_limit(downstream_limit)

# If completing a task decreases the overall object store memory usage, allow it
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ def __init__(
# the DataContext from the plugin implementations, as well as to avoid
# circular dependencies.
self._kv_configs: Dict[str, Any] = {}
# The max number of blocks that can be buffered at the streaming generator of
# each `DataOpTask`.
self._max_num_blocks_in_streaming_gen_buffer = None

@staticmethod
def get_current() -> "DataContext":
Expand Down
3 changes: 2 additions & 1 deletion python/ray/data/tests/test_backpressure_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,9 @@ def test_policy_basic(self):
policy._max_num_blocks_in_op_output_queue
== self._max_blocks_in_op_output_queue
)
data_context = ray.data.DataContext.get_current()
assert (
policy._max_num_blocks_in_streaming_gen_buffer
data_context._max_num_blocks_in_streaming_gen_buffer
== self._max_blocks_in_generator_buffer
)

Expand Down
43 changes: 43 additions & 0 deletions python/ray/data/tests/test_runtime_metrics_scheduling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import time

import numpy as np
import pytest

import ray
from ray._private.internal_api import memory_summary
from ray.data._internal.execution.backpressure_policy import (
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY,
StreamingOutputBackpressurePolicy,
)


def test_scheduler_accounts_for_in_flight_tasks(shutdown_only, restore_data_context):
# The executor launches multiple tasks in each scheduling step. If it doesn't
# account for the potential output of in flight tasks, it may launch too many tasks
# and cause spilling.
ctx = ray.init(object_store_memory=100 * 1024**2)

ray.data.DataContext.get_current().use_runtime_metrics_scheduling = True
ray.data.DataContext.get_current().set_config(
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, [StreamingOutputBackpressurePolicy]
)

def f(batch):
time.sleep(0.1)
return {"data": np.zeros(24 * 1024**2, dtype=np.uint8)}

# If the executor doesn't account for the potential output of in flight tasks, it
# will launch all 8 tasks at once, producing 8 * 24MiB = 192MiB > 100MiB of data.
ds = ray.data.range(8, parallelism=8).map_batches(f, batch_size=None)

for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"):
pass

meminfo = memory_summary(ctx.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))

0 comments on commit 0c0ed96

Please sign in to comment.