Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Add serve.get_deployment() API #14953

Merged
merged 14 commits into from
Mar 31, 2021
8 changes: 8 additions & 0 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ py_test(
deps = [":serve_lib"],
)

py_test(
name = "test_get_deployment",
size = "medium",
srcs = serve_tests_srcs,
tags = ["exclusive"],
deps = [":serve_lib"],
)

py_test(
name = "test_advanced",
size = "medium",
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
accept_batch, connect, start, get_replica_context, get_handle,
shadow_traffic, set_traffic, delete_backend, list_backends, create_backend,
get_backend_config, update_backend_config, list_endpoints, delete_endpoint,
create_endpoint, shutdown, ingress, deployment)
create_endpoint, shutdown, ingress, deployment, get_deployment)
from ray.serve.batching import batch
from ray.serve.config import BackendConfig, HTTPOptions
from ray.serve.utils import ServeRequest
Expand All @@ -18,5 +18,5 @@
"shadow_traffic", "set_traffic", "delete_backend", "list_backends",
"create_backend", "get_backend_config", "update_backend_config",
"list_endpoints", "delete_endpoint", "create_endpoint", "shutdown",
"ingress", "deployment"
"ingress", "deployment", "get_deployment"
]
37 changes: 30 additions & 7 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from warnings import warn

