Skip to content

Commit

Permalink
[serve] collect request metrics at handles
Browse files Browse the repository at this point in the history
Collect request metrics for autoscaling at handles instead of replicas. This will allow queued metrics to be taken into account instead of just ongoing requests.


Signed-off-by: Cindy Zhang <cindyzyx9@gmail.com>
  • Loading branch information
zcin committed Jan 30, 2024
1 parent 5ec3691 commit 3d3e730
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 29 deletions.
6 changes: 6 additions & 0 deletions python/ray/serve/_private/constants.py
Expand Up @@ -268,3 +268,9 @@

# The default autoscaling policy to use if none is specified.
DEFAULT_AUTOSCALING_POLICY = "ray.serve.autoscaling_policy:default_autoscaling_policy"

# Feature flag to enable collecting all queued and ongoing request
# metrics at handles instead of replicas. OFF by default.
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE = (
os.environ.get("RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE", "0") == "1"
)
26 changes: 16 additions & 10 deletions python/ray/serve/_private/deployment_state.py
Expand Up @@ -35,6 +35,7 @@
from ray.serve._private.config import DeploymentConfig
from ray.serve._private.constants import (
MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS,
REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD,
SERVE_LOGGER_NAME,
Expand Down Expand Up @@ -1595,14 +1596,17 @@ def get_total_num_requests(self) -> int:

total_requests = 0
running_replicas = self._replicas.get([ReplicaState.RUNNING])
for replica in running_replicas:
replica_tag = replica.replica_tag
if replica_tag in self.replica_average_ongoing_requests:
total_requests += self.replica_average_ongoing_requests[replica_tag][1]

if len(running_replicas) == 0:
for handle_metrics in self.handle_requests.values():
total_requests += handle_metrics[1]
if (
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE
or len(running_replicas) == 0
):
for handle_metric in self.handle_requests.values():
total_requests += handle_metric[1]
else:
for replica in running_replicas:
id = replica.replica_tag
if id in self.replica_average_ongoing_requests:
total_requests += self.replica_average_ongoing_requests[id][1]

return total_requests

Expand All @@ -1613,11 +1617,12 @@ def autoscale(self) -> int:
return

total_num_requests = self.get_total_num_requests()
num_running_replicas = len(self.get_running_replica_infos())
autoscaling_policy_manager = self.autoscaling_policy_manager
decision_num_replicas = autoscaling_policy_manager.get_decision_num_replicas(
curr_target_num_replicas=self._target_state.target_num_replicas,
total_num_requests=total_num_requests,
num_running_replicas=len(self.get_running_replica_infos()),
num_running_replicas=num_running_replicas,
target_capacity=self._target_state.info.target_capacity,
target_capacity_direction=self._target_state.info.target_capacity_direction,
)
Expand All @@ -1631,7 +1636,8 @@ def autoscale(self) -> int:
logger.info(
f"Autoscaling replicas for deployment '{self.deployment_name}' in "
f"application '{self.app_name}' to {decision_num_replicas}. "
f"Current number of requests: {total_num_requests}."
f"Current number of requests: {total_num_requests}. Current number of "
f"running replicas: {num_running_replicas}."
)

new_info = copy(self._target_state.info)
Expand Down
6 changes: 5 additions & 1 deletion python/ray/serve/_private/replica.py
Expand Up @@ -32,6 +32,7 @@
DEFAULT_LATENCY_BUCKET_MS,
GRPC_CONTEXT_ARG_NAME,
HEALTH_CHECK_METHOD,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
RAY_SERVE_GAUGE_METRIC_SET_PERIOD_S,
RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_PERIOD_S,
RECONFIGURE_METHOD,
Expand Down Expand Up @@ -180,7 +181,10 @@ def set_autoscaling_config(self, autoscaling_config: Optional[AutoscalingConfig]

self._autoscaling_config = autoscaling_config

if self._autoscaling_config:
if (
not RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE
and self._autoscaling_config
):
# Push autoscaling metrics to the controller periodically.
self._metrics_pusher.register_or_update_task(
self.PUSH_METRICS_TO_CONTROLLER_TASK_NAME,
Expand Down
121 changes: 104 additions & 17 deletions python/ray/serve/_private/router.py
Expand Up @@ -4,10 +4,12 @@
import math
import pickle
import random
import threading
import time
from abc import ABC, abstractmethod
from collections import defaultdict, deque
from dataclasses import dataclass
from functools import partial
from typing import (
Any,
AsyncGenerator,
Expand All @@ -29,21 +31,24 @@
from ray.serve._private.common import DeploymentID, RequestProtocol, RunningReplicaInfo
from ray.serve._private.constants import (
HANDLE_METRIC_PUSH_INTERVAL_S,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
RAY_SERVE_MAX_QUEUE_LENGTH_RESPONSE_DEADLINE_S,
RAY_SERVE_MULTIPLEXED_MODEL_ID_MATCHING_TIMEOUT_S,
RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTING,
RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S,
SERVE_LOGGER_NAME,
)
from ray.serve._private.long_poll import LongPollClient, LongPollNamespace
from ray.serve._private.metrics_utils import MetricsPusher
from ray.serve._private.metrics_utils import InMemoryMetricsStore, MetricsPusher
from ray.serve._private.utils import JavaActorHandleProxy
from ray.serve.config import AutoscalingConfig
from ray.serve.generated.serve_pb2 import RequestMetadata as RequestMetadataProto
from ray.serve.grpc_util import RayServegRPCContext
from ray.util import metrics

logger = logging.getLogger(SERVE_LOGGER_NAME)
PUSH_METRICS_TO_CONTROLLER_TASK_NAME = "push_metrics_to_controller"
RECORD_METRICS_TASK_NAME = "record_metrics"


@dataclass
Expand Down Expand Up @@ -911,14 +916,14 @@ async def choose_replica_for_query(self, query: Query) -> ReplicaWrapper:

async def assign_replica(
self, query: Query
) -> Union[ray.ObjectRef, "ray._raylet.ObjectRefGenerator"]:
) -> Tuple[Union[ray.ObjectRef, "ray._raylet.ObjectRefGenerator"], str]:
"""Choose a replica for the request and send it.
This will block indefinitely if no replicas are available to handle the
request, so it's up to the caller to time out or cancel the request.
"""
replica = await self.choose_replica_for_query(query)
return replica.send_query(query)
return replica.send_query(query), replica.replica_id


class Router:
Expand Down Expand Up @@ -992,7 +997,7 @@ def __init__(
(
LongPollNamespace.RUNNING_REPLICAS,
deployment_id,
): self._replica_scheduler.update_running_replicas,
): self.update_running_replicas,
(
LongPollNamespace.AUTOSCALING_CONFIG,
deployment_id,
Expand All @@ -1001,11 +1006,38 @@ def __init__(
call_in_event_loop=event_loop,
)

self.metrics_pusher = MetricsPusher()
# For autoscaling deployments.
self.autoscaling_config = None
# Track queries sent to replicas for the autoscaling algorithm.
self.num_queries_sent_to_replicas = defaultdict(int)
self._queries_lock = threading.Lock()
# Regularly aggregate and push autoscaling metrics to controller
self.metrics_pusher = MetricsPusher()
self.metrics_store = InMemoryMetricsStore()
self.push_metrics_to_controller = controller_handle.record_handle_metrics.remote

def update_autoscaling_config(self, autoscaling_config):
def update_running_replicas(self, running_replicas: List[RunningReplicaInfo]):
self._replica_scheduler.update_running_replicas(running_replicas)

# Prune list of replica ids in self.num_queries_sent_to_replicas
# The purpose of this is two-fold:
# 1. Only requests sent to RUNNING replicas should be considered
# for the autoscaling algorithm.
# 2. Avoid self.num_queries_sent_to_replicas from growing in
# from growing in memory as the deployment upscales and
# downscales over time.
running_replica_set = {replica.replica_tag for replica in running_replicas}
with self._queries_lock:
self.num_queries_sent_to_replicas = defaultdict(
int,
{
id: self.num_queries_sent_to_replicas[id]
for id, num_queries in self.num_queries_sent_to_replicas.items()
if num_queries or id in running_replica_set
},
)

def update_autoscaling_config(self, autoscaling_config: AutoscalingConfig):
self.autoscaling_config = autoscaling_config

# Start the metrics pusher if autoscaling is enabled.
Expand All @@ -1019,23 +1051,67 @@ def update_autoscaling_config(self, autoscaling_config):
and self.num_queued_queries
):
self.push_metrics_to_controller(
self._collect_handle_queue_metrics(), time.time()
self._get_aggregated_requests(), time.time()
)

self.metrics_pusher.register_or_update_task(
PUSH_METRICS_TO_CONTROLLER_TASK_NAME,
self._collect_handle_queue_metrics,
HANDLE_METRIC_PUSH_INTERVAL_S,
self.push_metrics_to_controller,
)
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
# Record number of queued + ongoing requests at regular
# intervals into the in-memory metrics store
self.metrics_pusher.register_or_update_task(
RECORD_METRICS_TASK_NAME,
self._get_num_requests_for_autoscaling,
min(0.5, self.autoscaling_config.metrics_interval_s),
self._add_autoscaling_metrics_point,
)
# Push metrics to the controller periodically.
self.metrics_pusher.register_or_update_task(
PUSH_METRICS_TO_CONTROLLER_TASK_NAME,
self._get_aggregated_requests,
self.autoscaling_config.metrics_interval_s,
self.push_metrics_to_controller,
)
else:
self.metrics_pusher.register_or_update_task(
PUSH_METRICS_TO_CONTROLLER_TASK_NAME,
self._get_aggregated_requests,
HANDLE_METRIC_PUSH_INTERVAL_S,
self.push_metrics_to_controller,
)

self.metrics_pusher.start()
else:
if self.metrics_pusher:
self.metrics_pusher.shutdown()

def _collect_handle_queue_metrics(self) -> Dict[str, int]:
return (self.deployment_id, self.handle_id), self.num_queued_queries
def _get_num_requests_for_autoscaling(self) -> Dict[str, int]:
total_requests = self.num_queued_queries
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
for replica_id, num_queries in self.num_queries_sent_to_replicas.items():
if replica_id in self._replica_scheduler._replica_id_set:
total_requests += num_queries

return total_requests

def _add_autoscaling_metrics_point(self, data: int, send_timestamp: float):
self.metrics_store.add_metrics_point({self.deployment_id: data}, send_timestamp)

def _get_aggregated_requests(self):
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
look_back_period = self.autoscaling_config.look_back_period_s
window_avg = self.metrics_store.window_average(
self.deployment_id, time.time() - look_back_period
)
# If data hasn't been recorded in the in-memory metrics
# store yet, return the current number of queued and
# ongoing requests.
data = window_avg or self._get_num_requests_for_autoscaling()
return (self.deployment_id, self.handle_id), data
else:
return (self.deployment_id, self.handle_id), self.num_queued_queries

def process_finished_request(self, replica_tag, *args):
with self._queries_lock:
self.num_queries_sent_to_replicas[replica_tag] -= 1

async def assign_request(
self,
Expand All @@ -1062,7 +1138,7 @@ async def assign_request(
and self.num_queued_queries == 1
):
self.push_metrics_to_controller(
self._collect_handle_queue_metrics(), time.time()
self._get_aggregated_requests(), time.time()
)

try:
Expand All @@ -1072,7 +1148,18 @@ async def assign_request(
metadata=request_meta,
)
await query.replace_known_types_in_args()
return await self._replica_scheduler.assign_replica(query)
ref, replica_tag = await self._replica_scheduler.assign_replica(query)

# Keep track of requests that have been sent out to replicas
if RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE:
self.num_queries_sent_to_replicas[replica_tag] += 1
callback = partial(self.process_finished_request, replica_tag)
if isinstance(ref, ray.ObjectRef):
ref._on_completed(callback)
else:
ref.completed()._on_completed(callback)

return ref
finally:
# If the query is disconnected before assignment, this coroutine
# gets cancelled by the caller and an asyncio.CancelledError is
Expand Down
4 changes: 4 additions & 0 deletions python/ray/serve/_private/version.py
Expand Up @@ -176,6 +176,10 @@ def _get_serialized_options(
reconfigure_dict[option_name] = getattr(
self.deployment_config, option_name
)
# If autoscaling config was changed, only broadcast to
# replicas if metrics_interval_s or look_back_period_s
# was changed, because the rest of the fields are only
# used in deployment state manager
if isinstance(reconfigure_dict[option_name], AutoscalingConfig):
reconfigure_dict[option_name] = reconfigure_dict[option_name].dict(
include={"metrics_interval_s", "look_back_period_s"}
Expand Down
11 changes: 11 additions & 0 deletions python/ray/serve/tests/BUILD
Expand Up @@ -84,6 +84,17 @@ py_test_module_list(
deps = ["//python/ray/serve:serve_lib", ":conftest", ":common"],
)

# Test autoscaling with RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE set to 1
py_test(
name = "test_autoscaling_metrics_on_handle",
size = "medium",
main = "test_autoscaling_policy.py",
srcs = ["test_autoscaling_policy.py"],
tags = ["exclusive", "team:serve", "autoscaling"],
deps = ["//python/ray/serve:serve_lib", ":conftest", ":common"],
env = {"RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE": "1"},
)

py_test_module_list(
files = [
"test_gcs_failure.py",
Expand Down

0 comments on commit 3d3e730

Please sign in to comment.