Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] add blocking to serve.run() #43227

Merged
merged 9 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/serve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ray.serve.api import (
Application,
Deployment,
_run,
delete,
deployment,
get_app_handle,
Expand Down Expand Up @@ -35,6 +36,7 @@
ray._private.worker.blocking_get_inside_async_warned = True

__all__ = [
"_run",
"batch",
"start",
"HTTPOptions",
Expand Down
76 changes: 54 additions & 22 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import collections
import inspect
import logging
import time
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Union

Expand Down Expand Up @@ -436,7 +437,7 @@ def list_deployments() -> Dict[str, Deployment]:


@PublicAPI(stability="stable")
def run(
def _run(
target: Application,
_blocking: bool = True,
name: str = SERVE_DEFAULT_APP_NAME,
Expand All @@ -445,28 +446,9 @@ def run(
) -> DeploymentHandle:
"""Run an application and return a handle to its ingress deployment.

The application is returned by `Deployment.bind()`. Example:

.. code-block:: python

handle = serve.run(MyDeployment.bind())
ray.get(handle.remote())

Args:
target:
A Serve application returned by `Deployment.bind()`.
name: Application name. If not provided, this will be the only
application running on the cluster (it will delete all others).
route_prefix: Route prefix for HTTP requests. If not provided, it will use
route_prefix of the ingress deployment. If specified neither as an argument
nor in the ingress deployment, the route prefix will default to '/'.
logging_config: Application logging config. If provided, the config will
be applied to all deployments which doesn't have logging config.

Returns:
DeploymentHandle: A handle that can be used to call the application.
This is only used internally with the _blocking not totally blocking the following
code indefinitely until Ctrl-C'd.
"""

if len(name) == 0:
raise RayServeException("Application name must a non-empty string.")

Expand Down Expand Up @@ -530,6 +512,56 @@ def run(
return handle


@PublicAPI(stability="stable")
def run(
target: Application,
blocking: bool = False,
name: str = SERVE_DEFAULT_APP_NAME,
route_prefix: str = DEFAULT.VALUE,
logging_config: Optional[Union[Dict, LoggingConfig]] = None,
) -> DeploymentHandle:
"""Run an application and return a handle to its ingress deployment.

The application is returned by `Deployment.bind()`. Example:

.. code-block:: python

handle = serve.run(MyDeployment.bind())
ray.get(handle.remote())

Args:
target:
A Serve application returned by `Deployment.bind()`.
blocking: Whether this call should be blocking. If True, it
will loop and log status until Ctrl-C'd.
name: Application name. If not provided, this will be the only
application running on the cluster (it will delete all others).
route_prefix: Route prefix for HTTP requests. If not provided, it will use
route_prefix of the ingress deployment. If specified neither as an argument
nor in the ingress deployment, the route prefix will default to '/'.
logging_config: Application logging config. If provided, the config will
be applied to all deployments which doesn't have logging config.

Returns:
DeploymentHandle: A handle that can be used to call the application.
"""
handle = _run(
target=target,
name=name,
route_prefix=route_prefix,
logging_config=logging_config,
)

if blocking:
try:
while True:
# Block, letting Ray print logs to the terminal.
time.sleep(10)
except KeyboardInterrupt:
logger.info("Got KeyboardInterrupt, release blocking...")
return handle


@PublicAPI(stability="stable")
def delete(name: str, _blocking: bool = True):
"""Delete an application by its name.
Expand Down
17 changes: 10 additions & 7 deletions python/ray/serve/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,14 @@ def run(
if is_config:
client.deploy_apps(config, _blocking=False)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edoakes do you think as is is good enough or if we also want to add blocking to client.deploy_apps as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm that's an internal API anyways

cli_logger.success("Submitted deploy config successfully.")
if blocking:
while True:
# Block, letting Ray print logs to the terminal.
time.sleep(10)
else:
serve.run(app, name=name, route_prefix=route_prefix)
# This should not block if reload is true so the watchfiles can be triggered
should_block = blocking and not reload
serve.run(app, blocking=should_block, name=name, route_prefix=route_prefix)
cli_logger.success("Deployed app successfully.")

if reload:
Expand All @@ -618,14 +624,11 @@ def run(
app = _private_api.call_app_builder_with_args_if_necessary(
import_attr(import_path, reload_module=True), args_dict
)
serve.run(app, name=name, route_prefix=route_prefix)
serve.run(
target=app, blocking=True, name=name, route_prefix=route_prefix
)
cli_logger.success("Redeployed app successfully.")

if blocking:
while True:
# Block, letting Ray print logs to the terminal.
time.sleep(10)

except KeyboardInterrupt:
cli_logger.info("Got KeyboardInterrupt, shutting down...")
serve.shutdown()
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ class A:
def __init__(self):
1 / 0

serve.run(A.bind(), _blocking=False)
serve._run(A.bind(), _blocking=False)

def check_for_failed_deployment():
default_app = serve.status().applications[SERVE_DEFAULT_APP_NAME]
Expand Down Expand Up @@ -880,7 +880,7 @@ def __init__(self):
create_engine("mysql://some_wrong_url:3306").connect()

ray_actor_options = {"runtime_env": {"pip": ["PyMySQL", "sqlalchemy==1.3.19"]}}
serve.run(
serve._run(
MyDeployment.options(ray_actor_options=ray_actor_options).bind(),
_blocking=False,
)
Expand Down
6 changes: 3 additions & 3 deletions python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ async def __init__(self):
app = AutoscalingDeployment.bind()

# Start the AutoscalingDeployment.
serve.run(app, name=app_name, _blocking=False)
serve._run(app, name=app_name, _blocking=False)

# Active replicas are replicas that are waiting or running.
expected_num_active_replicas: int = min_replicas
Expand Down Expand Up @@ -1165,7 +1165,7 @@ def check_expected_statuses(
max_replicas=max_replicas,
)
).bind()
serve.run(app, name=app_name, _blocking=False)
serve._run(app, name=app_name, _blocking=False)
expected_num_active_replicas = min_replicas

wait_for_condition(check_num_active_replicas, expected=expected_num_active_replicas)
Expand Down Expand Up @@ -1221,7 +1221,7 @@ def check_expected_statuses(
max_replicas=max_replicas,
)
).bind()
serve.run(app, name=app_name, _blocking=False)
serve._run(app, name=app_name, _blocking=False)
expected_num_active_replicas = min_replicas

wait_for_condition(check_num_active_replicas, expected=expected_num_active_replicas)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class E:
async def __init__(self):
await signal.wait.remote()

serve.run(E.bind(), _blocking=False)
serve._run(E.bind(), _blocking=False)

def get_replicas(replica_state):
controller = client._controller
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1):
# Redeploy new version. Since there is one replica blocking, only one new
# replica should be started up.
V2 = V1.options(func_or_class=V2, version="2")
serve.run(V2.bind(), _blocking=False, name="app")
serve._run(V2.bind(), _blocking=False, name="app")
with pytest.raises(TimeoutError):
client._wait_for_application_running("app", timeout_s=0.1)
responses3, blocking3 = make_nonblocking_calls({"1": 1}, expect_blocking=True)
Expand Down Expand Up @@ -231,7 +231,7 @@ async def __init__(self):
def __call__(self, request):
return f"1|{os.getpid()}"

serve.run(V1.bind(), _blocking=False, name="app")
serve._run(V1.bind(), _blocking=False, name="app")
ray.get(pending_init_indicator.remote())

def get_actor_info(name: str):
Expand Down
8 changes: 5 additions & 3 deletions python/ray/serve/tests/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async def __call__(self, request):
# Redeploy new version. This should not go through until the old version
# replica completely stops.
V2 = V1.options(func_or_class=V2, version="2")
serve.run(V2.bind(), _blocking=False, name="app")
serve._run(V2.bind(), _blocking=False, name="app")
with pytest.raises(TimeoutError):
client._wait_for_application_running("app", timeout_s=0.1)

Expand Down Expand Up @@ -264,7 +264,7 @@ def make_nonblocking_calls(expected, expect_blocking=False):
# Redeploy new version. Since there is one replica blocking, only one new
# replica should be started up.
V2 = V1.options(func_or_class=V2, version="2")
serve.run(V2.bind(), _blocking=False, name="app")
serve._run(V2.bind(), _blocking=False, name="app")
with pytest.raises(TimeoutError):
client._wait_for_application_running("app", timeout_s=0.1)
responses3, blocking3 = make_nonblocking_calls({"1": 1}, expect_blocking=True)
Expand Down Expand Up @@ -351,7 +351,9 @@ def make_nonblocking_calls(expected, expect_blocking=False):

# Reconfigure should block one replica until the signal is sent. Check that
# some requests are now blocking.
serve.run(V1.options(user_config={"test": "2"}).bind(), name="app", _blocking=False)
serve._run(
V1.options(user_config={"test": "2"}).bind(), name="app", _blocking=False
)
responses2, blocking2 = make_nonblocking_calls({"1": 1}, expect_blocking=True)
assert list(responses2["1"])[0] in pids1

Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_deploy_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class Model:
def __call__(self):
return "hello world"

serve.run(Model.bind(), _blocking=False)
serve._run(Model.bind(), _blocking=False)

def check_fail():
app_status = serve.status().applications["default"]
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def make_blocked_request():
for _ in range(2):
starting_actor = SignalActor.remote()
finish_starting_actor = SignalActor.remote()
serve.run(
serve._run(
SlowStarter.bind(starting_actor, finish_starting_actor), _blocking=False
)

Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ def test_updating_status_message(lower_slow_startup_threshold_and_reset):
def f(*args):
pass

serve.run(f.bind(), _blocking=False)
serve._run(f.bind(), _blocking=False)

def updating_message():
deployment_status = (
Expand Down Expand Up @@ -699,7 +699,7 @@ def __init__(self):
def __call__(self, request):
pass

serve.run(f.bind(), _blocking=False)
serve._run(f.bind(), _blocking=False)

wait_for_condition(
lambda: serve.status()
Expand Down
4 changes: 2 additions & 2 deletions release/long_running_tests/workloads/serve_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def handler(self, *args):

if blocking:
ray.get(self.random_killer.spare.remote(new_name))
serve.run(
serve._run(
handler.bind(),
name=new_name,
route_prefix=f"/{new_name}",
Expand All @@ -149,7 +149,7 @@ def handler(self, *args):
self.applications.append(new_name)
ray.get(self.random_killer.stop_spare.remote(new_name))
else:
serve.run(
serve._run(
handler.bind(),
name=new_name,
route_prefix=f"/{new_name}",
Expand Down
Loading