Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ray-project/ray into http…
Browse files Browse the repository at this point in the history
…-proxy-as-deployment
  • Loading branch information
edoakes committed May 14, 2021
2 parents 98e410b + 7b1c5db commit e75ce44
Show file tree
Hide file tree
Showing 32 changed files with 522 additions and 118 deletions.
9 changes: 9 additions & 0 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ def auto_http_archive(*, name=None, url=None, urls=True,

def ray_deps_setup():

# Explicitly bring in protobuf dependency to work around
# https://github.com/ray-project/ray/issues/14117
http_archive(
name = "com_google_protobuf",
strip_prefix = "protobuf-3.16.0",
urls = ["https://github.com/protocolbuffers/protobuf/archive/v3.16.0.tar.gz"],
sha256 = "7892a35d979304a404400a101c46ce90e85ec9e2a766a86041bb361f626247f5",
)

auto_http_archive(
name = "com_github_antirez_redis",
build_file = "//bazel:BUILD.redis",
Expand Down
4 changes: 2 additions & 2 deletions dashboard/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
"use-debounce": "^3.4.3"
},
"devDependencies": {
"eslint-plugin-import": "^2.20.1",
"eslint-plugin-prefer-arrow": "^1.1.7",
"eslint-plugin-import": "2.20.1",
"eslint-plugin-prefer-arrow": "1.1.7",
"prettier": "2.3.0"
},
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion java/dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def gen_java_deps():
artifacts = [
"com.google.code.gson:gson:2.8.5",
"com.google.guava:guava:27.0.1-jre",
"com.google.protobuf:protobuf-java:3.8.0",
"com.google.protobuf:protobuf-java:3.16.0",
"com.puppycrawl.tools:checkstyle:8.15",
"com.sun.xml.bind:jaxb-core:2.3.0",
"com.sun.xml.bind:jaxb-impl:2.3.0",
Expand Down
2 changes: 1 addition & 1 deletion java/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
Expand Down
2 changes: 1 addition & 1 deletion java/test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
Expand Down
4 changes: 4 additions & 0 deletions python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ def _configure_system():
from ray.cross_language import java_function, java_actor_class # noqa: E402
from ray.runtime_context import get_runtime_context # noqa: E402
from ray import util # noqa: E402
# We import ClientBuilder so that modules can inherit from `ray.ClientBuilder`.
from ray.client_builder import client, ClientBuilder # noqa: E402

# Replaced with the current commit when building the wheels.
__commit__ = "{{RAY_COMMIT_SHA}}"
Expand All @@ -106,6 +108,8 @@ def _configure_system():
"actor",
"available_resources",
"cancel",
"client",
"ClientBuilder",
"cluster_resources",
"get",
"get_actor",
Expand Down
6 changes: 5 additions & 1 deletion python/ray/_private/runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ def _dir_travel(
excludes.append(e)
skip = any(e(path) for e in excludes)
if not skip:
handler(path)
try:
handler(path)
except Exception as e:
logger.error(f"Issue with path: {path}")
raise e
if path.is_dir():
for sub_path in path.iterdir():
_dir_travel(sub_path, excludes, handler)
Expand Down
99 changes: 99 additions & 0 deletions python/ray/client_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os
import importlib
import logging
from dataclasses import dataclass
from urllib.parse import urlparse
from typing import Any, Dict, Optional, Tuple

from ray.ray_constants import RAY_ADDRESS_ENVIRONMENT_VARIABLE
from ray.job_config import JobConfig
import ray.util.client_connect

logger = logging.getLogger(__name__)


@dataclass
class ClientInfo:
"""
Basic information of the remote server for a given Ray Client connection.
"""
dashboard_url: Optional[str]
python_version: str
ray_version: str
ray_commit: str
protocol_version: str


class ClientBuilder:
"""
Builder for a Ray Client connection.
"""

def __init__(self, address: Optional[str]) -> None:
self.address = address
self._job_config = JobConfig()

def env(self, env: Dict[str, Any]) -> "ClientBuilder":
"""
Set an environment for the session.
"""
self._job_config = JobConfig(runtime_env=env)
return self

def connect(self) -> ClientInfo:
"""
Begin a connection to the address passed in via ray.client(...).
"""
client_info_dict = ray.util.client_connect.connect(
self.address, job_config=self._job_config)
dashboard_url = ray.get(
ray.remote(ray.worker.get_dashboard_url).remote())
return ClientInfo(
dashboard_url=dashboard_url,
python_version=client_info_dict["python_version"],
ray_version=client_info_dict["ray_version"],
ray_commit=client_info_dict["ray_commit"],
protocol_version=client_info_dict["protocol_version"])


class _LocalClientBuilder(ClientBuilder):
pass


def _split_address(address: str) -> Tuple[str, str]:
"""
Splits address into a module string (scheme) and an inner_address.
"""
if "://" not in address:
address = "ray://" + address
url_object = urlparse(address)
module_string = url_object.scheme
inner_address = address.replace(module_string + "://", "", 1)
return (module_string, inner_address)


def _get_builder_from_address(address: Optional[str]) -> ClientBuilder:
if address is None or address == "local":
return _LocalClientBuilder(address)
module_string, inner_address = _split_address(address)
module = importlib.import_module(module_string)
return module.ClientBuilder(inner_address)


def client(address: Optional[str] = None) -> ClientBuilder:
"""
Creates a ClientBuilder based on the provided address. The address can be
of the following forms:
* None -> Connects to or creates a local cluster and connects to it.
* local -> Creates a new cluster locally and connects to it.
* IP:Port -> Connects to a Ray Client Server at the given address.
* module://inner_address -> load module.ClientBuilder & pass inner_address
"""
override_address = os.environ.get(RAY_ADDRESS_ENVIRONMENT_VARIABLE)
if override_address:
logger.debug(
f"Using address ({override_address}) instead of "
f"({address}) because {RAY_ADDRESS_ENVIRONMENT_VARIABLE} is set")
address = override_address

return _get_builder_from_address(address)
2 changes: 2 additions & 0 deletions python/ray/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def env_bool(key, default):
# we attempt to start the service running at this port.
DEFAULT_PORT = 6379

RAY_ADDRESS_ENVIRONMENT_VARIABLE = "RAY_ADDRESS"

DEFAULT_DASHBOARD_IP = "127.0.0.1"
DEFAULT_DASHBOARD_PORT = 8265
REDIS_KEY_DASHBOARD = "dashboard"
Expand Down
51 changes: 36 additions & 15 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,8 @@ def shutdown(self) -> None:
self._shutdown = True

def _wait_for_goal(self,
result_object_id: ray.ObjectRef,
goal_id: Optional[GoalId],
timeout: Optional[float] = None) -> bool:
goal_id: Optional[GoalId] = ray.get(result_object_id)
if goal_id is None:
return True

Expand Down Expand Up @@ -216,7 +215,7 @@ def create_endpoint(self,
"an element of type {}".format(type(method)))
upper_methods.append(method.upper())

self._wait_for_goal(
ray.get(
self._controller.create_endpoint.remote(
endpoint_name, {backend: 1.0}, route, upper_methods))

Expand Down Expand Up @@ -266,8 +265,9 @@ def update_backend_config(
if isinstance(config_options, dict):
config_options = BackendConfig.parse_obj(config_options)
self._wait_for_goal(
self._controller.update_backend_config.remote(
backend_tag, config_options))
ray.get(
self._controller.update_backend_config.remote(
backend_tag, config_options)))

@_ensure_connected
def get_backend_config(self, backend_tag: str) -> BackendConfig:
Expand Down Expand Up @@ -349,8 +349,9 @@ def create_backend(
raise TypeError("config must be a BackendConfig or a dictionary.")

self._wait_for_goal(
self._controller.create_backend.remote(backend_tag, backend_config,
replica_config))
ray.get(
self._controller.create_backend.remote(
backend_tag, backend_config, replica_config)))

@_ensure_connected
def deploy(self,
Expand Down Expand Up @@ -392,17 +393,35 @@ def deploy(self,
else:
raise TypeError("config must be a BackendConfig or a dictionary.")

goal_ref = self._controller.deploy.remote(
name, backend_config, replica_config, version, route_prefix)
python_methods = []
if inspect.isclass(backend_def):
for method_name, _ in inspect.getmembers(backend_def,
inspect.isfunction):
python_methods.append(method_name)

goal_id, updating = ray.get(
self._controller.deploy.remote(name, backend_config,
replica_config, python_methods,
version, route_prefix))

if updating:
msg = f"Updating deployment '{name}'"
if version is not None:
msg += f" to version '{version}'"
logger.info(f"{msg}.")
else:
logger.info(f"Deployment '{name}' is already at version "
f"'{version}', not updating.")

if _blocking:
self._wait_for_goal(goal_ref)
self._wait_for_goal(goal_id)
else:
return goal_ref
return goal_id

@_ensure_connected
def delete_deployment(self, name: str) -> None:
self._wait_for_goal(self._controller.delete_deployment.remote(name))
self._wait_for_goal(
ray.get(self._controller.delete_deployment.remote(name)))

@_ensure_connected
def get_deployment_info(self, name: str) -> Tuple[BackendInfo, str]:
Expand Down Expand Up @@ -432,7 +451,8 @@ def delete_backend(self, backend_tag: str, force: bool = False) -> None:
for graceful shutdown. Default to false.
"""
self._wait_for_goal(
self._controller.delete_backend.remote(backend_tag, force))
ray.get(
self._controller.delete_backend.remote(backend_tag, force)))

@_ensure_connected
def set_traffic(self, endpoint_name: str,
Expand Down Expand Up @@ -1361,7 +1381,7 @@ def get_deployment(name: str) -> Deployment:
raise KeyError(f"Deployment {name} was not found. "
"Did you call Deployment.deploy()?")
return Deployment(
backend_info.replica_config.backend_def,
cloudpickle.loads(backend_info.replica_config.serialized_backend_def),
name,
backend_info.backend_config,
version=backend_info.version,
Expand All @@ -1383,7 +1403,8 @@ def list_deployments() -> Dict[str, Deployment]:
for name, (backend_info, route_prefix) in infos.items():
if name != HTTP_PROXY_DEPLOYMENT_NAME:
deployments[name] = Deployment(
backend_info.replica_config.backend_def,
cloudpickle.loads(
backend_info.replica_config.serialized_backend_def),
name,
backend_info.backend_config,
version=backend_info.version,
Expand Down
34 changes: 25 additions & 9 deletions python/ray/serve/backend_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,13 @@ def __init__(self, controller_name: str, detached: bool,
self._notify_backend_configs_changed()
self._notify_replica_handles_changed()

def shutdown(self) -> None:
for replica_dict in self.get_running_replica_handles().values():
for replica in replica_dict.values():
ray.kill(replica, no_restart=True)

self._kv_store.delete(CHECKPOINT_KEY)

def _checkpoint(self) -> None:
self._kv_store.put(
CHECKPOINT_KEY,
Expand Down Expand Up @@ -602,8 +609,18 @@ def _set_backend_goal(self, backend_tag: BackendTag,

return new_goal_id, existing_goal_id

def deploy_backend(self, backend_tag: BackendTag,
backend_info: BackendInfo) -> Optional[GoalId]:
def deploy_backend(self, backend_tag: BackendTag, backend_info: BackendInfo
) -> Tuple[Optional[GoalId], bool]:
"""Deploy the backend.
If the backend already exists with the same version, this is a no-op
and returns the GoalId corresponding to the existing update if there
is one.
Returns:
GoalId, bool: The GoalId for the client to wait for and whether or
not the backend is being updated.
"""
# Ensures this method is idempotent.
existing_info = self._backend_metadata.get(backend_tag)
if existing_info is not None:
Expand All @@ -614,13 +631,12 @@ def deploy_backend(self, backend_tag: BackendTag,
if (existing_info.backend_config == backend_info.backend_config
and existing_info.replica_config ==
backend_info.replica_config):
return self._backend_goals.get(backend_tag, None)
return self._backend_goals.get(backend_tag, None), False
# New codepath: treat version as ground truth for implementation.
else:
if (existing_info.backend_config == backend_info.backend_config
and backend_info.version is not None
and existing_info.version == backend_info.version):
return self._backend_goals.get(backend_tag, None)
elif (existing_info.backend_config == backend_info.backend_config
and backend_info.version is not None
and existing_info.version == backend_info.version):
return self._backend_goals.get(backend_tag, None), False

if backend_tag not in self._replicas:
self._replicas[backend_tag] = ReplicaStateContainer()
Expand All @@ -636,7 +652,7 @@ def deploy_backend(self, backend_tag: BackendTag,

if existing_goal_id is not None:
self._goal_manager.complete_goal(existing_goal_id)
return new_goal_id
return new_goal_id, True

def delete_backend(self, backend_tag: BackendTag,
force_kill: bool = False) -> Optional[GoalId]:
Expand Down
Loading

0 comments on commit e75ce44

Please sign in to comment.