Skip to content

Commit

Permalink
[serve] Remove global state, instead access the master actor directly (
Browse files Browse the repository at this point in the history
…#7914)

* Move _scale() to master actor

* move create_backend

* Move set_backend_config

* Move get_backend_config

* Remove backend_table from global_state

* Remove global_state, just access master directly

* Remove accidental addition
  • Loading branch information
edoakes committed Apr 7, 2020
1 parent 8131414 commit 1be87c7
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 74 deletions.
63 changes: 29 additions & 34 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import ray
from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
SERVE_MASTER_NAME)
from ray.serve.global_state import GlobalState, ServeMaster
from ray.serve.master import ServeMaster
from ray.serve.handle import RayServeHandle
from ray.serve.kv_store_service import SQLiteKVStore
from ray.serve.task_runner import RayServeMixin, TaskRunnerActor
from ray.serve.utils import block_until_http_ready
Expand All @@ -17,20 +18,20 @@
from ray.serve.queues import Query
from ray.serve.request_params import RequestMetadata

global_state = None
master_actor = None


def _get_global_state():
"""Used for internal purpose. Because just import serve.global_state
will always reference the original None object
def _get_master_actor():
"""Used for internal purpose because using just import serve.global_state
will always reference the original None object.
"""
return global_state
return master_actor


def _ensure_connected(f):
@wraps(f)
def check(*args, **kwargs):
if _get_global_state() is None:
if _get_master_actor() is None:
raise RayServeException("Please run serve.init to initialize or "
"connect to existing ray serve cluster.")
return f(*args, **kwargs)
Expand Down Expand Up @@ -103,9 +104,8 @@ def init(
the backend for a service. (Default: RoutePolicy.Random)
policy_kwargs: Arguments required to instantiate a queueing policy
"""
global global_state
# Noop if global_state is no longer None
if global_state is not None:
global master_actor
if master_actor is not None:
return

