Skip to content

Commit

Permalink
[data] introduce abstract interface for data autoscaling (ray-project…
Browse files Browse the repository at this point in the history
…#45002)

* Introduce an abstract interface for data autoscaling, making
autoscaling behavior easier to customize and extend. Main components:
* `Autoscaler`: the abstract interface responsible for all autoscaling
decisions, including cluster and actor pool autoscaling.
* `AutoscalingActorPool`: abstract interface that represents an actor
pool that can autoscale.
  * `DefaultAutoscaler`: default implementation.
* No major code logic changes in this PR, except
* fixing a small bug of calculating actor pool util (should be
`num_active_actors/current_size` instead of
`num_running_actors/current_size`).
* `ActorPoolMapOperator.incremental_resource_usage` now doesn't consider
autoscaling, as we are abstracting autoscaling out of the op. Previously
the info wasn't useful either.
  *  Removed actor pool autoscaling logic for bulk executor.

---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Ryan O'Leary <ryanaoleary@google.com>
  • Loading branch information
raulchen authored and ryanaoleary committed Jun 6, 2024
1 parent 92e32f9 commit 01b971a
Show file tree
Hide file tree
Showing 18 changed files with 882 additions and 963 deletions.
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_autoscaler",
size = "small",
srcs = ["tests/test_autoscaler.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_lance",
size = "small",
Expand Down
15 changes: 15 additions & 0 deletions python/ray/data/_internal/execution/autoscaler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .autoscaler import Autoscaler
from .autoscaling_actor_pool import AutoscalingActorPool
from .default_autoscaler import DefaultAutoscaler


def create_autoscaler(topology, resource_manager, execution_id):
return DefaultAutoscaler(topology, resource_manager, execution_id)


__all__ = [
"Autoscaler",
"DefaultAutoscaler",
"create_autoscaler",
"AutoscalingActorPool",
]
38 changes: 38 additions & 0 deletions python/ray/data/_internal/execution/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.streaming_executor_state import Topology


@DeveloperAPI
class Autoscaler(ABC):
"""Abstract interface for Ray Data autoscaler."""

def __init__(
self,
topology: "Topology",
resource_manager: "ResourceManager",
execution_id: str,
):
self._topology = topology
self._resource_manager = resource_manager
self._execution_id = execution_id

@abstractmethod
def try_trigger_scaling(self):
"""Try trigger autoscaling.
This method will be called each time when StreamingExecutor makes
a scheduling decision. A subclass should override this method to
handle the autoscaling of both the cluster and `AutoscalingActorPool`s.
"""
...

@abstractmethod
def on_executor_shutdown(self):
"""Callback when the StreamingExecutor is shutting down."""
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from abc import ABC, abstractmethod

from ray.util.annotations import DeveloperAPI


@DeveloperAPI
class AutoscalingActorPool(ABC):
"""Abstract interface of an autoscaling actor pool.
A `PhysicalOperator` can manage one or more `AutoscalingActorPool`s.
`Autoscaler` is responsible for deciding autoscaling of these actor
pools.
"""

@abstractmethod
def min_size(self) -> int:
"""Min size of the actor pool."""
...

@abstractmethod
def max_size(self) -> int:
"""Max size of the actor pool."""
...

@abstractmethod
def current_size(self) -> int:
"""Current size of the actor pool."""
...

@abstractmethod
def num_running_actors(self) -> int:
"""Number of running actors."""
...

@abstractmethod
def num_active_actors(self) -> int:
"""Number of actors with at least one active task."""
...

@abstractmethod
def num_pending_actors(self) -> int:
"""Number of actors pending creation."""
...

@abstractmethod
def max_tasks_in_flight_per_actor(self) -> int:
"""Max number of in-flight tasks per actor."""
...

@abstractmethod
def current_in_flight_tasks(self) -> int:
"""Number of current in-flight tasks."""
...

def num_total_task_slots(self) -> int:
"""Total number of task slots."""
return self.max_tasks_in_flight_per_actor() * self.current_size()

def num_free_task_slots(self) -> int:
"""Number of free slots to run tasks."""
return (
self.max_tasks_in_flight_per_actor() * self.current_size()
- self.current_in_flight_tasks()
)

@abstractmethod
def scale_up(self, num_actors: int) -> int:
"""Request the actor pool to scale up by the given number of actors.
The number of actually added actors may be less than the requested
number.
Returns:
The number of actors actually added.
"""
...

@abstractmethod
def scale_down(self, num_actors: int) -> int:
"""Request actor pool to scale down by the given number of actors.
The number of actually removed actors may be less than the requested
number.
Returns:
The number of actors actually removed.
"""
...
184 changes: 184 additions & 0 deletions python/ray/data/_internal/execution/autoscaler/default_autoscaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import math
import time
from typing import TYPE_CHECKING, Dict

from .autoscaler import Autoscaler
from .autoscaling_actor_pool import AutoscalingActorPool
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
from ray.data._internal.execution.interfaces.execution_options import ExecutionResources

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces import PhysicalOperator
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.streaming_executor_state import OpState, Topology


class DefaultAutoscaler(Autoscaler):

# Default threshold of actor pool utilization to trigger scaling up.
DEFAULT_ACTOR_POOL_SCALING_UP_THRESHOLD: float = 0.8
# Default threshold of actor pool utilization to trigger scaling down.
DEFAULT_ACTOR_POOL_SCALING_DOWN_THRESHOLD: float = 0.5

# Min number of seconds between two autoscaling requests.
MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS = 20

def __init__(
self,
topology: "Topology",
resource_manager: "ResourceManager",
execution_id: str,
actor_pool_scaling_up_threshold: float = DEFAULT_ACTOR_POOL_SCALING_UP_THRESHOLD, # noqa: E501
actor_pool_scaling_down_threshold: float = DEFAULT_ACTOR_POOL_SCALING_DOWN_THRESHOLD, # noqa: E501
):
self._actor_pool_scaling_up_threshold = actor_pool_scaling_up_threshold
self._actor_pool_scaling_down_threshold = actor_pool_scaling_down_threshold
# Last time when a request was sent to Ray's autoscaler.
self._last_request_time = 0
super().__init__(topology, resource_manager, execution_id)

def try_trigger_scaling(self):
self._try_scale_up_cluster()
self._try_scale_up_or_down_actor_pool()

def _calculate_actor_pool_util(self, actor_pool: AutoscalingActorPool):
"""Calculate the utilization of the given actor pool."""
if actor_pool.current_size() == 0:
return 0
else:
return actor_pool.num_active_actors() / actor_pool.current_size()

def _actor_pool_should_scale_up(
self,
actor_pool: AutoscalingActorPool,
op: "PhysicalOperator",
op_state: "OpState",
):
# Do not scale up, if the op is completed or no more inputs are coming.
if op.completed() or (op._inputs_complete and op.internal_queue_size() == 0):
return False
if actor_pool.current_size() < actor_pool.min_size():
# Scale up, if the actor pool is below min size.
return True
elif actor_pool.current_size() >= actor_pool.max_size():
# Do not scale up, if the actor pool is already at max size.
return False
# Do not scale up, if the op still has enough resources to run.
if op_state._scheduling_status.under_resource_limits:
return False
# Do not scale up, if the op has enough free slots for the existing inputs.
if op_state.num_queued() <= actor_pool.num_free_task_slots():
return False
# Determine whether to scale up based on the actor pool utilization.
util = self._calculate_actor_pool_util(actor_pool)
return util > self._actor_pool_scaling_up_threshold

def _actor_pool_should_scale_down(
self,
actor_pool: AutoscalingActorPool,
op: "PhysicalOperator",
):
# Scale down, if the op is completed or no more inputs are coming.
if op.completed() or (op._inputs_complete and op.internal_queue_size() == 0):
return True
if actor_pool.current_size() > actor_pool.max_size():
# Scale down, if the actor pool is above max size.
return True
elif actor_pool.current_size() <= actor_pool.min_size():
# Do not scale down, if the actor pool is already at min size.
return False
# Determine whether to scale down based on the actor pool utilization.
util = self._calculate_actor_pool_util(actor_pool)
return util < self._actor_pool_scaling_down_threshold

def _try_scale_up_or_down_actor_pool(self):
for op, state in self._topology.items():
actor_pools = op.get_autoscaling_actor_pools()
for actor_pool in actor_pools:
while True:
# Try to scale up or down the actor pool.
should_scale_up = self._actor_pool_should_scale_up(
actor_pool,
op,
state,
)
should_scale_down = self._actor_pool_should_scale_down(
actor_pool, op
)
if should_scale_up and not should_scale_down:
if actor_pool.scale_up(1) == 0:
break
elif should_scale_down and not should_scale_up:
if actor_pool.scale_down(1) == 0:
break
else:
break

def _try_scale_up_cluster(self):
"""Try to scale up the cluster to accomodate the provided in-progress workload.
This makes a resource request to Ray's autoscaler consisting of the current,
aggregate usage of all operators in the DAG + the incremental usage of all
operators that are ready for dispatch (i.e. that have inputs queued). If the
autoscaler were to grant this resource request, it would allow us to dispatch
one task for every ready operator.
Note that this resource request does not take the global resource limits or the
liveness policy into account; it only tries to make the existing resource usage
+ one more task per ready operator feasible in the cluster.
"""
# Limit the frequency of autoscaling requests.
now = time.time()
if now - self._last_request_time < self.MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS:
return

# Scale up the cluster, if no ops are allowed to run, but there are still data
# in the input queues.
no_runnable_op = all(
op_state._scheduling_status.runnable is False
for _, op_state in self._topology.items()
)
any_has_input = any(
op_state.num_queued() > 0 for _, op_state in self._topology.items()
)
if not (no_runnable_op and any_has_input):
return

self._last_request_time = now

# Get resource usage for all ops + additional resources needed to launch one
# more task for each ready op.
resource_request = []

def to_bundle(resource: ExecutionResources) -> Dict:
req = {}
if resource.cpu:
req["CPU"] = math.ceil(resource.cpu)
if resource.gpu:
req["GPU"] = math.ceil(resource.gpu)
return req

for op, state in self._topology.items():
per_task_resource = op.incremental_resource_usage()
task_bundle = to_bundle(per_task_resource)
resource_request.extend([task_bundle] * op.num_active_tasks())
# Only include incremental resource usage for ops that are ready for
# dispatch.
if state.num_queued() > 0:
# TODO(Clark): Scale up more aggressively by adding incremental resource
# usage for more than one bundle in the queue for this op?
resource_request.append(task_bundle)

self._send_resource_request(resource_request)

def _send_resource_request(self, resource_request):
# Make autoscaler resource request.
actor = get_or_create_autoscaling_requester_actor()
actor.request_resources.remote(resource_request, self._execution_id)

def on_executor_shutdown(self):
# Make request for zero resources to autoscaler for this execution.
actor = get_or_create_autoscaling_requester_actor()
actor.request_resources.remote({}, self._execution_id)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import ray
from .ref_bundle import RefBundle
from ray._raylet import ObjectRefGenerator
from ray.data._internal.execution.autoscaler.autoscaling_actor_pool import (
AutoscalingActorPool,
)
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -400,30 +403,14 @@ def base_resource_usage(self) -> ExecutionResources:
"""
return ExecutionResources()

def incremental_resource_usage(
self, consider_autoscaling=True
) -> ExecutionResources:
def incremental_resource_usage(self) -> ExecutionResources:
"""Returns the incremental resources required for processing another input.
For example, an operator that launches a task per input could return
ExecutionResources(cpu=1) as its incremental usage.
Args:
consider_autoscaling: Whether to consider the possibility of autoscaling.
"""
return ExecutionResources()

def notify_resource_usage(
self, input_queue_size: int, under_resource_limits: bool
) -> None:
"""Called periodically by the executor.
Args:
input_queue_size: The number of inputs queued outside this operator.
under_resource_limits: Whether this operator is under resource limits.
"""
pass

def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None:
"""Called periodically from the executor to update internal in backpressure
status for stats collection purposes.
Expand All @@ -435,3 +422,7 @@ def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None:
if self._in_task_submission_backpressure != in_backpressure:
self._metrics.on_toggle_task_submission_backpressure(in_backpressure)
self._in_task_submission_backpressure = in_backpressure

def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
"""Return a list of `AutoscalingActorPool`s managed by this operator."""
return []
Loading

0 comments on commit 01b971a

Please sign in to comment.