diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index ddfd94322737..d8ce42acabe1 100644 --- a/vllm/v1/core/block_pool.py +++ b/vllm/v1/core/block_pool.py @@ -17,6 +17,7 @@ ExternalBlockHash, FreeKVCacheBlockQueue, KVCacheBlock, + SingleTypeKVCacheBlocks, get_block_hash, make_block_hash_with_group_id, maybe_convert_block_hash, @@ -328,7 +329,7 @@ def _maybe_evict_cached_block(self, block: KVCacheBlock) -> bool: ) return True - def touch(self, blocks: tuple[list[KVCacheBlock], ...]) -> None: + def touch(self, blocks: tuple[SingleTypeKVCacheBlocks, ...]) -> None: """Touch a block increases its reference count by 1, and may remove the block from the free queue. This is used when a block is hit by another request with the same prefix. diff --git a/vllm/v1/core/kv_cache_coordinator.py b/vllm/v1/core/kv_cache_coordinator.py index 37e1b7ca3932..574b3c547f6b 100644 --- a/vllm/v1/core/kv_cache_coordinator.py +++ b/vllm/v1/core/kv_cache_coordinator.py @@ -4,7 +4,7 @@ from typing import Optional from vllm.v1.core.block_pool import BlockPool -from vllm.v1.core.kv_cache_utils import BlockHash, KVCacheBlock +from vllm.v1.core.kv_cache_utils import BlockHash, KVCacheBlock, SingleTypeKVCacheBlocks from vllm.v1.core.single_type_kv_cache_manager import ( CrossAttentionManager, FullAttentionManager, @@ -48,11 +48,13 @@ def __init__( for i, kv_cache_group in enumerate(self.kv_cache_config.kv_cache_groups) ) + self.empty_blocks: tuple[KVCacheBlock, ...] = () + def get_num_blocks_to_allocate( self, request_id: str, num_tokens: int, - new_computed_blocks: tuple[list[KVCacheBlock], ...], + new_computed_blocks: tuple[SingleTypeKVCacheBlocks, ...], num_encoder_tokens: int, ) -> int: """ @@ -76,7 +78,7 @@ def get_num_blocks_to_allocate( # For cross-attention, we issue a single static allocation # of blocks based on the number of encoder input tokens. num_blocks_to_allocate += manager.get_num_blocks_to_allocate( - request_id, num_encoder_tokens, [] + request_id, num_encoder_tokens, self.empty_blocks ) else: num_blocks_to_allocate += manager.get_num_blocks_to_allocate( @@ -85,7 +87,7 @@ def get_num_blocks_to_allocate( return num_blocks_to_allocate def save_new_computed_blocks( - self, request_id: str, new_computed_blocks: tuple[list[KVCacheBlock], ...] + self, request_id: str, new_computed_blocks: tuple[SingleTypeKVCacheBlocks, ...] ) -> None: """ Add the new computed blocks to the request. diff --git a/vllm/v1/core/kv_cache_manager.py b/vllm/v1/core/kv_cache_manager.py index 3e1a83a8a220..d42a19031f58 100644 --- a/vllm/v1/core/kv_cache_manager.py +++ b/vllm/v1/core/kv_cache_manager.py @@ -7,7 +7,7 @@ from vllm.distributed.kv_events import KVCacheEvent from vllm.logger import init_logger from vllm.v1.core.kv_cache_coordinator import get_kv_cache_coordinator -from vllm.v1.core.kv_cache_utils import KVCacheBlock +from vllm.v1.core.kv_cache_utils import KVCacheBlock, SingleTypeKVCacheBlocks from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.metrics.stats import PrefixCacheStats from vllm.v1.request import Request, RequestStatus @@ -23,7 +23,7 @@ class KVCacheBlocks: structure from the Scheduler. """ - blocks: tuple[list[KVCacheBlock], ...] + blocks: tuple[SingleTypeKVCacheBlocks, ...] """ `blocks[i][j]` refers to the i-th kv_cache_group and the j-th block of tokens.We don't use block of @@ -36,7 +36,9 @@ class KVCacheBlocks: def __add__(self, other: "KVCacheBlocks") -> "KVCacheBlocks": """Adds two KVCacheBlocks instances.""" return KVCacheBlocks( - tuple(blk1 + blk2 for blk1, blk2 in zip(self.blocks, other.blocks)) + tuple( + list(blk1) + list(blk2) for blk1, blk2 in zip(self.blocks, other.blocks) + ) ) @overload @@ -74,8 +76,10 @@ def get_unhashed_block_ids(self) -> list[int]: return [block.block_id for block in self.blocks[0] if block.block_hash is None] def new_empty(self) -> "KVCacheBlocks": - """Creates a new KVCacheBlocks instance with no blocks.""" - return KVCacheBlocks(tuple([] for _ in range(len(self.blocks)))) + """ + Creates a new KVCacheBlocks instance with no blocks. + """ + return KVCacheBlocks(tuple(() for _ in range(len(self.blocks)))) class KVCacheManager: @@ -131,6 +135,13 @@ def __init__( self.block_pool = self.coordinator.block_pool self.kv_cache_config = kv_cache_config + # Pre-constructed KVCacheBlocks with no blocks, callers should use this + # via create_kv_cache_blocks instead of creating new ones to avoid GC + # overhead. + self.empty_kv_cache_blocks = KVCacheBlocks( + tuple(() for _ in range(self.num_kv_cache_groups)) + ) + @property def usage(self) -> float: """Get the KV cache usage. @@ -170,7 +181,7 @@ def get_computed_blocks(self, request: Request) -> tuple[KVCacheBlocks, int]: request.sampling_params is not None and request.sampling_params.prompt_logprobs is not None ): - return self.create_empty_block_list(), 0 + return self.empty_kv_cache_blocks, 0 # NOTE: When all tokens hit the cache, we must recompute the last token # to obtain logits. Thus, set max_cache_hit_length to prompt_length - 1. @@ -198,7 +209,7 @@ def get_computed_blocks(self, request: Request) -> tuple[KVCacheBlocks, int]: self.prefix_cache_stats.queries += request.num_tokens self.prefix_cache_stats.hits += num_new_computed_tokens - return KVCacheBlocks(computed_blocks), num_new_computed_tokens + return (self.create_kv_cache_blocks(computed_blocks), num_new_computed_tokens) def allocate_slots( self, @@ -251,9 +262,7 @@ def allocate_slots( if new_computed_blocks is not None: new_computed_block_list = new_computed_blocks.blocks else: - new_computed_block_list = tuple( - [] for _ in range(len(self.kv_cache_config.kv_cache_groups)) - ) + new_computed_block_list = self.empty_kv_cache_blocks.blocks # Free the blocks that are skipped during the attention computation # (e.g., tokens outside the sliding window). @@ -305,7 +314,7 @@ def allocate_slots( # P/D: delay caching blocks if we have to recv from # remote. Update state for locally cached blocks. if not self.enable_caching or delay_cache_blocks: - return KVCacheBlocks(new_blocks) + return self.create_kv_cache_blocks(new_blocks) # NOTE(woosuk): We want to commit (cache) up to num_computed_tokens + # num_new_tokens, but must exclude "non-committable" tokens (e.g., @@ -316,7 +325,7 @@ def allocate_slots( ) self.coordinator.cache_blocks(request, num_tokens_to_cache) - return KVCacheBlocks(new_blocks) + return self.create_kv_cache_blocks(new_blocks) def free(self, request: Request) -> None: """Free the blocks allocated for the request. @@ -398,7 +407,7 @@ def take_events(self) -> list[KVCacheEvent]: def get_blocks(self, request_id: str) -> KVCacheBlocks: """Get the blocks of a request.""" - return KVCacheBlocks(self.coordinator.get_blocks(request_id)) + return self.create_kv_cache_blocks(self.coordinator.get_blocks(request_id)) def get_block_ids(self, request_id: str) -> tuple[list[int], ...]: """Get the block ids of a request.""" @@ -409,6 +418,8 @@ def cache_blocks(self, request: Request, num_computed_tokens: int) -> None: if self.enable_caching: self.coordinator.cache_blocks(request, num_computed_tokens) - def create_empty_block_list(self) -> KVCacheBlocks: - """Creates a new KVCacheBlocks instance with no blocks.""" - return KVCacheBlocks(tuple([] for _ in range(self.num_kv_cache_groups))) + def create_kv_cache_blocks( + self, blocks: tuple[list[KVCacheBlock], ...] + ) -> KVCacheBlocks: + # Only create new KVCacheBlocks for non-empty blocks + return KVCacheBlocks(blocks) if any(blocks) else self.empty_kv_cache_blocks diff --git a/vllm/v1/core/kv_cache_utils.py b/vllm/v1/core/kv_cache_utils.py index 4683ad62981f..1c10657f2881 100644 --- a/vllm/v1/core/kv_cache_utils.py +++ b/vllm/v1/core/kv_cache_utils.py @@ -9,6 +9,8 @@ from dataclasses import dataclass from typing import Any, Callable, NewType, Optional, Union +from typing_extensions import TypeAlias + from vllm import envs from vllm.config import VllmConfig from vllm.logger import init_logger @@ -222,6 +224,14 @@ def __repr__(self) -> str: ) +# Represents KVCacheBlocks associated with a request. +# It could be represented as: +# - list[KVCacheBlock] for more than one KVCacheBlock +# - an empty tuple for requests without KVCacheBlock +# (a precomputed KVCacheBlocks is in KVCacheManager to avoid GC overhead) +SingleTypeKVCacheBlocks: TypeAlias = Sequence[KVCacheBlock] + + class FreeKVCacheBlockQueue: """This class organizes a list of KVCacheBlock objects to a doubly linked list of free blocks. We implement this class instead of using Python diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index d9a0ff1aa5c9..67b55e9b7005 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -426,9 +426,7 @@ def schedule(self) -> SchedulerOutput: # KVTransfer: WAITING reqs have num_computed_tokens > 0 # after async KV recvs are completed. else: - new_computed_blocks = ( - self.kv_cache_manager.create_empty_block_list() - ) + new_computed_blocks = self.kv_cache_manager.empty_kv_cache_blocks num_new_local_computed_tokens = 0 num_computed_tokens = request.num_computed_tokens diff --git a/vllm/v1/core/single_type_kv_cache_manager.py b/vllm/v1/core/single_type_kv_cache_manager.py index d624ff1b3dcc..92633ba721b8 100644 --- a/vllm/v1/core/single_type_kv_cache_manager.py +++ b/vllm/v1/core/single_type_kv_cache_manager.py @@ -6,7 +6,7 @@ from vllm.utils import cdiv from vllm.v1.core.block_pool import BlockPool -from vllm.v1.core.kv_cache_utils import BlockHash, KVCacheBlock +from vllm.v1.core.kv_cache_utils import BlockHash, KVCacheBlock, SingleTypeKVCacheBlocks from vllm.v1.kv_cache_interface import ( ChunkedLocalAttentionSpec, CrossAttentionSpec, @@ -61,7 +61,10 @@ def __init__( self._null_block = block_pool.null_block def get_num_blocks_to_allocate( - self, request_id: str, num_tokens: int, new_computed_blocks: list[KVCacheBlock] + self, + request_id: str, + num_tokens: int, + new_computed_blocks: SingleTypeKVCacheBlocks, ) -> int: """ Get the number of blocks needed to be allocated for the request. @@ -93,7 +96,7 @@ def get_num_blocks_to_allocate( return num_new_blocks + num_evictable_computed_blocks def save_new_computed_blocks( - self, request_id: str, new_computed_blocks: list[KVCacheBlock] + self, request_id: str, new_computed_blocks: SingleTypeKVCacheBlocks ) -> None: """ Add the new computed blocks to the request. @@ -605,10 +608,26 @@ def get_num_common_prefix_blocks( return 0 def get_num_blocks_to_allocate( - self, request_id: str, num_tokens: int, new_computed_blocks: list[KVCacheBlock] + self, + request_id: str, + num_tokens: int, + new_computed_blocks: SingleTypeKVCacheBlocks, ) -> int: # Allocate extra `num_speculative_blocks` blocks for # speculative decoding (MTP/EAGLE) with linear attention. + """ + Get the number of blocks needed to be allocated for the request. + + Args: + request_id: The request ID. + num_tokens: The total number of tokens that need a slot (including + tokens that are already allocated). + new_computed_blocks: The new computed blocks just hitting the + prefix caching. + + Returns: + The number of blocks + """ assert isinstance(self.kv_cache_spec, MambaSpec) if self.kv_cache_spec.num_speculative_blocks > 0: num_tokens += ( @@ -637,7 +656,7 @@ class CrossAttentionManager(SingleTypeKVCacheManager): """Manager for cross-attention KV cache in encoder-decoder models.""" def save_new_computed_blocks( - self, request_id: str, new_computed_blocks: list[KVCacheBlock] + self, request_id: str, new_computed_blocks: SingleTypeKVCacheBlocks ) -> None: # We do not cache blocks for cross-attention to be shared between # requests, so `new_computed_blocks` should always be empty.