Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Add replica placement group support #37830

Merged
merged 35 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6223248
WIP
edoakes Jul 26, 2023
e8a7f9b
Merge branch 'master' of https://github.com/ray-project/ray into repl…
edoakes Jul 27, 2023
f57f999
Merge branch 'master' of https://github.com/ray-project/ray into repl…
edoakes Aug 8, 2023
2cddb7d
fix
edoakes Aug 8, 2023
9259426
fix
edoakes Aug 8, 2023
5227695
basic test
edoakes Aug 8, 2023
1388cae
fix
edoakes Aug 8, 2023
c3f3efe
fix stuff
edoakes Aug 8, 2023
480216a
fix
edoakes Aug 8, 2023
d28861b
Merge branch 'master' of https://github.com/ray-project/ray into repl…
edoakes Aug 8, 2023
513d523
fix
edoakes Aug 8, 2023
8caab41
fix
edoakes Aug 8, 2023
5cb9a3f
fix linter
edoakes Aug 8, 2023
702b6f5
fix
edoakes Aug 8, 2023
8de4726
fix
edoakes Aug 8, 2023
6c89e7b
fix
edoakes Aug 9, 2023
15ce708
fix
edoakes Aug 9, 2023
9a4553a
pg util tests
edoakes Aug 9, 2023
e310c87
small fix
edoakes Aug 9, 2023
a3dc176
fix
edoakes Aug 9, 2023
ea898b4
fix
edoakes Aug 9, 2023
a771ba8
fix
edoakes Aug 9, 2023
1a9eedd
Merge branch 'master' of https://github.com/ray-project/ray into repl…
edoakes Aug 9, 2023
e9949fb
fix
edoakes Aug 9, 2023
ef6597e
fix
edoakes Aug 9, 2023
e541206
add CLI support + test
edoakes Aug 9, 2023
1a53810
fix
edoakes Aug 9, 2023
a092ea8
nit
edoakes Aug 10, 2023
1722ce7
Merge branch 'master' of https://github.com/ray-project/ray into repl…
edoakes Aug 10, 2023
129eebc
test case
edoakes Aug 10, 2023
82b7924
fix msg
edoakes Aug 10, 2023
3b75f5b
fix
edoakes Aug 10, 2023
0abe3e1
make it medium
edoakes Aug 10, 2023
d490f59
Merge branch 'master' of https://github.com/ray-project/ray into repl…
edoakes Aug 10, 2023
d59f00b
skip windows
edoakes Aug 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ py_test(
deps = [":serve_lib"],
)