# Initialize ray if needed.
Expand All @@ -118,8 +118,7 @@ def init(

# Try to get serve master actor if it exists
try:
ray.util.get_actor(SERVE_MASTER_NAME)
global_state = GlobalState()
master_actor = ray.util.get_actor(SERVE_MASTER_NAME)
return
except ValueError:
pass
Expand All @@ -135,20 +134,20 @@ def init(
_, kv_store_path = mkstemp()

# Serve has not been initialized, perform init sequence
# TODO move the db to session_dir
# TODO move the db to session_dir.
# ray.worker._global_node.address_info["session_dir"]
def kv_store_connector(namespace):
return SQLiteKVStore(namespace, db_path=kv_store_path)

master = ServeMaster.options(
master_actor = ServeMaster.options(
detached=True, name=SERVE_MASTER_NAME).remote(kv_store_connector)

ray.get(master.start_router.remote(queueing_policy.value, policy_kwargs))
ray.get(
master_actor.start_router.remote(queueing_policy.value, policy_kwargs))

global_state = GlobalState(master)
ray.get(master.start_metric_monitor.remote(gc_window_seconds))
ray.get(master_actor.start_metric_monitor.remote(gc_window_seconds))
if start_server:
ray.get(master.start_http_proxy.remote(http_host, http_port))
ray.get(master_actor.start_http_proxy.remote(http_host, http_port))

if start_server and blocking:
block_until_http_ready("http://{}:{}/-/routes".format(
Expand All @@ -168,8 +167,8 @@ def create_endpoint(endpoint_name, route=None, methods=["GET"]):
registered before returning
"""
ray.get(
global_state.master_actor.create_endpoint.remote(
route, endpoint_name, [m.upper() for m in methods]))
master_actor.create_endpoint.remote(route, endpoint_name,
[m.upper() for m in methods]))


@_ensure_connected
Expand All @@ -181,8 +180,7 @@ def set_backend_config(backend_tag, backend_config):
backend_config(BackendConfig) : Desired backend configuration.
"""
ray.get(
global_state.master_actor.set_backend_config.remote(
backend_tag, backend_config))
master_actor.set_backend_config.remote(backend_tag, backend_config))


@_ensure_connected
Expand All @@ -192,8 +190,7 @@ def get_backend_config(backend_tag):
Args:
backend_tag(str): A registered backend.
"""
return ray.get(
global_state.master_actor.get_backend_config.remote(backend_tag))
return ray.get(master_actor.get_backend_config.remote(backend_tag))


def _backend_accept_batch(func_or_class):
Expand Down Expand Up @@ -258,8 +255,8 @@ def __init__(self, *args, **kwargs):
type(func_or_class)))

ray.get(
global_state.master_actor.create_backend.remote(
backend_tag, creator, backend_config, arg_list))
master_actor.create_backend.remote(backend_tag, creator,
backend_config, arg_list))


@_ensure_connected
Expand Down Expand Up @@ -295,8 +292,8 @@ def split(endpoint_name, traffic_policy_dictionary):
to their traffic weights. The weights must sum to 1.
"""
ray.get(
global_state.master_actor.split_traffic.remote(
endpoint_name, traffic_policy_dictionary))
master_actor.split_traffic.remote(endpoint_name,
traffic_policy_dictionary))


@_ensure_connected
Expand All @@ -319,13 +316,11 @@ def get_handle(endpoint_name,
RayServeHandle
"""
if not missing_ok:
assert endpoint_name in global_state.get_all_endpoints()

# Delay import due to it's dependency on global_state
from ray.serve.handle import RayServeHandle
assert endpoint_name in ray.get(
master_actor.get_all_endpoints.remote())

return RayServeHandle(
global_state.get_router(),
ray.get(master_actor.get_router.remote())[0],
endpoint_name,
relative_slo_ms,
absolute_slo_ms,
Expand All @@ -344,7 +339,7 @@ def stat(percentiles=[50, 90, 95],
The longest aggregation window must be shorter or equal to the
gc_window_seconds.
"""
monitor = global_state.get_metric_monitor()
[monitor] = ray.get(master_actor.get_metric_monitor.remote())
return ray.get(monitor.collect.remote(percentiles, agg_windows_seconds))


Expand Down
9 changes: 7 additions & 2 deletions python/ray/serve/handle.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ray
from ray import serve
from ray.serve.context import TaskContext
from ray.serve.exceptions import RayServeException
Expand Down Expand Up @@ -105,9 +106,13 @@ def options(self,
def get_http_endpoint(self):
return DEFAULT_HTTP_ADDRESS

def get_traffic_policy(self):
master_actor = serve.api._get_master_actor()
return ray.get(
master_actor.get_traffic_policy.remote(self.endpoint_name))

def _ensure_backend_unique(self, backend_tag=None):
global_state = serve.api._get_global_state()
traffic_policy = global_state.get_traffic_policy(self.endpoint_name)
traffic_policy = self.get_traffic_policy()
if backend_tag is None:
assert len(traffic_policy) == 1, (
"Multiple backends detected. "
Expand Down
30 changes: 1 addition & 29 deletions python/ray/serve/global_state.py → python/ray/serve/master.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import ray
from ray.serve.backend_config import BackendConfig
from ray.serve.constants import (SERVE_MASTER_NAME, ASYNC_CONCURRENCY)
from ray.serve.constants import ASYNC_CONCURRENCY
from ray.serve.exceptions import batch_annotation_not_found
from ray.serve.http_proxy import HTTPProxyActor
from ray.serve.kv_store_service import (BackendTable, RoutingTable,
Expand Down Expand Up @@ -232,31 +232,3 @@ def get_backend_config(self, backend_tag):
), "Backend {} is not registered.".format(backend_tag)
backend_config_dict = self.backend_table.get_info(backend_tag)
return BackendConfig(**backend_config_dict)


class GlobalState:
"""Encapsulate all global state in the serving system.
The information is fetch lazily from
1. A collection of namespaced key value stores
2. A actor supervisor service
"""

def __init__(self, master_actor=None):
# Get actor nursery handle.
if master_actor is None:
master_actor = ray.util.get_actor(SERVE_MASTER_NAME)
self.master_actor = master_actor

def get_router(self):
return ray.get(self.master_actor.get_router.remote())[0]

def get_metric_monitor(self):
return ray.get(self.master_actor.get_metric_monitor.remote())[0]

def get_traffic_policy(self, endpoint_name):
return ray.get(
self.master_actor.get_traffic_policy.remote(endpoint_name))

def get_all_endpoints(self):
return ray.get(self.master_actor.get_all_endpoints.remote())
18 changes: 9 additions & 9 deletions python/ray/serve/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


def test_e2e(serve_instance):
serve.init() # so we have access to global state
serve.init()
serve.create_endpoint("endpoint", "/api", methods=["GET", "POST"])

retry_count = 5
Expand Down Expand Up @@ -181,19 +181,19 @@ def __call__(self, flask_request, temp=None):
serve.create_endpoint("simple", "/simple")
b_config = BackendConfig(num_replicas=3, num_cpus=2)
serve.create_backend(Simple, "simple:v1", backend_config=b_config)
global_state = serve.api._get_global_state()
master_actor = serve.api._get_master_actor()
old_replica_tag_list = ray.get(
global_state.master_actor._list_replicas.remote("simple:v1"))
master_actor._list_replicas.remote("simple:v1"))

bnew_config = serve.get_backend_config("simple:v1")
# change the config
bnew_config.num_cpus = 1
# set the config
serve.set_backend_config("simple:v1", bnew_config)
new_replica_tag_list = ray.get(
global_state.master_actor._list_replicas.remote("simple:v1"))
master_actor._list_replicas.remote("simple:v1"))
new_all_tag_list = list(
ray.get(global_state.master_actor.get_all_handles.remote()).keys())
ray.get(master_actor.get_all_handles.remote()).keys())

# the new_replica_tag_list must be subset of all_tag_list
assert set(new_replica_tag_list) <= set(new_all_tag_list)
Expand All @@ -215,19 +215,19 @@ def __call__(self, flask_request, temp=None):
serve.create_endpoint("bsimple", "/bsimple")
b_config = BackendConfig(num_replicas=3, max_batch_size=2)
serve.create_backend(BatchSimple, "bsimple:v1", backend_config=b_config)
global_state = serve.api._get_global_state()
master_actor = serve.api._get_master_actor()
old_replica_tag_list = ray.get(
global_state.master_actor._list_replicas.remote("bsimple:v1"))
master_actor._list_replicas.remote("bsimple:v1"))

bnew_config = serve.get_backend_config("bsimple:v1")
# change the config
bnew_config.max_batch_size = 5
# set the config
serve.set_backend_config("bsimple:v1", bnew_config)
new_replica_tag_list = ray.get(
global_state.master_actor._list_replicas.remote("bsimple:v1"))
master_actor._list_replicas.remote("bsimple:v1"))
new_all_tag_list = list(
ray.get(global_state.master_actor.get_all_handles.remote()).keys())
ray.get(master_actor.get_all_handles.remote()).keys())

# the old and new replica tag list should be identical
# and should be subset of all_tag_list
Expand Down

0 comments on commit 1be87c7

Please sign in to comment.