from ray.actor import ActorHandle
from ray.serve.common import EndpointTag, GoalId
from ray.serve.common import BackendInfo, EndpointTag, GoalId
from ray.serve.config import (BackendConfig, BackendMetadata, HTTPOptions,
ReplicaConfig)
from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
Expand Down Expand Up @@ -513,6 +513,10 @@ def deploy(self,
def delete_deployment(self, name: str) -> None:
self._wait_for_goal(self._controller.delete_deployment.remote(name))

@_ensure_connected
def get_deployment_info(self, name: str) -> Tuple[BackendInfo, str]:
return ray.get(self._controller.get_deployment_info.remote(name))

@_ensure_connected
def list_backends(self) -> Dict[str, BackendConfig]:
"""Returns a dictionary of all registered backends.
Expand Down Expand Up @@ -1172,18 +1176,13 @@ def deploy(cls, *init_args):
if len(init_args) == 0 and cls._init_args is not None:
init_args = cls._init_args

if cls._version is not None:
version = cls._version
else:
version = get_random_letters()

return _get_global_client().deploy(
cls._name,
cls._backend_def,
*init_args,
ray_actor_options=cls._ray_actor_options,
config=cls._config,
version=version,
version=cls._version,
_internal=True)

@classmethod
Expand Down Expand Up @@ -1306,3 +1305,27 @@ def decorator(backend_def):
ray_actor_options=ray_actor_options)

return decorator


def get_deployment(name: str) -> ServeDeployment:
"""Retrieve RayServeHandle for service endpoint to invoke it from Python.

Args:
name(str): name of the deployment. This must have already been
deployed.

Returns:
ServeDeployment
"""
try:
backend_info, route = _get_global_client().get_deployment_info(name)
except KeyError:
raise KeyError(f"Deployment {name} was not found. "
"Did you call Deployment.deploy()?")
return make_deployment_cls(
backend_info.replica_config.backend_def,
name,
backend_info.backend_config,
version=backend_info.version,
init_args=backend_info.replica_config.init_args,
ray_actor_options=backend_info.replica_config.ray_actor_options)
55 changes: 25 additions & 30 deletions python/ray/serve/backend_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import ray.cloudpickle as pickle
from ray.actor import ActorHandle
from ray.serve.async_goal_manager import AsyncGoalManager
from ray.serve.backend_worker import create_backend_replica
from ray.serve.common import (BackendInfo, BackendTag, Duration, GoalId,
ReplicaTag)
from ray.serve.config import BackendConfig, ReplicaConfig
from ray.serve.config import BackendConfig
from ray.serve.constants import RESERVED_VERSION_TAG
from ray.serve.kv_store import RayInternalKVStore
from ray.serve.long_poll import LongPollHost, LongPollNamespace
from ray.serve.utils import (format_actor_name, get_random_letters, logger)
Expand Down Expand Up @@ -539,57 +539,52 @@ def get_backend(self, backend_tag: BackendTag) -> Optional[BackendInfo]:
return self._backend_metadata.get(backend_tag)

def _set_backend_goal(self, backend_tag: BackendTag,
backend_info: BackendInfo,
version: Optional[str]) -> None:
backend_info: Optional[BackendInfo]) -> None:
existing_goal_id = self._backend_goals.get(backend_tag)
new_goal_id = self._goal_manager.create_goal()

if backend_info is not None:
self._backend_metadata[backend_tag] = backend_info
self._target_replicas[
backend_tag] = backend_info.backend_config.num_replicas

if backend_info.version is not None:
version = backend_info.version
else:
version = get_random_letters()
self._target_versions[backend_tag] = version
else:
self._target_replicas[backend_tag] = 0

self._backend_goals[backend_tag] = new_goal_id
self._target_versions[backend_tag] = version

return new_goal_id, existing_goal_id

def deploy_backend(self,
backend_tag: BackendTag,
backend_config: BackendConfig,
replica_config: ReplicaConfig,
version: Optional[str] = None) -> Optional[GoalId]:
def deploy_backend(self, backend_tag: BackendTag,
backend_info: BackendInfo) -> Optional[GoalId]:
# Ensures this method is idempotent.
backend_info = self._backend_metadata.get(backend_tag)
if backend_info is not None:
# Old codepath.
if version is None:
if (backend_info.backend_config == backend_config
and backend_info.replica_config == replica_config):
existing_info = self._backend_metadata.get(backend_tag)
if existing_info is not None:
# Old codepath. We use RESERVED_VERSION_TAG to distinguish that
# we shouldn't use versions at all to determine redeployment
# because `None` is used to indicate always redeploying.
if backend_info.version == RESERVED_VERSION_TAG:
edoakes marked this conversation as resolved.
Show resolved Hide resolved
if (existing_info.backend_config == backend_info.backend_config
and existing_info.replica_config ==
backend_info.replica_config):
return self._backend_goals.get(backend_tag, None)
# New codepath: treat version as ground truth for implementation.
else:
if (backend_info.backend_config == backend_config
and self._target_versions[backend_tag] == version):
if (existing_info.backend_config == backend_info.backend_config
and backend_info.version is not None
and existing_info.version == backend_info.version):
return self._backend_goals.get(backend_tag, None)

if backend_tag not in self._replicas:
self._replicas[backend_tag] = ReplicaStateContainer()

backend_replica_class = create_backend_replica(
replica_config.backend_def)

# Save creator that starts replicas, the arguments to be passed in,
# and the configuration for the backends.
backend_info = BackendInfo(
worker_class=backend_replica_class,
backend_config=backend_config,
replica_config=replica_config)

new_goal_id, existing_goal_id = self._set_backend_goal(
backend_tag, backend_info, version)
backend_tag, backend_info)

# NOTE(edoakes): we must write a checkpoint before starting new
# or pushing the updated config to avoid inconsistent state if we
Expand All @@ -609,7 +604,7 @@ def delete_backend(self, backend_tag: BackendTag,
return None

new_goal_id, existing_goal_id = self._set_backend_goal(
backend_tag, None, None)
backend_tag, None)
if force_kill:
self._backend_metadata[
backend_tag].backend_config.\
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pydantic import BaseModel
from typing import Dict, Any
from typing import Any, Dict, Optional
from uuid import UUID

import numpy as np
Expand All @@ -18,6 +18,7 @@ class BackendInfo(BaseModel):
# TODO(architkulkarni): Add type hint for worker_class after upgrading
# cloudpickle and adding types to RayServeWrappedReplica
worker_class: Any
version: Optional[str]
backend_config: BackendConfig
replica_config: ReplicaConfig

Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __init__(self, backend_def, *init_args, ray_actor_options=None):
self.is_blocking = _callable_is_blocking(backend_def)
self.is_asgi_app = hasattr(backend_def, "_serve_asgi_app")
self.path_prefix = getattr(backend_def, "_serve_path_prefix", None)
self.init_args = list(init_args)
self.init_args = init_args
if ray_actor_options is None:
self.ray_actor_options = {}
else:
Expand Down
70 changes: 47 additions & 23 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import asyncio
from collections import defaultdict
import inspect
from typing import Dict, Any, List, Optional
from typing import Dict, Any, List, Optional, Tuple

import ray
from ray.actor import ActorHandle
from ray.serve.async_goal_manager import AsyncGoalManager
from ray.serve.backend_state import BackendState
from ray.serve.backend_worker import create_backend_replica
from ray.serve.common import (
BackendInfo,
BackendTag,
EndpointTag,
GoalId,
Expand Down Expand Up @@ -196,8 +198,13 @@ async def create_backend(
replica_config: ReplicaConfig) -> Optional[GoalId]:
"""Register a new backend under the specified tag."""
async with self.write_lock:
return self.backend_state.deploy_backend(
backend_tag, backend_config, replica_config)
backend_info = BackendInfo(
worker_class=create_backend_replica(
replica_config.backend_def),
version=RESERVED_VERSION_TAG,
backend_config=backend_config,
replica_config=replica_config)
return self.backend_state.deploy_backend(backend_tag, backend_info)

async def delete_backend(self,
backend_tag: BackendTag,
Expand All @@ -217,16 +224,17 @@ async def update_backend_config(self, backend_tag: BackendTag,
config_options: BackendConfig) -> GoalId:
"""Set the config for the specified backend."""
async with self.write_lock:
existing_backend_info = self.backend_state.get_backend(backend_tag)
if existing_backend_info is None:
existing_info = self.backend_state.get_backend(backend_tag)
if existing_info is None:
raise ValueError(f"Backend {backend_tag} is not registered.")

existing_replica_config = existing_backend_info.replica_config
new_backend_config = existing_backend_info.backend_config.copy(
update=config_options.dict(exclude_unset=True))

return self.backend_state.deploy_backend(
backend_tag, new_backend_config, existing_replica_config)
backend_info = BackendInfo(
worker_class=existing_info.worker_class,
version=existing_info.version,
backend_config=existing_info.backend_config.copy(
update=config_options.dict(exclude_unset=True)),
replica_config=existing_info.replica_config)
return self.backend_state.deploy_backend(backend_tag, backend_info)

def get_backend_config(self, backend_tag: BackendTag) -> BackendConfig:
"""Get the current config for the specified backend."""
Expand Down Expand Up @@ -282,18 +290,14 @@ async def deploy(self, name: str, backend_config: BackendConfig,
python_methods.append(method_name)

async with self.write_lock:
if version is None:
version = RESERVED_VERSION_TAG
else:
if version == RESERVED_VERSION_TAG:
# TODO(edoakes): this is unlikely to ever be hit, but it's
# still ugly and should be removed once the old codepath
# can be deleted.
raise ValueError(
f"Version {RESERVED_VERSION_TAG} is reserved and "
"cannot be used by applications.")
goal_id = self.backend_state.deploy_backend(
name, backend_config, replica_config, version)
backend_info = BackendInfo(
worker_class=create_backend_replica(
replica_config.backend_def),
version=version,
backend_config=backend_config,
replica_config=replica_config)

goal_id = self.backend_state.deploy_backend(name, backend_info)
self.endpoint_state.create_endpoint(
name,
http_route,
Expand All @@ -307,3 +311,23 @@ async def deploy(self, name: str, backend_config: BackendConfig,
def delete_deployment(self, name: str) -> Optional[GoalId]:
self.endpoint_state.delete_endpoint(name)
return self.backend_state.delete_backend(name, force_kill=False)

def get_deployment_info(self, name: str) -> Tuple[BackendInfo, str]:
"""Get the current information about a deployment.

Args:
name(str): the name of the deployment.

Returns:
(BackendInfo, route)

Raises:
KeyError if the deployment doesn't exist.
"""
backend_info: BackendInfo = self.backend_state.get_backend(name)
if backend_info is None:
raise KeyError(f"Deployment {name} does not exist.")

route = self.endpoint_state.get_endpoint_route(name)

return backend_info, route
6 changes: 6 additions & 0 deletions python/ray/serve/endpoint_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ def shadow_traffic(self, endpoint: EndpointTag, backend: BackendTag,
self._checkpoint()
self._notify_traffic_policies_changed(endpoint)

def get_endpoint_route(self, endpoint: EndpointTag) -> Optional[str]:
architkulkarni marked this conversation as resolved.
Show resolved Hide resolved
for route, (route_endpoint, methods) in self._routes.items():
if route_endpoint == endpoint:
return route
return None

def get_endpoints(self) -> Dict[EndpointTag, Dict[str, Any]]:
endpoints = {}
for route, (endpoint, methods) in self._routes.items():
Expand Down
Loading