py_test(
name = "test_config",
size = "small",
Expand All @@ -242,7 +241,6 @@ py_test(
deps = [":serve_lib"],
)


py_test(
name = "test_failure",
size = "medium",
Expand Down Expand Up @@ -560,6 +558,14 @@ py_test(
deps = [":serve_lib"],
)

py_test(
name = "test_replica_placement_group",
size = "medium",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)

# Make sure the example showing in doc is tested
py_test(
name = "quickstart_class",
Expand Down
10 changes: 10 additions & 0 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,11 +978,21 @@ def override_deployment_info(
# if the code sets options to None).
override_actor_options = replica_config.ray_actor_options or {}

override_placement_group_bundles = options.pop(
"placement_group_bundles", replica_config.placement_group_bundles
)
override_placement_group_strategy = options.pop(
"placement_group_strategy", replica_config.placement_group_strategy
)

merged_env = override_runtime_envs_except_env_vars(
app_runtime_env, override_actor_options.get("runtime_env", {})
)
override_actor_options.update({"runtime_env": merged_env})
replica_config.update_ray_actor_options(override_actor_options)
replica_config.update_placement_group_options(
override_placement_group_bundles, override_placement_group_strategy
)
override_options["replica_config"] = replica_config

# Override deployment config options
Expand Down
2 changes: 2 additions & 0 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ def deploy_application(
deployment["init_kwargs"],
ingress=deployment["ingress"],
ray_actor_options=deployment["ray_actor_options"],
placement_group_bundles=deployment["placement_group_bundles"],
placement_group_strategy=deployment["placement_group_strategy"],
config=deployment["config"],
version=deployment["version"],
route_prefix=deployment["route_prefix"],
Expand Down
6 changes: 5 additions & 1 deletion python/ray/serve/_private/deploy_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Tuple, Union, Callable, Type, Optional, Any
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import hashlib
import json
import logging
Expand All @@ -22,6 +22,8 @@ def get_deploy_args(
init_kwargs: Dict[Any, Any],
ingress: bool = False,
ray_actor_options: Optional[Dict] = None,
placement_group_bundles: Optional[List[Dict[str, float]]] = None,
placement_group_strategy: Optional[str] = None,
config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
version: Optional[str] = None,
route_prefix: Optional[str] = None,
Expand Down Expand Up @@ -54,6 +56,8 @@ def get_deploy_args(
init_args=init_args,
init_kwargs=init_kwargs,
ray_actor_options=ray_actor_options,
placement_group_bundles=placement_group_bundles,
placement_group_strategy=placement_group_strategy,
)

if isinstance(config, dict):
Expand Down
41 changes: 36 additions & 5 deletions python/ray/serve/_private/deployment_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import sys
from typing import Callable, Dict, Tuple, List, Union, Set
from typing import Callable, Dict, Tuple, List, Optional, Union, Set
from dataclasses import dataclass
from collections import defaultdict

import ray
from ray.util.scheduling_strategies import (
NodeAffinitySchedulingStrategy,
PlacementGroupSchedulingStrategy,
)

from ray.serve._private.utils import (
get_head_node_id,
)
from ray.serve._private.cluster_node_info_cache import ClusterNodeInfoCache
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


class SpreadDeploymentSchedulingPolicy:
Expand Down Expand Up @@ -38,6 +42,10 @@ class ReplicaSchedulingRequest:
actor_options: Dict
actor_init_args: Tuple
on_scheduled: Callable
# Placement group bundles and strategy *for this replica*.
# These are optional: by default replicas do not have a placement group.
placement_group_bundles: Optional[List[Dict[str, float]]] = None
placement_group_strategy: Optional[str] = None
Comment on lines +45 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can create a PlacementGroupDeploymentSchedulingPolicy that contains placement_group_bundles and placement_group_strategy and register this policy during on_deployment_created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought more about this and actually I don't think it changes anything related to the DeploymentSchedulingPolicy. The placement group is only relevant to each replica itself. We probably still want to SPREAD the different placement groups among each other like the existing policy (and maintain things like max_replicas_per_node).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea makes sense. Currently there is no way to spread PGs.

If you think it's a valid case, do you mind filing an enhancement issue so I can track on my side.



@dataclass
Expand Down Expand Up @@ -191,13 +199,36 @@ def _schedule_spread_deployment(self, deployment_name: str) -> None:
pending_replica_name
]

placement_group = None
if replica_scheduling_request.placement_group_bundles is not None:
strategy = (
replica_scheduling_request.placement_group_strategy
if replica_scheduling_request.placement_group_strategy
else "PACK"
shrekris-anyscale marked this conversation as resolved.
Show resolved Hide resolved
)
placement_group = ray.util.placement_group(
replica_scheduling_request.placement_group_bundles,
strategy=strategy,
lifetime="detached",
name=replica_scheduling_request.actor_options["name"],
)
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_capture_child_tasks=True,
)
else:
scheduling_strategy = "SPREAD"

actor_handle = replica_scheduling_request.actor_def.options(
scheduling_strategy="SPREAD",
scheduling_strategy=scheduling_strategy,
**replica_scheduling_request.actor_options,
).remote(*replica_scheduling_request.actor_init_args)

del self._pending_replicas[deployment_name][pending_replica_name]
self._launching_replicas[deployment_name][pending_replica_name] = None
replica_scheduling_request.on_scheduled(actor_handle)
replica_scheduling_request.on_scheduled(
actor_handle, placement_group=placement_group
)

def _schedule_driver_deployment(self, deployment_name: str) -> None:
if self._recovering_replicas[deployment_name]:
Expand Down Expand Up @@ -236,7 +267,7 @@ def _schedule_driver_deployment(self, deployment_name: str) -> None:
self._launching_replicas[deployment_name][
pending_replica_name
] = target_node_id
replica_scheduling_request.on_scheduled(actor_handle)
replica_scheduling_request.on_scheduled(actor_handle, placement_group=None)

def _get_replicas_to_stop(
self, deployment_name: str, max_num_to_stop: int
Expand Down
Loading
Loading