Skip to content

Commit

Permalink
[Serve] Push route table updates to HTTP proxy (#7774)
Browse files Browse the repository at this point in the history
  • Loading branch information
edoakes committed Mar 30, 2020
1 parent f889f93 commit 3a53ea6
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 84 deletions.
31 changes: 17 additions & 14 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import ray
from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
SERVE_NURSERY_NAME)
SERVE_MASTER_NAME)
from ray.serve.global_state import GlobalState, start_initial_state
from ray.serve.kv_store_service import SQLiteKVStore
from ray.serve.task_runner import RayServeMixin, TaskRunnerActor
Expand Down Expand Up @@ -114,9 +114,9 @@ def init(
if not ray.is_initialized():
ray.init(**ray_init_kwargs)

# Try to get serve nursery if there exists
# Try to get serve master actor if it exists
try:
ray.util.get_actor(SERVE_NURSERY_NAME)
ray.util.get_actor(SERVE_MASTER_NAME)
global_state = GlobalState()
return
except ValueError:
Expand All @@ -138,15 +138,16 @@ def init(
def kv_store_connector(namespace):
return SQLiteKVStore(namespace, db_path=kv_store_path)

nursery = start_initial_state(kv_store_connector)
master = start_initial_state(kv_store_connector)

global_state = GlobalState(nursery)
if start_server:
global_state.init_or_get_http_server(host=http_host, port=http_port)
global_state.init_or_get_router(
global_state = GlobalState(master)
router = global_state.init_or_get_router(
queueing_policy=queueing_policy, policy_kwargs=policy_kwargs)
global_state.init_or_get_metric_monitor(
gc_window_seconds=gc_window_seconds)
if start_server:
global_state.init_or_get_http_proxy(
host=http_host, port=http_port).set_router_handle.remote(router)

if start_server and blocking:
block_until_http_ready("http://{}:{}/-/routes".format(
Expand All @@ -168,6 +169,9 @@ def create_endpoint(endpoint_name, route=None, methods=["GET"]):
methods = [m.upper() for m in methods]
global_state.route_table.register_service(
route, endpoint_name, methods=methods)
ray.get(global_state.init_or_get_http_proxy().set_route_table.remote(
global_state.route_table.list_service(
include_methods=True, include_headless=False)))


@_ensure_connected
Expand Down Expand Up @@ -321,9 +325,9 @@ def _start_replica(backend_tag):
# get actor creation kwargs
actor_kwargs = backend_config.get_actor_creation_args(init_args)

# Create the runner in the nursery
# Create the runner in the master actor
[runner_handle] = ray.get(
global_state.actor_nursery_handle.start_actor_with_creator.remote(
global_state.master_actor_handle.start_actor_with_creator.remote(
creator, actor_kwargs, replica_tag))

# Setup the worker
Expand All @@ -347,15 +351,14 @@ def _remove_replica(backend_tag):

replica_tag = global_state.backend_table.remove_replica(backend_tag)
[replica_handle] = ray.get(
global_state.actor_nursery_handle.get_handle.remote(replica_tag))
global_state.master_actor_handle.get_handle.remote(replica_tag))

# Remove the replica from metric monitor.
ray.get(global_state.init_or_get_metric_monitor().remove_target.remote(
replica_handle))

# Remove the replica from actor nursery.
ray.get(
global_state.actor_nursery_handle.remove_handle.remote(replica_tag))
# Remove the replica from master actor.
ray.get(global_state.master_actor_handle.remove_handle.remote(replica_tag))

# Remove the replica from router.
# This will also destory the actor handle.
Expand Down
7 changes: 2 additions & 5 deletions python/ray/serve/constants.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#: The interval which http server refreshes its routing table
HTTP_ROUTER_CHECKER_INTERVAL_S = 2

#: Actor name used to register actor nursery
SERVE_NURSERY_NAME = "SERVE_ACTOR_NURSERY"
#: Actor name used to register master actor
SERVE_MASTER_NAME = "SERVE_MASTER_ACTOR"

#: KVStore connector key in bootstrap config
BOOTSTRAP_KV_STORE_CONN_KEY = "kv_store_connector"
Expand Down
48 changes: 24 additions & 24 deletions python/ray/serve/global_state.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import ray
from ray.serve.constants import (BOOTSTRAP_KV_STORE_CONN_KEY,
DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
SERVE_NURSERY_NAME, ASYNC_CONCURRENCY)
SERVE_MASTER_NAME, ASYNC_CONCURRENCY)
from ray.serve.kv_store_service import (BackendTable, RoutingTable,
TrafficPolicyTable)
from ray.serve.metric import (MetricMonitor, start_metric_monitor_loop)

from ray.serve.policy import RoutePolicy
from ray.serve.server import HTTPActor
from ray.serve.http_proxy import HTTPProxyActor


def start_initial_state(kv_store_connector):
nursery_handle = ActorNursery.remote()
ray.util.register_actor(SERVE_NURSERY_NAME, nursery_handle)
master_handle = ServeMaster.remote()
ray.util.register_actor(SERVE_MASTER_NAME, master_handle)

ray.get(
nursery_handle.store_bootstrap_state.remote(
BOOTSTRAP_KV_STORE_CONN_KEY, kv_store_connector))
return nursery_handle
master_handle.store_bootstrap_state.remote(BOOTSTRAP_KV_STORE_CONN_KEY,
kv_store_connector))
return master_handle


@ray.remote
class ActorNursery:
class ServeMaster:
"""Initialize and store all actor handles.
Note:
Expand Down Expand Up @@ -88,15 +88,15 @@ class GlobalState:
2. A actor supervisor service
"""

def __init__(self, actor_nursery_handle=None):
def __init__(self, master_actor_handle=None):
# Get actor nursery handle
if actor_nursery_handle is None:
actor_nursery_handle = ray.util.get_actor(SERVE_NURSERY_NAME)
self.actor_nursery_handle = actor_nursery_handle
if master_actor_handle is None:
master_actor_handle = ray.util.get_actor(SERVE_MASTER_NAME)
self.master_actor_handle = master_actor_handle

# Connect to all the table
bootstrap_config = ray.get(
self.actor_nursery_handle.get_bootstrap_state_dict.remote())
self.master_actor_handle.get_bootstrap_state_dict.remote())
kv_store_connector = bootstrap_config[BOOTSTRAP_KV_STORE_CONN_KEY]
self.route_table = RoutingTable(kv_store_connector)
self.backend_table = BackendTable(kv_store_connector)
Expand All @@ -106,25 +106,25 @@ def __init__(self, actor_nursery_handle=None):

def refresh_actor_handle_cache(self):
self.actor_handle_cache = ray.get(
self.actor_nursery_handle.get_all_handles.remote())
self.master_actor_handle.get_all_handles.remote())

def init_or_get_http_server(self,
host=DEFAULT_HTTP_HOST,
port=DEFAULT_HTTP_PORT):
if "http_server" not in self.actor_handle_cache:
def init_or_get_http_proxy(self,
host=DEFAULT_HTTP_HOST,
port=DEFAULT_HTTP_PORT):
if "http_proxy" not in self.actor_handle_cache:
[handle] = ray.get(
self.actor_nursery_handle.start_actor.remote(
HTTPActor, tag="http_server"))
self.master_actor_handle.start_actor.remote(
HTTPProxyActor, tag="http_proxy"))

