Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Enable per-op resource reservation #43171

Merged
merged 83 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
a454087
integrate streaming output backpressure
raulchen Feb 14, 2024
a90c82d
integrate scheduling
raulchen Feb 14, 2024
45aace2
default streaming gen buffer
raulchen Feb 14, 2024
ac4ee7a
fix remote args
raulchen Feb 14, 2024
c8e8235
enable
raulchen Feb 14, 2024
7eef1c3
fix
raulchen Feb 14, 2024
0c57088
streaming backpressure based on size
raulchen Feb 14, 2024
d2b16cd
fix
raulchen Feb 14, 2024
276c7c6
comment out
raulchen Feb 14, 2024
21e80d0
reduce streaming gen buffer to 2 blocks
raulchen Feb 17, 2024
b00f347
fix obj_store_mem_max_pending_output_per_task
raulchen Feb 17, 2024
35d0e24
increase default obj memory to 50%
raulchen Feb 17, 2024
3a05243
print usage in progress bar
raulchen Feb 17, 2024
6e69a89
separate budgets
raulchen Feb 17, 2024
df2e4be
simplify
raulchen Feb 17, 2024
a380f72
refine code
raulchen Feb 21, 2024
8fab9b7
Merge branch 'master' into enable-memory-reservation
raulchen Feb 21, 2024
cbf29fd
only assign running tasks
raulchen Feb 21, 2024
d555014
fix
raulchen Feb 21, 2024
85eec48
handle fractional remaining
raulchen Feb 21, 2024
99d778f
Reserve enough memory for GPU actor tasks
bveeramani Feb 21, 2024
56256fb
minor fix
raulchen Feb 21, 2024
922440d
clean up debug logging
raulchen Feb 21, 2024
0a536b7
refine implementation & add comment
raulchen Feb 21, 2024
fe6e822
rename
raulchen Feb 21, 2024
0c0c4b0
remove StreamingOutputBackpressurePolicy
raulchen Feb 21, 2024
a492da7
Merge branch 'master' into enable-memory-reservation
raulchen Feb 21, 2024
70a75b3
fix merge
raulchen Feb 21, 2024
c756713
clean up max_bytes_to_read
raulchen Feb 21, 2024
04fdc30
refine resource usage string
raulchen Feb 21, 2024
b39a741
refine comments
raulchen Feb 21, 2024
18744ab
fix
raulchen Feb 22, 2024
d9af986
clear reservation when op finishes
raulchen Feb 22, 2024
4b6139c
do not consider autoscaling
raulchen Feb 22, 2024
29ef8ea
fix
raulchen Feb 22, 2024
081f501
comment
raulchen Feb 22, 2024
217b546
comment
raulchen Feb 22, 2024
a122f29
remove use_runtime_metrics_scheduling
raulchen Feb 22, 2024
186aa62
minor renames
raulchen Feb 22, 2024
e431d31
Merge branch 'master' into enable-memory-reservation
raulchen Feb 22, 2024
678bf23
remove python/ray/data/tests/test_runtime_metrics_scheduling.py
raulchen Feb 22, 2024
1e60ea9
lint
raulchen Feb 22, 2024
4c04e8d
refine
raulchen Feb 22, 2024
5cc517f
debug usage_str
raulchen Feb 22, 2024
c3e39c7
remove streaming output backpressure test
raulchen Feb 22, 2024
0018462
fix deadlock when global limit not enough
raulchen Feb 23, 2024
15a9892
idle detection
raulchen Feb 23, 2024
5636ff8
refine e2e tests
raulchen Feb 23, 2024
790b400
lint
raulchen Feb 23, 2024
df3b003
refine
raulchen Feb 23, 2024
c9d0b68
refine
raulchen Feb 23, 2024
135c6d0
fix process_completed_tasks
raulchen Feb 23, 2024
bb5273c
fix tests
raulchen Feb 23, 2024
d1b0857
comments
raulchen Feb 23, 2024
837575c
fix python/ray/data/tests/test_executor_resource_management.py
raulchen Feb 23, 2024
bbbc9be
update TestResourceManager
raulchen Feb 23, 2024
cbd76f6
rename
raulchen Feb 23, 2024
97024cb
fix
raulchen Feb 23, 2024
1a6af68
rename
raulchen Feb 23, 2024
be2e93e
fix
raulchen Feb 24, 2024
f5199ac
test reserve min
raulchen Feb 24, 2024
39a13b0
util func for mock map op
raulchen Feb 24, 2024
1c2b777
lint
raulchen Feb 24, 2024
3c93338
refactor idle detection
raulchen Feb 24, 2024
7745bff
add table explanation
raulchen Feb 24, 2024
b7fbd18
lint
raulchen Feb 24, 2024
47e121e
comment
raulchen Feb 24, 2024
a372102
comment
raulchen Feb 26, 2024
a3aba51
fix test_size_estimation.py
raulchen Feb 26, 2024
2f8c77d
fix
raulchen Feb 26, 2024
8c13be9
account non-map downstream ops to upstream map op
raulchen Feb 26, 2024
125506d
refine
raulchen Feb 26, 2024
73efa2b
fix
raulchen Feb 26, 2024
1ffab5d
comment
raulchen Feb 26, 2024
92a515d
resnet test
raulchen Feb 26, 2024
bae892e
update comments
raulchen Feb 26, 2024
5358ff5
reserve min memory
raulchen Feb 26, 2024
699e711
lint
raulchen Feb 27, 2024
6c72db1
move e2e tests
raulchen Feb 27, 2024
bef8277
loosen condition
raulchen Feb 27, 2024
a45edd9
lint
raulchen Feb 27, 2024
275ab66
lint
raulchen Feb 27, 2024
e2c2bdd
fix
raulchen Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 8 additions & 16 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,6 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_runtime_metrics_scheduling",
size = "small",
srcs = ["tests/test_runtime_metrics_scheduling.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_size_estimation",
size = "medium",
Expand Down Expand Up @@ -522,14 +514,6 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_streaming_backpressure_edge_case",
size = "medium",
srcs = ["tests/test_streaming_backpressure_edge_case.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_transform_pyarrow",
size = "small",
Expand Down Expand Up @@ -561,3 +545,11 @@ py_test(
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_backpressure_e2e",
size = "medium",
srcs = ["tests/test_backpressure_e2e.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import ray
from .backpressure_policy import BackpressurePolicy
from .concurrency_cap_backpressure_policy import ConcurrencyCapBackpressurePolicy
from .streaming_output_backpressure_policy import StreamingOutputBackpressurePolicy

if TYPE_CHECKING:
from ray.data._internal.execution.streaming_executor_state import Topology

# Default enabled backpressure policies and its config key.
# Use `DataContext.set_config` to config it.
# TODO(hchen): Enable StreamingOutputBackpressurePolicy by default.
ENABLED_BACKPRESSURE_POLICIES = [ConcurrencyCapBackpressurePolicy]
ENABLED_BACKPRESSURE_POLICIES = [
ConcurrencyCapBackpressurePolicy,
]
ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY = "backpressure_policies.enabled"


Expand All @@ -27,7 +27,6 @@ def get_backpressure_policies(topology: "Topology"):
__all__ = [
"BackpressurePolicy",
"ConcurrencyCapBackpressurePolicy",
"StreamingOutputBackpressurePolicy",
"ENABLED_BACKPRESSURE_POLICIES_CONFIG_KEY",
"get_backpressure_policies",
]
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces.physical_operator import (
PhysicalOperator,
)
from ray.data._internal.execution.streaming_executor_state import OpState, Topology
from ray.data._internal.execution.streaming_executor_state import Topology


class BackpressurePolicy(ABC):
Expand All @@ -15,24 +15,6 @@ class BackpressurePolicy(ABC):
def __init__(self, topology: "Topology"):
...

def calculate_max_blocks_to_read_per_op(
self, topology: "Topology"
) -> Dict["OpState", int]:
"""Determine how many blocks of data we can read from each operator.
The `DataOpTask`s of the operators will stop reading blocks when the limit is
reached. Then the execution of these tasks will be paused when the streaming
generator backpressure threshold is reached.
Used in `streaming_executor_state.py::process_completed_tasks()`.

Returns: A dict mapping from each operator's OpState to the desired number of
blocks to read. For operators that are not in the dict, all available blocks
will be read.

Note: Only one backpressure policy that implements this method can be enabled
at a time.
"""
return {}

def can_add_input(self, op: "PhysicalOperator") -> bool:
"""Determine if we can add a new input to the operator. If returns False, the
operator will be backpressured and will not be able to run new tasks.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ def inf(cls) -> "ExecutionResources":
"""Returns an ExecutionResources object with infinite resources."""
return ExecutionResources(float("inf"), float("inf"), float("inf"))

def is_zero(self) -> bool:
"""Returns True if all resources are zero."""
return self.cpu == 0.0 and self.gpu == 0.0 and self.object_store_memory == 0

def is_non_negative(self) -> bool:
"""Returns True if all resources are non-negative."""
return (
(self.cpu is None or self.cpu >= 0)
and (self.gpu is None or self.gpu >= 0)
and (self.object_store_memory is None or self.object_store_memory >= 0)
)

def object_store_memory_str(self) -> str:
"""Returns a human-readable string for the object store memory field."""
if self.object_store_memory is None:
Expand Down Expand Up @@ -92,13 +104,24 @@ def max(self, other: "ExecutionResources") -> "ExecutionResources":

def min(self, other: "ExecutionResources") -> "ExecutionResources":
"""Returns the minimum for each resource type."""
cpu1 = self.cpu if self.cpu is not None else float("inf")
cpu2 = other.cpu if other.cpu is not None else float("inf")
gpu1 = self.gpu if self.gpu is not None else float("inf")
gpu2 = other.gpu if other.gpu is not None else float("inf")
object_store_memory1 = (
self.object_store_memory
if self.object_store_memory is not None
else float("inf")
)
object_store_memory2 = (
other.object_store_memory
if other.object_store_memory is not None
else float("inf")
)
return ExecutionResources(
cpu=min(self.cpu or float("inf"), other.cpu or float("inf")),
gpu=min(self.gpu or float("inf"), other.gpu or float("inf")),
object_store_memory=min(
self.object_store_memory or float("inf"),
other.object_store_memory or float("inf"),
),
cpu=min(cpu1, cpu2),
gpu=min(gpu1, gpu2),
object_store_memory=min(object_store_memory1, object_store_memory2),
)

def satisfies_limit(self, limit: "ExecutionResources") -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ def __init__(
def get_waitable(self) -> ObjectRefGenerator:
return self._streaming_gen

def on_data_ready(self, max_blocks_to_read: Optional[int]) -> int:
def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
"""Callback when data is ready to be read from the streaming generator.

Args:
max_blocks_to_read: Max number of blocks to read. If None, all available
max_bytes_to_read: Max bytes of blocks to read. If None, all available
will be read.
Returns: The number of blocks read.
"""
num_blocks_read = 0
while max_blocks_to_read is None or num_blocks_read < max_blocks_to_read:
bytes_read = 0
while max_bytes_to_read is None or bytes_read < max_bytes_to_read:
try:
block_ref = self._streaming_gen._next_sync(0)
if block_ref.is_nil():
Expand Down Expand Up @@ -103,8 +103,8 @@ def on_data_ready(self, max_blocks_to_read: Optional[int]) -> int:
self._output_ready_callback(
RefBundle([(block_ref, meta)], owns_blocks=True)
)
num_blocks_read += 1
return num_blocks_read
bytes_read += meta.size_bytes
return bytes_read


class MetadataOpTask(OpTask):
Expand Down Expand Up @@ -386,11 +386,16 @@ def base_resource_usage(self) -> ExecutionResources:
"""
return ExecutionResources()

def incremental_resource_usage(self) -> ExecutionResources:
def incremental_resource_usage(
self, consider_autoscaling=True
) -> ExecutionResources:
"""Returns the incremental resources required for processing another input.

For example, an operator that launches a task per input could return
ExecutionResources(cpu=1) as its incremental usage.

Args:
consider_autoscaling: Whether to consider the possibility of autoscaling.
"""
return ExecutionResources()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,12 @@ def current_processor_usage(self) -> ExecutionResources:
gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
)

def incremental_resource_usage(self) -> ExecutionResources:
def incremental_resource_usage(
self, consider_autoscaling=True
) -> ExecutionResources:
# We would only have nonzero incremental CPU/GPU resources if a new task would
# require scale-up to run.
if self._autoscaling_policy.should_scale_up(
if consider_autoscaling and self._autoscaling_policy.should_scale_up(
num_total_workers=self._actor_pool.num_total_actors(),
num_running_workers=self._actor_pool.num_running_actors(),
):
Expand All @@ -345,7 +347,8 @@ def incremental_resource_usage(self) -> ExecutionResources:
return ExecutionResources(
cpu=num_cpus,
gpu=num_gpus,
object_store_memory=self._metrics.average_bytes_outputs_per_task,
object_store_memory=self._metrics.obj_store_mem_max_pending_output_per_task
or 0,
)

def _extra_metrics(self) -> Dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,9 @@ def base_resource_usage(self) -> ExecutionResources:
raise NotImplementedError

@abstractmethod
def incremental_resource_usage(self) -> ExecutionResources:
def incremental_resource_usage(
self, consider_autoscaling=True
) -> ExecutionResources:
raise NotImplementedError


Expand Down