From d52aa966820cc88f14a9547bcc4fba80f63fc07a Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 06:51:19 +0000 Subject: [PATCH 01/23] [Frontend] Pass API server count to each process Signed-off-by: DarkLight1337 --- examples/others/tensorize_vllm_model.py | 9 +----- .../test_api_server_process_manager.py | 6 ++-- vllm/config/parallel.py | 6 ++++ vllm/engine/arg_utils.py | 9 +++++- vllm/entrypoints/cli/serve.py | 18 ++++++----- vllm/multimodal/cache.py | 5 ++-- vllm/v1/engine/core_client.py | 1 + vllm/v1/utils.py | 30 +++++++++++-------- 8 files changed, 50 insertions(+), 34 deletions(-) diff --git a/examples/others/tensorize_vllm_model.py b/examples/others/tensorize_vllm_model.py index 559c7c493aca..2b7f0beab227 100644 --- a/examples/others/tensorize_vllm_model.py +++ b/examples/others/tensorize_vllm_model.py @@ -1,8 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project -import argparse -import dataclasses import json import logging import os @@ -327,12 +325,7 @@ def main(): if args.command == "serialize": - eng_args_dict = {f.name: getattr(args, f.name) for f in - dataclasses.fields(EngineArgs)} - - engine_args = EngineArgs.from_cli_args( - argparse.Namespace(**eng_args_dict) - ) + engine_args = EngineArgs.from_cli_args(args) input_dir = tensorizer_dir.rstrip('/') suffix = args.suffix if args.suffix else uuid.uuid4().hex diff --git a/tests/entrypoints/test_api_server_process_manager.py b/tests/entrypoints/test_api_server_process_manager.py index e4af60a78265..882382a38543 100644 --- a/tests/entrypoints/test_api_server_process_manager.py +++ b/tests/entrypoints/test_api_server_process_manager.py @@ -36,10 +36,10 @@ def api_server_args(): "localhost:8000", "sock": sock, - "args": - "test_args", # Simple string to avoid pickling issues "num_servers": 3, + "args_per_server": + ["test_args"] * 3, # Simple string to avoid pickling issues "input_addresses": [ "tcp://127.0.0.1:5001", "tcp://127.0.0.1:5002", "tcp://127.0.0.1:5003" @@ -60,7 +60,7 @@ def test_api_server_process_manager_init(api_server_args, with_stats_update): global WORKER_RUNTIME_SECONDS WORKER_RUNTIME_SECONDS = 0.5 - # Copy the args to avoid mutating the + # Copy the args to avoid mutating them args = api_server_args.copy() if not with_stats_update: diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index 9ea883d4a03c..b7f7231ef6da 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -96,6 +96,12 @@ class ParallelConfig: between local data parallel ranks, but an external LB balances between vLLM nodes/replicas. Set explicitly in conjunction with --data-parallel-start-rank.""" + + api_process_count: int = 1 + """[Internal] The number of API processes initialized.""" + api_process_rank: int = 0 + """[Internal] The rank of this API process.""" + enable_expert_parallel: bool = False """Use expert parallelism instead of tensor parallelism for MoE layers.""" enable_eplb: bool = False diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 9e7c95ea5205..91c98f6e64b2 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -303,6 +303,8 @@ class EngineArgs: data_parallel_rpc_port: Optional[int] = None data_parallel_hybrid_lb: bool = False data_parallel_backend: str = ParallelConfig.data_parallel_backend + api_process_count: int = ParallelConfig.api_process_count + api_process_rank: int = ParallelConfig.api_process_rank enable_expert_parallel: bool = ParallelConfig.enable_expert_parallel eplb_config: EPLBConfig = get_field(ParallelConfig, "eplb_config") enable_eplb: bool = ParallelConfig.enable_eplb @@ -895,7 +897,10 @@ def from_cli_args(cls, args: argparse.Namespace): # Get the list of attributes of this dataclass. attrs = [attr.name for attr in dataclasses.fields(cls)] # Set the attributes from the parsed arguments. - engine_args = cls(**{attr: getattr(args, attr) for attr in attrs}) + engine_args = cls(**{ + attr: getattr(args, attr) + for attr in attrs if hasattr(args, attr) + }) return engine_args def create_model_config(self) -> ModelConfig: @@ -1280,6 +1285,8 @@ def create_engine_config( data_parallel_rpc_port=data_parallel_rpc_port, data_parallel_backend=self.data_parallel_backend, data_parallel_hybrid_lb=self.data_parallel_hybrid_lb, + api_process_count=self.api_process_count, + api_process_rank=self.api_process_rank, enable_expert_parallel=self.enable_expert_parallel, enable_eplb=self.enable_eplb, eplb_config=self.eplb_config, diff --git a/vllm/entrypoints/cli/serve.py b/vllm/entrypoints/cli/serve.py index 803a3e004656..2ebffe9cb690 100644 --- a/vllm/entrypoints/cli/serve.py +++ b/vllm/entrypoints/cli/serve.py @@ -3,6 +3,7 @@ import argparse import signal +from copy import deepcopy from typing import Optional import uvloop @@ -135,10 +136,11 @@ def signal_handler(signum, frame): def run_multi_api_server(args: argparse.Namespace): assert not args.headless - num_api_servers = args.api_server_count + num_api_servers: int = args.api_server_count assert num_api_servers > 0 - orig_mm_processor_cache_gb = args.mm_processor_cache_gb + # No need to set api_process_rank for EngineCore processes + args.api_process_count = args.api_server_count if num_api_servers > 1: setup_multiprocess_prometheus() @@ -151,7 +153,6 @@ def run_multi_api_server(args: argparse.Namespace): engine_args = vllm.AsyncEngineArgs.from_cli_args(args) usage_context = UsageContext.OPENAI_API_SERVER vllm_config = engine_args.create_engine_config(usage_context=usage_context) - model_config = vllm_config.model_config if num_api_servers > 1: if not envs.VLLM_USE_V1: @@ -161,10 +162,6 @@ def run_multi_api_server(args: argparse.Namespace): raise ValueError("VLLM_ALLOW_RUNTIME_LORA_UPDATING cannot be used " "with api_server_count > 1") - if model_config.is_multimodal_model and orig_mm_processor_cache_gb > 0: - logger.warning("Multi-modal processor cache is disabled because " - "it is not compatible with `api_server_count > 1`.") - executor_class = Executor.get_class(vllm_config) log_stats = not engine_args.disable_log_stats @@ -174,6 +171,11 @@ def run_multi_api_server(args: argparse.Namespace): hybrid_dp_lb = parallel_config.data_parallel_hybrid_lb assert external_dp_lb or hybrid_dp_lb or dp_rank == 0 + # Set api_process_rank for API server processes + args_per_server = [deepcopy(args) for _ in range(num_api_servers)] + for server_idx in range(num_api_servers): + args_per_server[server_idx].api_process_rank = server_idx + api_server_manager: Optional[APIServerProcessManager] = None with launch_core_engines(vllm_config, executor_class, log_stats, @@ -185,7 +187,7 @@ def run_multi_api_server(args: argparse.Namespace): target_server_fn=run_api_server_worker_proc, listen_address=listen_address, sock=sock, - args=args, + args_per_server=args_per_server, num_servers=num_api_servers, input_addresses=addresses.inputs, output_addresses=addresses.outputs, diff --git a/vllm/multimodal/cache.py b/vllm/multimodal/cache.py index 0e81cb6d4d19..95765bfbe0f2 100644 --- a/vllm/multimodal/cache.py +++ b/vllm/multimodal/cache.py @@ -380,8 +380,9 @@ def _enable_processor_cache( def _enable_ipc_cache(vllm_config: "VllmConfig") -> bool: parallel_config = vllm_config.parallel_config - supports_ipc_cache = (parallel_config.data_parallel_size == 1 - or parallel_config.data_parallel_external_lb) + supports_ipc_cache = (parallel_config.api_process_count == 1 + and (parallel_config.data_parallel_size == 1 + or parallel_config.data_parallel_external_lb)) return supports_ipc_cache diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 079dd9a7d38d..54231cebea20 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -772,6 +772,7 @@ def __init__(self, client_addresses=client_addresses, ) + self.client_count = client_count self.client_index = client_index self.outputs_queue = asyncio.Queue[Union[EngineCoreOutputs, Exception]]() diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index b5750c82db02..fb8b1ba4ac2a 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -123,8 +123,8 @@ def __init__( target_server_fn: Callable, listen_address: str, sock: Any, - args: argparse.Namespace, num_servers: int, + args_per_server: list[argparse.Namespace], input_addresses: list[str], output_addresses: list[str], stats_update_address: Optional[str] = None, @@ -135,35 +135,41 @@ def __init__( target_server_fn: Function to call for each API server process listen_address: Address to listen for client connections sock: Socket for client connections - args: Command line arguments num_servers: Number of API server processes to start + args_per_server: Command line arguments for each API server input_addresses: Input addresses for each API server output_addresses: Output addresses for each API server stats_update_address: Optional stats update address """ + if len(args_per_server) != num_servers: + raise ValueError(f"Incorrect {len(args_per_server)=}") + if len(input_addresses) != num_servers: + raise ValueError(f"Incorrect {len(input_addresses)=}") + if len(output_addresses) != num_servers: + raise ValueError(f"Incorrect {len(output_addresses)=}") + self.listen_address = listen_address self.sock = sock - self.args = args # Start API servers spawn_context = multiprocessing.get_context("spawn") self.processes: list[BaseProcess] = [] - for i, in_addr, out_addr in zip(range(num_servers), input_addresses, - output_addresses): + for i in range(num_servers): client_config = { - "input_address": in_addr, - "output_address": out_addr, + "input_address": input_addresses[i], + "output_address": output_addresses[i], "client_count": num_servers, - "client_index": i + "client_index": i, } if stats_update_address is not None: client_config["stats_update_address"] = stats_update_address - proc = spawn_context.Process(target=target_server_fn, - name=f"ApiServer_{i}", - args=(listen_address, sock, args, - client_config)) + proc = spawn_context.Process( + target=target_server_fn, + name=f"ApiServer_{i}", + args=(listen_address, sock, args_per_server[i], client_config), + ) self.processes.append(proc) proc.start() From 5ff210dc887b454c4cb0e90758d35a50f0259116 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 07:18:13 +0000 Subject: [PATCH 02/23] Tests Signed-off-by: DarkLight1337 --- tests/v1/test_external_lb_dp.py | 34 ++++++++++++++++++++++++---- tests/v1/test_hybrid_lb_dp.py | 36 +++++++++++++++++++++++++----- tests/v1/test_internal_lb_dp.py | 39 +++++++++++++++++++++++++++------ 3 files changed, 93 insertions(+), 16 deletions(-) diff --git a/tests/v1/test_external_lb_dp.py b/tests/v1/test_external_lb_dp.py index 4a5c47fead58..6e7845f407ae 100644 --- a/tests/v1/test_external_lb_dp.py +++ b/tests/v1/test_external_lb_dp.py @@ -9,6 +9,7 @@ import openai # use the official client for correctness check import pytest import pytest_asyncio +import requests from tests.utils import RemoteOpenAIServer from vllm.platforms import current_platform @@ -70,6 +71,8 @@ def start_server(r: int, sargs: list[str]): sargs, auto_port=False, env_dict={ + "VLLM_SERVER_DEV_MODE": + "1", current_platform.device_control_env_var: ",".join( str( @@ -127,11 +130,18 @@ def default_server_args(): @pytest.fixture(scope="module", params=[1, 4]) -def servers(request, default_server_args): +def server_manager(request, default_server_args): api_server_count = request.param - with ExternalLBServerManager(MODEL_NAME, DP_SIZE, api_server_count, - default_server_args) as server_list: - yield server_list + server_manager = ExternalLBServerManager(MODEL_NAME, DP_SIZE, + api_server_count, + default_server_args) + + with server_manager: + yield server_manager + + +def servers(server_manager): + return server_manager.servers @pytest_asyncio.fixture @@ -144,6 +154,22 @@ async def clients(servers: list[tuple[RemoteOpenAIServer, list[str]]]): ] +def test_external_lb_server_info(server_manager): + servers = server_manager.servers + api_server_count = server_manager.api_server_count + + for i, (server, _) in enumerate(servers): + response = requests.get(server.url_for("/server_info")) + response.raise_for_status() + + vllm_config = response.json() + parallel_config = vllm_config["parallel_config"] + + assert parallel_config[ + "api_process_count"] == api_server_count, f"Failed ({i=})" + assert parallel_config["api_process_rank"] == 0, f"Failed ({i=})" + + @pytest.mark.asyncio @pytest.mark.parametrize( "model_name", diff --git a/tests/v1/test_hybrid_lb_dp.py b/tests/v1/test_hybrid_lb_dp.py index 293b1257be6b..1fa8597d4713 100644 --- a/tests/v1/test_hybrid_lb_dp.py +++ b/tests/v1/test_hybrid_lb_dp.py @@ -9,6 +9,7 @@ import openai # use the official client for correctness check import pytest import pytest_asyncio +import requests from tests.utils import RemoteOpenAIServer from tests.v1.test_utils import check_request_balancing @@ -92,6 +93,8 @@ def start_server(node: int, sargs: list[str]): sargs, auto_port=False, env_dict={ + "VLLM_SERVER_DEV_MODE": + "1", current_platform.device_control_env_var: ",".join( str( @@ -150,12 +153,19 @@ def default_server_args(): @pytest.fixture(scope="module", params=[1, 4]) -def servers(request, default_server_args): +def server_manager(request, default_server_args): api_server_count = request.param - with HybridLBServerManager(MODEL_NAME, DP_SIZE, api_server_count, - default_server_args, DP_SIZE_LOCAL, - TP_SIZE) as server_list: - yield server_list + server_manager = HybridLBServerManager(MODEL_NAME, DP_SIZE, + api_server_count, + default_server_args, DP_SIZE_LOCAL, + TP_SIZE) + + with server_manager: + yield server_manager + + +def servers(server_manager): + return server_manager.servers @pytest_asyncio.fixture @@ -168,6 +178,22 @@ async def clients(servers: list[tuple[RemoteOpenAIServer, list[str]]]): ] +def test_hybrid_dp_server_info(server_manager): + servers = server_manager.servers + api_server_count = server_manager.api_server_count + + for i, (server, _) in enumerate(servers): + response = requests.get(server.url_for("/server_info")) + response.raise_for_status() + + vllm_config = response.json() + parallel_config = vllm_config["parallel_config"] + + assert parallel_config[ + "api_process_count"] == api_server_count, f"Failed ({i=})" + assert parallel_config["api_process_rank"] == i, f"Failed ({i=})" + + @pytest.mark.asyncio @pytest.mark.parametrize( "model_name", diff --git a/tests/v1/test_internal_lb_dp.py b/tests/v1/test_internal_lb_dp.py index 2b031865cad7..373a8ab22eab 100644 --- a/tests/v1/test_internal_lb_dp.py +++ b/tests/v1/test_internal_lb_dp.py @@ -10,6 +10,7 @@ import openai # use the official client for correctness check import pytest import pytest_asyncio +import requests from tests.utils import RemoteOpenAIServer from tests.v1.test_utils import check_request_balancing @@ -230,6 +231,8 @@ def start_engines_server(): engines_server_args, auto_port=False, env_dict={ + "VLLM_SERVER_DEV_MODE": + "1", current_platform.device_control_env_var: ",".join( str( @@ -293,14 +296,20 @@ def default_server_args(): @pytest.fixture(scope="module", params=[1, 4]) -def servers(request, default_server_args): +def server_manager(request, default_server_args): api_server_count = request.param - with MultinodeInternalLBServerManager(MODEL_NAME, DP_SIZE, - api_server_count, - default_server_args, - DP_SIZE // NUM_NODES, - TP_SIZE) as server_list: - yield server_list + server_manager = MultinodeInternalLBServerManager(MODEL_NAME, DP_SIZE, + api_server_count, + default_server_args, + DP_SIZE // NUM_NODES, + TP_SIZE) + + with server_manager: + yield server_manager + + +def servers(server_manager): + return server_manager.servers @pytest.fixture(scope="module", params=[1, 4]) @@ -331,6 +340,22 @@ async def api_only_client(api_only_servers: list[tuple[RemoteOpenAIServer, yield client +def test_multinode_dp_server_info(server_manager): + servers = server_manager.servers + api_server_count = server_manager.api_server_count + + for i, (server, _) in enumerate(servers): + response = requests.get(server.url_for("/server_info")) + response.raise_for_status() + + vllm_config = response.json() + parallel_config = vllm_config["parallel_config"] + + assert parallel_config[ + "api_process_count"] == api_server_count, f"Failed ({i=})" + assert parallel_config["api_process_rank"] == i, f"Failed ({i=})" + + @pytest.mark.asyncio @pytest.mark.parametrize( "model_name", From ed761704663772878b3bfa5ed8ed306980992ccc Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 07:24:11 +0000 Subject: [PATCH 03/23] Update Signed-off-by: DarkLight1337 --- tests/v1/test_external_lb_dp.py | 5 ++++- tests/v1/test_hybrid_lb_dp.py | 5 ++++- tests/v1/test_internal_lb_dp.py | 5 ++++- vllm/multimodal/cache.py | 6 +++--- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/v1/test_external_lb_dp.py b/tests/v1/test_external_lb_dp.py index 6e7845f407ae..7a774ef8a22b 100644 --- a/tests/v1/test_external_lb_dp.py +++ b/tests/v1/test_external_lb_dp.py @@ -159,7 +159,7 @@ def test_external_lb_server_info(server_manager): api_server_count = server_manager.api_server_count for i, (server, _) in enumerate(servers): - response = requests.get(server.url_for("/server_info")) + response = requests.get(server.url_for("server_info")) response.raise_for_status() vllm_config = response.json() @@ -169,6 +169,9 @@ def test_external_lb_server_info(server_manager): "api_process_count"] == api_server_count, f"Failed ({i=})" assert parallel_config["api_process_rank"] == 0, f"Failed ({i=})" + # Logging in case a non-assert exception occurs + print(f"Passed ({i=})") + @pytest.mark.asyncio @pytest.mark.parametrize( diff --git a/tests/v1/test_hybrid_lb_dp.py b/tests/v1/test_hybrid_lb_dp.py index 1fa8597d4713..fa9f645ad2af 100644 --- a/tests/v1/test_hybrid_lb_dp.py +++ b/tests/v1/test_hybrid_lb_dp.py @@ -183,7 +183,7 @@ def test_hybrid_dp_server_info(server_manager): api_server_count = server_manager.api_server_count for i, (server, _) in enumerate(servers): - response = requests.get(server.url_for("/server_info")) + response = requests.get(server.url_for("server_info")) response.raise_for_status() vllm_config = response.json() @@ -193,6 +193,9 @@ def test_hybrid_dp_server_info(server_manager): "api_process_count"] == api_server_count, f"Failed ({i=})" assert parallel_config["api_process_rank"] == i, f"Failed ({i=})" + # Logging in case a non-assert exception occurs + print(f"Passed ({i=})") + @pytest.mark.asyncio @pytest.mark.parametrize( diff --git a/tests/v1/test_internal_lb_dp.py b/tests/v1/test_internal_lb_dp.py index 373a8ab22eab..939e8e060a8e 100644 --- a/tests/v1/test_internal_lb_dp.py +++ b/tests/v1/test_internal_lb_dp.py @@ -345,7 +345,7 @@ def test_multinode_dp_server_info(server_manager): api_server_count = server_manager.api_server_count for i, (server, _) in enumerate(servers): - response = requests.get(server.url_for("/server_info")) + response = requests.get(server.url_for("server_info")) response.raise_for_status() vllm_config = response.json() @@ -355,6 +355,9 @@ def test_multinode_dp_server_info(server_manager): "api_process_count"] == api_server_count, f"Failed ({i=})" assert parallel_config["api_process_rank"] == i, f"Failed ({i=})" + # Logging in case a non-assert exception occurs + print(f"Passed ({i=})") + @pytest.mark.asyncio @pytest.mark.parametrize( diff --git a/vllm/multimodal/cache.py b/vllm/multimodal/cache.py index 95765bfbe0f2..6a3a63199a67 100644 --- a/vllm/multimodal/cache.py +++ b/vllm/multimodal/cache.py @@ -380,9 +380,9 @@ def _enable_processor_cache( def _enable_ipc_cache(vllm_config: "VllmConfig") -> bool: parallel_config = vllm_config.parallel_config - supports_ipc_cache = (parallel_config.api_process_count == 1 - and (parallel_config.data_parallel_size == 1 - or parallel_config.data_parallel_external_lb)) + supports_ipc_cache = ((parallel_config.api_process_count == 1 + and parallel_config.data_parallel_size == 1) + or parallel_config.data_parallel_external_lb) return supports_ipc_cache From 90703bd9d62ff4b9419c42af365be350660edbab Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 08:11:22 +0000 Subject: [PATCH 04/23] Update and fix tests Signed-off-by: DarkLight1337 --- tests/v1/test_external_lb_dp.py | 27 ++++++++++++------- tests/v1/test_hybrid_lb_dp.py | 27 ++++++++++++------- tests/v1/test_internal_lb_dp.py | 39 ++++++++++++++++----------- vllm/entrypoints/openai/api_server.py | 21 ++++++++++++--- 4 files changed, 74 insertions(+), 40 deletions(-) diff --git a/tests/v1/test_external_lb_dp.py b/tests/v1/test_external_lb_dp.py index 7a774ef8a22b..7820600738ab 100644 --- a/tests/v1/test_external_lb_dp.py +++ b/tests/v1/test_external_lb_dp.py @@ -154,23 +154,30 @@ async def clients(servers: list[tuple[RemoteOpenAIServer, list[str]]]): ] +def _get_parallel_config(server: RemoteOpenAIServer): + response = requests.get(server.url_for("server_info?config_format=json")) + response.raise_for_status() + + vllm_config = response.json()["vllm_config"] + return vllm_config["parallel_config"] + + def test_external_lb_server_info(server_manager): servers = server_manager.servers api_server_count = server_manager.api_server_count for i, (server, _) in enumerate(servers): - response = requests.get(server.url_for("server_info")) - response.raise_for_status() - - vllm_config = response.json() - parallel_config = vllm_config["parallel_config"] + print(f"Testing {i=}") - assert parallel_config[ - "api_process_count"] == api_server_count, f"Failed ({i=})" - assert parallel_config["api_process_rank"] == 0, f"Failed ({i=})" + # Each request will hit one of the API servers + parallel_configs = [_get_parallel_config(server) for _ in range(50)] + api_process_counts = [c["api_process_count"] for c in parallel_configs] + api_process_ranks = [c["api_process_rank"] for c in parallel_configs] - # Logging in case a non-assert exception occurs - print(f"Passed ({i=})") + assert all(c == api_server_count + for c in api_process_counts), api_process_counts + assert all(0 <= r < api_server_count + for r in api_process_ranks), api_process_ranks @pytest.mark.asyncio diff --git a/tests/v1/test_hybrid_lb_dp.py b/tests/v1/test_hybrid_lb_dp.py index fa9f645ad2af..339abe18c979 100644 --- a/tests/v1/test_hybrid_lb_dp.py +++ b/tests/v1/test_hybrid_lb_dp.py @@ -178,23 +178,30 @@ async def clients(servers: list[tuple[RemoteOpenAIServer, list[str]]]): ] +def _get_parallel_config(server: RemoteOpenAIServer): + response = requests.get(server.url_for("server_info?config_format=json")) + response.raise_for_status() + + vllm_config = response.json()["vllm_config"] + return vllm_config["parallel_config"] + + def test_hybrid_dp_server_info(server_manager): servers = server_manager.servers api_server_count = server_manager.api_server_count for i, (server, _) in enumerate(servers): - response = requests.get(server.url_for("server_info")) - response.raise_for_status() - - vllm_config = response.json() - parallel_config = vllm_config["parallel_config"] + print(f"Testing {i=}") - assert parallel_config[ - "api_process_count"] == api_server_count, f"Failed ({i=})" - assert parallel_config["api_process_rank"] == i, f"Failed ({i=})" + # Each request will hit one of the API servers + parallel_configs = [_get_parallel_config(server) for _ in range(50)] + api_process_counts = [c["api_process_count"] for c in parallel_configs] + api_process_ranks = [c["api_process_rank"] for c in parallel_configs] - # Logging in case a non-assert exception occurs - print(f"Passed ({i=})") + assert all(c == api_server_count + for c in api_process_counts), api_process_counts + assert all(0 <= r < api_server_count + for r in api_process_ranks), api_process_ranks @pytest.mark.asyncio diff --git a/tests/v1/test_internal_lb_dp.py b/tests/v1/test_internal_lb_dp.py index 939e8e060a8e..66f5aca14c57 100644 --- a/tests/v1/test_internal_lb_dp.py +++ b/tests/v1/test_internal_lb_dp.py @@ -102,6 +102,8 @@ def start_server(sidx: int, r: int, sargs: list[str]): sargs, auto_port=False, env_dict={ + "VLLM_SERVER_DEV_MODE": + "1", current_platform.device_control_env_var: ",".join( str( @@ -215,7 +217,10 @@ def start_api_server(): self.model_name, api_server_args, auto_port=False, - env_dict={}) # No GPUs needed for API-only server + env_dict={ + "VLLM_SERVER_DEV_MODE": "1", + # No GPUs needed for API-only server + }) server.__enter__() print(f"API-only server started successfully with " f"{self.api_server_count} API servers") @@ -231,8 +236,6 @@ def start_engines_server(): engines_server_args, auto_port=False, env_dict={ - "VLLM_SERVER_DEV_MODE": - "1", current_platform.device_control_env_var: ",".join( str( @@ -340,23 +343,27 @@ async def api_only_client(api_only_servers: list[tuple[RemoteOpenAIServer, yield client -def test_multinode_dp_server_info(server_manager): - servers = server_manager.servers - api_server_count = server_manager.api_server_count +def _get_parallel_config(server: RemoteOpenAIServer): + response = requests.get(server.url_for("server_info?config_format=json")) + response.raise_for_status() - for i, (server, _) in enumerate(servers): - response = requests.get(server.url_for("server_info")) - response.raise_for_status() + vllm_config = response.json()["vllm_config"] + return vllm_config["parallel_config"] - vllm_config = response.json() - parallel_config = vllm_config["parallel_config"] - assert parallel_config[ - "api_process_count"] == api_server_count, f"Failed ({i=})" - assert parallel_config["api_process_rank"] == i, f"Failed ({i=})" +def test_multinode_dp_server_info(server_manager): + head_server = server_manager.servers[0][0] + api_server_count = server_manager.api_server_count + + # Each request will hit one of the API servers + parallel_configs = [_get_parallel_config(head_server) for _ in range(50)] + api_process_counts = [c["api_process_count"] for c in parallel_configs] + api_process_ranks = [c["api_process_rank"] for c in parallel_configs] - # Logging in case a non-assert exception occurs - print(f"Passed ({i=})") + assert all(c == api_server_count + for c in api_process_counts), api_process_counts + assert all(0 <= r < api_server_count + for r in api_process_ranks), api_process_ranks @pytest.mark.asyncio diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 9a2470649c8d..3a2039eea45f 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -19,13 +19,14 @@ from contextlib import asynccontextmanager from functools import partial from http import HTTPStatus -from typing import Annotated, Any, Callable, Optional +from typing import Annotated, Any, Callable, Literal, Optional import prometheus_client import pydantic import regex as re import uvloop -from fastapi import APIRouter, Depends, FastAPI, Form, HTTPException, Request +from fastapi import (APIRouter, Depends, FastAPI, Form, HTTPException, Query, + Request) from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, Response, StreamingResponse @@ -1038,9 +1039,21 @@ async def do_rerank_v2(request: RerankRequest, raw_request: Request): logger.warning("SECURITY WARNING: Development endpoints are enabled! " "This should NOT be used in production!") + PydanticVllmConfig = pydantic.TypeAdapter(VllmConfig) + @router.get("/server_info") - async def show_server_info(raw_request: Request): - server_info = {"vllm_config": str(raw_request.app.state.vllm_config)} + async def show_server_info( + raw_request: Request, + config_format: Annotated[Literal["text", "json"], + Query()] = "text", + ): + vllm_config: VllmConfig = raw_request.app.state.vllm_config + server_info = { + "vllm_config": + str(vllm_config) + if config_format == "text" else PydanticVllmConfig.dump_python( + vllm_config, mode="json", fallback=str) + } return JSONResponse(content=server_info) @router.post("/reset_prefix_cache") From 3f97be411d5eaa0da98b7febae44db5092e9cd01 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 08:19:49 +0000 Subject: [PATCH 05/23] Update docstring Signed-off-by: DarkLight1337 --- vllm/config/parallel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index b7f7231ef6da..a3aba2d01df8 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -98,9 +98,9 @@ class ParallelConfig: --data-parallel-start-rank.""" api_process_count: int = 1 - """[Internal] The number of API processes initialized.""" + """[Internal CLI arg] The number of API processes initialized.""" api_process_rank: int = 0 - """[Internal] The rank of this API process.""" + """[Internal CLI arg] The rank of this API process.""" enable_expert_parallel: bool = False """Use expert parallelism instead of tensor parallelism for MoE layers.""" From 91ea959ad379b556b0032cf63ccc6ca8831a9b75 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 08:30:43 +0000 Subject: [PATCH 06/23] Optimize Signed-off-by: DarkLight1337 --- tests/v1/test_external_lb_dp.py | 7 ++++++- tests/v1/test_hybrid_lb_dp.py | 7 ++++++- tests/v1/test_internal_lb_dp.py | 7 ++++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/tests/v1/test_external_lb_dp.py b/tests/v1/test_external_lb_dp.py index 7820600738ab..328079b55519 100644 --- a/tests/v1/test_external_lb_dp.py +++ b/tests/v1/test_external_lb_dp.py @@ -170,7 +170,12 @@ def test_external_lb_server_info(server_manager): print(f"Testing {i=}") # Each request will hit one of the API servers - parallel_configs = [_get_parallel_config(server) for _ in range(50)] + # `n_reqs` is set so that there is a good chance each server + # receives at least one request + n_reqs = 2 * api_server_count * api_server_count + parallel_configs = [ + _get_parallel_config(server) for _ in range(n_reqs) + ] api_process_counts = [c["api_process_count"] for c in parallel_configs] api_process_ranks = [c["api_process_rank"] for c in parallel_configs] diff --git a/tests/v1/test_hybrid_lb_dp.py b/tests/v1/test_hybrid_lb_dp.py index 339abe18c979..f09e4dedddae 100644 --- a/tests/v1/test_hybrid_lb_dp.py +++ b/tests/v1/test_hybrid_lb_dp.py @@ -194,7 +194,12 @@ def test_hybrid_dp_server_info(server_manager): print(f"Testing {i=}") # Each request will hit one of the API servers - parallel_configs = [_get_parallel_config(server) for _ in range(50)] + # `n_reqs` is set so that there is a good chance each server + # receives at least one request + n_reqs = 2 * api_server_count * api_server_count + parallel_configs = [ + _get_parallel_config(server) for _ in range(n_reqs) + ] api_process_counts = [c["api_process_count"] for c in parallel_configs] api_process_ranks = [c["api_process_rank"] for c in parallel_configs] diff --git a/tests/v1/test_internal_lb_dp.py b/tests/v1/test_internal_lb_dp.py index 66f5aca14c57..ead6b3f2c6ec 100644 --- a/tests/v1/test_internal_lb_dp.py +++ b/tests/v1/test_internal_lb_dp.py @@ -356,7 +356,12 @@ def test_multinode_dp_server_info(server_manager): api_server_count = server_manager.api_server_count # Each request will hit one of the API servers - parallel_configs = [_get_parallel_config(head_server) for _ in range(50)] + # `n_reqs` is set so that there is a good chance each server + # receives at least one request + n_reqs = 2 * api_server_count * api_server_count + parallel_configs = [ + _get_parallel_config(head_server) for _ in range(n_reqs) + ] api_process_counts = [c["api_process_count"] for c in parallel_configs] api_process_ranks = [c["api_process_rank"] for c in parallel_configs] From 6d0c0408833341c73102563ec6e6602823705761 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 08:32:50 +0000 Subject: [PATCH 07/23] Comment Signed-off-by: DarkLight1337 --- vllm/entrypoints/openai/api_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 3a2039eea45f..6fdf1261eea7 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -1053,6 +1053,7 @@ async def show_server_info( str(vllm_config) if config_format == "text" else PydanticVllmConfig.dump_python( vllm_config, mode="json", fallback=str) + # fallback=str is needed to handle e.g. torch.dtype } return JSONResponse(content=server_info) From 69c9ff09fdad12f0ff5471ec8edb03907acfed25 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 08:39:38 +0000 Subject: [PATCH 08/23] Improve error message Signed-off-by: DarkLight1337 --- vllm/v1/utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index fb8b1ba4ac2a..841ffa2c9da8 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -142,11 +142,14 @@ def __init__( stats_update_address: Optional stats update address """ if len(args_per_server) != num_servers: - raise ValueError(f"Incorrect {len(args_per_server)=}") + raise ValueError(f"Incorrect {len(args_per_server)=}, " + f"expected {num_servers}") if len(input_addresses) != num_servers: - raise ValueError(f"Incorrect {len(input_addresses)=}") + raise ValueError(f"Incorrect {len(input_addresses)=}, " + f"expected {num_servers}") if len(output_addresses) != num_servers: - raise ValueError(f"Incorrect {len(output_addresses)=}") + raise ValueError(f"Incorrect {len(output_addresses)=}, " + f"expected {num_servers}") self.listen_address = listen_address self.sock = sock From 8e3ea32495710e7b067393510974daedcb72f083 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 08:43:21 +0000 Subject: [PATCH 09/23] Update docstring Signed-off-by: DarkLight1337 --- vllm/config/parallel.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index a3aba2d01df8..05561933c890 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -98,9 +98,21 @@ class ParallelConfig: --data-parallel-start-rank.""" api_process_count: int = 1 - """[Internal CLI arg] The number of API processes initialized.""" + """ + The number of API processes initialized. + + Note: + This is an internal config that should only be set by API server + scale-out. + """ api_process_rank: int = 0 - """[Internal CLI arg] The rank of this API process.""" + """ + The rank of this API process. + + Note: + This is an internal config that should only be set by API server + scale-out. + """ enable_expert_parallel: bool = False """Use expert parallelism instead of tensor parallelism for MoE layers.""" From 1dd58943ceb3af508f67689d4ac63a74387c4dba Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 10:04:59 +0000 Subject: [PATCH 10/23] Fixture Signed-off-by: DarkLight1337 --- tests/v1/test_external_lb_dp.py | 1 + tests/v1/test_hybrid_lb_dp.py | 1 + tests/v1/test_internal_lb_dp.py | 1 + 3 files changed, 3 insertions(+) diff --git a/tests/v1/test_external_lb_dp.py b/tests/v1/test_external_lb_dp.py index 328079b55519..54e00c889b8e 100644 --- a/tests/v1/test_external_lb_dp.py +++ b/tests/v1/test_external_lb_dp.py @@ -140,6 +140,7 @@ def server_manager(request, default_server_args): yield server_manager +@pytest.fixture def servers(server_manager): return server_manager.servers diff --git a/tests/v1/test_hybrid_lb_dp.py b/tests/v1/test_hybrid_lb_dp.py index f09e4dedddae..3c35ea039ed4 100644 --- a/tests/v1/test_hybrid_lb_dp.py +++ b/tests/v1/test_hybrid_lb_dp.py @@ -164,6 +164,7 @@ def server_manager(request, default_server_args): yield server_manager +@pytest.fixture def servers(server_manager): return server_manager.servers diff --git a/tests/v1/test_internal_lb_dp.py b/tests/v1/test_internal_lb_dp.py index ead6b3f2c6ec..e67c2af5a12b 100644 --- a/tests/v1/test_internal_lb_dp.py +++ b/tests/v1/test_internal_lb_dp.py @@ -311,6 +311,7 @@ def server_manager(request, default_server_args): yield server_manager +@pytest.fixture def servers(server_manager): return server_manager.servers From d06434faa86b6048e4eedae59b83839025c84643 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 15:51:18 +0000 Subject: [PATCH 11/23] Address comments in serve.py Signed-off-by: DarkLight1337 --- .../test_api_server_process_manager.py | 4 +-- vllm/entrypoints/cli/serve.py | 18 ++++------ vllm/v1/utils.py | 33 +++++++------------ 3 files changed, 20 insertions(+), 35 deletions(-) diff --git a/tests/entrypoints/test_api_server_process_manager.py b/tests/entrypoints/test_api_server_process_manager.py index 882382a38543..a39ba6402668 100644 --- a/tests/entrypoints/test_api_server_process_manager.py +++ b/tests/entrypoints/test_api_server_process_manager.py @@ -36,10 +36,10 @@ def api_server_args(): "localhost:8000", "sock": sock, + "args": + "test_args", # Simple string to avoid pickling issues "num_servers": 3, - "args_per_server": - ["test_args"] * 3, # Simple string to avoid pickling issues "input_addresses": [ "tcp://127.0.0.1:5001", "tcp://127.0.0.1:5002", "tcp://127.0.0.1:5003" diff --git a/vllm/entrypoints/cli/serve.py b/vllm/entrypoints/cli/serve.py index 2ebffe9cb690..216bcfb1da40 100644 --- a/vllm/entrypoints/cli/serve.py +++ b/vllm/entrypoints/cli/serve.py @@ -3,7 +3,6 @@ import argparse import signal -from copy import deepcopy from typing import Optional import uvloop @@ -140,14 +139,11 @@ def run_multi_api_server(args: argparse.Namespace): assert num_api_servers > 0 # No need to set api_process_rank for EngineCore processes - args.api_process_count = args.api_server_count + args.api_process_count = num_api_servers if num_api_servers > 1: setup_multiprocess_prometheus() - # Not compatible with API server scale-out - args.mm_processor_cache_gb = 0 - listen_address, sock = setup_server(args) engine_args = vllm.AsyncEngineArgs.from_cli_args(args) @@ -171,11 +167,6 @@ def run_multi_api_server(args: argparse.Namespace): hybrid_dp_lb = parallel_config.data_parallel_hybrid_lb assert external_dp_lb or hybrid_dp_lb or dp_rank == 0 - # Set api_process_rank for API server processes - args_per_server = [deepcopy(args) for _ in range(num_api_servers)] - for server_idx in range(num_api_servers): - args_per_server[server_idx].api_process_rank = server_idx - api_server_manager: Optional[APIServerProcessManager] = None with launch_core_engines(vllm_config, executor_class, log_stats, @@ -187,7 +178,7 @@ def run_multi_api_server(args: argparse.Namespace): target_server_fn=run_api_server_worker_proc, listen_address=listen_address, sock=sock, - args_per_server=args_per_server, + args=args, num_servers=num_api_servers, input_addresses=addresses.inputs, output_addresses=addresses.outputs, @@ -223,9 +214,12 @@ def run_api_server_worker_proc(listen_address, client_config=None, **uvicorn_kwargs) -> None: """Entrypoint for individual API server worker processes.""" + client_config = client_config or {} + + args.api_process_rank = server_index = client_config.get("client_index", 0) + args.api_process_count = client_config.get("client_count", 1) # Set process title and add process-specific prefix to stdout and stderr. - server_index = client_config.get("client_index", 0) if client_config else 0 set_process_title("APIServer", str(server_index)) decorate_logs() diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index 841ffa2c9da8..03fe8a794f56 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -123,8 +123,8 @@ def __init__( target_server_fn: Callable, listen_address: str, sock: Any, + args: argparse.Namespace, num_servers: int, - args_per_server: list[argparse.Namespace], input_addresses: list[str], output_addresses: list[str], stats_update_address: Optional[str] = None, @@ -135,44 +135,35 @@ def __init__( target_server_fn: Function to call for each API server process listen_address: Address to listen for client connections sock: Socket for client connections - num_servers: Number of API server processes to start args_per_server: Command line arguments for each API server + num_servers: Number of API server processes to start input_addresses: Input addresses for each API server output_addresses: Output addresses for each API server stats_update_address: Optional stats update address """ - if len(args_per_server) != num_servers: - raise ValueError(f"Incorrect {len(args_per_server)=}, " - f"expected {num_servers}") - if len(input_addresses) != num_servers: - raise ValueError(f"Incorrect {len(input_addresses)=}, " - f"expected {num_servers}") - if len(output_addresses) != num_servers: - raise ValueError(f"Incorrect {len(output_addresses)=}, " - f"expected {num_servers}") - self.listen_address = listen_address self.sock = sock + self.args = args # Start API servers spawn_context = multiprocessing.get_context("spawn") self.processes: list[BaseProcess] = [] - for i in range(num_servers): + for i, in_addr, out_addr in zip(range(num_servers), input_addresses, + output_addresses): client_config = { - "input_address": input_addresses[i], - "output_address": output_addresses[i], + "input_address": in_addr, + "output_address": out_addr, "client_count": num_servers, - "client_index": i, + "client_index": i } if stats_update_address is not None: client_config["stats_update_address"] = stats_update_address - proc = spawn_context.Process( - target=target_server_fn, - name=f"ApiServer_{i}", - args=(listen_address, sock, args_per_server[i], client_config), - ) + proc = spawn_context.Process(target=target_server_fn, + name=f"ApiServer_{i}", + args=(listen_address, sock, args, + client_config)) self.processes.append(proc) proc.start() From dac11709edb1f7b03e7609c15b71b953d3ac5136 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:04:49 +0000 Subject: [PATCH 12/23] Rename attributes to internal and validate Signed-off-by: DarkLight1337 --- tests/v1/test_external_lb_dp.py | 6 +++-- tests/v1/test_hybrid_lb_dp.py | 6 +++-- tests/v1/test_internal_lb_dp.py | 4 +-- vllm/config/parallel.py | 48 +++++++++++++++++++++------------ vllm/engine/arg_utils.py | 8 +++--- vllm/entrypoints/cli/serve.py | 9 ++++--- vllm/multimodal/cache.py | 2 +- 7 files changed, 51 insertions(+), 32 deletions(-) diff --git a/tests/v1/test_external_lb_dp.py b/tests/v1/test_external_lb_dp.py index 54e00c889b8e..862a76f3c4e2 100644 --- a/tests/v1/test_external_lb_dp.py +++ b/tests/v1/test_external_lb_dp.py @@ -177,8 +177,10 @@ def test_external_lb_server_info(server_manager): parallel_configs = [ _get_parallel_config(server) for _ in range(n_reqs) ] - api_process_counts = [c["api_process_count"] for c in parallel_configs] - api_process_ranks = [c["api_process_rank"] for c in parallel_configs] + api_process_counts = [ + c["_api_process_count"] for c in parallel_configs + ] + api_process_ranks = [c["_api_process_rank"] for c in parallel_configs] assert all(c == api_server_count for c in api_process_counts), api_process_counts diff --git a/tests/v1/test_hybrid_lb_dp.py b/tests/v1/test_hybrid_lb_dp.py index 3c35ea039ed4..552436f818d7 100644 --- a/tests/v1/test_hybrid_lb_dp.py +++ b/tests/v1/test_hybrid_lb_dp.py @@ -201,8 +201,10 @@ def test_hybrid_dp_server_info(server_manager): parallel_configs = [ _get_parallel_config(server) for _ in range(n_reqs) ] - api_process_counts = [c["api_process_count"] for c in parallel_configs] - api_process_ranks = [c["api_process_rank"] for c in parallel_configs] + api_process_counts = [ + c["_api_process_count"] for c in parallel_configs + ] + api_process_ranks = [c["_api_process_rank"] for c in parallel_configs] assert all(c == api_server_count for c in api_process_counts), api_process_counts diff --git a/tests/v1/test_internal_lb_dp.py b/tests/v1/test_internal_lb_dp.py index e67c2af5a12b..e965645711ee 100644 --- a/tests/v1/test_internal_lb_dp.py +++ b/tests/v1/test_internal_lb_dp.py @@ -363,8 +363,8 @@ def test_multinode_dp_server_info(server_manager): parallel_configs = [ _get_parallel_config(head_server) for _ in range(n_reqs) ] - api_process_counts = [c["api_process_count"] for c in parallel_configs] - api_process_ranks = [c["api_process_rank"] for c in parallel_configs] + api_process_counts = [c["_api_process_count"] for c in parallel_configs] + api_process_ranks = [c["_api_process_rank"] for c in parallel_configs] assert all(c == api_server_count for c in api_process_counts), api_process_counts diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index 05561933c890..4405b7fa5d5a 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -97,23 +97,6 @@ class ParallelConfig: between vLLM nodes/replicas. Set explicitly in conjunction with --data-parallel-start-rank.""" - api_process_count: int = 1 - """ - The number of API processes initialized. - - Note: - This is an internal config that should only be set by API server - scale-out. - """ - api_process_rank: int = 0 - """ - The rank of this API process. - - Note: - This is an internal config that should only be set by API server - scale-out. - """ - enable_expert_parallel: bool = False """Use expert parallelism instead of tensor parallelism for MoE layers.""" enable_eplb: bool = False @@ -188,6 +171,26 @@ class is dynamically inherited by the worker class. This is used to inject Set to be private as it's not intended to be configured by users. """ + _api_process_count: int = -1 + """ + The number of API processes initialized, or `-1` if API server scale-out + is not used. + + Note: + This is an internal config that is only valid for and + should only be set by API server scale-out. + """ + _api_process_rank: int = -1 + """ + The rank of this API process, or `-1` if API server scale-out + is not used. It is also `-1` for engine core processes + under API server scale-out. + + Note: + This is an internal config that is only valid for and + should only be set by API server scale-out. + """ + @property def world_size_across_dp(self) -> int: """world_size_across_dp is TPxPPxDP, it is the size of the world @@ -424,6 +427,17 @@ def __post_init__(self) -> None: if self.distributed_executor_backend is None and self.world_size == 1: self.distributed_executor_backend = "uni" + if self._api_process_count == -1 and self._api_process_rank != -1: + raise ValueError("`_api_process_rank` is an internal config " + "and should not be set by users") + + if (self._api_process_count != -1 and + not -1 <= self._api_process_rank < self._api_process_count): + raise ValueError( + "Invalid value of `_api_process_rank`. " + f"Expected to be `-1` or `[0, {self._api_process_count})`, " + f"but found: {self._api_process_rank}") + @property def use_ray(self) -> bool: return self.distributed_executor_backend == "ray" or ( diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 91c98f6e64b2..1ec322ba4f86 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -303,11 +303,11 @@ class EngineArgs: data_parallel_rpc_port: Optional[int] = None data_parallel_hybrid_lb: bool = False data_parallel_backend: str = ParallelConfig.data_parallel_backend - api_process_count: int = ParallelConfig.api_process_count - api_process_rank: int = ParallelConfig.api_process_rank enable_expert_parallel: bool = ParallelConfig.enable_expert_parallel eplb_config: EPLBConfig = get_field(ParallelConfig, "eplb_config") enable_eplb: bool = ParallelConfig.enable_eplb + _api_process_count: int = ParallelConfig._api_process_count + _api_process_rank: int = ParallelConfig._api_process_rank num_redundant_experts: int = EPLBConfig.num_redundant_experts eplb_window_size: int = EPLBConfig.window_size eplb_step_interval: int = EPLBConfig.step_interval @@ -1285,8 +1285,6 @@ def create_engine_config( data_parallel_rpc_port=data_parallel_rpc_port, data_parallel_backend=self.data_parallel_backend, data_parallel_hybrid_lb=self.data_parallel_hybrid_lb, - api_process_count=self.api_process_count, - api_process_rank=self.api_process_rank, enable_expert_parallel=self.enable_expert_parallel, enable_eplb=self.enable_eplb, eplb_config=self.eplb_config, @@ -1298,6 +1296,8 @@ def create_engine_config( distributed_executor_backend=self.distributed_executor_backend, worker_cls=self.worker_cls, worker_extension_cls=self.worker_extension_cls, + _api_process_count=self._api_process_count, + _api_process_rank=self._api_process_rank, ) speculative_config = self.create_speculative_config( diff --git a/vllm/entrypoints/cli/serve.py b/vllm/entrypoints/cli/serve.py index 216bcfb1da40..c1ae9726bbec 100644 --- a/vllm/entrypoints/cli/serve.py +++ b/vllm/entrypoints/cli/serve.py @@ -138,8 +138,8 @@ def run_multi_api_server(args: argparse.Namespace): num_api_servers: int = args.api_server_count assert num_api_servers > 0 - # No need to set api_process_rank for EngineCore processes - args.api_process_count = num_api_servers + args._api_process_count = num_api_servers + args._api_process_rank = -1 if num_api_servers > 1: setup_multiprocess_prometheus() @@ -216,8 +216,9 @@ def run_api_server_worker_proc(listen_address, """Entrypoint for individual API server worker processes.""" client_config = client_config or {} - args.api_process_rank = server_index = client_config.get("client_index", 0) - args.api_process_count = client_config.get("client_count", 1) + args._api_process_rank = server_index = client_config.get( + "client_index", 0) + args._api_process_count = client_config.get("client_count", 1) # Set process title and add process-specific prefix to stdout and stderr. set_process_title("APIServer", str(server_index)) diff --git a/vllm/multimodal/cache.py b/vllm/multimodal/cache.py index 6a3a63199a67..b4e368147fda 100644 --- a/vllm/multimodal/cache.py +++ b/vllm/multimodal/cache.py @@ -380,7 +380,7 @@ def _enable_processor_cache( def _enable_ipc_cache(vllm_config: "VllmConfig") -> bool: parallel_config = vllm_config.parallel_config - supports_ipc_cache = ((parallel_config.api_process_count == 1 + supports_ipc_cache = ((parallel_config._api_process_count == 1 and parallel_config.data_parallel_size == 1) or parallel_config.data_parallel_external_lb) From 3f62e0eb28c7495892156155361b303104560f7c Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:07:36 +0000 Subject: [PATCH 13/23] Fix Signed-off-by: DarkLight1337 --- vllm/config/parallel.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index 4405b7fa5d5a..24971147eaaa 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -171,15 +171,15 @@ class is dynamically inherited by the worker class. This is used to inject Set to be private as it's not intended to be configured by users. """ - _api_process_count: int = -1 + _api_process_count: int = 1 """ - The number of API processes initialized, or `-1` if API server scale-out - is not used. + The number of API processes initialized. Note: This is an internal config that is only valid for and should only be set by API server scale-out. """ + _api_process_rank: int = -1 """ The rank of this API process, or `-1` if API server scale-out @@ -427,11 +427,11 @@ def __post_init__(self) -> None: if self.distributed_executor_backend is None and self.world_size == 1: self.distributed_executor_backend = "uni" - if self._api_process_count == -1 and self._api_process_rank != -1: + if self._api_process_count == 1 and self._api_process_rank != -1: raise ValueError("`_api_process_rank` is an internal config " "and should not be set by users") - if (self._api_process_count != -1 and + if (self._api_process_count > 1 and not -1 <= self._api_process_rank < self._api_process_count): raise ValueError( "Invalid value of `_api_process_rank`. " From df9f9cbc5bf0834e5fa0eb46cc4b099ced216d38 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:15:11 +0000 Subject: [PATCH 14/23] Update Signed-off-by: DarkLight1337 --- vllm/entrypoints/cli/serve.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vllm/entrypoints/cli/serve.py b/vllm/entrypoints/cli/serve.py index c1ae9726bbec..345ca9bad009 100644 --- a/vllm/entrypoints/cli/serve.py +++ b/vllm/entrypoints/cli/serve.py @@ -216,12 +216,11 @@ def run_api_server_worker_proc(listen_address, """Entrypoint for individual API server worker processes.""" client_config = client_config or {} - args._api_process_rank = server_index = client_config.get( - "client_index", 0) args._api_process_count = client_config.get("client_count", 1) + args._api_process_rank = client_config.get("client_index", 0) # Set process title and add process-specific prefix to stdout and stderr. - set_process_title("APIServer", str(server_index)) + set_process_title("APIServer", str(args._api_process_rank)) decorate_logs() uvloop.run( From 0ec4e66776583722dc198765f75c424b1b44a25f Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:18:15 +0000 Subject: [PATCH 15/23] Push down Signed-off-by: DarkLight1337 --- vllm/entrypoints/cli/serve.py | 5 ++--- vllm/entrypoints/openai/api_server.py | 10 +++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/vllm/entrypoints/cli/serve.py b/vllm/entrypoints/cli/serve.py index 345ca9bad009..ccab442b72ce 100644 --- a/vllm/entrypoints/cli/serve.py +++ b/vllm/entrypoints/cli/serve.py @@ -216,11 +216,10 @@ def run_api_server_worker_proc(listen_address, """Entrypoint for individual API server worker processes.""" client_config = client_config or {} - args._api_process_count = client_config.get("client_count", 1) - args._api_process_rank = client_config.get("client_index", 0) + server_index = client_config.get("client_index", 0) # Set process title and add process-specific prefix to stdout and stderr. - set_process_title("APIServer", str(args._api_process_rank)) + set_process_title("APIServer", str(server_index)) decorate_logs() uvloop.run( diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 6fdf1261eea7..6bc5aab22db6 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -1950,12 +1950,14 @@ async def run_server_worker(listen_address, client_config=None, **uvicorn_kwargs) -> None: """Run a single API server worker.""" + client_config = client_config or {} + + args._api_process_count = client_config.get("client_count", 1) + args._api_process_rank = client_config.get("client_index", 0) if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3: ToolParserManager.import_tool_parser(args.tool_parser_plugin) - server_index = client_config.get("client_index", 0) if client_config else 0 - # Load logging config for uvicorn if specified log_config = load_log_config(args.log_config_file) if log_config is not None: @@ -1971,7 +1973,9 @@ async def run_server_worker(listen_address, vllm_config = await engine_client.get_vllm_config() await init_app_state(engine_client, vllm_config, app.state, args) - logger.info("Starting vLLM API server %d on %s", server_index, + print(vllm_config) + logger.info("Starting vLLM API server %d/%d on %s", + args._api_process_rank, args._api_server_count, listen_address) shutdown_task = await serve_http( app, From e500a9bce4584ae0188986a21c93198376342075 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:20:48 +0000 Subject: [PATCH 16/23] Update Signed-off-by: DarkLight1337 --- vllm/config/parallel.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index 24971147eaaa..814cef03f987 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -180,10 +180,9 @@ class is dynamically inherited by the worker class. This is used to inject should only be set by API server scale-out. """ - _api_process_rank: int = -1 + _api_process_rank: int = 0 """ - The rank of this API process, or `-1` if API server scale-out - is not used. It is also `-1` for engine core processes + The rank of this API process, or `-1` for engine core processes under API server scale-out. Note: @@ -427,10 +426,6 @@ def __post_init__(self) -> None: if self.distributed_executor_backend is None and self.world_size == 1: self.distributed_executor_backend = "uni" - if self._api_process_count == 1 and self._api_process_rank != -1: - raise ValueError("`_api_process_rank` is an internal config " - "and should not be set by users") - if (self._api_process_count > 1 and not -1 <= self._api_process_rank < self._api_process_count): raise ValueError( From 36fb875c092e77f0b8aa701d14d050a86cd4a500 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:23:32 +0000 Subject: [PATCH 17/23] Fix Signed-off-by: DarkLight1337 --- vllm/v1/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index 03fe8a794f56..b5750c82db02 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -135,7 +135,7 @@ def __init__( target_server_fn: Function to call for each API server process listen_address: Address to listen for client connections sock: Socket for client connections - args_per_server: Command line arguments for each API server + args: Command line arguments num_servers: Number of API server processes to start input_addresses: Input addresses for each API server output_addresses: Output addresses for each API server From e08e7b788092265b56c382caf1c8e8682be7dbb3 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:25:03 +0000 Subject: [PATCH 18/23] Try deepcopy Signed-off-by: DarkLight1337 --- vllm/entrypoints/cli/serve.py | 1 - vllm/entrypoints/openai/api_server.py | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/entrypoints/cli/serve.py b/vllm/entrypoints/cli/serve.py index ccab442b72ce..0d2b0e5057c9 100644 --- a/vllm/entrypoints/cli/serve.py +++ b/vllm/entrypoints/cli/serve.py @@ -215,7 +215,6 @@ def run_api_server_worker_proc(listen_address, **uvicorn_kwargs) -> None: """Entrypoint for individual API server worker processes.""" client_config = client_config or {} - server_index = client_config.get("client_index", 0) # Set process title and add process-specific prefix to stdout and stderr. diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 6bc5aab22db6..03baae1befb1 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -17,6 +17,7 @@ from argparse import Namespace from collections.abc import AsyncIterator, Awaitable from contextlib import asynccontextmanager +from copy import deepcopy from functools import partial from http import HTTPStatus from typing import Annotated, Any, Callable, Literal, Optional @@ -1952,6 +1953,7 @@ async def run_server_worker(listen_address, """Run a single API server worker.""" client_config = client_config or {} + args = deepcopy(args) args._api_process_count = client_config.get("client_count", 1) args._api_process_rank = client_config.get("client_index", 0) From 875c7e3ae6f49eba4945020b5c8ec90c562cff91 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:26:03 +0000 Subject: [PATCH 19/23] No print Signed-off-by: DarkLight1337 --- vllm/entrypoints/openai/api_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 03baae1befb1..646dfe35abad 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -1975,7 +1975,6 @@ async def run_server_worker(listen_address, vllm_config = await engine_client.get_vllm_config() await init_app_state(engine_client, vllm_config, app.state, args) - print(vllm_config) logger.info("Starting vLLM API server %d/%d on %s", args._api_process_rank, args._api_server_count, listen_address) From d9a5c81993ba9cd04868665332b88fa74459ea73 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:27:51 +0000 Subject: [PATCH 20/23] Simplify Signed-off-by: DarkLight1337 --- vllm/config/parallel.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index 814cef03f987..f822a6901b22 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -96,7 +96,6 @@ class ParallelConfig: between local data parallel ranks, but an external LB balances between vLLM nodes/replicas. Set explicitly in conjunction with --data-parallel-start-rank.""" - enable_expert_parallel: bool = False """Use expert parallelism instead of tensor parallelism for MoE layers.""" enable_eplb: bool = False @@ -426,8 +425,7 @@ def __post_init__(self) -> None: if self.distributed_executor_backend is None and self.world_size == 1: self.distributed_executor_backend = "uni" - if (self._api_process_count > 1 and - not -1 <= self._api_process_rank < self._api_process_count): + if not -1 <= self._api_process_rank < self._api_process_count: raise ValueError( "Invalid value of `_api_process_rank`. " f"Expected to be `-1` or `[0, {self._api_process_count})`, " From dabe421ede267bcf3c8b1794ba034ed6e2009c66 Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:39:53 +0000 Subject: [PATCH 21/23] Fix Signed-off-by: DarkLight1337 --- vllm/entrypoints/openai/api_server.py | 22 ++++++++++------------ vllm/v1/engine/core_client.py | 2 +- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index 646dfe35abad..db4e77be0311 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -17,7 +17,6 @@ from argparse import Namespace from collections.abc import AsyncIterator, Awaitable from contextlib import asynccontextmanager -from copy import deepcopy from functools import partial from http import HTTPStatus from typing import Annotated, Any, Callable, Literal, Optional @@ -172,6 +171,9 @@ async def build_async_engine_client( # Context manager to handle engine_client lifecycle # Ensures everything is shutdown and cleaned up on error/exit engine_args = AsyncEngineArgs.from_cli_args(args) + if client_config: + engine_args._api_process_count = client_config.get("client_count", 1) + engine_args._api_process_rank = client_config.get("client_index", 0) if disable_frontend_multiprocessing is None: disable_frontend_multiprocessing = bool( @@ -201,6 +203,7 @@ async def build_async_engine_client_from_engine_args( Returns the Client or None if the creation failed. """ + client_config = dict(client_config) if client_config else {} # Create the EngineConfig (determines if we can use V1). vllm_config = engine_args.create_engine_config(usage_context=usage_context) @@ -214,10 +217,10 @@ async def build_async_engine_client_from_engine_args( from vllm.v1.engine.async_llm import AsyncLLM async_llm: Optional[AsyncLLM] = None - client_count = client_config.pop( - "client_count") if client_config else 1 - client_index = client_config.pop( - "client_index") if client_config else 0 + print("client_config before", client_config) + client_count = client_config.pop("client_count", 1) + client_index = client_config.pop("client_index", 0) + print("client_config after", client_config) try: async_llm = AsyncLLM.from_vllm_config( vllm_config=vllm_config, @@ -1951,11 +1954,6 @@ async def run_server_worker(listen_address, client_config=None, **uvicorn_kwargs) -> None: """Run a single API server worker.""" - client_config = client_config or {} - - args = deepcopy(args) - args._api_process_count = client_config.get("client_count", 1) - args._api_process_rank = client_config.get("client_index", 0) if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3: ToolParserManager.import_tool_parser(args.tool_parser_plugin) @@ -1975,8 +1973,8 @@ async def run_server_worker(listen_address, vllm_config = await engine_client.get_vllm_config() await init_app_state(engine_client, vllm_config, app.state, args) - logger.info("Starting vLLM API server %d/%d on %s", - args._api_process_rank, args._api_server_count, + logger.info("Starting vLLM API server %d on %s", + vllm_config.parallel_config._api_process_rank, listen_address) shutdown_task = await serve_http( app, diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 54231cebea20..b95f4175f50c 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -435,7 +435,7 @@ def __init__( self.engines_running = False self.stats_update_address: Optional[str] = None - if client_addresses is not None: + if client_addresses: # Engines are managed externally to this client. input_address = client_addresses["input_address"] output_address = client_addresses["output_address"] From fdc9b6e43e5e621f94c5ff944a29b9d4e4f7a65f Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:41:52 +0000 Subject: [PATCH 22/23] Update Signed-off-by: DarkLight1337 --- vllm/entrypoints/openai/api_server.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index db4e77be0311..c167ecd9833f 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -203,7 +203,6 @@ async def build_async_engine_client_from_engine_args( Returns the Client or None if the creation failed. """ - client_config = dict(client_config) if client_config else {} # Create the EngineConfig (determines if we can use V1). vllm_config = engine_args.create_engine_config(usage_context=usage_context) @@ -217,10 +216,12 @@ async def build_async_engine_client_from_engine_args( from vllm.v1.engine.async_llm import AsyncLLM async_llm: Optional[AsyncLLM] = None - print("client_config before", client_config) + + # Don't mutate the input client_config + client_config = dict(client_config) if client_config else {} client_count = client_config.pop("client_count", 1) client_index = client_config.pop("client_index", 0) - print("client_config after", client_config) + try: async_llm = AsyncLLM.from_vllm_config( vllm_config=vllm_config, From 94ec51d4401603c6e45ed2e1b445e30e9dcbb79f Mon Sep 17 00:00:00 2001 From: DarkLight1337 Date: Wed, 27 Aug 2025 16:45:24 +0000 Subject: [PATCH 23/23] Type checking Signed-off-by: DarkLight1337 --- vllm/entrypoints/cli/serve.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/entrypoints/cli/serve.py b/vllm/entrypoints/cli/serve.py index 0d2b0e5057c9..de47bf00932e 100644 --- a/vllm/entrypoints/cli/serve.py +++ b/vllm/entrypoints/cli/serve.py @@ -138,15 +138,15 @@ def run_multi_api_server(args: argparse.Namespace): num_api_servers: int = args.api_server_count assert num_api_servers > 0 - args._api_process_count = num_api_servers - args._api_process_rank = -1 - if num_api_servers > 1: setup_multiprocess_prometheus() listen_address, sock = setup_server(args) engine_args = vllm.AsyncEngineArgs.from_cli_args(args) + engine_args._api_process_count = num_api_servers + engine_args._api_process_rank = -1 + usage_context = UsageContext.OPENAI_API_SERVER vllm_config = engine_args.create_engine_config(usage_context=usage_context)