Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to RequestOutput #2876

Merged
merged 5 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/async_engine/test_request_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_request_tracker():
stream_5 = tracker.add_request("5")
assert tracker.new_requests_event.flag
tracker.process_request_output(
RequestOutput("2", "output", [], [], [], finished=True))
RequestOutput("2", "output", [], [], [], bool(finished)))
new, finished = tracker.get_new_and_finished_requests()
assert not tracker.new_requests_event.flag
assert len(finished) == 1
Expand Down
2 changes: 1 addition & 1 deletion vllm/core/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_priority(
now: float,
seq_group: SequenceGroup,
) -> float:
return now - seq_group.arrival_time
return now - seq_group.metrics.arrival_time


class PolicyFactory:
Expand Down
3 changes: 3 additions & 0 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,13 @@ def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
# This function call changes the internal states of the scheduler
# such as self.running, self.swapped, and self.waiting.
scheduler_outputs = self._schedule()
now = time.time()

# Create input data structures.
seq_group_metadata_list: List[SequenceGroupMetadata] = []
for seq_group in scheduler_outputs.scheduled_seq_groups:
seq_group.maybe_set_first_scheduled_time(now)

seq_data: Dict[int, SequenceData] = {}
block_tables: Dict[int, List[int]] = {}
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
Expand Down
7 changes: 5 additions & 2 deletions vllm/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ def _process_sequence_group_outputs(self, seq_group: SequenceGroup,
def _process_model_outputs(
self, output: SamplerOutput,
scheduler_outputs: SchedulerOutputs) -> List[RequestOutput]:
now = time.time()
# Update the scheduled sequence groups with the model outputs.
scheduled_seq_groups = scheduler_outputs.scheduled_seq_groups
for seq_group, outputs in zip(scheduled_seq_groups, output):
Expand All @@ -736,6 +737,7 @@ def _process_model_outputs(
# Create the outputs.
request_outputs: List[RequestOutput] = []
for seq_group in scheduled_seq_groups:
seq_group.maybe_set_first_token_time(now)
request_output = RequestOutput.from_seq_group(seq_group)
request_outputs.append(request_output)
for seq_group in scheduler_outputs.ignored_seq_groups:
Expand Down Expand Up @@ -871,11 +873,12 @@ def _get_stats(self,
# Latency Timings.
time_last_iters = []
for seq_group in scheduler_outputs.scheduled_seq_groups:
# Time since last token. (n.b. updates seq_group.last_token_time)
# Time since last token. (n.b. updates seq_group.metrics.last_token_time)
time_last_iters.append(seq_group.get_last_latency(now))
# Time since arrival for all finished requests.
if seq_group.is_finished():
time_e2e_requests.append(now - seq_group.arrival_time)
time_e2e_requests.append(now -
seq_group.metrics.arrival_time)

time_to_first_tokens = time_last_iters if prompt_run else []
time_per_output_tokens = [] if prompt_run else time_last_iters
Expand Down
10 changes: 9 additions & 1 deletion vllm/outputs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import List, Optional
import time

from vllm.sequence import (PromptLogprobs, SampleLogprobs, SequenceGroup,
SequenceStatus)
SequenceStatus, RequestMetrics)
from vllm.lora.request import LoRARequest


Expand Down Expand Up @@ -60,6 +61,7 @@ class RequestOutput:
prompt_logprobs: The log probabilities to return per prompt token.
outputs: The output sequences of the request.
finished: Whether the whole request is finished.
metrics: Metrics associated with the request.
lora_request: The LoRA request that was used to generate the output.
"""

Expand All @@ -71,6 +73,7 @@ def __init__(
prompt_logprobs: Optional[PromptLogprobs],
outputs: List[CompletionOutput],
finished: bool,
metrics: Optional[RequestMetrics] = None,
lora_request: Optional[LoRARequest] = None,
) -> None:
self.request_id = request_id
Expand All @@ -79,6 +82,7 @@ def __init__(
self.prompt_logprobs = prompt_logprobs
self.outputs = outputs
self.finished = finished
self.metrics = metrics
self.lora_request = lora_request

@classmethod
Expand Down Expand Up @@ -115,12 +119,15 @@ def from_seq_group(cls, seq_group: SequenceGroup) -> "RequestOutput":
prompt_token_ids = seq_group.prompt_token_ids
prompt_logprobs = seq_group.prompt_logprobs
finished = seq_group.is_finished()
finished_time = time.time() if finished else None
seq_group.set_finished_time(finished_time)
return cls(seq_group.request_id,
prompt,
prompt_token_ids,
prompt_logprobs,
outputs,
finished,
seq_group.metrics,
lora_request=seq_group.lora_request)

def __repr__(self) -> str:
Expand All @@ -130,4 +137,5 @@ def __repr__(self) -> str:
f"prompt_logprobs={self.prompt_logprobs}, "
f"outputs={self.outputs}, "
f"finished={self.finished}, "
f"metrics={self.metrics}, "
f"lora_request={self.lora_request})")
46 changes: 42 additions & 4 deletions vllm/sequence.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Sequence and its related classes."""
import copy
import enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Union

from vllm.block import LogicalTokenBlock
Expand Down Expand Up @@ -49,6 +50,25 @@ def get_finished_reason(status: "SequenceStatus") -> Union[str, None]:
return finish_reason


@dataclass
class RequestMetrics:
"""Metrics associated with a request.

Args:
arrival_time: The time when the request arrived.
first_scheduled_time: The time when the request was first scheduled.
first_token_time: The time when the first token was generated.
time_in_queue: The time the request spent in the queue.
finished_time: The time when the request was finished.
"""
arrival_time: float
last_token_time: float
first_scheduled_time: Optional[float]
first_token_time: Optional[float]
time_in_queue: Optional[float]
finished_time: Optional[float] = None


class SequenceData:
"""Data associated with a sequence.

Expand Down Expand Up @@ -252,8 +272,11 @@ def __init__(
self.request_id = request_id
self.seqs_dict = {seq.seq_id: seq for seq in seqs}
self.sampling_params = sampling_params
self.arrival_time = arrival_time
self.last_token_time = arrival_time
self.metrics = RequestMetrics(arrival_time=arrival_time,
last_token_time=arrival_time,
first_scheduled_time=None,
first_token_time=None,
time_in_queue=None)
self.lora_request = lora_request
self.prefix: Optional[Prefix] = prefix
self.prompt_logprobs: Optional[PromptLogprobs] = None
Expand All @@ -276,10 +299,25 @@ def lora_int_id(self) -> int:

def get_last_latency(self, now: float) -> float:
"""Gets last token latency for Request level timings."""
latency = now - self.last_token_time
self.last_token_time = now
latency = now - self.metrics.last_token_time
self.metrics.last_token_time = now
return latency

def maybe_set_first_token_time(self, time: float) -> None:
"""Sets the first token time for Request level timings."""
if self.metrics.first_token_time is None:
self.metrics.first_token_time = time

def maybe_set_first_scheduled_time(self, time: float) -> None:
"""Sets the first scheduled time and time in queue for Request level timings."""
if self.metrics.first_scheduled_time is None:
self.metrics.first_scheduled_time = time
self.metrics.time_in_queue = time - self.metrics.arrival_time

def set_finished_time(self, time: Optional[float]) -> None:
"""Sets the finished time for Request level timings."""
self.metrics.finished_time = time

def get_max_num_running_seqs(self) -> int:
"""The maximum number of sequences running in parallel in the remaining
lifetime of the request."""
Expand Down
Loading