Skip to content

Commit

Permalink
[serve] Optionally namespace serve clusters (#8447)
Browse files Browse the repository at this point in the history
  • Loading branch information
edoakes committed May 17, 2020
1 parent 67c0145 commit fb23bd6
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 65 deletions.
24 changes: 24 additions & 0 deletions doc/source/rayserve/overview.rst
Expand Up @@ -246,6 +246,30 @@ The shard key can either be specified via the X-SERVE-SHARD-KEY HTTP header or `
handle = serve.get_handle("api_endpoint")
handler.options(shard_key=session_id).remote(args)
Running Multiple Serve Clusters on one Ray Cluster
++++++++++++++++++++++++++++++++++++++++++++++++++

You can run multiple serve clusters on the same Ray cluster by providing a ``cluster_name`` to ``serve.init()``.

.. code-block:: python
# Create a first cluster whose HTTP server listens on 8000.
serve.init(cluster_name="cluster1", http_port=8000)
serve.create_endpoint("counter1", "/increment")
# Create a second cluster whose HTTP server listens on 8001.
serve.init(cluster_name="cluster2", http_port=8001)
serve.create_endpoint("counter1", "/increment")
# Create a backend that will be served on the second cluster.
serve.create_backend("counter1", function)
serve.set_traffic("counter1", {"counter1": 1.0})
# Switch back the the first cluster and create the same backend on it.
serve.init(cluster_name="cluster1")
serve.create_backend("counter1", function)
serve.set_traffic("counter1", {"counter1": 1.0})
Other Resources
---------------

Expand Down
34 changes: 18 additions & 16 deletions python/ray/serve/api.py
Expand Up @@ -7,7 +7,8 @@
SERVE_MASTER_NAME)
from ray.serve.master import ServeMaster
from ray.serve.handle import RayServeHandle
from ray.serve.utils import block_until_http_ready, retry_actor_failures
from ray.serve.utils import (block_until_http_ready, format_actor_name,
retry_actor_failures)
from ray.serve.exceptions import RayServeException
from ray.serve.config import BackendConfig, ReplicaConfig
from ray.serve.router import Query
Expand All @@ -23,16 +24,15 @@ def _get_master_actor():
"""
global master_actor
if master_actor is None:
master_actor = ray.util.get_actor(SERVE_MASTER_NAME)
raise RayServeException("Please run serve.init to initialize or "
"connect to existing ray serve cluster.")
return master_actor


def _ensure_connected(f):
@wraps(f)
def check(*args, **kwargs):
if _get_master_actor() is None:
raise RayServeException("Please run serve.init to initialize or "
"connect to existing ray serve cluster.")
_get_master_actor()
return f(*args, **kwargs)

return check
Expand Down Expand Up @@ -60,7 +60,8 @@ def __call__(self, *, python_arg=None):
return f


def init(blocking=False,
def init(cluster_name=None,
blocking=False,
start_server=True,
http_host=DEFAULT_HTTP_HOST,
http_port=DEFAULT_HTTP_PORT,
Expand All @@ -78,6 +79,9 @@ def init(blocking=False,
requirement.
Args:
cluster_name (str): A unique name for this serve cluster. This allows
multiple serve clusters to run on the same ray cluster. Must be
specified in all subsequent serve.init() calls.
blocking (bool): If true, the function will wait for the HTTP server to
be healthy, and other components to be ready before returns.
start_server (bool): If true, `serve.init` starts http server.
Expand All @@ -92,21 +96,18 @@ def init(blocking=False,
services. RayServe has two options built in: InMemoryExporter and
PrometheusExporter
"""
global master_actor
if master_actor is not None:
return
if cluster_name is not None and not isinstance(cluster_name, str):
raise TypeError("cluster_name must be a string.")

# Initialize ray if needed.
if not ray.is_initialized():
ray.init(**ray_init_kwargs)

# Register serialization context once
ray.register_custom_serializer(Query, Query.ray_serialize,
Query.ray_deserialize)

# Try to get serve master actor if it exists
global master_actor
master_actor_name = format_actor_name(SERVE_MASTER_NAME, cluster_name)
try:
master_actor = ray.util.get_actor(SERVE_MASTER_NAME)
master_actor = ray.util.get_actor(master_actor_name)
return
except ValueError:
pass
Expand All @@ -124,9 +125,10 @@ def init(blocking=False,
http_node_id = ray.state.current_node_id()
master_actor = ServeMaster.options(
detached=True,
name=SERVE_MASTER_NAME,
name=master_actor_name,
max_restarts=-1,
).remote(start_server, http_node_id, http_host, http_port, metric_exporter)
).remote(cluster_name, start_server, http_node_id, http_host, http_port,
metric_exporter)

if start_server and blocking:
block_until_http_ready("http://{}:{}/-/routes".format(
Expand Down
24 changes: 17 additions & 7 deletions python/ray/serve/backend_worker.py
Expand Up @@ -7,7 +7,8 @@
from ray.serve import context as serve_context
from ray.serve.context import FakeFlaskRequest
from collections import defaultdict
from ray.serve.utils import parse_request_item, _get_logger
from ray.serve.utils import (parse_request_item, _get_logger,
retry_actor_failures)
from ray.serve.exceptions import RayServeException
from ray.serve.metric import MetricClient
from ray.async_compat import sync_to_async
Expand All @@ -26,15 +27,24 @@ def create_backend_worker(func_or_class):
assert False, "func_or_class must be function or class."

class RayServeWrappedWorker(object):
def __init__(self, backend_tag, replica_tag, init_args):
serve.init()
def __init__(self,
backend_tag,
replica_tag,
init_args,
cluster_name=None):
serve.init(cluster_name=cluster_name)
if is_function:
_callable = func_or_class
else:
_callable = func_or_class(*init_args)

master = serve.api._get_master_actor()
[metric_exporter] = retry_actor_failures(
master.get_metric_exporter)
metric_client = MetricClient(
metric_exporter, default_labels={"backend": backend_tag})
self.backend = RayServeWorker(backend_tag, replica_tag, _callable,
is_function)
is_function, metric_client)

async def handle_request(self, request):
return await self.backend.handle_request(request)
Expand Down Expand Up @@ -67,14 +77,14 @@ def ensure_async(func):
class RayServeWorker:
"""Handles requests with the provided callable."""

def __init__(self, name, replica_tag, _callable, is_function):
def __init__(self, name, replica_tag, _callable, is_function,
metric_client):
self.name = name
self.replica_tag = replica_tag
self.callable = _callable
self.is_function = is_function

self.metric_client = MetricClient.connect_from_serve(
default_labels={"backend": self.name})
self.metric_client = metric_client
self.request_counter = self.metric_client.new_counter(
"backend_request_counter",
description=("Number of queries that have been "
Expand Down
9 changes: 5 additions & 4 deletions python/ray/serve/http_proxy.py
Expand Up @@ -4,12 +4,12 @@
import uvicorn

import ray
from ray import serve
from ray.serve.context import TaskContext
from ray.serve.metric import MetricClient
from ray.serve.request_params import RequestMetadata
from ray.serve.http_util import Response
from ray.serve.utils import logger, retry_actor_failures_async
from ray.serve.constants import SERVE_MASTER_NAME

from urllib.parse import parse_qs

Expand All @@ -29,7 +29,7 @@ class HTTPProxy:

async def fetch_config_from_master(self):
assert ray.is_initialized()
master = ray.util.get_actor(SERVE_MASTER_NAME)
master = serve.api._get_master_actor()

self.route_table, [
self.router_handle
Expand All @@ -39,7 +39,7 @@ async def fetch_config_from_master(self):
[self.metric_exporter] = await retry_actor_failures_async(
master.get_metric_exporter)

self.metric_client = MetricClient.connect_from_serve()
self.metric_client = MetricClient(self.metric_exporter)
self.request_counter = self.metric_client.new_counter(
"num_http_requests",
description="The number of requests processed",
Expand Down Expand Up @@ -194,7 +194,8 @@ async def __call__(self, scope, receive, send):

@ray.remote
class HTTPProxyActor:
async def __init__(self, host, port):
async def __init__(self, host, port, cluster_name=None):
serve.init(cluster_name=cluster_name)
self.app = HTTPProxy()
await self.app.fetch_config_from_master()
self.host = host
Expand Down
58 changes: 38 additions & 20 deletions python/ray/serve/master.py
Expand Up @@ -13,7 +13,8 @@
from ray.serve.kv_store import RayInternalKVStore
from ray.serve.metric.exporter import MetricExporterActor
from ray.serve.router import Router
from ray.serve.utils import async_retryable, get_random_letters, logger
from ray.serve.utils import (async_retryable, format_actor_name,
get_random_letters, logger)

import numpy as np

Expand Down Expand Up @@ -49,10 +50,13 @@ class ServeMaster:
requires all implementations here to be idempotent.
"""

async def __init__(self, start_http_proxy, http_node_id, http_proxy_host,
http_proxy_port, metric_exporter_class):
async def __init__(self, cluster_name, start_http_proxy, http_node_id,
http_proxy_host, http_proxy_port,
metric_exporter_class):
# Unique name of the serve cluster managed by this actor. Used to
# namespace child actors and checkpoints.
self.cluster_name = cluster_name
# Used to read/write checkpoints.
# TODO(edoakes): namespace the master actor and its checkpoints.
self.kv_store = RayInternalKVStore()
# path -> (endpoint, methods).
self.routes = {}
Expand Down Expand Up @@ -105,7 +109,10 @@ async def __init__(self, start_http_proxy, http_node_id, http_proxy_host,
# a checkpoint to the event loop. Other state-changing calls acquire
# this lock and will be blocked until recovering from the checkpoint
# finishes.
checkpoint = self.kv_store.get(CHECKPOINT_KEY)
checkpoint_key = CHECKPOINT_KEY
if self.cluster_name is not None:
checkpoint_key = "{}:{}".format(self.cluster_name, checkpoint_key)
checkpoint = self.kv_store.get(checkpoint_key)
if checkpoint is None:
logger.debug("No checkpoint found")
else:
Expand All @@ -118,16 +125,17 @@ def _get_or_start_router(self):
If the router does not already exist, it will be started.
"""
router_name = format_actor_name(SERVE_ROUTER_NAME, self.cluster_name)
try:
self.router = ray.util.get_actor(SERVE_ROUTER_NAME)
self.router = ray.util.get_actor(router_name)
except ValueError:
logger.info(
"Starting router with name '{}'".format(SERVE_ROUTER_NAME))
logger.info("Starting router with name '{}'".format(router_name))
self.router = async_retryable(ray.remote(Router)).options(
detached=True,
name=SERVE_ROUTER_NAME,
name=router_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1).remote()
max_restarts=-1,
).remote(cluster_name=self.cluster_name)

def get_router(self):
"""Returns a handle to the router managed by this actor."""
Expand All @@ -138,21 +146,23 @@ def _get_or_start_http_proxy(self, node_id, host, port):
If the HTTP proxy does not already exist, it will be started.
"""
proxy_name = format_actor_name(SERVE_PROXY_NAME, self.cluster_name)
try:
self.http_proxy = ray.util.get_actor(SERVE_PROXY_NAME)
self.http_proxy = ray.util.get_actor(proxy_name)
except ValueError:
logger.info(
"Starting HTTP proxy with name '{}' on node '{}'".format(
SERVE_PROXY_NAME, node_id))
proxy_name, node_id))
self.http_proxy = async_retryable(HTTPProxyActor).options(
detached=True,
name=SERVE_PROXY_NAME,
name=proxy_name,
max_concurrency=ASYNC_CONCURRENCY,
max_restarts=-1,
resources={
node_id: 0.01
},
).remote(host, port)
).remote(
host, port, cluster_name=self.cluster_name)

def get_http_proxy(self):
"""Returns a handle to the HTTP proxy managed by this actor."""
Expand All @@ -167,14 +177,16 @@ def _get_or_start_metric_exporter(self, metric_exporter_class):
If the metric exporter does not already exist, it will be started.
"""
metric_sink_name = format_actor_name(SERVE_METRIC_SINK_NAME,
self.cluster_name)
try:
self.metric_exporter = ray.util.get_actor(SERVE_METRIC_SINK_NAME)
self.metric_exporter = ray.util.get_actor(metric_sink_name)
except ValueError:
logger.info("Starting metric exporter with name '{}'".format(
SERVE_METRIC_SINK_NAME))
metric_sink_name))
self.metric_exporter = MetricExporterActor.options(
detached=True,
name=SERVE_METRIC_SINK_NAME).remote(metric_exporter_class)
name=metric_sink_name).remote(metric_exporter_class)

def get_metric_exporter(self):
"""Returns a handle to the metric exporter managed by this actor."""
Expand Down Expand Up @@ -232,8 +244,10 @@ async def _recover_from_checkpoint(self, checkpoint_bytes):
# were created.
for backend_tag, replica_tags in self.replicas.items():
for replica_tag in replica_tags:
replica_name = format_actor_name(replica_tag,
self.cluster_name)
self.workers[backend_tag][replica_tag] = ray.util.get_actor(
replica_tag)
replica_name)

# Push configuration state to the router.
# TODO(edoakes): should we make this a pull-only model for simplicity?
Expand Down Expand Up @@ -295,12 +309,16 @@ async def _start_backend_worker(self, backend_tag, replica_tag):
(backend_worker, backend_config,
replica_config) = self.backends[backend_tag]

replica_name = format_actor_name(replica_tag, self.cluster_name)
worker_handle = async_retryable(ray.remote(backend_worker)).options(
detached=True,
name=replica_tag,
name=replica_name,
max_restarts=-1,
**replica_config.ray_actor_options).remote(
backend_tag, replica_tag, replica_config.actor_init_args)
backend_tag,
replica_tag,
replica_config.actor_init_args,
cluster_name=self.cluster_name)
# TODO(edoakes): we should probably have a timeout here.
await worker_handle.ready.remote()
return worker_handle
Expand Down
15 changes: 1 addition & 14 deletions python/ray/serve/metric/client.py
Expand Up @@ -6,8 +6,7 @@
convert_event_type_to_class,
MetricMetadata,
)
from ray.serve.utils import (retry_actor_failures, retry_actor_failures_async,
_get_logger)
from ray.serve.utils import retry_actor_failures_async, _get_logger
from ray.serve.constants import METRIC_PUSH_INTERVAL_S

logger = _get_logger()
Expand Down Expand Up @@ -38,18 +37,6 @@ def __init__(
self.push_to_exporter_forever(push_interval))
logger.debug("Initialized client")

@staticmethod
def connect_from_serve(default_labels: Optional[Dict[str, str]] = None):
"""Create the metric client automatically when running inside serve."""
from ray.serve.api import _get_master_actor

master_actor = _get_master_actor()
[metric_exporter] = retry_actor_failures(
master_actor.get_metric_exporter)
return MetricClient(
metric_exporter_actor=metric_exporter,
default_labels=default_labels)

def new_counter(self,
name: str,
*,
Expand Down

0 comments on commit fb23bd6

Please sign in to comment.