handle.run.remote(host=host, port=port)
self.refresh_actor_handle_cache()
return self.actor_handle_cache["http_server"]
return self.actor_handle_cache["http_proxy"]

def _get_queueing_policy(self, default_policy):
return_policy = default_policy
# check if there is already a queue_actor running
# with policy as p.name for the case where
# serve nursery exists: ray.util.get_actor(SERVE_NURSERY_NAME)
# serve nursery exists: ray.util.get_actor(SERVE_MASTER_NAME)
for p in RoutePolicy:
queue_actor_tag = "queue_actor::" + p.name
if queue_actor_tag in self.actor_handle_cache:
Expand All @@ -141,7 +141,7 @@ def init_or_get_router(self,
queue_actor_tag = "queue_actor::" + self.queueing_policy.name
if queue_actor_tag not in self.actor_handle_cache:
[handle] = ray.get(
self.actor_nursery_handle.start_actor.remote(
self.master_actor_handle.start_actor.remote(
self.queueing_policy.value,
init_kwargs=policy_kwargs,
tag=queue_actor_tag,
Expand All @@ -154,7 +154,7 @@ def init_or_get_router(self,
def init_or_get_metric_monitor(self, gc_window_seconds=3600):
if "metric_monitor" not in self.actor_handle_cache:
[handle] = ray.get(
self.actor_nursery_handle.start_actor.remote(
self.master_actor_handle.start_actor.remote(
MetricMonitor,
init_args=(gc_window_seconds, ),
tag="metric_monitor"))
Expand Down
74 changes: 38 additions & 36 deletions python/ray/serve/server.py → python/ray/serve/http_proxy.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import asyncio
import socket

import uvicorn

import ray
from ray.experimental.async_api import _async_init
from ray.serve.constants import HTTP_ROUTER_CHECKER_INTERVAL_S
from ray.serve.context import TaskContext
from ray.serve.request_params import RequestMetadata
from ray.serve.http_util import Response
Expand All @@ -23,39 +21,24 @@ class HTTPProxy:

def __init__(self):
assert ray.is_initialized()
# Must be set via set_route_table.
self.route_table = dict()
# Must be set via set_router_handle.
self.router_handle = None

# Delay import due to GlobalState depends on HTTP actor
from ray.serve.global_state import GlobalState
def set_route_table(self, route_table):
self.route_table = route_table

self.serve_global_state = GlobalState()
self.route_table_cache = dict()

self.route_checker_task = None
self.route_checker_should_shutdown = False

async def route_checker(self, interval):
while True:
if self.route_checker_should_shutdown:
return

self.route_table_cache = (
self.serve_global_state.route_table.list_service(
include_methods=True, include_headless=False))

await asyncio.sleep(interval)
def set_router_handle(self, router_handle):
self.router_handle = router_handle

async def handle_lifespan_message(self, scope, receive, send):
assert scope["type"] == "lifespan"

message = await receive()
if message["type"] == "lifespan.startup":
await _async_init()
self.route_checker_task = asyncio.get_event_loop().create_task(
self.route_checker(interval=HTTP_ROUTER_CHECKER_INTERVAL_S))
await send({"type": "lifespan.startup.complete"})
elif message["type"] == "lifespan.shutdown":
self.route_checker_task.cancel()
self.route_checker_should_shutdown = True
await send({"type": "lifespan.shutdown.complete"})

async def receive_http_body(self, scope, receive, send):
Expand Down Expand Up @@ -113,22 +96,24 @@ async def __call__(self, scope, receive, send):

error_sender = self._make_error_sender(scope, receive, send)

assert self.route_table is not None, (
"Route table must be set via set_route_table.")
assert scope["type"] == "http"
current_path = scope["path"]
if current_path == "/-/routes":
await Response(self.route_table_cache).send(scope, receive, send)
await Response(self.route_table).send(scope, receive, send)
return

# TODO(simon): Use werkzeug route mapper to support variable path
if current_path not in self.route_table_cache:
if current_path not in self.route_table:
error_message = (
"Path {} not found. "
"Please ping http://.../-/routes for routing table"
).format(current_path)
await error_sender(error_message, 404)
return

endpoint_name, methods_allowed = self.route_table_cache[current_path]
endpoint_name, methods_allowed = self.route_table[current_path]

if scope["method"] not in methods_allowed:
error_message = ("Methods {} not allowed. "
Expand All @@ -154,21 +139,38 @@ async def __call__(self, scope, receive, send):
absolute_slo_ms=absolute_slo_ms,
call_method=headers.get("X-SERVE-CALL-METHOD".lower(), "__call__"))

assert self.route_table is not None, (
"Router handle must be set via set_router_handle.")
try:
result = await (self.serve_global_state.init_or_get_router()
.enqueue_request.remote(request_metadata, scope,
http_body_bytes))
result = await self.router_handle.enqueue_request.remote(
request_metadata, scope, http_body_bytes)
await Response(result).send(scope, receive, send)
except Exception as e:
error_message = "Internal Error. Traceback: {}.".format(e)
await error_sender(error_message, 500)


@ray.remote
class HTTPActor:
class HTTPProxyActor:
def __init__(self):
self.app = HTTPProxy()

def run(self, host="0.0.0.0", port=8000):
uvicorn.run(
self.app, host=host, port=port, lifespan="on", access_log=False)
async def run(self, host="0.0.0.0", port=8000):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
sock.set_inheritable(True)

config = uvicorn.Config(self.app, lifespan="on", access_log=False)
server = uvicorn.Server(config=config)
# TODO(edoakes): we need to override install_signal_handlers here
# because the existing implementation fails if it isn't running in
# the main thread and uvicorn doesn't expose a way to configure it.
server.install_signal_handlers = lambda: None
await server.serve(sockets=[sock])

async def set_route_table(self, route_table):
self.app.set_route_table(route_table)

async def set_router_handle(self, router_handle):
self.app.set_router_handle(router_handle)
8 changes: 4 additions & 4 deletions python/ray/serve/kv_store_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,13 @@ def get_request_count(self):
This method is used for two purpose:
1. Make sure HTTP server has started and healthy. Incremented request
count means HTTP server is actively fetching routing table.
1. Make sure HTTP proxy has started and healthy. Incremented request
count means HTTP proxy is actively fetching routing table.
2. Make sure HTTP server does not have stale routing table. This number
2. Make sure HTTP proxy does not have stale routing table. This number
should be incremented every HTTP_ROUTER_CHECKER_INTERVAL_S seconds.
Supervisor should check this number as indirect indicator of http
server's health.
proxy's health.
"""
return self.request_count

Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def block_until_http_ready(http_endpoint, num_retries=5, backoff_time_s=1):
retries -= 1
if retries == 0:
raise Exception(
"HTTP server not ready after {} retries.".format(num_retries))
"HTTP proxy not ready after {} retries.".format(num_retries))


def get_random_letters(length=6):
Expand Down

0 comments on commit 3a53ea6

Please sign in to comment.