diff --git a/python/ray/serve/__init__.py b/python/ray/serve/__init__.py index 3b4e5f65b0d6..61f8f3f9ea8f 100644 --- a/python/ray/serve/__init__.py +++ b/python/ray/serve/__init__.py @@ -4,6 +4,7 @@ from ray.serve.api import ( Application, Deployment, + _run, delete, deployment, get_app_handle, @@ -35,6 +36,7 @@ ray._private.worker.blocking_get_inside_async_warned = True __all__ = [ + "_run", "batch", "start", "HTTPOptions", diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 5c8f83e51378..86c21faaf843 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -1,6 +1,7 @@ import collections import inspect import logging +import time from functools import wraps from typing import Any, Callable, Dict, List, Optional, Union @@ -436,7 +437,7 @@ def list_deployments() -> Dict[str, Deployment]: @PublicAPI(stability="stable") -def run( +def _run( target: Application, _blocking: bool = True, name: str = SERVE_DEFAULT_APP_NAME, @@ -445,28 +446,9 @@ def run( ) -> DeploymentHandle: """Run an application and return a handle to its ingress deployment. - The application is returned by `Deployment.bind()`. Example: - - .. code-block:: python - - handle = serve.run(MyDeployment.bind()) - ray.get(handle.remote()) - - Args: - target: - A Serve application returned by `Deployment.bind()`. - name: Application name. If not provided, this will be the only - application running on the cluster (it will delete all others). - route_prefix: Route prefix for HTTP requests. If not provided, it will use - route_prefix of the ingress deployment. If specified neither as an argument - nor in the ingress deployment, the route prefix will default to '/'. - logging_config: Application logging config. If provided, the config will - be applied to all deployments which doesn't have logging config. - - Returns: - DeploymentHandle: A handle that can be used to call the application. + This is only used internally with the _blocking not totally blocking the following + code indefinitely until Ctrl-C'd. """ - if len(name) == 0: raise RayServeException("Application name must a non-empty string.") @@ -530,6 +512,56 @@ def run( return handle +@PublicAPI(stability="stable") +def run( + target: Application, + blocking: bool = False, + name: str = SERVE_DEFAULT_APP_NAME, + route_prefix: str = DEFAULT.VALUE, + logging_config: Optional[Union[Dict, LoggingConfig]] = None, +) -> DeploymentHandle: + """Run an application and return a handle to its ingress deployment. + + The application is returned by `Deployment.bind()`. Example: + + .. code-block:: python + + handle = serve.run(MyDeployment.bind()) + ray.get(handle.remote()) + + Args: + target: + A Serve application returned by `Deployment.bind()`. + blocking: Whether this call should be blocking. If True, it + will loop and log status until Ctrl-C'd. + name: Application name. If not provided, this will be the only + application running on the cluster (it will delete all others). + route_prefix: Route prefix for HTTP requests. If not provided, it will use + route_prefix of the ingress deployment. If specified neither as an argument + nor in the ingress deployment, the route prefix will default to '/'. + logging_config: Application logging config. If provided, the config will + be applied to all deployments which doesn't have logging config. + + Returns: + DeploymentHandle: A handle that can be used to call the application. + """ + handle = _run( + target=target, + name=name, + route_prefix=route_prefix, + logging_config=logging_config, + ) + + if blocking: + try: + while True: + # Block, letting Ray print logs to the terminal. + time.sleep(10) + except KeyboardInterrupt: + logger.info("Got KeyboardInterrupt, release blocking...") + return handle + + @PublicAPI(stability="stable") def delete(name: str, _blocking: bool = True): """Delete an application by its name. diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py index 1419bcfa0d9f..85ea4526f475 100644 --- a/python/ray/serve/scripts.py +++ b/python/ray/serve/scripts.py @@ -590,8 +590,14 @@ def run( if is_config: client.deploy_apps(config, _blocking=False) cli_logger.success("Submitted deploy config successfully.") + if blocking: + while True: + # Block, letting Ray print logs to the terminal. + time.sleep(10) else: - serve.run(app, name=name, route_prefix=route_prefix) + # This should not block if reload is true so the watchfiles can be triggered + should_block = blocking and not reload + serve.run(app, blocking=should_block, name=name, route_prefix=route_prefix) cli_logger.success("Deployed app successfully.") if reload: @@ -618,14 +624,11 @@ def run( app = _private_api.call_app_builder_with_args_if_necessary( import_attr(import_path, reload_module=True), args_dict ) - serve.run(app, name=name, route_prefix=route_prefix) + serve.run( + target=app, blocking=True, name=name, route_prefix=route_prefix + ) cli_logger.success("Redeployed app successfully.") - if blocking: - while True: - # Block, letting Ray print logs to the terminal. - time.sleep(10) - except KeyboardInterrupt: cli_logger.info("Got KeyboardInterrupt, shutting down...") serve.shutdown() diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 8a4edd4d9c47..45f62b5eba48 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -848,7 +848,7 @@ class A: def __init__(self): 1 / 0 - serve.run(A.bind(), _blocking=False) + serve._run(A.bind(), _blocking=False) def check_for_failed_deployment(): default_app = serve.status().applications[SERVE_DEFAULT_APP_NAME] @@ -880,7 +880,7 @@ def __init__(self): create_engine("mysql://some_wrong_url:3306").connect() ray_actor_options = {"runtime_env": {"pip": ["PyMySQL", "sqlalchemy==1.3.19"]}} - serve.run( + serve._run( MyDeployment.options(ray_actor_options=ray_actor_options).bind(), _blocking=False, ) diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index 2241dd05c2ed..19b9189a0a77 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -1081,7 +1081,7 @@ async def __init__(self): app = AutoscalingDeployment.bind() # Start the AutoscalingDeployment. - serve.run(app, name=app_name, _blocking=False) + serve._run(app, name=app_name, _blocking=False) # Active replicas are replicas that are waiting or running. expected_num_active_replicas: int = min_replicas @@ -1165,7 +1165,7 @@ def check_expected_statuses( max_replicas=max_replicas, ) ).bind() - serve.run(app, name=app_name, _blocking=False) + serve._run(app, name=app_name, _blocking=False) expected_num_active_replicas = min_replicas wait_for_condition(check_num_active_replicas, expected=expected_num_active_replicas) @@ -1221,7 +1221,7 @@ def check_expected_statuses( max_replicas=max_replicas, ) ).bind() - serve.run(app, name=app_name, _blocking=False) + serve._run(app, name=app_name, _blocking=False) expected_num_active_replicas = min_replicas wait_for_condition(check_num_active_replicas, expected=expected_num_active_replicas) diff --git a/python/ray/serve/tests/test_cluster.py b/python/ray/serve/tests/test_cluster.py index bda10c2de4f4..4a123d716b4a 100644 --- a/python/ray/serve/tests/test_cluster.py +++ b/python/ray/serve/tests/test_cluster.py @@ -156,7 +156,7 @@ class E: async def __init__(self): await signal.wait.remote() - serve.run(E.bind(), _blocking=False) + serve._run(E.bind(), _blocking=False) def get_replicas(replica_state): controller = client._controller diff --git a/python/ray/serve/tests/test_controller_recovery.py b/python/ray/serve/tests/test_controller_recovery.py index 1222314b25d5..ba06e3cc4880 100644 --- a/python/ray/serve/tests/test_controller_recovery.py +++ b/python/ray/serve/tests/test_controller_recovery.py @@ -191,7 +191,7 @@ def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1): # Redeploy new version. Since there is one replica blocking, only one new # replica should be started up. V2 = V1.options(func_or_class=V2, version="2") - serve.run(V2.bind(), _blocking=False, name="app") + serve._run(V2.bind(), _blocking=False, name="app") with pytest.raises(TimeoutError): client._wait_for_application_running("app", timeout_s=0.1) responses3, blocking3 = make_nonblocking_calls({"1": 1}, expect_blocking=True) @@ -231,7 +231,7 @@ async def __init__(self): def __call__(self, request): return f"1|{os.getpid()}" - serve.run(V1.bind(), _blocking=False, name="app") + serve._run(V1.bind(), _blocking=False, name="app") ray.get(pending_init_indicator.remote()) def get_actor_info(name: str): diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index 9ced1f4de065..32bb9cc51bc6 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -149,7 +149,7 @@ async def __call__(self, request): # Redeploy new version. This should not go through until the old version # replica completely stops. V2 = V1.options(func_or_class=V2, version="2") - serve.run(V2.bind(), _blocking=False, name="app") + serve._run(V2.bind(), _blocking=False, name="app") with pytest.raises(TimeoutError): client._wait_for_application_running("app", timeout_s=0.1) @@ -264,7 +264,7 @@ def make_nonblocking_calls(expected, expect_blocking=False): # Redeploy new version. Since there is one replica blocking, only one new # replica should be started up. V2 = V1.options(func_or_class=V2, version="2") - serve.run(V2.bind(), _blocking=False, name="app") + serve._run(V2.bind(), _blocking=False, name="app") with pytest.raises(TimeoutError): client._wait_for_application_running("app", timeout_s=0.1) responses3, blocking3 = make_nonblocking_calls({"1": 1}, expect_blocking=True) @@ -351,7 +351,9 @@ def make_nonblocking_calls(expected, expect_blocking=False): # Reconfigure should block one replica until the signal is sent. Check that # some requests are now blocking. - serve.run(V1.options(user_config={"test": "2"}).bind(), name="app", _blocking=False) + serve._run( + V1.options(user_config={"test": "2"}).bind(), name="app", _blocking=False + ) responses2, blocking2 = make_nonblocking_calls({"1": 1}, expect_blocking=True) assert list(responses2["1"])[0] in pids1 diff --git a/python/ray/serve/tests/test_deploy_2.py b/python/ray/serve/tests/test_deploy_2.py index b3e4bad296d9..64372b2db1fb 100644 --- a/python/ray/serve/tests/test_deploy_2.py +++ b/python/ray/serve/tests/test_deploy_2.py @@ -256,7 +256,7 @@ class Model: def __call__(self): return "hello world" - serve.run(Model.bind(), _blocking=False) + serve._run(Model.bind(), _blocking=False) def check_fail(): app_status = serve.status().applications["default"] diff --git a/python/ray/serve/tests/test_failure.py b/python/ray/serve/tests/test_failure.py index 5906af5cb1e1..28902b8621f8 100644 --- a/python/ray/serve/tests/test_failure.py +++ b/python/ray/serve/tests/test_failure.py @@ -233,7 +233,7 @@ def make_blocked_request(): for _ in range(2): starting_actor = SignalActor.remote() finish_starting_actor = SignalActor.remote() - serve.run( + serve._run( SlowStarter.bind(starting_actor, finish_starting_actor), _blocking=False ) diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index c121ec179460..2dc678ffc18e 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -670,7 +670,7 @@ def test_updating_status_message(lower_slow_startup_threshold_and_reset): def f(*args): pass - serve.run(f.bind(), _blocking=False) + serve._run(f.bind(), _blocking=False) def updating_message(): deployment_status = ( @@ -699,7 +699,7 @@ def __init__(self): def __call__(self, request): pass - serve.run(f.bind(), _blocking=False) + serve._run(f.bind(), _blocking=False) wait_for_condition( lambda: serve.status() diff --git a/release/long_running_tests/workloads/serve_failure.py b/release/long_running_tests/workloads/serve_failure.py index 48ed63a2f7a7..56480a1732a3 100644 --- a/release/long_running_tests/workloads/serve_failure.py +++ b/release/long_running_tests/workloads/serve_failure.py @@ -140,7 +140,7 @@ def handler(self, *args): if blocking: ray.get(self.random_killer.spare.remote(new_name)) - serve.run( + serve._run( handler.bind(), name=new_name, route_prefix=f"/{new_name}", @@ -149,7 +149,7 @@ def handler(self, *args): self.applications.append(new_name) ray.get(self.random_killer.stop_spare.remote(new_name)) else: - serve.run( + serve._run( handler.bind(), name=new_name, route_prefix=f"/{new_name}",