Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Estimate object store memory from in-flight tasks #42504

Merged
merged 13 commits into from
Jan 25, 2024
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_runtime_metrics_scheduling",
size = "small",
srcs = ["tests/test_runtime_metrics_scheduling.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_size_estimation",
size = "medium",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy):

# The max number of blocks that can be buffered at the streaming generator
# of each `DataOpTask`.
MAX_BLOCKS_IN_GENERATOR_BUFFER = 10
DEFAULT_MAX_BLOCKS_IN_GENERATOR_BUFFER = 4
MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY = (
"backpressure_policies.streaming_output.max_blocks_in_generator_buffer"
)
# The max number of blocks that can be buffered at the operator output queue
# (`OpState.outqueue`).
MAX_BLOCKS_IN_OP_OUTPUT_QUEUE = 20
DEFAULT_MAX_BLOCKS_IN_OP_OUTPUT_QUEUE = 20
MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY = (
"backpressure_policies.streaming_output.max_blocks_in_op_output_queue"
)
Expand All @@ -51,12 +51,11 @@ class StreamingOutputBackpressurePolicy(BackpressurePolicy):
MAX_OUTPUT_IDLE_SECONDS = 10

def __init__(self, topology: "Topology"):
data_context = ray.data.DataContext.get_current()
self._max_num_blocks_in_streaming_gen_buffer = data_context.get_config(
self.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY,
self.MAX_BLOCKS_IN_GENERATOR_BUFFER,
self._max_num_blocks_in_streaming_gen_buffer = (
self.get_max_num_blocks_in_streaming_gen_buffer()
)
assert self._max_num_blocks_in_streaming_gen_buffer > 0

data_context = ray.data.DataContext.get_current()
# The `_generator_backpressure_num_objects` parameter should be
# `2 * self._max_num_blocks_in_streaming_gen_buffer` because we yield
# 2 objects for each block: the block and the block metadata.
Expand All @@ -66,7 +65,7 @@ def __init__(self, topology: "Topology"):

self._max_num_blocks_in_op_output_queue = data_context.get_config(
self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE_CONFIG_KEY,
self.MAX_BLOCKS_IN_OP_OUTPUT_QUEUE,
self.DEFAULT_MAX_BLOCKS_IN_OP_OUTPUT_QUEUE,
)
assert self._max_num_blocks_in_op_output_queue > 0

Expand Down Expand Up @@ -131,6 +130,16 @@ def calculate_max_blocks_to_read_per_op(
self._print_warning(state.op, cur_time - last_time)
return max_blocks_to_read_per_op

@classmethod
def get_max_num_blocks_in_streaming_gen_buffer(cls):
data_context = ray.data.DataContext.get_current()
value = data_context.get_config(
cls.MAX_BLOCKS_IN_GENERATOR_BUFFER_CONFIG_KEY,
cls.DEFAULT_MAX_BLOCKS_IN_GENERATOR_BUFFER,
)
assert value > 0
return value

def _print_warning(self, op: "PhysicalOperator", idle_time: float):
if self._warning_printed:
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
from typing import TYPE_CHECKING, Any, Dict, Optional

import ray
from ray.data._internal.execution.backpressure_policy import (
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY,
StreamingOutputBackpressurePolicy,
)
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data._internal.memory_tracing import trace_allocation

Expand Down Expand Up @@ -161,6 +165,39 @@ def average_num_outputs_per_task(self) -> Optional[float]:
else:
return self.num_outputs_of_finished_tasks / self.num_tasks_finished

@property
def average_bytes_per_output(self) -> Optional[float]:
"""Average size in bytes of output blocks."""
if self.num_outputs_generated == 0:
return None
else:
return self.bytes_outputs_generated / self.num_outputs_generated

@property
def obj_store_mem_pending(self) -> Optional[float]:
"""Estimated size in bytes of output blocks in Ray generator buffers.

If an estimate isn't available, this property returns ``None``.
"""
backpressure_policies = ray.data.DataContext.get_current().get_config(
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY
)
if StreamingOutputBackpressurePolicy not in backpressure_policies:
return None
bveeramani marked this conversation as resolved.
Show resolved Hide resolved

estimated_bytes_per_output = (
self.average_bytes_per_output
or ray.data.DataContext.get_current().target_max_block_size
)
max_num_outputs_in_streaming_gen_buffer = (
StreamingOutputBackpressurePolicy.get_max_num_blocks_in_streaming_gen_buffer() # noqa: E501
)
return (
self.num_tasks_running
* estimated_bytes_per_output
* max_num_outputs_in_streaming_gen_buffer
)

@property
def average_bytes_inputs_per_task(self) -> Optional[float]:
"""Average size in bytes of ref bundles passed to tasks, or ``None`` if no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,13 @@ def base_resource_usage(self) -> ExecutionResources:
def current_resource_usage(self) -> ExecutionResources:
# Both pending and running actors count towards our current resource usage.
num_active_workers = self._actor_pool.num_total_actors()
object_store_memory = self.metrics.obj_store_mem_cur
if self.metrics.obj_store_mem_pending is not None:
object_store_memory += self.metrics.obj_store_mem_pending
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not blocking this PR. just a note.)
I plan to move this method to OpRuntimeMetrics. Currently it's weird that we have 2 different places reporting resource metrics.

return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
object_store_memory=self.metrics.obj_store_mem_cur,
object_store_memory=object_store_memory,
)

def incremental_resource_usage(self) -> ExecutionResources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@ def base_resource_usage(self) -> ExecutionResources:

def current_resource_usage(self) -> ExecutionResources:
num_active_workers = self.num_active_tasks()
object_store_memory = self.metrics.obj_store_mem_cur
if self.metrics.obj_store_mem_pending is not None:
object_store_memory += self.metrics.obj_store_mem_pending
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
object_store_memory=self.metrics.obj_store_mem_cur,
object_store_memory=object_store_memory,
)

def incremental_resource_usage(self) -> ExecutionResources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,15 @@ def _execution_allowed(
)
global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory)
downstream_usage = global_usage.downstream_memory_usage[op]
downstream_memory = downstream_usage.object_store_memory
if (
DataContext.get_current().use_runtime_metrics_scheduling
and inc.object_store_memory
):
downstream_memory += inc.object_store_memory
downstream_limit = global_limits.scale(downstream_usage.topology_fraction)
downstream_memory_ok = ExecutionResources(
object_store_memory=downstream_usage.object_store_memory
object_store_memory=downstream_memory
).satisfies_limit(downstream_limit)

# If completing a task decreases the overall object store memory usage, allow it
Expand Down
43 changes: 43 additions & 0 deletions python/ray/data/tests/test_runtime_metrics_scheduling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import time

import numpy as np
import pytest

import ray
from ray._private.internal_api import memory_summary
from ray.data._internal.execution.backpressure_policy import (
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY,
StreamingOutputBackpressurePolicy,
)


def test_scheduler_accounts_for_in_flight_tasks(shutdown_only, restore_data_context):
# The executor launches multiple tasks in each scheduling step. If it doesn't
# account for the potential output of in flight tasks, it may launch too many tasks
# and cause spilling.
ctx = ray.init(object_store_memory=100 * 1024**2)

ray.data.DataContext.get_current().use_runtime_metrics_scheduling = True
ray.data.DataContext.get_current().set_config(
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY, [StreamingOutputBackpressurePolicy]
)

def f(batch):
time.sleep(0.1)
return {"data": np.zeros(24 * 1024**2, dtype=np.uint8)}

# If the executor doesn't account for the potential output of in flight tasks, it
# will launch all 8 tasks at once, producing 8 * 24MiB = 192MiB > 100MiB of data.
ds = ray.data.range(8, parallelism=8).map_batches(f, batch_size=None)

for _ in ds.iter_batches(batch_size=None, batch_format="pyarrow"):
pass

meminfo = memory_summary(ctx.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
Loading