diff --git a/python/ray/serve/BUILD b/python/ray/serve/BUILD index 1ee484fa9bcb0..e87ed72d76cc3 100644 --- a/python/ray/serve/BUILD +++ b/python/ray/serve/BUILD @@ -233,7 +233,6 @@ py_test( deps = [":serve_lib"], ) - py_test( name = "test_config", size = "small", @@ -242,7 +241,6 @@ py_test( deps = [":serve_lib"], ) - py_test( name = "test_failure", size = "medium", @@ -560,6 +558,14 @@ py_test( deps = [":serve_lib"], ) +py_test( + name = "test_replica_placement_group", + size = "medium", + srcs = serve_tests_srcs, + tags = ["exclusive", "team:serve"], + deps = [":serve_lib"], +) + # Make sure the example showing in doc is tested py_test( name = "quickstart_class", diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index 05128a1f9519c..d4bbc36fd83e7 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -978,11 +978,21 @@ def override_deployment_info( # if the code sets options to None). override_actor_options = replica_config.ray_actor_options or {} + override_placement_group_bundles = options.pop( + "placement_group_bundles", replica_config.placement_group_bundles + ) + override_placement_group_strategy = options.pop( + "placement_group_strategy", replica_config.placement_group_strategy + ) + merged_env = override_runtime_envs_except_env_vars( app_runtime_env, override_actor_options.get("runtime_env", {}) ) override_actor_options.update({"runtime_env": merged_env}) replica_config.update_ray_actor_options(override_actor_options) + replica_config.update_placement_group_options( + override_placement_group_bundles, override_placement_group_strategy + ) override_options["replica_config"] = replica_config # Override deployment config options diff --git a/python/ray/serve/_private/client.py b/python/ray/serve/_private/client.py index 46f35827c0e9e..6b198d7e55b3b 100644 --- a/python/ray/serve/_private/client.py +++ b/python/ray/serve/_private/client.py @@ -311,6 +311,8 @@ def deploy_application( deployment["init_kwargs"], ingress=deployment["ingress"], ray_actor_options=deployment["ray_actor_options"], + placement_group_bundles=deployment["placement_group_bundles"], + placement_group_strategy=deployment["placement_group_strategy"], config=deployment["config"], version=deployment["version"], route_prefix=deployment["route_prefix"], diff --git a/python/ray/serve/_private/deploy_utils.py b/python/ray/serve/_private/deploy_utils.py index 8ee5cd36c56c4..518e3d544b5b7 100644 --- a/python/ray/serve/_private/deploy_utils.py +++ b/python/ray/serve/_private/deploy_utils.py @@ -1,4 +1,4 @@ -from typing import Dict, Tuple, Union, Callable, Type, Optional, Any +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union import hashlib import json import logging @@ -22,6 +22,8 @@ def get_deploy_args( init_kwargs: Dict[Any, Any], ingress: bool = False, ray_actor_options: Optional[Dict] = None, + placement_group_bundles: Optional[List[Dict[str, float]]] = None, + placement_group_strategy: Optional[str] = None, config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None, version: Optional[str] = None, route_prefix: Optional[str] = None, @@ -54,6 +56,8 @@ def get_deploy_args( init_args=init_args, init_kwargs=init_kwargs, ray_actor_options=ray_actor_options, + placement_group_bundles=placement_group_bundles, + placement_group_strategy=placement_group_strategy, ) if isinstance(config, dict): diff --git a/python/ray/serve/_private/deployment_scheduler.py b/python/ray/serve/_private/deployment_scheduler.py index b8ee0c5b8c740..0de320d05dd92 100644 --- a/python/ray/serve/_private/deployment_scheduler.py +++ b/python/ray/serve/_private/deployment_scheduler.py @@ -1,14 +1,18 @@ import sys -from typing import Callable, Dict, Tuple, List, Union, Set +from typing import Callable, Dict, Tuple, List, Optional, Union, Set from dataclasses import dataclass from collections import defaultdict import ray +from ray.util.scheduling_strategies import ( + NodeAffinitySchedulingStrategy, + PlacementGroupSchedulingStrategy, +) + from ray.serve._private.utils import ( get_head_node_id, ) from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy class SpreadDeploymentSchedulingPolicy: @@ -38,6 +42,10 @@ class ReplicaSchedulingRequest: actor_options: Dict actor_init_args: Tuple on_scheduled: Callable + # Placement group bundles and strategy *for this replica*. + # These are optional: by default replicas do not have a placement group. + placement_group_bundles: Optional[List[Dict[str, float]]] = None + placement_group_strategy: Optional[str] = None @dataclass @@ -191,13 +199,36 @@ def _schedule_spread_deployment(self, deployment_name: str) -> None: pending_replica_name ] + placement_group = None + if replica_scheduling_request.placement_group_bundles is not None: + strategy = ( + replica_scheduling_request.placement_group_strategy + if replica_scheduling_request.placement_group_strategy + else "PACK" + ) + placement_group = ray.util.placement_group( + replica_scheduling_request.placement_group_bundles, + strategy=strategy, + lifetime="detached", + name=replica_scheduling_request.actor_options["name"], + ) + scheduling_strategy = PlacementGroupSchedulingStrategy( + placement_group=placement_group, + placement_group_capture_child_tasks=True, + ) + else: + scheduling_strategy = "SPREAD" + actor_handle = replica_scheduling_request.actor_def.options( - scheduling_strategy="SPREAD", + scheduling_strategy=scheduling_strategy, **replica_scheduling_request.actor_options, ).remote(*replica_scheduling_request.actor_init_args) + del self._pending_replicas[deployment_name][pending_replica_name] self._launching_replicas[deployment_name][pending_replica_name] = None - replica_scheduling_request.on_scheduled(actor_handle) + replica_scheduling_request.on_scheduled( + actor_handle, placement_group=placement_group + ) def _schedule_driver_deployment(self, deployment_name: str) -> None: if self._recovering_replicas[deployment_name]: @@ -236,7 +267,7 @@ def _schedule_driver_deployment(self, deployment_name: str) -> None: self._launching_replicas[deployment_name][ pending_replica_name ] = target_node_id - replica_scheduling_request.on_scheduled(actor_handle) + replica_scheduling_request.on_scheduled(actor_handle, placement_group=None) def _get_replicas_to_stop( self, deployment_name: str, max_num_to_stop: int diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index e462743727dbf..52c239ec86a26 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -13,12 +13,13 @@ import ray from ray import ObjectRef, cloudpickle +from ray.actor import ActorHandle +from ray.exceptions import RayActorError, RayError, RayTaskError +from ray.util.placement_group import PlacementGroup from ray._private.usage.usage_lib import ( TagKey, record_extra_usage_tag, ) -from ray.actor import ActorHandle -from ray.exceptions import RayActorError, RayError, RayTaskError from ray.serve._private.autoscaling_metrics import InMemoryMetricsStore from ray.serve._private.common import ( @@ -113,6 +114,8 @@ def from_deployment_info( info.version, deployment_config=info.deployment_config, ray_actor_options=info.replica_config.ray_actor_options, + placement_group_bundles=info.replica_config.placement_group_bundles, + placement_group_strategy=info.replica_config.placement_group_strategy, ) return cls(info, num_replicas, version, deleting) @@ -198,14 +201,15 @@ def __init__( self._health_check_ref: Optional[ObjectRef] = None self._last_health_check_time: float = 0.0 self._consecutive_health_check_failures = 0 - # NOTE: storing these is necessary to keep the actor and PG alive in - # the non-detached case. + + # Populated in `on_scheduled` or `recover`. self._actor_handle: ActorHandle = None + self._placement_group: PlacementGroup = None + # Populated after replica is allocated. self._pid: int = None self._actor_id: str = None self._worker_id: str = None - # Populated after replica is allocated. self._node_id: str = None self._node_ip: str = None self._log_file_path: str = None @@ -245,6 +249,13 @@ def actor_handle(self) -> Optional[ActorHandle]: return self._actor_handle + @property + def placement_group_bundles(self) -> Optional[List[Dict[str, float]]]: + if not self._placement_group: + return None + + return self._placement_group.bundle_specs + @property def version(self) -> DeploymentVersion: """Replica version. This can be incorrect during state recovery. @@ -411,11 +422,23 @@ def start(self, deployment_info: DeploymentInfo) -> ReplicaSchedulingRequest: actor_resources=self._actor_resources, actor_options=actor_options, actor_init_args=init_args, + placement_group_bundles=( + deployment_info.replica_config.placement_group_bundles + ), + placement_group_strategy=( + deployment_info.replica_config.placement_group_strategy + ), on_scheduled=self.on_scheduled, ) - def on_scheduled(self, actor_handle: ActorHandle): + def on_scheduled( + self, + actor_handle: ActorHandle, + placement_group: Optional[PlacementGroup] = None, + ): self._actor_handle = actor_handle + self._placement_group = placement_group + # Perform auto method name translation for java handles. # See https://github.com/ray-project/ray/issues/21474 deployment_config = copy(self._version.deployment_config) @@ -488,6 +511,13 @@ def recover(self): f"{self.deployment_name}." ) self._actor_handle = self.actor_handle + try: + self._placement_group = ray.util.get_placement_group( + self._actor_name, + ) + except ValueError: + # ValueError is raised if the placement group does not exist. + self._placement_group = None # Re-fetch initialization proof self._allocated_obj_ref = self._actor_handle.is_allocated.remote() @@ -596,7 +626,7 @@ def graceful_stop(self) -> Duration: handle = ray.get_actor(self._actor_name, namespace=SERVE_NAMESPACE) self._graceful_shutdown_ref = handle.prepare_for_shutdown.remote() except ValueError: - # ValueError thrown from ray.get_actor means actor has already been deleted + # ValueError thrown from ray.get_actor means actor has already been deleted. pass return self.graceful_shutdown_timeout_s @@ -607,7 +637,6 @@ def check_stopped(self) -> bool: handle = ray.get_actor(self._actor_name, namespace=SERVE_NAMESPACE) stopped = check_obj_ref_ready_nowait(self._graceful_shutdown_ref) if stopped: - ray.kill(handle, no_restart=True) try: ray.get(self._graceful_shutdown_ref) except Exception: @@ -615,9 +644,16 @@ def check_stopped(self) -> bool: "Exception when trying to gracefully shutdown replica:\n" + traceback.format_exc() ) + + ray.kill(handle, no_restart=True) except ValueError: - # ValueError thrown from ray.get_actor means actor has already been deleted + # ValueError thrown from ray.get_actor means actor has already been deleted. stopped = True + finally: + # Remove the placement group both if the actor has already been deleted or + # it was just killed above. + if stopped and self._placement_group is not None: + ray.util.remove_placement_group(self._placement_group) return stopped @@ -932,21 +968,24 @@ def resource_requirements(self) -> Tuple[str, str]: required dict and only resources in the required dict will be included in the available dict (filtered for relevance). """ - # NOTE(edoakes): if self._actor.actor_resources is None: return "UNKNOWN", "UNKNOWN" - required = { - k: v - for k, v in self._actor.actor_resources.items() - if v is not None and v > 0 - } + if self._actor.placement_group_bundles is not None: + required = self._actor.placement_group_bundles + else: + required = { + k: v + for k, v in self._actor.actor_resources.items() + if v is not None and v > 0 + } + available = { k: v for k, v in self._actor.available_resources.items() if k in required } # Use json.dumps() instead of str() here to avoid double-quoting keys - # when dumping these dictionaries. See + # when dumping these objects. See # https://github.com/ray-project/ray/issues/26210 for the issue. return json.dumps(required), json.dumps(available) @@ -1442,7 +1481,10 @@ def _stop_or_update_outdated_version_replicas(self, max_to_stop=math.inf) -> boo # normal scale-up process. elif replica.version.requires_actor_restart(self._target_state.version): code_version_changes += 1 - self._stop_replica(replica) + # If the replica is still `STARTING`, we don't need to go through the + # graceful stop period. + graceful_stop = replica.actor_details.state == ReplicaState.RUNNING + self._stop_replica(replica, graceful_stop=graceful_stop) replicas_changed = True # Otherwise, only lightweight options in deployment config is a mismatch, so # we update it dynamically without restarting the replica. @@ -1877,14 +1919,12 @@ def _check_and_update_replicas(self): if len(pending_allocation) > 0: required, available = pending_allocation[0].resource_requirements() message = ( - f'Deployment "{self._name}" has ' - f"{len(pending_allocation)} replicas that have taken " - f"more than {SLOW_STARTUP_WARNING_S}s to be scheduled. " - f"This may be caused by waiting for the cluster to " - f"auto-scale, or waiting for a runtime environment " - f"to install. " - f"Resources required for each replica: {required}, " - f"resources available: {available}." + f"Deployment '{self._name}' has {len(pending_allocation)} replicas " + f"that have taken more than {SLOW_STARTUP_WARNING_S}s to be " + "scheduled. This may be due to waiting for the cluster to " + "auto-scale or for a runtime environment to be installed. " + f"Resources required for each replica: {required}, total resources " + f"available: {available}. Use `ray status` for more details." ) logger.warning(message) if _SCALING_LOG_ENABLED: @@ -2152,6 +2192,7 @@ def __init__( kv_store: KVStoreBase, long_poll_host: LongPollHost, all_current_actor_names: List[str], + all_current_placement_group_names: List[str], cluster_node_info_cache: ClusterNodeInfoCache, ): @@ -2167,7 +2208,9 @@ def __init__( self._deployment_states: Dict[str, DeploymentState] = dict() self._deleted_deployment_metadata: Dict[str, DeploymentInfo] = OrderedDict() - self._recover_from_checkpoint(all_current_actor_names) + self._recover_from_checkpoint( + all_current_actor_names, all_current_placement_group_names + ) # TODO(simon): move autoscaling related stuff into a manager. self.autoscaling_metrics_store = InMemoryMetricsStore() @@ -2247,7 +2290,52 @@ def _map_actor_names_to_deployment( return deployment_to_current_replicas - def _recover_from_checkpoint(self, all_current_actor_names: List[str]) -> None: + def _detect_and_remove_leaked_placement_groups( + self, + all_current_actor_names: List[str], + all_current_placement_group_names: List[str], + ): + """Detect and remove any placement groups not associated with a replica. + + This can happen under certain rare circumstances: + - The controller creates a placement group then crashes before creating + the associated replica actor. + - While the controller is down, a replica actor crashes but its placement + group still exists. + + In both of these (or any other unknown cases), we simply need to remove the + leaked placement groups. + """ + leaked_pg_names = [] + for pg_name in all_current_placement_group_names: + if ( + ReplicaName.is_replica_name(pg_name) + and pg_name not in all_current_actor_names + ): + leaked_pg_names.append(pg_name) + + if len(leaked_pg_names) > 0: + logger.warning( + f"Detected leaked placement groups: {leaked_pg_names}. " + "The placement groups will be removed. This can happen in rare " + "circumstances when the controller crashes and should not cause any " + "issues. If this happens repeatedly, please file an issue on GitHub." + ) + + for leaked_pg_name in leaked_pg_names: + try: + pg = ray.util.get_placement_group(leaked_pg_name) + ray.util.remove_placement_group(pg) + except Exception: + logger.exception( + f"Failed to remove leaked placement group {leaked_pg_name}." + ) + + def _recover_from_checkpoint( + self, + all_current_actor_names: List[str], + all_current_placement_group_names: List[str], + ): """ Recover from checkpoint upon controller failure with all actor names found in current cluster. @@ -2257,6 +2345,11 @@ def _recover_from_checkpoint(self, all_current_actor_names: List[str]) -> None: For current state it will prioritize reconstructing from current actor names found that matches deployment tag if applicable. """ + self._detect_and_remove_leaked_placement_groups( + all_current_actor_names, + all_current_placement_group_names, + ) + deployment_to_current_replicas = self._map_actor_names_to_deployment( all_current_actor_names ) diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index 2b5595c81b516..a776ce5f4ccd3 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -714,3 +714,23 @@ def calculate_remaining_timeout( time_since_start_s = curr_time_s - start_time_s return max(0, timeout_s - time_since_start_s) + + +def get_all_live_placement_group_names() -> List[str]: + """Fetch and parse the Ray placement group table for live placement group names. + + Placement groups are filtered based on their `scheduling_state`; any placement + group not in the "REMOVED" state is considered live. + """ + placement_group_table = ray.util.placement_group_table() + + live_pg_names = [] + for entry in placement_group_table.values(): + pg_name = entry.get("name", "") + if ( + pg_name + and entry.get("stats", {}).get("scheduling_state", "UNKNOWN") != "REMOVED" + ): + live_pg_names.append(pg_name) + + return live_pg_names diff --git a/python/ray/serve/_private/version.py b/python/ray/serve/_private/version.py index 4fb21b0c4f5a2..7fb3fd3794ee4 100644 --- a/python/ray/serve/_private/version.py +++ b/python/ray/serve/_private/version.py @@ -19,6 +19,8 @@ def __init__( code_version: Optional[str], deployment_config: DeploymentConfig, ray_actor_options: Optional[Dict], + placement_group_bundles: Optional[List[Dict[str, float]]] = None, + placement_group_strategy: Optional[str] = None, ): if code_version is not None and not isinstance(code_version, str): raise TypeError(f"code_version must be str, got {type(code_version)}.") @@ -30,9 +32,11 @@ def __init__( self.code_version = code_version # Options for this field may be mutated over time, so any logic that uses this - # should access this field directly - self.deployment_config: DeploymentConfig = deployment_config - self.ray_actor_options: Dict = ray_actor_options + # should access this field directly. + self.deployment_config = deployment_config + self.ray_actor_options = ray_actor_options + self.placement_group_bundles = placement_group_bundles + self.placement_group_strategy = placement_group_strategy self.compute_hashes() @classmethod @@ -57,6 +61,8 @@ def requires_actor_restart(self, new_version): return ( self.code_version != new_version.code_version or self.ray_actor_options_hash != new_version.ray_actor_options_hash + or self.placement_group_options_hash + != new_version.placement_group_options_hash ) def requires_actor_reconfigure(self, new_version): @@ -76,9 +82,18 @@ def requires_long_poll_broadcast(self, new_version): ) def compute_hashes(self): - # If this changes, the controller will directly restart all existing replicas. + # If these change, the controller will rolling upgrade existing replicas. serialized_ray_actor_options = _serialize(self.ray_actor_options or {}) self.ray_actor_options_hash = crc32(serialized_ray_actor_options) + combined_placement_group_options = {} + if self.placement_group_bundles is not None: + combined_placement_group_options["bundles"] = self.placement_group_bundles + if self.placement_group_strategy is not None: + combined_placement_group_options["strategy"] = self.placement_group_strategy + serialized_placement_group_options = _serialize( + combined_placement_group_options + ) + self.placement_group_options_hash = crc32(serialized_placement_group_options) # If this changes, DeploymentReplica.reconfigure() will call reconfigure on the # actual replica actor @@ -93,6 +108,7 @@ def compute_hashes(self): self._hash = crc32( self.code_version.encode("utf-8") + serialized_ray_actor_options + + serialized_placement_group_options + self._get_serialized_options( [ DeploymentOptionUpdateType.NeedsReconfigure, @@ -107,6 +123,12 @@ def to_proto(self) -> bytes: code_version=self.code_version, deployment_config=self.deployment_config.to_proto(), ray_actor_options=json.dumps(self.ray_actor_options), + placement_group_bundles=json.dumps(self.placement_group_bundles) + if self.placement_group_bundles is not None + else "", + placement_group_strategy=self.placement_group_strategy + if self.placement_group_strategy is not None + else "", ) @classmethod @@ -115,6 +137,14 @@ def from_proto(cls, proto: DeploymentVersionProto): proto.code_version, DeploymentConfig.from_proto(proto.deployment_config), json.loads(proto.ray_actor_options), + placement_group_bundles=( + json.loads(proto.placement_group_bundles) + if proto.placement_group_bundles + else None + ), + placement_group_version=( + proto.placement_group_version if proto.placement_group_version else None + ), ) def _get_serialized_options( diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 52825f150e55e..e5994ec5d2aed 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -1,7 +1,7 @@ import collections import inspect import logging -from typing import Any, Callable, Dict, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from functools import wraps from fastapi import APIRouter, FastAPI @@ -249,6 +249,8 @@ def deployment( init_kwargs: Default[Dict[Any, Any]] = DEFAULT.VALUE, route_prefix: Default[Union[str, None]] = DEFAULT.VALUE, ray_actor_options: Default[Dict] = DEFAULT.VALUE, + placement_group_bundles: Optional[List[Dict[str, float]]] = DEFAULT.VALUE, + placement_group_strategy: Optional[str] = DEFAULT.VALUE, user_config: Default[Optional[Any]] = DEFAULT.VALUE, max_concurrent_queries: Default[int] = DEFAULT.VALUE, autoscaling_config: Default[Union[Dict, AutoscalingConfig, None]] = DEFAULT.VALUE, @@ -288,6 +290,15 @@ class MyDeployment: resource requirements. Valid options are: `accelerator_type`, `memory`, `num_cpus`, `num_gpus`, `object_store_memory`, `resources`, and `runtime_env`. + placement_group_bundles: Defines a set of placement group bundles to be + scheduled *for each replica* of this deployment. The replica actor will + be scheduled in the first bundle provided, so the resources specified in + `ray_actor_options` must be a subset of the first bundle's resources. All + actors and tasks created by the replica actor will be scheduled in the + placement group by default (`placement_group_capture_child_tasks` is set + to True). + placement_group_strategy: Strategy to use for the replica placement group + specified via `placement_group_bundles`. Defaults to `PACK`. user_config: Config to pass to the reconfigure method of the deployment. This can be updated dynamically without restarting the replicas of the deployment. The user_config must be fully JSON-serializable. @@ -367,6 +378,16 @@ def decorator(_func_or_class): ray_actor_options=( ray_actor_options if ray_actor_options is not DEFAULT.VALUE else None ), + placement_group_bundles=( + placement_group_bundles + if placement_group_bundles is not DEFAULT.VALUE + else None + ), + placement_group_strategy=( + placement_group_strategy + if placement_group_strategy is not DEFAULT.VALUE + else None + ), is_driver_deployment=is_driver_deployment, _internal=True, ) @@ -488,6 +509,8 @@ def run( "init_args": deployment.init_args, "init_kwargs": deployment.init_kwargs, "ray_actor_options": deployment._ray_actor_options, + "placement_group_bundles": deployment._placement_group_bundles, + "placement_group_strategy": deployment._placement_group_strategy, "config": deployment._config, "version": deployment._version or get_random_letters(), "route_prefix": deployment.route_prefix, diff --git a/python/ray/serve/built_application.py b/python/ray/serve/built_application.py index fc9524607f328..7408530f516ab 100644 --- a/python/ray/serve/built_application.py +++ b/python/ray/serve/built_application.py @@ -85,6 +85,8 @@ def _get_deploy_args_from_built_app(app: BuiltApplication): init_kwargs=deployment.init_kwargs, ingress=is_ingress, ray_actor_options=deployment._ray_actor_options, + placement_group_bundles=deployment._placement_group_bundles, + placement_group_strategy=deployment._placement_group_strategy, config=deployment._config, version=deployment._version, route_prefix=deployment.route_prefix, diff --git a/python/ray/serve/config.py b/python/ray/serve/config.py index 01f236048cc86..df7dfd4fa9cb5 100644 --- a/python/ray/serve/config.py +++ b/python/ray/serve/config.py @@ -16,6 +16,8 @@ ) from ray import cloudpickle +from ray.util.placement_group import VALID_PLACEMENT_GROUP_STRATEGIES + from ray.serve._private.constants import ( DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_S, DEFAULT_GRACEFUL_SHUTDOWN_WAIT_LOOP_S, @@ -345,6 +347,8 @@ def __init__( serialized_init_args: bytes, serialized_init_kwargs: bytes, ray_actor_options: Dict, + placement_group_bundles: Optional[List[Dict[str, float]]] = None, + placement_group_strategy: Optional[str] = None, needs_pickle: bool = True, ): """Construct a ReplicaConfig with serialized properties. @@ -368,6 +372,10 @@ def __init__( self.ray_actor_options = ray_actor_options self._validate_ray_actor_options() + self.placement_group_bundles = placement_group_bundles + self.placement_group_strategy = placement_group_strategy + self._validate_placement_group_options() + # Create resource_dict. This contains info about the replica's resource # needs. It does NOT set the replica's resource usage. That's done by # the ray_actor_options. @@ -379,6 +387,15 @@ def update_ray_actor_options(self, ray_actor_options): self._validate_ray_actor_options() self.resource_dict = resources_from_ray_options(self.ray_actor_options) + def update_placement_group_options( + self, + placement_group_bundles: Optional[List[Dict[str, float]]], + placement_group_strategy: Optional[str], + ): + self.placement_group_bundles = placement_group_bundles + self.placement_group_strategy = placement_group_strategy + self._validate_placement_group_options() + @classmethod def create( cls, @@ -386,6 +403,8 @@ def create( init_args: Optional[Union[Tuple[Any], bytes]] = None, init_kwargs: Optional[Dict[Any, Any]] = None, ray_actor_options: Optional[Dict] = None, + placement_group_bundles: Optional[List[Dict[str, float]]] = None, + placement_group_strategy: Optional[str] = None, deployment_def_name: Optional[str] = None, ): """Create a ReplicaConfig from deserialized parameters.""" @@ -424,6 +443,8 @@ def create( pickle_dumps(init_args, "Could not serialize the deployment init args"), pickle_dumps(init_kwargs, "Could not serialize the deployment init kwargs"), ray_actor_options, + placement_group_bundles, + placement_group_strategy, ) config._deployment_def = deployment_def @@ -432,8 +453,7 @@ def create( return config - def _validate_ray_actor_options(self) -> None: - + def _validate_ray_actor_options(self): if not isinstance(self.ray_actor_options, dict): raise TypeError( f'Got invalid type "{type(self.ray_actor_options)}" for ' @@ -465,6 +485,88 @@ def _validate_ray_actor_options(self) -> None: if self.ray_actor_options.get("num_cpus") is None: self.ray_actor_options["num_cpus"] = 1 + def _validate_placement_group_options(self) -> None: + if ( + self.placement_group_strategy is not None + and self.placement_group_strategy not in VALID_PLACEMENT_GROUP_STRATEGIES + ): + raise ValueError( + f"Invalid placement group strategy '{self.placement_group_strategy}'. " + f"Supported strategies are: {VALID_PLACEMENT_GROUP_STRATEGIES}." + ) + + if ( + self.placement_group_strategy is not None + and self.placement_group_bundles is None + ): + raise ValueError( + "If `placement_group_strategy` is provided, `placement_group_bundles` " + "must also be provided." + ) + + if self.placement_group_bundles is not None: + if ( + not isinstance(self.placement_group_bundles, list) + or len(self.placement_group_bundles) == 0 + ): + raise ValueError( + "`placement_group_bundles` must be a non-empty list of resource " + 'dictionaries. For example: `[{"CPU": 1.0}, {"GPU": 1.0}]`.' + ) + + for i, bundle in enumerate(self.placement_group_bundles): + if ( + not isinstance(bundle, dict) + or not all(isinstance(k, str) for k in bundle.keys()) + or not all(isinstance(v, (int, float)) for v in bundle.values()) + ): + raise ValueError( + "`placement_group_bundles` must be a non-empty list of " + "resource dictionaries. For example: " + '`[{"CPU": 1.0}, {"GPU": 1.0}]`.' + ) + + # Validate that the replica actor fits in the first bundle. + if i == 0: + bundle_cpu = bundle.get("CPU", 0) + replica_actor_num_cpus = self.ray_actor_options.get("num_cpus", 0) + if bundle_cpu < replica_actor_num_cpus: + raise ValueError( + "When using `placement_group_bundles`, the replica actor " + "will be placed in the first bundle, so the resource " + "requirements for the actor must be a subset of the first " + "bundle. `num_cpus` for the actor is " + f"{replica_actor_num_cpus} but the bundle only has " + f"{bundle_cpu} `CPU` specified." + ) + + bundle_gpu = bundle.get("GPU", 0) + replica_actor_num_gpus = self.ray_actor_options.get("num_gpus", 0) + if bundle_gpu < replica_actor_num_gpus: + raise ValueError( + "When using `placement_group_bundles`, the replica actor " + "will be placed in the first bundle, so the resource " + "requirements for the actor must be a subset of the first " + "bundle. `num_gpus` for the actor is " + f"{replica_actor_num_gpus} but the bundle only has " + f"{bundle_gpu} `GPU` specified." + ) + + replica_actor_resources = self.ray_actor_options.get( + "resources", {} + ) + for actor_resource, actor_value in replica_actor_resources.items(): + bundle_value = bundle.get(actor_resource, 0) + if bundle_value < actor_value: + raise ValueError( + "When using `placement_group_bundles`, the replica " + "actor will be placed in the first bundle, so the " + "resource requirements for the actor must be a subset " + f"of the first bundle. `{actor_resource}` requirement " + f"for the actor is {actor_value} but the bundle only " + f"has {bundle_value} `{actor_resource}` specified." + ) + @property def deployment_def(self) -> Union[Callable, str]: """The code, or a reference to the code, that this replica runs. @@ -523,6 +625,12 @@ def from_proto(cls, proto: ReplicaConfigProto, needs_pickle: bool = True): proto.init_args if proto.init_args != b"" else None, proto.init_kwargs if proto.init_kwargs != b"" else None, json.loads(proto.ray_actor_options), + json.loads(proto.placement_group_bundles) + if proto.placement_group_bundles + else None, + proto.placement_group_strategy + if proto.placement_group_strategy != "" + else None, needs_pickle, ) @@ -538,6 +646,10 @@ def to_proto(self): init_args=self.serialized_init_args, init_kwargs=self.serialized_init_kwargs, ray_actor_options=json.dumps(self.ray_actor_options), + placement_group_bundles=json.dumps(self.placement_group_bundles) + if self.placement_group_bundles is not None + else "", + placement_group_strategy=self.placement_group_strategy, ) def to_proto_bytes(self): diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index 9372a4279ae05..e6d97cb2fedcd 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -57,6 +57,7 @@ from ray.serve._private.storage.kv_store import RayInternalKVStore from ray.serve._private.utils import ( call_function_from_import_path, + get_all_live_placement_group_names, get_head_node_id, record_serve_tag, ) @@ -171,6 +172,7 @@ async def __init__( self.kv_store, self.long_poll_host, all_serve_actor_names, + get_all_live_placement_group_names(), self.cluster_node_info_cache, ) diff --git a/python/ray/serve/deployment.py b/python/ray/serve/deployment.py index 1f93a843bcbda..9e4b6b4772e7e 100644 --- a/python/ray/serve/deployment.py +++ b/python/ray/serve/deployment.py @@ -5,6 +5,7 @@ Any, Callable, Dict, + List, Optional, Tuple, Union, @@ -131,6 +132,8 @@ def __init__( init_kwargs: Optional[Tuple[Any]] = None, route_prefix: Union[str, None, DEFAULT] = DEFAULT.VALUE, ray_actor_options: Optional[Dict] = None, + placement_group_bundles: Optional[List[Dict[str, float]]] = None, + placement_group_strategy: Optional[str] = None, is_driver_deployment: Optional[bool] = False, _internal=False, ) -> None: @@ -162,6 +165,21 @@ def __init__( raise ValueError("route_prefix may not contain wildcards.") if not (ray_actor_options is None or isinstance(ray_actor_options, dict)): raise TypeError("ray_actor_options must be a dict.") + if placement_group_bundles is not None: + if not isinstance(placement_group_bundles, list): + raise TypeError("placement_group_bundles must be a list.") + + for bundle in placement_group_bundles: + if not isinstance(bundle, dict): + raise TypeError( + "placement_group_bundles entries must be " + f"dicts, got {type(bundle)}." + ) + if not ( + placement_group_strategy is None + or isinstance(placement_group_strategy, str) + ): + raise TypeError("placement_group_strategy must be a string.") if is_driver_deployment is True: if config.num_replicas != 1: @@ -191,6 +209,8 @@ def __init__( self._init_kwargs = init_kwargs self._route_prefix = route_prefix self._ray_actor_options = ray_actor_options + self._placement_group_bundles = placement_group_bundles + self._placement_group_strategy = placement_group_strategy self._is_driver_deployment = is_driver_deployment self._docs_path = docs_path @@ -401,6 +421,8 @@ def options( init_kwargs: Default[Dict[Any, Any]] = DEFAULT.VALUE, route_prefix: Default[Union[str, None]] = DEFAULT.VALUE, ray_actor_options: Default[Optional[Dict]] = DEFAULT.VALUE, + placement_group_bundles: Optional[List[Dict[str, float]]] = DEFAULT.VALUE, + placement_group_strategy: Optional[str] = DEFAULT.VALUE, user_config: Default[Optional[Any]] = DEFAULT.VALUE, max_concurrent_queries: Default[int] = DEFAULT.VALUE, autoscaling_config: Default[ @@ -484,6 +506,12 @@ def options( if ray_actor_options is DEFAULT.VALUE: ray_actor_options = self._ray_actor_options + if placement_group_bundles is DEFAULT.VALUE: + placement_group_bundles = self._placement_group_bundles + + if placement_group_strategy is DEFAULT.VALUE: + placement_group_strategy = self._placement_group_strategy + if autoscaling_config is not DEFAULT.VALUE: new_config.autoscaling_config = autoscaling_config @@ -511,6 +539,8 @@ def options( init_kwargs=init_kwargs, route_prefix=route_prefix, ray_actor_options=ray_actor_options, + placement_group_bundles=placement_group_bundles, + placement_group_strategy=placement_group_strategy, _internal=True, is_driver_deployment=is_driver_deployment, ) @@ -630,6 +660,8 @@ def deployment_to_schema( "health_check_period_s": d._config.health_check_period_s, "health_check_timeout_s": d._config.health_check_timeout_s, "ray_actor_options": ray_actor_options_schema, + "placement_group_strategy": d._placement_group_strategy, + "placement_group_bundles": d._placement_group_bundles, "is_driver_deployment": d._is_driver_deployment, } @@ -665,6 +697,16 @@ def schema_to_deployment(s: DeploymentSchema) -> Deployment: else: ray_actor_options = s.ray_actor_options.dict(exclude_unset=True) + if s.placement_group_bundles is DEFAULT.VALUE: + placement_group_bundles = None + else: + placement_group_bundles = s.placement_group_bundles + + if s.placement_group_strategy is DEFAULT.VALUE: + placement_group_strategy = None + else: + placement_group_strategy = s.placement_group_strategy + if s.is_driver_deployment is DEFAULT.VALUE: is_driver_deployment = False else: @@ -690,6 +732,8 @@ def schema_to_deployment(s: DeploymentSchema) -> Deployment: init_kwargs={}, route_prefix=s.route_prefix, ray_actor_options=ray_actor_options, + placement_group_bundles=placement_group_bundles, + placement_group_strategy=placement_group_strategy, _internal=True, is_driver_deployment=is_driver_deployment, ) diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index f19c5d0f9bc0e..6f99756eb177f 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -235,6 +235,27 @@ class DeploymentSchema( default=DEFAULT.VALUE, description="Options set for each replica actor." ) + placement_group_bundles: List[Dict[str, float]] = Field( + default=DEFAULT.VALUE, + description=( + "Define a set of placement group bundles to be " + "scheduled *for each replica* of this deployment. The replica actor will " + "be scheduled in the first bundle provided, so the resources specified in " + "`ray_actor_options` must be a subset of the first bundle's resources. All " + "actors and tasks created by the replica actor will be scheduled in the " + "placement group by default (`placement_group_capture_child_tasks` is set " + "to True)." + ), + ) + + placement_group_strategy: str = Field( + default=DEFAULT.VALUE, + description=( + "Strategy to use for the replica placement group " + "specified via `placement_group_bundles`. Defaults to `PACK`." + ), + ) + is_driver_deployment: bool = Field( default=DEFAULT.VALUE, description="Indicate Whether the deployment is driver deployment " diff --git a/python/ray/serve/tests/test_cli.py b/python/ray/serve/tests/test_cli.py index e8d766e73630d..09c6362f218c5 100644 --- a/python/ray/serve/tests/test_cli.py +++ b/python/ray/serve/tests/test_cli.py @@ -714,5 +714,33 @@ def check_for_failed_deployment(): wait_for_condition(check_for_failed_deployment, timeout=15) +@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") +def test_replica_placement_group_options(ray_start_stop): + """Test that placement group options can be set via config file.""" + + config_file_name = os.path.join( + os.path.dirname(__file__), "test_config_files", "replica_placement_groups.yaml" + ) + + subprocess.check_output(["serve", "deploy", config_file_name]) + + def check_application_status(): + cli_output = subprocess.check_output( + ["serve", "status", "-a", "http://localhost:52365/"] + ) + status = yaml.safe_load(cli_output)["applications"] + # TODO(zcin): fix error handling in the application state manager for + # invalid override options and check for `DEPLOY_FAILED` here. + return ( + status["valid"]["status"] == "RUNNING" + # and status["invalid_bundles"] == "DEPLOY_FAILED" + and status["invalid_bundles"]["status"] == "DEPLOYING" + # and status["invalid_strategy"] == "DEPLOY_FAILED" + and status["invalid_strategy"]["status"] == "DEPLOYING" + ) + + wait_for_condition(check_application_status, timeout=15) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_config.py b/python/ray/serve/tests/test_config.py index 3fce12cb0fdb3..77a6e1a5e0e2e 100644 --- a/python/ray/serve/tests/test_config.py +++ b/python/ray/serve/tests/test_config.py @@ -116,7 +116,7 @@ def test_from_default_ignore_default(self): class TestReplicaConfig: - def test_replica_config_validation(self): + def test_basic_validation(self): class Class: pass @@ -128,7 +128,10 @@ def function(_): with pytest.raises(TypeError): ReplicaConfig.create(Class()) - # Check ray_actor_options validation. + def test_ray_actor_options_validation(self): + class Class: + pass + ReplicaConfig.create( Class, tuple(), @@ -186,6 +189,131 @@ def function(_): with pytest.raises(ValueError): ReplicaConfig.create(Class, ray_actor_options={option: None}) + def test_placement_group_options_validation(self): + class Class: + pass + + # Specify placement_group_bundles without num_cpus or placement_group_strategy. + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1.0}], + ) + + # Specify placement_group_bundles with integer value. + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1}], + ) + + # Specify placement_group_bundles and placement_group_strategy. + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1.0}], + placement_group_strategy="STRICT_PACK", + ) + + # Specify placement_group_bundles and placement_group_strategy and num_cpus. + ReplicaConfig.create( + Class, + tuple(), + dict(), + ray_actor_options={"num_cpus": 1}, + placement_group_bundles=[{"CPU": 1.0}], + placement_group_strategy="STRICT_PACK", + ) + + # Invalid: placement_group_strategy without placement_group_bundles. + with pytest.raises( + ValueError, match="`placement_group_bundles` must also be provided" + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_strategy="PACK", + ) + + # Invalid: unsupported placement_group_strategy. + with pytest.raises( + ValueError, match="Invalid placement group strategy 'FAKE_NEWS'" + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": 1.0}], + placement_group_strategy="FAKE_NEWS", + ) + + # Invalid: malformed placement_group_bundles. + with pytest.raises( + ValueError, + match=( + "`placement_group_bundles` must be a non-empty list " + "of resource dictionaries." + ), + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + placement_group_bundles=[{"CPU": "1.0"}], + ) + + # Invalid: replica actor does not fit in the first bundle (CPU). + with pytest.raises( + ValueError, + match=( + "the resource requirements for the actor must be a " + "subset of the first bundle." + ), + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + ray_actor_options={"num_cpus": 1}, + placement_group_bundles=[{"CPU": 0.1}], + ) + + # Invalid: replica actor does not fit in the first bundle (CPU). + with pytest.raises( + ValueError, + match=( + "the resource requirements for the actor must be a " + "subset of the first bundle." + ), + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + ray_actor_options={"num_gpus": 1}, + placement_group_bundles=[{"CPU": 1.0}], + ) + + # Invalid: replica actor does not fit in the first bundle (custom resource). + with pytest.raises( + ValueError, + match=( + "the resource requirements for the actor must be a " + "subset of the first bundle." + ), + ): + ReplicaConfig.create( + Class, + tuple(), + dict(), + ray_actor_options={"resources": {"custom": 1}}, + placement_group_bundles=[{"CPU": 1}], + ) + def test_replica_config_lazy_deserialization(self): def f(): return "Check this out!" diff --git a/python/ray/serve/tests/test_config_files/replica_placement_groups.py b/python/ray/serve/tests/test_config_files/replica_placement_groups.py new file mode 100644 index 0000000000000..29974d56f6fae --- /dev/null +++ b/python/ray/serve/tests/test_config_files/replica_placement_groups.py @@ -0,0 +1,10 @@ +from ray import serve + + +@serve.deployment +class D: + def __call__(self, *args): + return "hi" + + +app = D.bind() diff --git a/python/ray/serve/tests/test_config_files/replica_placement_groups.yaml b/python/ray/serve/tests/test_config_files/replica_placement_groups.yaml new file mode 100644 index 0000000000000..4fc504501e764 --- /dev/null +++ b/python/ray/serve/tests/test_config_files/replica_placement_groups.yaml @@ -0,0 +1,27 @@ +applications: + - name: valid + route_prefix: /valid + import_path: ray.serve.tests.test_config_files.replica_placement_groups.app + deployments: + - name: D + placement_group_bundles: + - {"CPU": 1} + placement_group_strategy: STRICT_PACK + + - name: invalid_bundles + route_prefix: /invalid_bundles + import_path: ray.serve.tests.test_config_files.replica_placement_groups.app + deployments: + - name: D + # Insufficient resources for the replica actor. + placement_group_bundles: + - {"CPU": 0.1} + + - name: invalid_strategy + route_prefix: /invalid_strategy + import_path: ray.serve.tests.test_config_files.replica_placement_groups.app + deployments: + - name: D + placement_group_bundles: + - {"CPU": 1} + placement_group_strategy: FAKE_NEWS diff --git a/python/ray/serve/tests/test_deployment_scheduler.py b/python/ray/serve/tests/test_deployment_scheduler.py index c40369b76d904..2bd811888c90d 100644 --- a/python/ray/serve/tests/test_deployment_scheduler.py +++ b/python/ray/serve/tests/test_deployment_scheduler.py @@ -23,8 +23,21 @@ class Replica: def get_node_id(self): return ray.get_runtime_context().get_node_id() + def get_placement_group(self): + return ray.util.get_current_placement_group() -def test_spread_deployment_scheduling_policy_upscale(ray_start_cluster): + +@pytest.mark.parametrize( + "placement_group_config", + [ + {}, + {"bundles": [{"CPU": 3}]}, + {"bundles": [{"CPU": 1}, {"CPU": 1}, {"CPU": 1}], "strategy": "STRICT_PACK"}, + ], +) +def test_spread_deployment_scheduling_policy_upscale( + ray_start_cluster, placement_group_config +): """Test to make sure replicas are spreaded.""" cluster = ray_start_cluster cluster.add_node(num_cpus=3) @@ -40,6 +53,12 @@ def test_spread_deployment_scheduling_policy_upscale(ray_start_cluster): scheduler = DeploymentScheduler(cluster_node_info_cache) scheduler.on_deployment_created("deployment1", SpreadDeploymentSchedulingPolicy()) replica_actor_handles = [] + replica_placement_groups = [] + + def on_scheduled(actor_handle, placement_group): + replica_actor_handles.append(actor_handle) + replica_placement_groups.append(placement_group) + deployment_to_replicas_to_stop = scheduler.schedule( upscales={ "deployment1": [ @@ -48,10 +67,12 @@ def test_spread_deployment_scheduling_policy_upscale(ray_start_cluster): replica_name="replica1", actor_def=Replica, actor_resources={"CPU": 1}, - actor_options={}, + actor_options={"name": "deployment1_replica1"}, actor_init_args=(), - on_scheduled=lambda actor_handle: replica_actor_handles.append( - actor_handle + on_scheduled=on_scheduled, + placement_group_bundles=placement_group_config.get("bundles", None), + placement_group_strategy=placement_group_config.get( + "strategy", None ), ), ReplicaSchedulingRequest( @@ -59,10 +80,12 @@ def test_spread_deployment_scheduling_policy_upscale(ray_start_cluster): replica_name="replica2", actor_def=Replica, actor_resources={"CPU": 1}, - actor_options={}, + actor_options={"name": "deployment1_replica2"}, actor_init_args=(), - on_scheduled=lambda actor_handle: replica_actor_handles.append( - actor_handle + on_scheduled=on_scheduled, + placement_group_bundles=placement_group_config.get("bundles", None), + placement_group_strategy=placement_group_config.get( + "strategy", None ), ), ] @@ -71,6 +94,7 @@ def test_spread_deployment_scheduling_policy_upscale(ray_start_cluster): ) assert not deployment_to_replicas_to_stop assert len(replica_actor_handles) == 2 + assert len(replica_placement_groups) == 2 assert not scheduler._pending_replicas["deployment1"] assert len(scheduler._launching_replicas["deployment1"]) == 2 assert ( @@ -82,6 +106,16 @@ def test_spread_deployment_scheduling_policy_upscale(ray_start_cluster): ) == 2 ) + if "bundles" in placement_group_config: + assert ( + len( + { + ray.get(replica_actor_handles[0].get_placement_group.remote()), + ray.get(replica_actor_handles[1].get_placement_group.remote()), + } + ) + == 2 + ) scheduler.on_replica_stopping("deployment1", "replica1") scheduler.on_replica_stopping("deployment1", "replica2") scheduler.on_deployment_deleted("deployment1") @@ -130,7 +164,7 @@ def test_spread_deployment_scheduling_policy_downscale(ray_start_cluster): actor_resources={"CPU": 1}, actor_options={}, actor_init_args=(), - on_scheduled=lambda actor_handle: actor_handle, + on_scheduled=lambda actor_handle, placement_group: actor_handle, ), ] }, @@ -258,6 +292,10 @@ def test_driver_deployment_scheduling_policy_upscale(ray_start_cluster): scheduler.on_deployment_created("deployment1", DriverDeploymentSchedulingPolicy()) replica_actor_handles = [] + + def on_scheduled(actor_handle, placement_group): + replica_actor_handles.append(actor_handle) + deployment_to_replicas_to_stop = scheduler.schedule( upscales={ "deployment1": [ @@ -268,9 +306,7 @@ def test_driver_deployment_scheduling_policy_upscale(ray_start_cluster): actor_resources={"CPU": 1}, actor_options={}, actor_init_args=(), - on_scheduled=lambda actor_handle: replica_actor_handles.append( - actor_handle - ), + on_scheduled=on_scheduled, ), ReplicaSchedulingRequest( deployment_name="deployment1", @@ -279,9 +315,7 @@ def test_driver_deployment_scheduling_policy_upscale(ray_start_cluster): actor_resources={"CPU": 1}, actor_options={}, actor_init_args=(), - on_scheduled=lambda actor_handle: replica_actor_handles.append( - actor_handle - ), + on_scheduled=on_scheduled, ), ReplicaSchedulingRequest( deployment_name="deployment1", @@ -290,9 +324,7 @@ def test_driver_deployment_scheduling_policy_upscale(ray_start_cluster): actor_resources={"CPU": 1}, actor_options={}, actor_init_args=(), - on_scheduled=lambda actor_handle: replica_actor_handles.append( - actor_handle - ), + on_scheduled=on_scheduled, ), ] }, diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index d55d4da7b2cc7..a3916420fe83c 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -2297,17 +2297,21 @@ def mock_deployment_state_manager_full( cluster_node_info_cache = MockClusterNodeInfoCache() def create_deployment_state_manager( - actor_names=None, is_driver_deployment=False + actor_names=None, placement_group_names=None, is_driver_deployment=False ): if actor_names is None: actor_names = [] + if placement_group_names is None: + placement_group_names = [] + return DeploymentStateManager( "name", True, kv_store, mock_long_poll, actor_names, + placement_group_names, cluster_node_info_cache, ) @@ -2535,12 +2539,14 @@ def mock_deployment_state_manager(request) -> Tuple[DeploymentStateManager, Mock kv_store = RayInternalKVStore("test") cluster_node_info_cache = MockClusterNodeInfoCache() all_current_actor_names = [] + all_current_placement_group_names = [] deployment_state_manager = DeploymentStateManager( "name", True, kv_store, mock_long_poll, all_current_actor_names, + all_current_placement_group_names, cluster_node_info_cache, ) @@ -2614,6 +2620,7 @@ def test_resource_requirements_none(): class FakeActor: actor_resources = {"num_cpus": 2, "fake": None} + placement_group_bundles = None available_resources = {} # Make a DeploymentReplica just to accesss its resource_requirement function diff --git a/python/ray/serve/tests/test_deployment_version.py b/python/ray/serve/tests/test_deployment_version.py index cb63917ffd13d..9f998107e081f 100644 --- a/python/ray/serve/tests/test_deployment_version.py +++ b/python/ray/serve/tests/test_deployment_version.py @@ -205,6 +205,70 @@ def test_ray_actor_options(): assert hash(v1) != hash(v3) +def test_placement_group_options(): + v1 = DeploymentVersion("1", DeploymentConfig(), {"num_cpus": 0.1}) + v2 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + ) + v3 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + ) + v4 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"GPU": 0.1}], + ) + + assert v1 != v2 + assert hash(v1) != hash(v2) + assert v1.requires_actor_restart(v2) + + assert v2 == v3 + assert hash(v2) == hash(v3) + assert not v2.requires_actor_restart(v3) + + assert v3 != v4 + assert hash(v3) != hash(v4) + assert v3.requires_actor_restart(v4) + + v5 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + placement_group_strategy="PACK", + ) + v6 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + placement_group_strategy="PACK", + ) + v7 = DeploymentVersion( + "1", + DeploymentConfig(), + {"num_cpus": 0.1}, + placement_group_bundles=[{"CPU": 0.1}], + placement_group_strategy="SPREAD", + ) + + assert v5 == v6 + assert hash(v5) == hash(v6) + assert not v5.requires_actor_restart(v6) + + assert v6 != v7 + assert hash(v6) != hash(v7) + assert v6.requires_actor_restart(v7) + + def test_hash_consistent_across_processes(serve_instance): @ray.remote def get_version(): diff --git a/python/ray/serve/tests/test_replica_placement_group.py b/python/ray/serve/tests/test_replica_placement_group.py new file mode 100644 index 0000000000000..046eb533474f2 --- /dev/null +++ b/python/ray/serve/tests/test_replica_placement_group.py @@ -0,0 +1,315 @@ +import sys + +import pytest + +import ray +from ray.util.placement_group import ( + get_current_placement_group, + PlacementGroup, +) +from ray.util.scheduling_strategies import ( + PlacementGroupSchedulingStrategy, +) +from ray._private.test_utils import wait_for_condition + +from ray import serve +from ray.serve.context import get_global_client +from ray.serve._private.utils import get_all_live_placement_group_names + + +def _get_pg_strategy(pg: PlacementGroup) -> str: + return ray.util.placement_group_table(pg)["strategy"] + + +@pytest.mark.skipif(sys.platform == "win32", reason="Timing out on Windows.") +def test_basic(serve_instance): + """Test the basic workflow: multiple replicas with their own PGs.""" + + @serve.deployment( + num_replicas=2, + placement_group_bundles=[{"CPU": 1}, {"CPU": 0.1}], + ) + class D: + def get_pg(self) -> PlacementGroup: + return get_current_placement_group() + + h = serve.run(D.bind(), name="pg_test") + + # Verify that each replica has its own placement group with the correct config. + assert len(get_all_live_placement_group_names()) == 2 + unique_pgs = set(ray.get([h.get_pg.remote() for _ in range(20)])) + assert len(unique_pgs) == 2 + for pg in unique_pgs: + assert _get_pg_strategy(pg) == "PACK" + assert pg.bundle_specs == [{"CPU": 1}, {"CPU": 0.1}] + + # Verify that all placement groups are deleted when the deployment is deleted. + serve.delete("pg_test") + assert len(get_all_live_placement_group_names()) == 0 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Timing out on Windows.") +def test_upgrade_and_change_pg(serve_instance): + """Test re-deploying a deployment with different PG bundles and strategy.""" + + @serve.deployment( + num_replicas=1, + placement_group_bundles=[{"CPU": 1}, {"CPU": 0.1}], + placement_group_strategy="STRICT_PACK", + ) + class D: + def get_pg(self) -> PlacementGroup: + return get_current_placement_group() + + h = serve.run(D.bind(), name="pg_test") + + # Check that the original replica is created with the expected PG config. + assert len(get_all_live_placement_group_names()) == 1 + original_pg = ray.get(h.get_pg.remote()) + assert original_pg.bundle_specs == [{"CPU": 1}, {"CPU": 0.1}] + assert _get_pg_strategy(original_pg) == "STRICT_PACK" + + # Re-deploy with a new PG config. + D = D.options( + placement_group_bundles=[{"CPU": 2}, {"CPU": 0.2}], + placement_group_strategy="SPREAD", + ) + + h = serve.run(D.bind(), name="pg_test") + assert len(get_all_live_placement_group_names()) == 1 + new_pg = ray.get(h.get_pg.remote()) + assert new_pg.bundle_specs == [{"CPU": 2}, {"CPU": 0.2}] + assert _get_pg_strategy(new_pg) == "SPREAD" + + # Verify that all placement groups are deleted when the deployment is deleted. + serve.delete("pg_test") + assert len(get_all_live_placement_group_names()) == 0 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Timing out on Windows.") +def test_pg_removed_on_replica_graceful_shutdown(serve_instance): + """Verify that PGs are removed when a replica shuts down gracefully.""" + + @serve.deployment( + placement_group_bundles=[{"CPU": 1}], + ) + class D: + def get_pg(self) -> PlacementGroup: + return get_current_placement_group() + + h = serve.run(D.options(num_replicas=2).bind(), name="pg_test") + + # Two replicas to start, each should have their own placement group. + assert len(get_all_live_placement_group_names()) == 2 + original_unique_pgs = set(ray.get([h.get_pg.remote() for _ in range(20)])) + assert len(original_unique_pgs) == 2 + + # Re-deploy the application with a single replica. + # The existing PGs should be removed and a new one should be created for the + # new replica. + h = serve.run(D.options(num_replicas=1).bind(), name="pg_test") + assert len(get_all_live_placement_group_names()) == 1 + new_unique_pgs = set(ray.get([h.get_pg.remote() for _ in range(20)])) + assert len(new_unique_pgs) == 1 + assert not new_unique_pgs.issubset(original_unique_pgs) + + # Verify that all placement groups are deleted when the deployment is deleted. + serve.delete("pg_test") + assert len(get_all_live_placement_group_names()) == 0 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Timing out on Windows.") +def test_pg_removed_on_replica_crash(serve_instance): + """Verify that PGs are removed when a replica crashes unexpectedly.""" + + @serve.deployment( + placement_group_bundles=[{"CPU": 1}], + health_check_period_s=0.1, + ) + class D: + def die(self): + sys.exit() + + def get_pg(self) -> PlacementGroup: + return get_current_placement_group() + + h = serve.run(D.bind(), name="pg_test") + + # Get the placement group for the original replica. + assert len(get_all_live_placement_group_names()) == 1 + pg = ray.get(h.get_pg.remote()) + + # Kill the replica forcefully. + with pytest.raises(ray.exceptions.RayActorError): + ray.get(h.die.remote()) + + def new_replica_scheduled(): + try: + ray.get(h.get_pg.remote()) + except ray.exceptions.RayActorError: + return False + + return True + + # The original placement group should be deleted and a new replica should + # be scheduled with its own new placement group. + wait_for_condition(new_replica_scheduled) + new_pg = ray.get(h.get_pg.remote()) + assert pg != new_pg + assert len(get_all_live_placement_group_names()) == 1 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Timing out on Windows.") +def test_pg_removed_after_controller_crash(serve_instance): + """Verify that PGs are removed normally after recovering from a controller crash. + + If the placement group was not properly recovered in the replica recovery process, + it would be leaked here. + """ + + @serve.deployment( + placement_group_bundles=[{"CPU": 1}], + ) + class D: + pass + + serve.run(D.bind(), name="pg_test") + assert len(get_all_live_placement_group_names()) == 1 + + ray.kill(get_global_client()._controller, no_restart=False) + + serve.delete("pg_test") + assert len(get_all_live_placement_group_names()) == 0 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Timing out on Windows.") +def test_leaked_pg_removed_on_controller_recovery(serve_instance): + """Verify that leaked PGs are removed on controller recovery. + + A placement group can be "leaked" if the replica is killed while the controller is + down or the controller crashes between creating a placement group and its replica. + + In these cases, the controller should detect the leak on recovery and delete the + leaked placement group(s). + """ + + @serve.deployment( + placement_group_bundles=[{"CPU": 1}], + health_check_period_s=0.1, + ) + class D: + def die(self): + sys.exit() + + def get_pg(self) -> PlacementGroup: + return get_current_placement_group() + + h = serve.run(D.bind(), name="pg_test") + + prev_pg = ray.get(h.get_pg.remote()) + assert len(get_all_live_placement_group_names()) == 1 + + # Kill the controller and the replica immediately after. + # This will cause the controller to *not* detect the replica on recovery, but it + # should still detect the leaked placement group and clean it up. + ray.kill(get_global_client()._controller, no_restart=False) + with pytest.raises(ray.exceptions.RayActorError): + ray.get(h.die.remote()) + + def leaked_pg_cleaned_up(): + try: + new_pg = ray.get(h.get_pg.remote()) + except ray.exceptions.RayActorError: + return False + + return len(get_all_live_placement_group_names()) == 1 and new_pg != prev_pg + + # Verify that a new replica is placed with a new placement group and the old + # placement group has been removed. + wait_for_condition(leaked_pg_cleaned_up) + + serve.delete("pg_test") + assert len(get_all_live_placement_group_names()) == 0 + + +@pytest.mark.skipif(sys.platform == "win32", reason="Timing out on Windows.") +def test_replica_actor_infeasible(serve_instance): + """Test that we get a validation error if the replica doesn't fit in the bundle.""" + + @serve.deployment( + placement_group_bundles=[{"CPU": 0.1}], + ) + class Infeasible: + pass + + with pytest.raises(ValueError): + serve.run(Infeasible.bind()) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Timing out on Windows.") +def test_coschedule_actors_and_tasks(serve_instance): + """Test that actor/tasks are placed in the replica's placement group by default.""" + + @ray.remote(num_cpus=1) + class TestActor: + def get_pg(self) -> PlacementGroup: + return get_current_placement_group() + + @ray.remote + def get_pg(): + return get_current_placement_group() + + @serve.deployment( + # Bundles have space for one additional 1-CPU actor. + placement_group_bundles=[{"CPU": 1}, {"CPU": 1}], + ) + class Parent: + def run_test(self): + # First actor should be scheduled in the placement group without issue. + a1 = TestActor.remote() + assert ray.get(a1.get_pg.remote()) == get_current_placement_group() + + # Second actor can't be placed because there are no more resources in the + # placement group (the first actor is occupying the second bundle). + a2 = TestActor.remote() + ready, _ = ray.wait([a2.get_pg.remote()], timeout=0.1) + assert len(ready) == 0 + ray.kill(a2) + + # Second actor can be successfully scheduled outside the placement group. + a3 = TestActor.options( + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=None + ) + ).remote() + assert ray.get(a3.get_pg.remote()) is None + + # A zero-CPU task can be scheduled in the placement group. + assert ( + ray.get(get_pg.options(num_cpus=0).remote()) + == get_current_placement_group() + ) + + # A two-CPU task cannot fit in the placement group. + with pytest.raises(ValueError): + get_pg.options(num_cpus=2).remote() + + # A two-CPU task can be scheduled outside the placement group. + assert ( + ray.get( + get_pg.options( + num_cpus=2, + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=None + ), + ).remote() + ) + is None + ) + + h = serve.run(Parent.bind()) + ray.get(h.run_test.remote()) + + +if __name__ == "__main__": + sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/test_util.py b/python/ray/serve/tests/test_util.py index fdd6f4e348eb1..d2af287639b09 100644 --- a/python/ray/serve/tests/test_util.py +++ b/python/ray/serve/tests/test_util.py @@ -22,6 +22,7 @@ serve_encoders, snake_to_camel_case, dict_keys_snake_to_camel_case, + get_all_live_placement_group_names, get_head_node_id, ) from ray._private.resource_spec import HEAD_NODE_RESOURCE_NAME @@ -618,6 +619,52 @@ def test_calculate_remaining_timeout(): ) +def test_get_all_live_placement_group_names(ray_instance): + """Test the logic to parse the Ray placement group table. + + The test contains three cases: + - A named placement group that was created and is still alive ("pg2"). + - A named placement group that's still being scheduled ("pg3"). + - An unnamed placement group. + + Only "pg2" and "pg3" should be returned as live placement group names. + """ + + # Named placement group that's been removed (should not be returned). + pg1 = ray.util.placement_group([{"CPU": 0.1}], name="pg1") + ray.util.remove_placement_group(pg1) + + # Named, detached placement group that's been removed (should not be returned). + pg2 = ray.util.placement_group([{"CPU": 0.1}], name="pg2", lifetime="detached") + ray.util.remove_placement_group(pg2) + + # Named placement group that's still alive (should be returned). + pg3 = ray.util.placement_group([{"CPU": 0.1}], name="pg3") + assert pg3.wait() + + # Named, detached placement group that's still alive (should be returned). + pg4 = ray.util.placement_group([{"CPU": 0.1}], name="pg4", lifetime="detached") + assert pg4.wait() + + # Named placement group that's being scheduled (should be returned). + pg5 = ray.util.placement_group([{"CPU": 1000}], name="pg5") + assert not pg5.wait(timeout_seconds=0.001) + + # Named, detached placement group that's being scheduled (should be returned). + pg6 = ray.util.placement_group([{"CPU": 1000}], name="pg6", lifetime="detached") + assert not pg6.wait(timeout_seconds=0.001) + + # Unnamed placement group (should not be returned). + pg7 = ray.util.placement_group([{"CPU": 0.1}]) + assert pg7.wait() + + # Unnamed, detached placement group (should not be returned). + pg8 = ray.util.placement_group([{"CPU": 0.1}], lifetime="detached") + assert pg8.wait() + + assert set(get_all_live_placement_group_names()) == {"pg3", "pg4", "pg5", "pg6"} + + if __name__ == "__main__": import sys diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index dad9981c6f002..32c0abdb28512 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -12,6 +12,13 @@ bundle_reservation_check = None BUNDLE_RESOURCE_LABEL = "bundle" +VALID_PLACEMENT_GROUP_STRATEGIES = { + "PACK", + "SPREAD", + "STRICT_PACK", + "STRICT_SPREAD", +} + # We need to import this method to use for ready API. # But ray.remote is only available in runtime, and @@ -219,6 +226,12 @@ def placement_group( stacklevel=1, ) + if strategy not in VALID_PLACEMENT_GROUP_STRATEGIES: + raise ValueError( + f"Invalid placement group strategy {strategy}. " + f"Supported strategies are: {VALID_PLACEMENT_GROUP_STRATEGIES}." + ) + if lifetime is None: detached = False elif lifetime == "detached": @@ -271,7 +284,9 @@ def get_placement_group(placement_group_name: str) -> PlacementGroup: placement_group_name, worker.namespace ) if placement_group_info is None: - raise ValueError(f"Failed to look up actor with name: {placement_group_name}") + raise ValueError( + f"Failed to look up placement group with name: {placement_group_name}" + ) else: return PlacementGroup( PlacementGroupID(hex_to_binary(placement_group_info["placement_group_id"])) diff --git a/src/ray/protobuf/serve.proto b/src/ray/protobuf/serve.proto index 8485bb5dbaaaa..7f88eea79bc69 100644 --- a/src/ray/protobuf/serve.proto +++ b/src/ray/protobuf/serve.proto @@ -152,6 +152,8 @@ message DeploymentVersion { string code_version = 1; DeploymentConfig deployment_config = 2; string ray_actor_options = 3; + string placement_group_bundles = 4; + string placement_group_strategy = 5; } message ReplicaConfig { @@ -160,6 +162,8 @@ message ReplicaConfig { bytes init_args = 3; bytes init_kwargs = 4; string ray_actor_options = 5; + string placement_group_bundles = 6; + string placement_group_strategy = 7; } message DeploymentInfo {