Skip to content

Commit

Permalink
[serve] collect request metrics at handles (#42578)
Browse files Browse the repository at this point in the history
Collect request metrics for autoscaling at handles instead of replicas. This will allow queued metrics to be taken into account instead of just ongoing requests.

(3x) DeploymentHandle streaming throughput (ASYNC) (num_replicas=1, tokens_per_request=1000, batch_size=10)
| Collect on handles | Original |
| --- | --- |
| 12140.58 +- 431.94 tokens/s | 12365.42 +- 353.48 tokens/s |
| 12119.03 +- 255.58 tokens/s | 12395.43 +- 642.92 tokens/s |
| 12168.44 +- 653.06 tokens/s | 12365.76 +- 680.51 tokens/s |

(5x) HTTP streaming throughput (num_replicas=1, tokens_per_request=1000, batch_size=10, use_intermediate_deployment=False)
| Collect on handles | Original |
| --- | --- |
| 141118.93 +- 103940.74 tokens/s | 143454.65 +- 103615.47 tokens/s |
| 225063.66 +- 5347.35 tokens/s | 228244.37 +- 6209.89 tokens/s |
| 225684.1 +- 3262.97 tokens/s | 221354.73 +- 2928.82 tokens/s |
| 220755.65 +- 6837.1 tokens/s | 188224.32 +- 78546.09 tokens/s |
| 221404.26 +- 3427.73 tokens/s | 223172.25 +- 4064.79 tokens/s |

(4x) DeploymentHandle throughput (num_replicas=1, batch_size=100)
| Collect on handles | Original |
| --- | --- |
| 1766.19 +- 15.25 requests/s | 1819.54 +- 5.26 requests/s |
| 1760.92 +- 51.87 requests/s | 1762.08 +- 21.04 requests/s |
| 1796.22 +- 10.52 requests/s | 1750.08 +- 31.73 requests/s |
| 1788.1 +- 29.98 requests/s | 1779.63 +- 24.86 requests/s |


Signed-off-by: Cindy Zhang <cindyzyx9@gmail.com>
Streaming HTTP throughput
  • Loading branch information
zcin committed Feb 1, 2024
1 parent cd67a82 commit 23d9b93
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 51 deletions.
6 changes: 6 additions & 0 deletions python/ray/serve/_private/constants.py
Expand Up @@ -268,3 +268,9 @@

# The default autoscaling policy to use if none is specified.
DEFAULT_AUTOSCALING_POLICY = "ray.serve.autoscaling_policy:default_autoscaling_policy"

# Feature flag to enable collecting all queued and ongoing request
# metrics at handles instead of replicas. OFF by default.
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE = (
os.environ.get("RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE", "0") == "1"
)
2 changes: 1 addition & 1 deletion python/ray/serve/_private/controller.py
Expand Up @@ -260,7 +260,7 @@ def record_autoscaling_metrics(self, data: Dict[str, float], send_timestamp: flo
)
self.deployment_state_manager.record_autoscaling_metrics(data, send_timestamp)

def record_handle_metrics(self, data: Dict[str, float], send_timestamp: float):
def record_handle_metrics(self, data, send_timestamp: float):
logger.debug(f"Received handle metrics: {data} at timestamp {send_timestamp}")
self.deployment_state_manager.record_handle_metrics(data, send_timestamp)

Expand Down
86 changes: 58 additions & 28 deletions python/ray/serve/_private/deployment_state.py
Expand Up @@ -35,6 +35,7 @@
from ray.serve._private.config import DeploymentConfig
from ray.serve._private.constants import (
MAX_DEPLOYMENT_CONSTRUCTOR_RETRY_COUNT,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS,
REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD,
SERVE_LOGGER_NAME,
Expand Down Expand Up @@ -187,6 +188,13 @@ class DeploymentStateUpdateResult:
_SCALING_LOG_ENABLED = os.environ.get("SERVE_ENABLE_SCALING_LOG", "0") != "0"


@dataclass
class HandleRequestMetric:
queued_requests: float
running_requests: Dict[str, float]
timestamp: float


def print_verbose_scaling_log():
assert _SCALING_LOG_ENABLED

Expand Down Expand Up @@ -1246,7 +1254,9 @@ def __init__(
self.replica_average_ongoing_requests: Dict[str, float] = dict()

# Map from handle ID to (# requests recorded at handle, recording timestamp)
self.handle_requests: Dict[str, Tuple(float, float)] = dict()
self.handle_requests: Dict[str, HandleRequestMetric] = dict()
self.requests_queued_at_handles: Dict[str, float] = dict()
# Number of ongoing requests reported by replicas
self.replica_average_ongoing_requests: Dict[str, float] = dict()

self.health_check_gauge = metrics.Gauge(
Expand Down Expand Up @@ -1586,23 +1596,37 @@ def deploy(self, deployment_info: DeploymentInfo) -> bool:
self._backoff_time_s = 1
return True

def get_total_num_requests(self) -> int:
"""Return total number of ongoing requests in Serve.
def get_total_num_requests(self) -> float:
"""Get average total number of requests aggregated over the past
`look_back_period_s` number of seconds.
If there are 0 running replicas, then returns the total number
of requests queued at handles.
of requests queued at handles
If the flag RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE is
set to 1, the returned average includes both queued and ongoing
requests. Otherwise, the returned average includes only ongoing
requests.
"""

total_requests = 0
running_replicas = self._replicas.get([ReplicaState.RUNNING])
for replica in running_replicas:
replica_tag = replica.replica_tag
if replica_tag in self.replica_average_ongoing_requests:
total_requests += self.replica_average_ongoing_requests[replica_tag][1]

if len(running_replicas) == 0:
for handle_metrics in self.handle_requests.values():
total_requests += handle_metrics[1]
if (
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE
or len(running_replicas) == 0
):
for handle_metric in self.handle_requests.values():
total_requests += handle_metric.queued_requests
for replica in running_replicas:
id = replica.replica_tag
if id in handle_metric.running_requests:
total_requests += handle_metric.running_requests[id]
else:
for replica in running_replicas:
id = replica.replica_tag
if id in self.replica_average_ongoing_requests:
total_requests += self.replica_average_ongoing_requests[id][1]

return total_requests

Expand All @@ -1613,11 +1637,12 @@ def autoscale(self) -> int:
return

total_num_requests = self.get_total_num_requests()
num_running_replicas = len(self.get_running_replica_infos())
autoscaling_policy_manager = self.autoscaling_policy_manager
decision_num_replicas = autoscaling_policy_manager.get_decision_num_replicas(
curr_target_num_replicas=self._target_state.target_num_replicas,
total_num_requests=total_num_requests,
num_running_replicas=len(self.get_running_replica_infos()),
num_running_replicas=num_running_replicas,
target_capacity=self._target_state.info.target_capacity,
target_capacity_direction=self._target_state.info.target_capacity_direction,
)
Expand All @@ -1631,7 +1656,8 @@ def autoscale(self) -> int:
logger.info(
f"Autoscaling replicas for deployment '{self.deployment_name}' in "
f"application '{self.app_name}' to {decision_num_replicas}. "
f"Current number of requests: {total_num_requests}."
f"Current number of requests: {total_num_requests}. Current number of "
f"running replicas: {num_running_replicas}."
)

new_info = copy(self._target_state.info)
Expand Down Expand Up @@ -2279,15 +2305,23 @@ def record_autoscaling_metrics(
)

def record_request_metrics_for_handle(
self, handle_id: str, num_requests: float, send_timestamp: float
self,
handle_id: str,
queued_requests: float,
running_requests: Dict[str, float],
send_timestamp: float,
) -> None:
"""Update request metric for a specific handle."""

if (
handle_id not in self.handle_requests
or send_timestamp > self.handle_requests[handle_id][0]
or send_timestamp > self.handle_requests[handle_id].timestamp
):
self.handle_requests[handle_id] = (send_timestamp, num_requests)
self.handle_requests[handle_id] = HandleRequestMetric(
queued_requests=queued_requests,
running_requests=running_requests,
timestamp=send_timestamp,
)

def record_multiplexed_model_ids(
self, replica_name: str, multiplexed_model_ids: List[str]
Expand Down Expand Up @@ -2367,18 +2401,14 @@ def record_autoscaling_metrics(self, data, send_timestamp: float):
replica_name.deployment_id
].record_autoscaling_metrics(replica_tag, window_avg, send_timestamp)

def record_handle_metrics(self, data: Dict[str, float], send_timestamp: float):
id, num_requests = data
if num_requests is not None:
deployment_id, handle_id = id
# There can be handles to deleted deployments still sending
# metrics to the controller
if deployment_id in self._deployment_states:
self._deployment_states[
deployment_id
].record_request_metrics_for_handle(
handle_id, num_requests, send_timestamp
)
def record_handle_metrics(self, data, send_timestamp: float):
deployment_id, handle_id, queued_requests, running_requests = data
# NOTE(zcin): There can be handles to deleted deployments still
# sending metrics to the controller
if deployment_id in self._deployment_states:
self._deployment_states[deployment_id].record_request_metrics_for_handle(
handle_id, queued_requests, running_requests, send_timestamp
)

def get_autoscaling_metrics(self):
"""Return autoscaling metrics (used for dumping from controller)"""
Expand Down
6 changes: 5 additions & 1 deletion python/ray/serve/_private/replica.py
Expand Up @@ -33,6 +33,7 @@
DEFAULT_LATENCY_BUCKET_MS,
GRPC_CONTEXT_ARG_NAME,
HEALTH_CHECK_METHOD,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
RAY_SERVE_GAUGE_METRIC_SET_PERIOD_S,
RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_PERIOD_S,
RECONFIGURE_METHOD,
Expand Down Expand Up @@ -180,7 +181,10 @@ def set_autoscaling_config(self, autoscaling_config: Optional[AutoscalingConfig]

self._autoscaling_config = autoscaling_config

if self._autoscaling_config:
if (
not RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE
and self._autoscaling_config
):
# Push autoscaling metrics to the controller periodically.
self._metrics_pusher.register_or_update_task(
self.PUSH_METRICS_TO_CONTROLLER_TASK_NAME,
Expand Down
16 changes: 13 additions & 3 deletions python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py
Expand Up @@ -6,7 +6,17 @@
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import AsyncGenerator, DefaultDict, Deque, Dict, List, Optional, Set, Union
from typing import (
AsyncGenerator,
DefaultDict,
Deque,
Dict,
List,
Optional,
Set,
Tuple,
Union,
)

import ray
from ray.exceptions import RayActorError
Expand Down Expand Up @@ -657,11 +667,11 @@ async def choose_replica_for_query(self, query: Query) -> ReplicaWrapper:

async def assign_replica(
self, query: Query
) -> Union[ray.ObjectRef, "ray._raylet.ObjectRefGenerator"]:
) -> Tuple[Union[ray.ObjectRef, "ray._raylet.ObjectRefGenerator"], str]:
"""Choose a replica for the request and send it.
This will block indefinitely if no replicas are available to handle the
request, so it's up to the caller to time out or cancel the request.
"""
replica = await self.choose_replica_for_query(query)
return replica.send_query(query)
return replica.send_query(query), replica.replica_id

0 comments on commit 23d9b93

Please sign in to comment.