From 703bb20f570d34740f154e3845416886708acbdb Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Thu, 17 Aug 2023 11:24:07 -0700 Subject: [PATCH] [serve] Optimize cold start time (#38351) Small optimization that makes handles immediately send reports to the controller if the current state is "idle": - handle has no queued requests - there are 0 replicas for the deployment This makes it so that the first request sent to a scaled-to-zero deployment doesn't have to wait for the every-10-second metric push. Ran some tests below for comparison. The load test is ramp 0 -> 10 users -> 50 users -> 100 users. You can see this greatly helps the first ramp up from 0 -> 10 users, and doesn't really affect anything else. --- python/ray/serve/_private/router.py | 28 +++++++++++++++- .../serve/tests/test_autoscaling_policy.py | 33 +++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index d2380c1d5137d8..ac9fb16b2c84c7 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -254,6 +254,10 @@ async def assign_replica( def update_running_replicas(self, running_replicas: List[RunningReplicaInfo]): pass + @property + def curr_replicas(self) -> Dict[str, ReplicaWrapper]: + pass + @dataclass class PendingRequest: @@ -752,6 +756,12 @@ def __init__(self, event_loop: asyncio.AbstractEventLoop): str, List[RunningReplicaInfo] ] = defaultdict(list) + @property + def curr_replicas(self) -> Dict[str, ReplicaWrapper]: + return { + r.replica_tag: ActorReplicaWrapper(r) for r in self.in_flight_queries.keys() + } + def _reset_replica_iterator(self): """Reset the iterator used to load balance replicas. @@ -1032,14 +1042,20 @@ def __init__( deployment_info = DeploymentInfo.from_proto(deployment_route.deployment_info) self.metrics_pusher = None if deployment_info.deployment_config.autoscaling_config: + self.autoscaling_enabled = True + self.push_metrics_to_controller = ( + controller_handle.record_handle_metrics.remote + ) self.metrics_pusher = MetricsPusher() self.metrics_pusher.register_task( self._collect_handle_queue_metrics, HANDLE_METRIC_PUSH_INTERVAL_S, - controller_handle.record_handle_metrics.remote, + self.push_metrics_to_controller, ) self.metrics_pusher.start() + else: + self.autoscaling_enabled = False def _collect_handle_queue_metrics(self) -> Dict[str, int]: return {str(self.deployment_id): self.num_queued_queries} @@ -1063,6 +1079,16 @@ async def assign_request( }, ) + # Optimization: if there are currently zero replicas for a deployment, + # push handle metric to controller to allow for fast cold start time. + # Only do it for the first query to arrive on the router. + if ( + self.autoscaling_enabled + and len(self._replica_scheduler.curr_replicas) == 0 + and self.num_queued_queries == 1 + ): + self.push_metrics_to_controller({self.deployment_name: 1}, time.time()) + try: query = Query( args=list(request_args), diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index 835ef095dd8bd7..9913e8881ee03a 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -330,6 +330,39 @@ def __call__(self): assert len(get_running_replicas(controller, "A")) == 2 +def test_cold_start_time(serve_instance): + """Send 100 requests and check that we autoscale up, and then back down.""" + + @serve.deployment( + autoscaling_config={ + "min_replicas": 0, + "max_replicas": 1, + "look_back_period_s": 0.2, + }, + ) + class A: + def __call__(self): + return "hello" + + handle = serve.run(A.bind()) + + def check_running(): + assert serve.status().applications["default"].status == "RUNNING" + return True + + wait_for_condition(check_running) + + start = time.time() + result = ray.get(handle.remote()) + cold_start_time = time.time() - start + assert cold_start_time < 2 + print( + "Time taken for deployment at 0 replicas to serve first request:", + cold_start_time, + ) + assert result == "hello" + + def test_smoothing_factor_scale_up_from_0_replicas(): """Test that the smoothing factor is respected when scaling up from 0 replicas."""