Skip to content

Commit

Permalink
[serve] Add replica placement group support (#37830)
Browse files Browse the repository at this point in the history
Adds support for `placement_group_strategy` and `placement_group_policy` to the deployment config. This enables creating a placement group _per replica_ of a deployment which is a feature request from users orchestrating multiple actors within a replica (e.g., to perform model-parallel inference).

The replica actor will be created in the bundle with index `0` (following the precedent set in Ray Train and Ray Tune).
  • Loading branch information
edoakes committed Aug 10, 2023
1 parent e8e6b22 commit 6fef803
Show file tree
Hide file tree
Showing 25 changed files with 1,140 additions and 63 deletions.
10 changes: 8 additions & 2 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ py_test(
deps = [":serve_lib"],
)


py_test(
name = "test_config",
size = "small",
Expand All @@ -242,7 +241,6 @@ py_test(
deps = [":serve_lib"],
)


py_test(
name = "test_failure",
size = "medium",
Expand Down Expand Up @@ -560,6 +558,14 @@ py_test(
deps = [":serve_lib"],
)

py_test(
name = "test_replica_placement_group",
size = "medium",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)

# Make sure the example showing in doc is tested
py_test(
name = "quickstart_class",
Expand Down
10 changes: 10 additions & 0 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,11 +978,21 @@ def override_deployment_info(
# if the code sets options to None).
override_actor_options = replica_config.ray_actor_options or {}

override_placement_group_bundles = options.pop(
"placement_group_bundles", replica_config.placement_group_bundles
)
override_placement_group_strategy = options.pop(
"placement_group_strategy", replica_config.placement_group_strategy
)

merged_env = override_runtime_envs_except_env_vars(
app_runtime_env, override_actor_options.get("runtime_env", {})
)
override_actor_options.update({"runtime_env": merged_env})
replica_config.update_ray_actor_options(override_actor_options)
replica_config.update_placement_group_options(
override_placement_group_bundles, override_placement_group_strategy
)
override_options["replica_config"] = replica_config

# Override deployment config options
Expand Down
2 changes: 2 additions & 0 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ def deploy_application(
deployment["init_kwargs"],
ingress=deployment["ingress"],
ray_actor_options=deployment["ray_actor_options"],
placement_group_bundles=deployment["placement_group_bundles"],
placement_group_strategy=deployment["placement_group_strategy"],
config=deployment["config"],
version=deployment["version"],
route_prefix=deployment["route_prefix"],
Expand Down
6 changes: 5 additions & 1 deletion python/ray/serve/_private/deploy_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Tuple, Union, Callable, Type, Optional, Any
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import hashlib
import json
import logging
Expand All @@ -22,6 +22,8 @@ def get_deploy_args(
init_kwargs: Dict[Any, Any],
ingress: bool = False,
ray_actor_options: Optional[Dict] = None,
placement_group_bundles: Optional[List[Dict[str, float]]] = None,
placement_group_strategy: Optional[str] = None,
config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
version: Optional[str] = None,
route_prefix: Optional[str] = None,
Expand Down Expand Up @@ -54,6 +56,8 @@ def get_deploy_args(
init_args=init_args,
init_kwargs=init_kwargs,
ray_actor_options=ray_actor_options,
placement_group_bundles=placement_group_bundles,
placement_group_strategy=placement_group_strategy,
)

if isinstance(config, dict):
Expand Down
41 changes: 36 additions & 5 deletions python/ray/serve/_private/deployment_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import sys
from typing import Callable, Dict, Tuple, List, Union, Set
from typing import Callable, Dict, Tuple, List, Optional, Union, Set
from dataclasses import dataclass
from collections import defaultdict

import ray
from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
PlacementGroupSchedulingStrategy,
)

from ray.serve._private.utils import (
get_head_node_id,
)
from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


class SpreadDeploymentSchedulingPolicy:
Expand Down Expand Up @@ -38,6 +42,10 @@ class ReplicaSchedulingRequest:
actor_options: Dict
actor_init_args: Tuple
on_scheduled: Callable
# Placement group bundles and strategy *for this replica*.
# These are optional: by default replicas do not have a placement group.
placement_group_bundles: Optional[List[Dict[str, float]]] = None
placement_group_strategy: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -191,13 +199,36 @@ def _schedule_spread_deployment(self, deployment_name: str) -> None:
pending_replica_name
]

placement_group = None
if replica_scheduling_request.placement_group_bundles is not None:
strategy = (
replica_scheduling_request.placement_group_strategy
if replica_scheduling_request.placement_group_strategy
else "PACK"
)
placement_group = ray.util.placement_group(
replica_scheduling_request.placement_group_bundles,
strategy=strategy,
lifetime="detached",
name=replica_scheduling_request.actor_options["name"],
)
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_capture_child_tasks=True,
)
else:
scheduling_strategy = "SPREAD"

actor_handle = replica_scheduling_request.actor_def.options(
scheduling_strategy="SPREAD",
scheduling_strategy=scheduling_strategy,
**replica_scheduling_request.actor_options,
).remote(*replica_scheduling_request.actor_init_args)

del self._pending_replicas[deployment_name][pending_replica_name]
self._launching_replicas[deployment_name][pending_replica_name] = None
replica_scheduling_request.on_scheduled(actor_handle)
replica_scheduling_request.on_scheduled(
actor_handle, placement_group=placement_group
)

def _schedule_driver_deployment(self, deployment_name: str) -> None:
if self._recovering_replicas[deployment_name]:
Expand Down Expand Up @@ -236,7 +267,7 @@ def _schedule_driver_deployment(self, deployment_name: str) -> None:
self._launching_replicas[deployment_name][
pending_replica_name
] = target_node_id
replica_scheduling_request.on_scheduled(actor_handle)
replica_scheduling_request.on_scheduled(actor_handle, placement_group=None)

def _get_replicas_to_stop(
self, deployment_name: str, max_num_to_stop: int
Expand Down

0 comments on commit 6fef803

Please sign in to comment.