From 5111db5b308b3d57b731ef626990c31aa754ffd8 Mon Sep 17 00:00:00 2001 From: meher-m Date: Tue, 23 Sep 2025 20:30:26 +0000 Subject: [PATCH 01/31] initial changes --- .../use_cases/llm_model_endpoint_use_cases.py | 8 +- .../configs/service--http_forwarder.yaml | 8 +- .../inference/forwarding/http_forwarder.py | 113 +++++++++++------- .../inference/vllm/build_and_upload_image.sh | 1 + .../inference/vllm/vllm_server.py | 110 +---------------- ...eaming_model_endpoint_inference_gateway.py | 13 +- ...e_sync_model_endpoint_inference_gateway.py | 13 +- 7 files changed, 104 insertions(+), 162 deletions(-) diff --git a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py index b9083c3e7..7f64032aa 100644 --- a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py @@ -1017,8 +1017,8 @@ async def create_vllm_bundle( protocol="http", readiness_initial_delay_seconds=10, healthcheck_route="/health", - predict_route="/predict", - streaming_predict_route="/stream", + predict_route="/v1/completions", + streaming_predict_route="/v1/completions", extra_routes=[ OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH, @@ -1099,8 +1099,8 @@ async def create_vllm_multinode_bundle( protocol="http", readiness_initial_delay_seconds=10, healthcheck_route="/health", - predict_route="/predict", - streaming_predict_route="/stream", + predict_route="/v1/completions", + streaming_predict_route="/v1/completions", extra_routes=[OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH], env=common_vllm_envs, worker_command=worker_command, diff --git a/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml b/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml index 56641fa03..dbb7bae10 100644 --- a/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml +++ b/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml @@ -3,8 +3,8 @@ forwarder: user_port: 5005 user_hostname: "localhost" use_grpc: false - predict_route: "/predict" - healthcheck_route: "/readyz" + predict_route: "/v1/completions" + healthcheck_route: "/health" batch_route: null model_engine_unwrap: true serialize_results_as_string: true @@ -13,8 +13,8 @@ forwarder: stream: user_port: 5005 user_hostname: "localhost" - predict_route: "/stream" - healthcheck_route: "/readyz" + predict_route: "/v1/completions" + healthcheck_route: "/health" batch_route: null model_engine_unwrap: true serialize_results_as_string: false diff --git a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py index d0d9a1bd5..627efc5b0 100644 --- a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py +++ b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py @@ -145,52 +145,49 @@ def sanitize_response_headers(headers: dict, force_cache_bust: bool = False) -> return lower_headers -async def predict( +async def completions_endpoint( request: EndpointPredictV1Request, background_tasks: BackgroundTasks, - forwarder: Forwarder = Depends(load_forwarder), + sync_forwarder: Forwarder = Depends(load_forwarder), + stream_forwarder: StreamingForwarder = Depends(load_streaming_forwarder), limiter: MultiprocessingConcurrencyLimiter = Depends(get_concurrency_limiter), ): + """OpenAI-compatible completions endpoint that handles both sync and streaming requests.""" with limiter: try: - response = await forwarder.forward(request.model_dump()) - if forwarder.post_inference_hooks_handler: - background_tasks.add_task( - forwarder.post_inference_hooks_handler.handle, request, response - ) - return response + payload = request.model_dump() except Exception: logger.error(f"Failed to decode payload from: {request}") raise + # Determine if this is a streaming request + is_stream = payload.get("args", {}).get("stream", False) if hasattr(payload.get("args", {}), 'get') else payload.get("stream", False) -async def stream( - request: EndpointPredictV1Request, - forwarder: StreamingForwarder = Depends(load_streaming_forwarder), - limiter: MultiprocessingConcurrencyLimiter = Depends(get_concurrency_limiter), -): - with limiter: - try: - payload = request.model_dump() - except Exception: - logger.error(f"Failed to decode payload from: {request}") - raise - else: - logger.debug(f"Received request: {payload}") + if is_stream: + # Handle streaming request + logger.debug(f"Received streaming request: {payload}") - responses = forwarder.forward(payload) - # We fetch the first response to check if upstream request was successful - # If it was not, this will raise the corresponding HTTPException - # If it was, we will proceed to the event generator - initial_response = await responses.__anext__() + responses = stream_forwarder.forward(payload) + # We fetch the first response to check if upstream request was successful + # If it was not, this will raise the corresponding HTTPException + # If it was, we will proceed to the event generator + initial_response = await responses.__anext__() - async def event_generator(): - yield {"data": orjson.dumps(initial_response).decode("utf-8")} + async def event_generator(): + yield {"data": orjson.dumps(initial_response).decode("utf-8")} - async for response in responses: - yield {"data": orjson.dumps(response).decode("utf-8")} + async for response in responses: + yield {"data": orjson.dumps(response).decode("utf-8")} - return EventSourceResponse(event_generator()) + return EventSourceResponse(event_generator()) + else: + # Handle sync request + response = await sync_forwarder.forward(payload) + if sync_forwarder.post_inference_hooks_handler: + background_tasks.add_task( + sync_forwarder.post_inference_hooks_handler.handle, request, response + ) + return response async def passthrough_stream( @@ -296,10 +293,8 @@ def get_sync_forwarder(route=route): def get_stream_forwarder(route=route): return stream_forwarders.get(route) - # This route is a catch-all for any requests that don't match the /predict or /stream routes - # It will treat the request as a streaming request if the "stream" body parameter is set to true - # NOTE: it is important for this to be defined AFTER the /predict and /stream endpoints - # because FastAPI will match the first route that matches the request path + # This route handles requests for extra routes defined in configuration + # It will treat the request as a streaming request if the "stream" parameter is set to true in request args async def predict_or_stream( request: EndpointPredictV1Request, background_tasks: BackgroundTasks, @@ -307,12 +302,48 @@ async def predict_or_stream( stream_forwarder: StreamingForwarder = Depends(get_stream_forwarder), limiter=Depends(get_concurrency_limiter), ): + """Handles requests for extra routes, routing to sync or streaming based on args.""" if not request.args: raise Exception("Request has no args") - if request.args.root.get("stream", False) and stream_forwarder: - return await stream(request, stream_forwarder, limiter) - elif request.args.root.get("stream") is not True and sync_forwarder: - return await predict(request, background_tasks, sync_forwarder, limiter) + + is_stream = request.args.root.get("stream", False) + + if is_stream and stream_forwarder: + # Handle streaming request using consolidated logic + with limiter: + try: + payload = request.model_dump() + except Exception: + logger.error(f"Failed to decode payload from: {request}") + raise + + logger.debug(f"Received streaming request: {payload}") + + responses = stream_forwarder.forward(payload) + initial_response = await responses.__anext__() + + async def event_generator(): + yield {"data": orjson.dumps(initial_response).decode("utf-8")} + async for response in responses: + yield {"data": orjson.dumps(response).decode("utf-8")} + + return EventSourceResponse(event_generator()) + + elif not is_stream and sync_forwarder: + # Handle sync request using consolidated logic + with limiter: + try: + payload = request.model_dump() + except Exception: + logger.error(f"Failed to decode payload from: {request}") + raise + + response = await sync_forwarder.forward(payload) + if sync_forwarder.post_inference_hooks_handler: + background_tasks.add_task( + sync_forwarder.post_inference_hooks_handler.handle, request, response + ) + return response else: raise Exception("No forwarder configured for this route") @@ -384,8 +415,8 @@ def add_extra_routes(app: FastAPI): app.add_api_route(path="/healthz", endpoint=healthcheck, methods=["GET"]) app.add_api_route(path="/readyz", endpoint=healthcheck, methods=["GET"]) - app.add_api_route(path="/predict", endpoint=predict, methods=["POST"]) - app.add_api_route(path="/stream", endpoint=stream, methods=["POST"]) + # Legacy /predict and /stream endpoints removed - using /v1/completions + app.add_api_route(path="/v1/completions", endpoint=completions_endpoint, methods=["POST"]) add_extra_routes(app) return app diff --git a/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh b/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh index 0026e74aa..3e6f56728 100755 --- a/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh +++ b/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh @@ -3,6 +3,7 @@ set -eo pipefail # Build and push vLLM docker image to AWS ECR. +# Now uses vLLM's official OpenAI-compatible endpoints (/v1/completions) instead of custom legacy endpoints /predict and /stream. # # Usage: VLLM_VERSION=0.10.0 ./build_and_upload_image.sh vllm|vllm_batch|vllm_batch_v2 diff --git a/model-engine/model_engine_server/inference/vllm/vllm_server.py b/model-engine/model_engine_server/inference/vllm/vllm_server.py index d75640423..8e40c686a 100644 --- a/model-engine/model_engine_server/inference/vllm/vllm_server.py +++ b/model-engine/model_engine_server/inference/vllm/vllm_server.py @@ -1,17 +1,11 @@ import asyncio import code -import json import os -import signal import subprocess import traceback from logging import Logger -from typing import AsyncGenerator, Dict, List, Optional import vllm.envs as envs -from fastapi import APIRouter, BackgroundTasks, Request -from fastapi.responses import Response, StreamingResponse -from vllm.engine.async_llm_engine import AsyncEngineDeadError from vllm.engine.protocol import EngineClient from vllm.entrypoints.launcher import serve_http from vllm.entrypoints.openai.api_server import ( @@ -24,10 +18,7 @@ ) from vllm.entrypoints.openai.cli_args import make_arg_parser from vllm.entrypoints.openai.tool_parsers import ToolParserManager -from vllm.outputs import CompletionOutput -from vllm.sampling_params import SamplingParams -from vllm.sequence import Logprob -from vllm.utils import FlexibleArgumentParser, random_uuid +from vllm.utils import FlexibleArgumentParser logger = Logger("vllm_server") @@ -36,88 +27,8 @@ TIMEOUT_KEEP_ALIVE = 5 # seconds. TIMEOUT_TO_PREVENT_DEADLOCK = 1 # seconds -router = APIRouter() - - -@router.post("/predict") -@router.post("/stream") -async def generate(request: Request) -> Response: - """Generate completion for the request. - - The request should be a JSON object with the following fields: - - prompt: the prompt to use for the generation. - - stream: whether to stream the results or not. - - other fields: the sampling parameters (See `SamplingParams` for details). - """ - # check health before accepting request and fail fast if engine isn't healthy - try: - await engine_client.check_health() - - request_dict = await request.json() - prompt = request_dict.pop("prompt") - stream = request_dict.pop("stream", False) - - sampling_params = SamplingParams(**request_dict) - - request_id = random_uuid() - - results_generator = engine_client.generate(prompt, sampling_params, request_id) - - async def abort_request() -> None: - await engine_client.abort(request_id) - - if stream: - # Streaming case - async def stream_results() -> AsyncGenerator[str, None]: - last_output_text = "" - async for request_output in results_generator: - log_probs = format_logprobs(request_output) - ret = { - "text": request_output.outputs[-1].text[len(last_output_text) :], - "count_prompt_tokens": len(request_output.prompt_token_ids), - "count_output_tokens": len(request_output.outputs[0].token_ids), - "log_probs": ( - log_probs[-1] if log_probs and sampling_params.logprobs else None - ), - "finished": request_output.finished, - } - last_output_text = request_output.outputs[-1].text - yield f"data:{json.dumps(ret)}\n\n" - - background_tasks = BackgroundTasks() - # Abort the request if the client disconnects. - background_tasks.add_task(abort_request) - - return StreamingResponse(stream_results(), background=background_tasks) - - # Non-streaming case - final_output = None - tokens = [] - last_output_text = "" - async for request_output in results_generator: - tokens.append(request_output.outputs[-1].text[len(last_output_text) :]) - last_output_text = request_output.outputs[-1].text - if await request.is_disconnected(): - # Abort the request if the client disconnects. - await engine_client.abort(request_id) - return Response(status_code=499) - final_output = request_output - - assert final_output is not None - prompt = final_output.prompt - ret = { - "text": final_output.outputs[0].text, - "count_prompt_tokens": len(final_output.prompt_token_ids), - "count_output_tokens": len(final_output.outputs[0].token_ids), - "log_probs": format_logprobs(final_output), - "tokens": tokens, - } - return Response(content=json.dumps(ret)) - - except AsyncEngineDeadError as e: - logger.error(f"The vllm engine is dead, exiting the pod: {e}") - os.kill(os.getpid(), signal.SIGINT) - raise e +# Legacy endpoints /predit and /stream removed - using vLLM's native OpenAI-compatible endpoints instead +# All requests now go through /v1/completions, /v1/chat/completions, etc. def get_gpu_free_memory(): @@ -171,20 +82,6 @@ def debug(sig, frame): i.interact(message) -def format_logprobs( - request_output: CompletionOutput, -) -> Optional[List[Dict[int, float]]]: - """Given a request output, format the logprobs if they exist.""" - output_logprobs = request_output.outputs[0].logprobs - if output_logprobs is None: - return None - - def extract_logprobs(logprobs: Dict[int, Logprob]) -> Dict[int, float]: - return {k: v.logprob for k, v in logprobs.items()} - - return [extract_logprobs(logprobs) for logprobs in output_logprobs] - - def parse_args(parser: FlexibleArgumentParser): parser = make_arg_parser(parser) parser.add_argument("--attention-backend", type=str, help="The attention backend to use") @@ -220,7 +117,6 @@ async def run_server_worker( vllm_config = await engine_client.get_vllm_config() await init_app_state(engine_client, vllm_config, app.state, args) - app.include_router(router) logger.info("Starting vLLM API server %d on %s", server_index, listen_address) shutdown_task = await serve_http( diff --git a/model-engine/model_engine_server/infra/gateways/live_streaming_model_endpoint_inference_gateway.py b/model-engine/model_engine_server/infra/gateways/live_streaming_model_endpoint_inference_gateway.py index 91fc78ec3..b539bc6ce 100644 --- a/model-engine/model_engine_server/infra/gateways/live_streaming_model_endpoint_inference_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/live_streaming_model_endpoint_inference_gateway.py @@ -48,7 +48,7 @@ def _get_streaming_endpoint_url( - service_name: str, path: str = "/stream", manually_resolve_dns: bool = False + service_name: str, path: str = "/v1/completions", manually_resolve_dns: bool = False ) -> str: if CIRCLECI: # Circle CI: a NodePort is used to expose the service @@ -212,7 +212,7 @@ async def streaming_predict( ) -> AsyncIterable[SyncEndpointPredictV1Response]: deployment_url = _get_streaming_endpoint_url( topic, - path=predict_request.destination_path or "/stream", + path=predict_request.destination_path or "/v1/completions", manually_resolve_dns=manually_resolve_dns, ) @@ -227,9 +227,16 @@ async def streaming_predict( if predict_request.num_retries is None else predict_request.num_retries ) + + # Ensure streaming requests are properly marked + payload_json = predict_request.model_dump(exclude_none=True) + if "args" not in payload_json: + payload_json["args"] = {} + payload_json["args"]["stream"] = True # Explicitly set stream=True for streaming requests + response = self.make_request_with_retries( request_url=deployment_url, - payload_json=predict_request.model_dump(exclude_none=True), + payload_json=payload_json, timeout_seconds=timeout_seconds, num_retries=num_retries, endpoint_name=endpoint_name or topic, diff --git a/model-engine/model_engine_server/infra/gateways/live_sync_model_endpoint_inference_gateway.py b/model-engine/model_engine_server/infra/gateways/live_sync_model_endpoint_inference_gateway.py index 8274d5b88..1e5b08a48 100644 --- a/model-engine/model_engine_server/infra/gateways/live_sync_model_endpoint_inference_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/live_sync_model_endpoint_inference_gateway.py @@ -46,7 +46,7 @@ def _get_sync_endpoint_url( - service_name: str, destination_path: str = "/predict", manually_resolve_dns: bool = False + service_name: str, destination_path: str = "/v1/completions", manually_resolve_dns: bool = False ) -> str: if CIRCLECI: # Circle CI: a NodePort is used to expose the service @@ -238,7 +238,7 @@ async def predict( ) -> SyncEndpointPredictV1Response: deployment_url = _get_sync_endpoint_url( topic, - destination_path=predict_request.destination_path or "/predict", + destination_path=predict_request.destination_path or "/v1/completions", manually_resolve_dns=manually_resolve_dns, ) @@ -253,9 +253,16 @@ async def predict( if predict_request.num_retries is None else predict_request.num_retries ) + + # Ensure sync requests are properly marked + payload_json = predict_request.model_dump(exclude_none=True) + if "args" not in payload_json: + payload_json["args"] = {} + payload_json["args"]["stream"] = False # Explicitly set stream=False for sync requests + response = await self.make_request_with_retries( request_url=deployment_url, - payload_json=predict_request.model_dump(exclude_none=True), + payload_json=payload_json, timeout_seconds=timeout_seconds, num_retries=num_retries, endpoint_name=endpoint_name or topic, From 71b5d994b72eaf2eae222bdb78f53c46b5a463ce Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 18:18:03 +0000 Subject: [PATCH 02/31] reverting some forwarder changes that aren't needed --- .../use_cases/llm_model_endpoint_use_cases.py | 12 +- .../configs/service--http_forwarder.yaml | 8 +- .../inference/forwarding/http_forwarder.py | 115 +++++++----------- .../inference/vllm/build_and_upload_image.sh | 1 - 4 files changed, 52 insertions(+), 84 deletions(-) diff --git a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py index 72013e4d5..352b7a060 100644 --- a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py @@ -1017,9 +1017,9 @@ async def create_vllm_bundle( protocol="http", readiness_initial_delay_seconds=10, healthcheck_route="/health", - predict_route="/v1/completions", - streaming_predict_route="/v1/completions", - extra_routes=[ + predict_route="/predict", + streaming_predict_route="/stream", + routes=[ OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH, ], @@ -1099,9 +1099,9 @@ async def create_vllm_multinode_bundle( protocol="http", readiness_initial_delay_seconds=10, healthcheck_route="/health", - predict_route="/v1/completions", - streaming_predict_route="/v1/completions", - extra_routes=[OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH], + predict_route="/predict", + streaming_predict_route="/stream", + routes=[OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH], env=common_vllm_envs, worker_command=worker_command, worker_env=common_vllm_envs, diff --git a/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml b/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml index dbb7bae10..56641fa03 100644 --- a/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml +++ b/model-engine/model_engine_server/inference/configs/service--http_forwarder.yaml @@ -3,8 +3,8 @@ forwarder: user_port: 5005 user_hostname: "localhost" use_grpc: false - predict_route: "/v1/completions" - healthcheck_route: "/health" + predict_route: "/predict" + healthcheck_route: "/readyz" batch_route: null model_engine_unwrap: true serialize_results_as_string: true @@ -13,8 +13,8 @@ forwarder: stream: user_port: 5005 user_hostname: "localhost" - predict_route: "/v1/completions" - healthcheck_route: "/health" + predict_route: "/stream" + healthcheck_route: "/readyz" batch_route: null model_engine_unwrap: true serialize_results_as_string: false diff --git a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py index fe1bf84ff..2f6451c8c 100644 --- a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py +++ b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py @@ -149,49 +149,52 @@ def sanitize_response_headers(headers: dict, force_cache_bust: bool = False) -> return lower_headers -async def completions_endpoint( +async def predict( request: EndpointPredictV1Request, background_tasks: BackgroundTasks, - sync_forwarder: Forwarder = Depends(load_forwarder), - stream_forwarder: StreamingForwarder = Depends(load_streaming_forwarder), + forwarder: Forwarder = Depends(load_forwarder), limiter: MultiprocessingConcurrencyLimiter = Depends(get_concurrency_limiter), ): - """OpenAI-compatible completions endpoint that handles both sync and streaming requests.""" with limiter: try: - payload = request.model_dump() + response = await forwarder.forward(request.model_dump()) + if forwarder.post_inference_hooks_handler: + background_tasks.add_task( + forwarder.post_inference_hooks_handler.handle, request, response + ) + return response except Exception: logger.error(f"Failed to decode payload from: {request}") raise - # Determine if this is a streaming request - is_stream = payload.get("args", {}).get("stream", False) if hasattr(payload.get("args", {}), 'get') else payload.get("stream", False) - if is_stream: - # Handle streaming request - logger.debug(f"Received streaming request: {payload}") +async def stream( + request: EndpointPredictV1Request, + forwarder: StreamingForwarder = Depends(load_streaming_forwarder), + limiter: MultiprocessingConcurrencyLimiter = Depends(get_concurrency_limiter), +): + with limiter: + try: + payload = request.model_dump() + except Exception: + logger.error(f"Failed to decode payload from: {request}") + raise + else: + logger.debug(f"Received request: {payload}") - responses = stream_forwarder.forward(payload) - # We fetch the first response to check if upstream request was successful - # If it was not, this will raise the corresponding HTTPException - # If it was, we will proceed to the event generator - initial_response = await responses.__anext__() + responses = forwarder.forward(payload) + # We fetch the first response to check if upstream request was successful + # If it was not, this will raise the corresponding HTTPException + # If it was, we will proceed to the event generator + initial_response = await responses.__anext__() - async def event_generator(): - yield {"data": orjson.dumps(initial_response).decode("utf-8")} + async def event_generator(): + yield {"data": orjson.dumps(initial_response).decode("utf-8")} - async for response in responses: - yield {"data": orjson.dumps(response).decode("utf-8")} + async for response in responses: + yield {"data": orjson.dumps(response).decode("utf-8")} - return EventSourceResponse(event_generator()) - else: - # Handle sync request - response = await sync_forwarder.forward(payload) - if sync_forwarder.post_inference_hooks_handler: - background_tasks.add_task( - sync_forwarder.post_inference_hooks_handler.handle, request, response - ) - return response + return EventSourceResponse(event_generator()) async def passthrough_stream( @@ -309,8 +312,10 @@ def get_sync_forwarder(route=route): def get_stream_forwarder(route=route): return stream_forwarders.get(route) - # This route handles requests for extra routes defined in configuration - # It will treat the request as a streaming request if the "stream" parameter is set to true in request args + # This route is a catch-all for any requests that don't match the /predict or /stream routes + # It will treat the request as a streaming request if the "stream" body parameter is set to true + # NOTE: it is important for this to be defined AFTER the /predict and /stream endpoints + # because FastAPI will match the first route that matches the request path async def predict_or_stream( request: EndpointPredictV1Request, background_tasks: BackgroundTasks, @@ -318,48 +323,12 @@ async def predict_or_stream( stream_forwarder: StreamingForwarder = Depends(get_stream_forwarder), limiter=Depends(get_concurrency_limiter), ): - """Handles requests for extra routes, routing to sync or streaming based on args.""" if not request.args: raise Exception("Request has no args") - - is_stream = request.args.root.get("stream", False) - - if is_stream and stream_forwarder: - # Handle streaming request using consolidated logic - with limiter: - try: - payload = request.model_dump() - except Exception: - logger.error(f"Failed to decode payload from: {request}") - raise - - logger.debug(f"Received streaming request: {payload}") - - responses = stream_forwarder.forward(payload) - initial_response = await responses.__anext__() - - async def event_generator(): - yield {"data": orjson.dumps(initial_response).decode("utf-8")} - async for response in responses: - yield {"data": orjson.dumps(response).decode("utf-8")} - - return EventSourceResponse(event_generator()) - - elif not is_stream and sync_forwarder: - # Handle sync request using consolidated logic - with limiter: - try: - payload = request.model_dump() - except Exception: - logger.error(f"Failed to decode payload from: {request}") - raise - - response = await sync_forwarder.forward(payload) - if sync_forwarder.post_inference_hooks_handler: - background_tasks.add_task( - sync_forwarder.post_inference_hooks_handler.handle, request, response - ) - return response + if request.args.root.get("stream", False) and stream_forwarder: + return await stream(request, stream_forwarder, limiter) + elif request.args.root.get("stream") is not True and sync_forwarder: + return await predict(request, background_tasks, sync_forwarder, limiter) else: raise Exception("No forwarder configured for this route") @@ -444,8 +413,8 @@ def add_extra_routes(app: FastAPI): app.add_api_route(path="/healthz", endpoint=healthcheck, methods=["GET"]) app.add_api_route(path="/readyz", endpoint=healthcheck, methods=["GET"]) - # Legacy /predict and /stream endpoints removed - using /v1/completions - app.add_api_route(path="/v1/completions", endpoint=completions_endpoint, methods=["POST"]) + app.add_api_route(path="/predict", endpoint=predict, methods=["POST"]) + app.add_api_route(path="/stream", endpoint=stream, methods=["POST"]) add_extra_routes(app) return app @@ -478,4 +447,4 @@ def entrypoint(): # pragma: no cover if __name__ == "__main__": - entrypoint() + entrypoint() \ No newline at end of file diff --git a/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh b/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh index 3e6f56728..0026e74aa 100755 --- a/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh +++ b/model-engine/model_engine_server/inference/vllm/build_and_upload_image.sh @@ -3,7 +3,6 @@ set -eo pipefail # Build and push vLLM docker image to AWS ECR. -# Now uses vLLM's official OpenAI-compatible endpoints (/v1/completions) instead of custom legacy endpoints /predict and /stream. # # Usage: VLLM_VERSION=0.10.0 ./build_and_upload_image.sh vllm|vllm_batch|vllm_batch_v2 From 8d30ab30b902bf76711e69d0e212ec6835b3977a Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 18:23:02 +0000 Subject: [PATCH 03/31] remove some other unneeded stuff --- .../inference/vllm/vllm_server.py | 60 +------------------ ...eaming_model_endpoint_inference_gateway.py | 12 +--- ...e_sync_model_endpoint_inference_gateway.py | 12 +--- 3 files changed, 8 insertions(+), 76 deletions(-) diff --git a/model-engine/model_engine_server/inference/vllm/vllm_server.py b/model-engine/model_engine_server/inference/vllm/vllm_server.py index 8e40c686a..7065a54b5 100644 --- a/model-engine/model_engine_server/inference/vllm/vllm_server.py +++ b/model-engine/model_engine_server/inference/vllm/vllm_server.py @@ -14,6 +14,7 @@ init_app_state, load_log_config, maybe_register_tokenizer_info_endpoint, + run_server, setup_server, ) from vllm.entrypoints.openai.cli_args import make_arg_parser @@ -88,64 +89,6 @@ def parse_args(parser: FlexibleArgumentParser): return parser.parse_args() -async def run_server(args, **uvicorn_kwargs) -> None: - """Run a single-worker API server.""" - listen_address, sock = setup_server(args) - await run_server_worker(listen_address, sock, args, **uvicorn_kwargs) - - -async def run_server_worker( - listen_address, sock, args, client_config=None, **uvicorn_kwargs -) -> None: - """Run a single API server worker.""" - - if args.tool_parser_plugin and len(args.tool_parser_plugin) > 3: - ToolParserManager.import_tool_parser(args.tool_parser_plugin) - - server_index = client_config.get("client_index", 0) if client_config else 0 - - # Load logging config for uvicorn if specified - log_config = load_log_config(args.log_config_file) - if log_config is not None: - uvicorn_kwargs["log_config"] = log_config - - global engine_client - - async with build_async_engine_client(args, client_config=client_config) as engine_client: - maybe_register_tokenizer_info_endpoint(args) - app = build_app(args) - - vllm_config = await engine_client.get_vllm_config() - await init_app_state(engine_client, vllm_config, app.state, args) - - logger.info("Starting vLLM API server %d on %s", server_index, listen_address) - shutdown_task = await serve_http( - app, - sock=sock, - enable_ssl_refresh=args.enable_ssl_refresh, - host=args.host, - port=args.port, - log_level=args.uvicorn_log_level, - # NOTE: When the 'disable_uvicorn_access_log' value is True, - # no access log will be output. - access_log=not args.disable_uvicorn_access_log, - timeout_keep_alive=envs.VLLM_HTTP_TIMEOUT_KEEP_ALIVE, - ssl_keyfile=args.ssl_keyfile, - ssl_certfile=args.ssl_certfile, - ssl_ca_certs=args.ssl_ca_certs, - ssl_cert_reqs=args.ssl_cert_reqs, - h11_max_incomplete_event_size=args.h11_max_incomplete_event_size, - h11_max_header_count=args.h11_max_header_count, - **uvicorn_kwargs, - ) - - # NB: Await server shutdown only after the backend context is exited - try: - await shutdown_task - finally: - sock.close() - - if __name__ == "__main__": check_unknown_startup_memory_usage() @@ -153,4 +96,5 @@ async def run_server_worker( args = parse_args(parser) if args.attention_backend is not None: os.environ["VLLM_ATTENTION_BACKEND"] = args.attention_backend + # Using vllm's run_server asyncio.run(run_server(args)) diff --git a/model-engine/model_engine_server/infra/gateways/live_streaming_model_endpoint_inference_gateway.py b/model-engine/model_engine_server/infra/gateways/live_streaming_model_endpoint_inference_gateway.py index b539bc6ce..86151922f 100644 --- a/model-engine/model_engine_server/infra/gateways/live_streaming_model_endpoint_inference_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/live_streaming_model_endpoint_inference_gateway.py @@ -48,7 +48,7 @@ def _get_streaming_endpoint_url( - service_name: str, path: str = "/v1/completions", manually_resolve_dns: bool = False + service_name: str, path: str = "/stream", manually_resolve_dns: bool = False ) -> str: if CIRCLECI: # Circle CI: a NodePort is used to expose the service @@ -212,7 +212,7 @@ async def streaming_predict( ) -> AsyncIterable[SyncEndpointPredictV1Response]: deployment_url = _get_streaming_endpoint_url( topic, - path=predict_request.destination_path or "/v1/completions", + path=predict_request.destination_path or "/stream", manually_resolve_dns=manually_resolve_dns, ) @@ -228,15 +228,9 @@ async def streaming_predict( else predict_request.num_retries ) - # Ensure streaming requests are properly marked - payload_json = predict_request.model_dump(exclude_none=True) - if "args" not in payload_json: - payload_json["args"] = {} - payload_json["args"]["stream"] = True # Explicitly set stream=True for streaming requests - response = self.make_request_with_retries( request_url=deployment_url, - payload_json=payload_json, + payload_json=predict_request.model_dump(exclude_none=True), timeout_seconds=timeout_seconds, num_retries=num_retries, endpoint_name=endpoint_name or topic, diff --git a/model-engine/model_engine_server/infra/gateways/live_sync_model_endpoint_inference_gateway.py b/model-engine/model_engine_server/infra/gateways/live_sync_model_endpoint_inference_gateway.py index 1e5b08a48..e65c30440 100644 --- a/model-engine/model_engine_server/infra/gateways/live_sync_model_endpoint_inference_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/live_sync_model_endpoint_inference_gateway.py @@ -46,7 +46,7 @@ def _get_sync_endpoint_url( - service_name: str, destination_path: str = "/v1/completions", manually_resolve_dns: bool = False + service_name: str, destination_path: str = "/predict", manually_resolve_dns: bool = False ) -> str: if CIRCLECI: # Circle CI: a NodePort is used to expose the service @@ -238,7 +238,7 @@ async def predict( ) -> SyncEndpointPredictV1Response: deployment_url = _get_sync_endpoint_url( topic, - destination_path=predict_request.destination_path or "/v1/completions", + destination_path=predict_request.destination_path or "/predict", manually_resolve_dns=manually_resolve_dns, ) @@ -254,15 +254,9 @@ async def predict( else predict_request.num_retries ) - # Ensure sync requests are properly marked - payload_json = predict_request.model_dump(exclude_none=True) - if "args" not in payload_json: - payload_json["args"] = {} - payload_json["args"]["stream"] = False # Explicitly set stream=False for sync requests - response = await self.make_request_with_retries( request_url=deployment_url, - payload_json=payload_json, + payload_json=predict_request.model_dump(exclude_none=True), timeout_seconds=timeout_seconds, num_retries=num_retries, endpoint_name=endpoint_name or topic, From 22a0cf93341a43d90ea87014659335f0935bd1cc Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 18:52:44 +0000 Subject: [PATCH 04/31] not sure --- .../infra/gateways/resources/k8s_resource_types.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 1f1075372..35943a379 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -807,8 +807,8 @@ def get_endpoint_resource_arguments_from_request( # Runnable Image Arguments MAIN_ENV=main_env, COMMAND=flavor.streaming_command, - PREDICT_ROUTE=flavor.predict_route, - STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, + # PREDICT_ROUTE=flavor.predict_route, + # STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, HEALTHCHECK_ROUTE=flavor.healthcheck_route, READINESS_INITIAL_DELAY=flavor.readiness_initial_delay_seconds, INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH=infra_service_config_volume_mount_path, @@ -817,7 +817,9 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes, + FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + flavor.routes, + # FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Streaming Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, @@ -913,7 +915,8 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes, + # FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, From 28cea65c214155af49c698a6654d5a472fad6da4 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 19:01:30 +0000 Subject: [PATCH 05/31] adding cpu --- .../inference/forwarding/http_forwarder.py | 2 +- .../infra/gateways/resources/k8s_resource_types.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py index 2f6451c8c..73a0ecd9f 100644 --- a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py +++ b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py @@ -447,4 +447,4 @@ def entrypoint(): # pragma: no cover if __name__ == "__main__": - entrypoint() \ No newline at end of file + entrypoint() diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 35943a379..a7a5bcc3d 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -769,7 +769,8 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, + FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + flavor.routes + flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Streaming Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, @@ -818,7 +819,7 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes, - FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + flavor.routes, + FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + flavor.routes + flavor.extra_routes, # FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Streaming Deployment Arguments @@ -868,7 +869,7 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, + FORWARDER_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, @@ -915,7 +916,7 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, # FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Sync Deployment Arguments From 6ea87b47797582793e13dbd8bccfc7951a9d61d1 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 19:39:31 +0000 Subject: [PATCH 06/31] add column --- .../model_engine_server/db/models/hosted_model_inference.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/model-engine/model_engine_server/db/models/hosted_model_inference.py b/model-engine/model_engine_server/db/models/hosted_model_inference.py index 4076b1295..c5b8247f6 100644 --- a/model-engine/model_engine_server/db/models/hosted_model_inference.py +++ b/model-engine/model_engine_server/db/models/hosted_model_inference.py @@ -146,6 +146,7 @@ class Bundle(Base): runnable_image_env = Column(JSON, nullable=True) runnable_image_protocol = Column(Text, nullable=True) runnable_image_readiness_initial_delay_seconds = Column(Integer, nullable=True) + runnable_image_routes = Column(ARRAY(Text), nullable=True) runnable_image_extra_routes = Column(ARRAY(Text), nullable=True) runnable_image_forwarder_type = Column(Text, nullable=True) runnable_image_worker_command = Column(ARRAY(Text), nullable=True) @@ -209,6 +210,7 @@ def __init__( runnable_image_env: Optional[Dict[str, Any]] = None, runnable_image_protocol: Optional[str] = None, runnable_image_readiness_initial_delay_seconds: Optional[int] = None, + runnable_image_routes: Optional[List[str]] = None, runnable_image_extra_routes: Optional[List[str]] = None, runnable_image_forwarder_type: Optional[str] = None, runnable_image_worker_command: Optional[List[str]] = None, @@ -268,6 +270,7 @@ def __init__( self.runnable_image_healthcheck_route = runnable_image_healthcheck_route self.runnable_image_env = runnable_image_env self.runnable_image_protocol = runnable_image_protocol + self.runnable_image_routes = runnable_image_routes self.runnable_image_extra_routes = runnable_image_extra_routes self.runnable_image_forwarder_type = runnable_image_forwarder_type self.runnable_image_worker_command = runnable_image_worker_command From 5d8a634d502744b2a55a0853280179a4f4f827b7 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 19:42:07 +0000 Subject: [PATCH 07/31] add file for db model change --- ..._25_1940-221aa19d3f32_add_routes_column.py | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py new file mode 100644 index 000000000..ded028342 --- /dev/null +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py @@ -0,0 +1,32 @@ +"""add routes column + +Revision ID: 221aa19d3f32 +Revises: e580182d6bfd +Create Date: 2025-09-25 19:40:24.927198 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '221aa19d3f32' +down_revision = 'e580182d6bfd' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + 'bundles', + sa.Column('routes', sa.ARRAY(sa.Text), nullable=True), + schema='hosted_model_inference', + ) + + +def downgrade() -> None: + op.drop_column( + 'bundles', + 'routes', + schema='hosted_model_inference', + ) From 5229c5586d3db73fe11312dfd6b5db81cd8b0bfb Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 19:43:32 +0000 Subject: [PATCH 08/31] update readme instructions --- model-engine/model_engine_server/db/migrations/README | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/db/migrations/README b/model-engine/model_engine_server/db/migrations/README index 34a0d901b..a27fd1cc1 100644 --- a/model-engine/model_engine_server/db/migrations/README +++ b/model-engine/model_engine_server/db/migrations/README @@ -4,7 +4,7 @@ We introduce alembic by 1. dumping the current db schemas into 'initial.sql' via pg_dump ``` -pg_dump -h $HOST -U postgres -O -s -d $DB_NAME -n hosted_model_inference -n model -f initial.sql +pg_dump -h $HOST -U postgres -O -s -d $DB_NAME -n hosted_model_inference -n model -f initial.sql ``` 2. writing an initial revision that reads and applies intial.sql script @@ -19,6 +19,9 @@ alembic revision -m “initial” alembic stamp fa3267c80731 ``` +# Steps to add a column to the db model + +Steps can be found here: https://alembic.sqlalchemy.org/en/latest/tutorial.html#running-our-second-migration # Test db migration from scratch From 1b7414c12f38fc807778312e5156ff174e24dab6 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 19:45:50 +0000 Subject: [PATCH 09/31] fix column name --- .../2025_09_25_1940-221aa19d3f32_add_routes_column.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py index ded028342..43d20a4fa 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py @@ -19,7 +19,7 @@ def upgrade() -> None: op.add_column( 'bundles', - sa.Column('routes', sa.ARRAY(sa.Text), nullable=True), + sa.Column('runnable_image_routes', sa.ARRAY(sa.Text), nullable=True), schema='hosted_model_inference', ) @@ -27,6 +27,6 @@ def upgrade() -> None: def downgrade() -> None: op.drop_column( 'bundles', - 'routes', + 'runnable_image_routes', schema='hosted_model_inference', ) From 494ea1e249ca703e320c0c43c01cafffd221e092 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 19:51:28 +0000 Subject: [PATCH 10/31] reformat --- .../2025_09_25_1940-221aa19d3f32_add_routes_column.py | 3 +-- .../infra/gateways/resources/k8s_resource_types.py | 8 ++++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py index 43d20a4fa..50de5f543 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py @@ -5,9 +5,8 @@ Create Date: 2025-09-25 19:40:24.927198 """ -from alembic import op import sqlalchemy as sa - +from alembic import op # revision identifiers, used by Alembic. revision = '221aa19d3f32' diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index a7a5bcc3d..be6d4125e 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -770,7 +770,9 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, - FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + flavor.routes + flavor.extra_routes, + FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + + flavor.routes + + flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Streaming Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, @@ -819,7 +821,9 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes, - FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + flavor.routes + flavor.extra_routes, + FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + + flavor.routes + + flavor.extra_routes, # FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Streaming Deployment Arguments From 43b5054060422a44c6207a87c29a3255f8c3c50f Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 19:58:12 +0000 Subject: [PATCH 11/31] remove unused commits --- .../inference/vllm/vllm_server.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/model-engine/model_engine_server/inference/vllm/vllm_server.py b/model-engine/model_engine_server/inference/vllm/vllm_server.py index 7065a54b5..c576a3384 100644 --- a/model-engine/model_engine_server/inference/vllm/vllm_server.py +++ b/model-engine/model_engine_server/inference/vllm/vllm_server.py @@ -5,20 +5,9 @@ import traceback from logging import Logger -import vllm.envs as envs from vllm.engine.protocol import EngineClient -from vllm.entrypoints.launcher import serve_http -from vllm.entrypoints.openai.api_server import ( - build_app, - build_async_engine_client, - init_app_state, - load_log_config, - maybe_register_tokenizer_info_endpoint, - run_server, - setup_server, -) +from vllm.entrypoints.openai.api_server import run_server from vllm.entrypoints.openai.cli_args import make_arg_parser -from vllm.entrypoints.openai.tool_parsers import ToolParserManager from vllm.utils import FlexibleArgumentParser logger = Logger("vllm_server") From a2e4b507f6612619b43c35f72d43a145d7399343 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 20:10:25 +0000 Subject: [PATCH 12/31] fix --- .../infra/gateways/resources/k8s_resource_types.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index be6d4125e..bd59ebc0c 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -810,8 +810,6 @@ def get_endpoint_resource_arguments_from_request( # Runnable Image Arguments MAIN_ENV=main_env, COMMAND=flavor.streaming_command, - # PREDICT_ROUTE=flavor.predict_route, - # STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, HEALTHCHECK_ROUTE=flavor.healthcheck_route, READINESS_INITIAL_DELAY=flavor.readiness_initial_delay_seconds, INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH=infra_service_config_volume_mount_path, @@ -820,11 +818,10 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + flavor.routes + flavor.extra_routes, - # FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Streaming Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, @@ -873,7 +870,7 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, @@ -921,7 +918,6 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, - # FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, From 8c4930f00f171a0c8cd4044b1667f5dae4da31f8 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 20:13:48 +0000 Subject: [PATCH 13/31] fix readme --- model-engine/model_engine_server/db/migrations/README | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/db/migrations/README b/model-engine/model_engine_server/db/migrations/README index a27fd1cc1..7863d56d2 100644 --- a/model-engine/model_engine_server/db/migrations/README +++ b/model-engine/model_engine_server/db/migrations/README @@ -19,7 +19,7 @@ alembic revision -m “initial” alembic stamp fa3267c80731 ``` -# Steps to add a column to the db model +# Steps to make generic database schema changes Steps can be found here: https://alembic.sqlalchemy.org/en/latest/tutorial.html#running-our-second-migration From ff8766bb4d9c685907ab2488de255f27643605c3 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 20:22:47 +0000 Subject: [PATCH 14/31] fix types --- .../infra/gateways/resources/k8s_resource_types.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index bd59ebc0c..18331a738 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -148,6 +148,7 @@ class _StreamingDeploymentArguments(TypedDict): FORWARDER_PORT: int STREAMING_PREDICT_ROUTE: str FORWARDER_WORKER_COUNT: int + FORWARDER_STREAMING_ROUTES: List[str] class _RunnableImageDeploymentArguments(_BaseDeploymentArguments): @@ -163,7 +164,7 @@ class _RunnableImageDeploymentArguments(_BaseDeploymentArguments): FORWARDER_CPUS_LIMIT: float FORWARDER_MEMORY_LIMIT: str FORWARDER_STORAGE_LIMIT: str - FORWARDER_EXTRA_ROUTES: List[str] + FORWARDER_SYNC_ROUTES: List[str] FORWARDER_TYPE: Optional[str] USER_CONTAINER_PORT: int From 4b2103effd41719286cdb3225105be65c2d16d2d Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 20:36:11 +0000 Subject: [PATCH 15/31] leave the existing variables for backwards compatibility --- .../infra/gateways/resources/k8s_resource_types.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 18331a738..0cea21f60 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -165,6 +165,7 @@ class _RunnableImageDeploymentArguments(_BaseDeploymentArguments): FORWARDER_MEMORY_LIMIT: str FORWARDER_STORAGE_LIMIT: str FORWARDER_SYNC_ROUTES: List[str] + FORWARDER_EXTRA_ROUTES: List[str] FORWARDER_TYPE: Optional[str] USER_CONTAINER_PORT: int @@ -811,6 +812,8 @@ def get_endpoint_resource_arguments_from_request( # Runnable Image Arguments MAIN_ENV=main_env, COMMAND=flavor.streaming_command, + PREDICT_ROUTE=flavor.predict_route, + STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, HEALTHCHECK_ROUTE=flavor.healthcheck_route, READINESS_INITIAL_DELAY=flavor.readiness_initial_delay_seconds, INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH=infra_service_config_volume_mount_path, From 29877c372fc12a4fa0d036c5308521c9f90d1e53 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 20:44:59 +0000 Subject: [PATCH 16/31] edit types --- .../infra/gateways/resources/k8s_resource_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 0cea21f60..9b12afdd5 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -140,6 +140,7 @@ class _SyncRunnableImageDeploymentArguments(TypedDict): FORWARDER_PORT: int FORWARDER_WORKER_COUNT: int + FORWARDER_SYNC_ROUTES: List[str] class _StreamingDeploymentArguments(TypedDict): @@ -164,7 +165,6 @@ class _RunnableImageDeploymentArguments(_BaseDeploymentArguments): FORWARDER_CPUS_LIMIT: float FORWARDER_MEMORY_LIMIT: str FORWARDER_STORAGE_LIMIT: str - FORWARDER_SYNC_ROUTES: List[str] FORWARDER_EXTRA_ROUTES: List[str] FORWARDER_TYPE: Optional[str] USER_CONTAINER_PORT: int From 24cc3937d1570e447af6ce45d6e9e6eb1771e550 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 20:54:57 +0000 Subject: [PATCH 17/31] remove EXTRA ROUTES completely. its not used by the async or triton enhanced routes and we replaced it with ROUTES for the sync and streaming routes --- .../infra/gateways/resources/k8s_resource_types.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 9b12afdd5..86bb59c1c 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -165,7 +165,6 @@ class _RunnableImageDeploymentArguments(_BaseDeploymentArguments): FORWARDER_CPUS_LIMIT: float FORWARDER_MEMORY_LIMIT: str FORWARDER_STORAGE_LIMIT: str - FORWARDER_EXTRA_ROUTES: List[str] FORWARDER_TYPE: Optional[str] USER_CONTAINER_PORT: int @@ -666,7 +665,6 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Async Deployment Arguments CELERY_S3_BUCKET=s3_bucket, @@ -717,7 +715,6 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Async Deployment Arguments CELERY_S3_BUCKET=s3_bucket, @@ -970,7 +967,6 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Async Deployment Arguments CELERY_S3_BUCKET=s3_bucket, @@ -1029,7 +1025,6 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Async Deployment Arguments CELERY_S3_BUCKET=s3_bucket, @@ -1090,7 +1085,6 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, @@ -1145,7 +1139,6 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, @@ -1206,7 +1199,6 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_EXTRA_ROUTES=flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Streaming Arguments FORWARDER_PORT=FORWARDER_PORT, From 68fd91d5d2f2e78f27c11077463b6f2a4d42cb5e Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 21:11:11 +0000 Subject: [PATCH 18/31] adding FORWARDER_SYNC_ROUTES and FORWARDER_STREAMING_ROUTES to the tritonenhanced and lws ones that need it based on type. wont get used though --- .../infra/gateways/resources/k8s_resource_types.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 86bb59c1c..7b6d27357 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -149,6 +149,7 @@ class _StreamingDeploymentArguments(TypedDict): FORWARDER_PORT: int STREAMING_PREDICT_ROUTE: str FORWARDER_WORKER_COUNT: int + FORWARDER_SYNC_ROUTES: List[str] FORWARDER_STREAMING_ROUTES: List[str] @@ -1089,6 +1090,7 @@ def get_endpoint_resource_arguments_from_request( # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, FORWARDER_WORKER_COUNT=FORWARDER_WORKER_COUNT, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, # Triton Deployment Arguments TRITON_MODEL_REPOSITORY=flavor.triton_model_repository, TRITON_CPUS=str(flavor.triton_num_cpu), @@ -1143,6 +1145,7 @@ def get_endpoint_resource_arguments_from_request( # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, FORWARDER_WORKER_COUNT=FORWARDER_WORKER_COUNT, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, # GPU Deployment Arguments GPU_TYPE=build_endpoint_request.gpu_type.value, GPUS=build_endpoint_request.gpus, @@ -1203,6 +1206,10 @@ def get_endpoint_resource_arguments_from_request( # Streaming Arguments FORWARDER_PORT=FORWARDER_PORT, FORWARDER_WORKER_COUNT=FORWARDER_WORKER_COUNT, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, + FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + + flavor.routes + + flavor.extra_routes, # GPU Arguments GPU_TYPE=build_endpoint_request.gpu_type.value, GPUS=build_endpoint_request.gpus, From 8b623b32061f9381abfb96040f99ddd7b7604a57 Mon Sep 17 00:00:00 2001 From: meher-m Date: Thu, 25 Sep 2025 21:27:00 +0000 Subject: [PATCH 19/31] change to pass unit tests --- .../service_template_config_map_circleci.yaml | 64 ++++++++----------- 1 file changed, 28 insertions(+), 36 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml index 6b5610442..59aa6df04 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml +++ b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml @@ -167,8 +167,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -440,8 +440,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -660,8 +660,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -929,8 +929,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -1104,17 +1104,13 @@ data: - --num-workers - "${FORWARDER_WORKER_COUNT}" - --set - - "forwarder.sync.predict_route=${PREDICT_ROUTE}" - - --set - - "forwarder.stream.predict_route=${STREAMING_PREDICT_ROUTE}" - - --set - "forwarder.sync.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - - "forwarder.sync.extra_routes=${FORWARDER_EXTRA_ROUTES}" + - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - - "forwarder.stream.extra_routes=${FORWARDER_EXTRA_ROUTES}" + - "forwarder.stream.routes=${FORWARDER_STREAMING_ROUTES}" - --set - "forwarder.sync.forwarder_type=${FORWARDER_TYPE}" - --set @@ -1162,8 +1158,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -1398,8 +1394,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -1679,8 +1675,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -1907,8 +1903,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -2184,8 +2180,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -2367,17 +2363,13 @@ data: - --num-workers - "${FORWARDER_WORKER_COUNT}" - --set - - "forwarder.sync.predict_route=${PREDICT_ROUTE}" - - --set - - "forwarder.stream.predict_route=${STREAMING_PREDICT_ROUTE}" - - --set - "forwarder.sync.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - - "forwarder.sync.extra_routes=${FORWARDER_EXTRA_ROUTES}" + - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - - "forwarder.stream.extra_routes=${FORWARDER_EXTRA_ROUTES}" + - "forwarder.stream.routes=${FORWARDER_STREAMING_ROUTES}" - --set - "forwarder.sync.forwarder_type=${FORWARDER_TYPE}" - --set @@ -2425,8 +2417,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config @@ -2638,8 +2630,8 @@ data: apiVersion: leaderworkerset.x-k8s.io/v1 kind: LeaderWorkerSet metadata: - name: ${RESOURCE_NAME} - namespace: ${NAMESPACE} + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} labels: user_id: ${OWNER} team: ${TEAM} @@ -2780,8 +2772,8 @@ data: cpu: ${FORWARDER_CPUS_LIMIT} memory: ${FORWARDER_MEMORY_LIMIT} ephemeral-storage: ${FORWARDER_STORAGE_LIMIT} - - + + volumeMounts: - name: config-volume mountPath: /opt/.aws/config From cbb0e0518c953d2f8c5aecc7b44a9c4949698b4d Mon Sep 17 00:00:00 2001 From: meher-m Date: Fri, 26 Sep 2025 15:48:15 +0000 Subject: [PATCH 20/31] update orm --- .../infra/repositories/db_model_bundle_repository.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py b/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py index 033b1971f..7072d2cac 100644 --- a/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py +++ b/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py @@ -146,6 +146,7 @@ def translate_model_bundle_orm_to_model_bundle( env=model_bundle_orm.runnable_image_env, protocol=model_bundle_orm.runnable_image_protocol, readiness_initial_delay_seconds=model_bundle_orm.runnable_image_readiness_initial_delay_seconds, + routes=model_bundle_orm.runnable_image_routes, extra_routes=model_bundle_orm.runnable_image_extra_routes, forwarder_type=model_bundle_orm.runnable_image_forwarder_type, worker_command=model_bundle_orm.runnable_image_worker_command, @@ -218,6 +219,7 @@ def translate_kwargs_to_model_bundle_orm( runnable_image_readiness_initial_delay_seconds=flavor_dict.get( "readiness_initial_delay_seconds" ), + runnable_image_routes=flavor_dict.get("routes"), runnable_image_extra_routes=flavor_dict.get("extra_routes"), runnable_image_forwarder_type=flavor_dict.get("forwarder_type"), runnable_image_worker_command=flavor_dict.get("worker_command"), From fd3ad7a95f95729399ccf0ab888c9d16dc1fa2e8 Mon Sep 17 00:00:00 2001 From: meher-m Date: Fri, 26 Sep 2025 16:34:29 +0000 Subject: [PATCH 21/31] test change --- .../use_cases/llm_model_endpoint_use_cases.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py index 352b7a060..ce6054241 100644 --- a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py @@ -858,6 +858,7 @@ async def create_sglang_bundle( # pragma: no cover protocol="http", readiness_initial_delay_seconds=10, healthcheck_route="/health", + # TODO: do i need to change these also? predict_route="/predict", streaming_predict_route="/stream", extra_routes=[ @@ -1016,9 +1017,9 @@ async def create_vllm_bundle( streaming_command=command, protocol="http", readiness_initial_delay_seconds=10, - healthcheck_route="/health", - predict_route="/predict", - streaming_predict_route="/stream", + healthcheck_route="/ping", + predict_route="/v1/completions", + streaming_predict_route="/v1/completions", routes=[ OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH, @@ -1098,9 +1099,9 @@ async def create_vllm_multinode_bundle( streaming_command=leader_command, protocol="http", readiness_initial_delay_seconds=10, - healthcheck_route="/health", - predict_route="/predict", - streaming_predict_route="/stream", + healthcheck_route="/ping", + predict_route="/v1/completions", + streaming_predict_route="/v1/completions", routes=[OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH], env=common_vllm_envs, worker_command=worker_command, From 9b59d4d35ce7459daeb3e54504faabcda5a30341 Mon Sep 17 00:00:00 2001 From: meher-m Date: Fri, 26 Sep 2025 17:00:12 +0000 Subject: [PATCH 22/31] change test bundle --- integration_tests/rest_api_utils.py | 4 ++++ .../domain/use_cases/llm_model_endpoint_use_cases.py | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/integration_tests/rest_api_utils.py b/integration_tests/rest_api_utils.py index 7db937dc6..fc51e99ac 100644 --- a/integration_tests/rest_api_utils.py +++ b/integration_tests/rest_api_utils.py @@ -107,6 +107,10 @@ def my_model(**keyword_args): }, "protocol": "http", "readiness_initial_delay_seconds": 20, + "healthcheck_route": "/health", + "predict_route": "/predict", + "streaming_predict_route": "/stream", + "routes": ["/predict", "/stream"], }, } diff --git a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py index ce6054241..55b3c331d 100644 --- a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py @@ -858,7 +858,6 @@ async def create_sglang_bundle( # pragma: no cover protocol="http", readiness_initial_delay_seconds=10, healthcheck_route="/health", - # TODO: do i need to change these also? predict_route="/predict", streaming_predict_route="/stream", extra_routes=[ From fa1a5b32910ebdb26c16e9c658fe436f1a7584b1 Mon Sep 17 00:00:00 2001 From: meher-m Date: Fri, 26 Sep 2025 17:23:27 +0000 Subject: [PATCH 23/31] add debug logs --- integration_tests/rest_api_utils.py | 4 ++++ .../domain/use_cases/llm_model_endpoint_use_cases.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/integration_tests/rest_api_utils.py b/integration_tests/rest_api_utils.py index fc51e99ac..65c950401 100644 --- a/integration_tests/rest_api_utils.py +++ b/integration_tests/rest_api_utils.py @@ -369,6 +369,7 @@ def create_model_endpoint( ) -> Dict[str, Any]: create_model_endpoint_request = create_model_endpoint_request.copy() replace_model_bundle_name_with_id(create_model_endpoint_request, user_id, "v1") + print(f"DEBUG: Creating endpoint with request: {create_model_endpoint_request}") response = requests.post( f"{BASE_PATH}/v1/model-endpoints", json=create_model_endpoint_request, @@ -376,8 +377,11 @@ def create_model_endpoint( auth=(user_id, ""), timeout=DEFAULT_NETWORK_TIMEOUT_SEC, ) + print(f"DEBUG: Endpoint creation response status: {response.status_code}") if not response.ok: + print(f"ERROR: Endpoint creation failed: {response.content}") raise ValueError(response.content) + print(f"SUCCESS: Endpoint created successfully") return response.json() diff --git a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py index 55b3c331d..f632ad91a 100644 --- a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py @@ -1016,7 +1016,7 @@ async def create_vllm_bundle( streaming_command=command, protocol="http", readiness_initial_delay_seconds=10, - healthcheck_route="/ping", + healthcheck_route="/health", predict_route="/v1/completions", streaming_predict_route="/v1/completions", routes=[ @@ -1098,7 +1098,7 @@ async def create_vllm_multinode_bundle( streaming_command=leader_command, protocol="http", readiness_initial_delay_seconds=10, - healthcheck_route="/ping", + healthcheck_route="/health", predict_route="/v1/completions", streaming_predict_route="/v1/completions", routes=[OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH], From edcf542007f517498fec96ef0341dada17b6f718 Mon Sep 17 00:00:00 2001 From: meher-m Date: Fri, 26 Sep 2025 21:00:58 +0000 Subject: [PATCH 24/31] trying to fix --- integration_tests/rest_api_utils.py | 1 + .../infra/repositories/db_model_bundle_repository.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/integration_tests/rest_api_utils.py b/integration_tests/rest_api_utils.py index 65c950401..c677bf2bb 100644 --- a/integration_tests/rest_api_utils.py +++ b/integration_tests/rest_api_utils.py @@ -111,6 +111,7 @@ def my_model(**keyword_args): "predict_route": "/predict", "streaming_predict_route": "/stream", "routes": ["/predict", "/stream"], + "extra_routes": ["/predict", "/stream"], }, } diff --git a/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py b/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py index 7072d2cac..29a0a1982 100644 --- a/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py +++ b/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py @@ -141,13 +141,13 @@ def translate_model_bundle_orm_to_model_bundle( repository=model_bundle_orm.runnable_image_repository, tag=model_bundle_orm.runnable_image_tag, command=model_bundle_orm.runnable_image_command, - predict_route=model_bundle_orm.runnable_image_predict_route, - healthcheck_route=model_bundle_orm.runnable_image_healthcheck_route, + predict_route=model_bundle_orm.runnable_image_predict_route or "/predict", + healthcheck_route=model_bundle_orm.runnable_image_healthcheck_route or "/readyz", env=model_bundle_orm.runnable_image_env, protocol=model_bundle_orm.runnable_image_protocol, readiness_initial_delay_seconds=model_bundle_orm.runnable_image_readiness_initial_delay_seconds, - routes=model_bundle_orm.runnable_image_routes, - extra_routes=model_bundle_orm.runnable_image_extra_routes, + routes=model_bundle_orm.runnable_image_routes or [], + extra_routes=model_bundle_orm.runnable_image_extra_routes or [], forwarder_type=model_bundle_orm.runnable_image_forwarder_type, worker_command=model_bundle_orm.runnable_image_worker_command, worker_env=model_bundle_orm.runnable_image_worker_env, From 3397278f177d48c4e99686cb0707628a0286d9eb Mon Sep 17 00:00:00 2001 From: meher-m Date: Fri, 26 Sep 2025 22:37:03 +0000 Subject: [PATCH 25/31] changes --- .../service_template_config_map.yaml | 20 ++++++------------- integration_tests/rest_api_utils.py | 10 +++++++++- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index 4780e29dc..f1cee8c4d 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -181,10 +181,6 @@ data: - --set - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - - "forwarder.sync.extra_routes=${FORWARDER_EXTRA_ROUTES}" - - --set - - "forwarder.stream.extra_routes=${FORWARDER_EXTRA_ROUTES}" - - --set - "forwarder.sync.forwarder_type=${FORWARDER_TYPE}" - --set - "forwarder.stream.forwarder_type=${FORWARDER_TYPE}" @@ -370,7 +366,7 @@ data: name: {{ $service_template_aws_config_map_name }} {{- else }} name: {{ $aws_config_map_name }} - {{- end }} + {{- end }} {{- end }} - name: user-config configMap: @@ -487,15 +483,15 @@ data: threshold: "${CONCURRENCY}" metricName: request_concurrency_average query: sum(rate(istio_request_duration_milliseconds_sum{destination_workload="${RESOURCE_NAME}"}[2m])) / 1000 - serverAddress: ${PROMETHEUS_SERVER_ADDRESS} + serverAddress: ${PROMETHEUS_SERVER_ADDRESS} {{- range $device := tuple "gpu" }} {{- range $mode := tuple "streaming"}} leader-worker-set-{{ $mode }}-{{ $device }}.yaml: |- apiVersion: leaderworkerset.x-k8s.io/v1 kind: LeaderWorkerSet metadata: - name: ${RESOURCE_NAME} - namespace: ${NAMESPACE} + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} labels: {{- $service_template_labels | nindent 8 }} spec: @@ -617,10 +613,6 @@ data: - --set - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - - "forwarder.sync.extra_routes=${FORWARDER_EXTRA_ROUTES}" - - --set - - "forwarder.stream.extra_routes=${FORWARDER_EXTRA_ROUTES}" - - --set - "forwarder.sync.forwarder_type=${FORWARDER_TYPE}" - --set - "forwarder.stream.forwarder_type=${FORWARDER_TYPE}" @@ -748,7 +740,7 @@ data: name: {{ $service_template_aws_config_map_name }} {{- else }} name: {{ $aws_config_map_name }} - {{- end }} + {{- end }} {{- end }} - name: user-config configMap: @@ -856,7 +848,7 @@ data: name: {{ $service_template_aws_config_map_name }} {{- else }} name: {{ $aws_config_map_name }} - {{- end }} + {{- end }} {{- end }} - name: user-config configMap: diff --git a/integration_tests/rest_api_utils.py b/integration_tests/rest_api_utils.py index c677bf2bb..1f97009a3 100644 --- a/integration_tests/rest_api_utils.py +++ b/integration_tests/rest_api_utils.py @@ -614,7 +614,15 @@ def get_model_endpoint(name: str, user_id: str) -> Dict[str, Any]: ) if not response.ok: raise ValueError(response.content) - return response.json()["model_endpoints"][0] + + endpoints = response.json()["model_endpoints"] + print(f"DEBUG: Looking for endpoint name='{name}', found {len(endpoints)} endpoints:") + for ep in endpoints: + print(f" - name='{ep['name']}', status='{ep['status']}'") + + if not endpoints: + raise ValueError(f"No endpoint found with name '{name}'") + return endpoints[0] @retry(stop=stop_after_attempt(6), wait=wait_fixed(1)) From c849a5e8dcde89d2988132c90322028aabceafbb Mon Sep 17 00:00:00 2001 From: meher-m Date: Fri, 26 Sep 2025 23:00:38 +0000 Subject: [PATCH 26/31] cleanup debug code --- integration_tests/rest_api_utils.py | 14 +------------- integration_tests/test_endpoints.py | 2 +- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/integration_tests/rest_api_utils.py b/integration_tests/rest_api_utils.py index 1f97009a3..eea7e638d 100644 --- a/integration_tests/rest_api_utils.py +++ b/integration_tests/rest_api_utils.py @@ -370,7 +370,6 @@ def create_model_endpoint( ) -> Dict[str, Any]: create_model_endpoint_request = create_model_endpoint_request.copy() replace_model_bundle_name_with_id(create_model_endpoint_request, user_id, "v1") - print(f"DEBUG: Creating endpoint with request: {create_model_endpoint_request}") response = requests.post( f"{BASE_PATH}/v1/model-endpoints", json=create_model_endpoint_request, @@ -378,11 +377,8 @@ def create_model_endpoint( auth=(user_id, ""), timeout=DEFAULT_NETWORK_TIMEOUT_SEC, ) - print(f"DEBUG: Endpoint creation response status: {response.status_code}") if not response.ok: - print(f"ERROR: Endpoint creation failed: {response.content}") raise ValueError(response.content) - print(f"SUCCESS: Endpoint created successfully") return response.json() @@ -614,15 +610,7 @@ def get_model_endpoint(name: str, user_id: str) -> Dict[str, Any]: ) if not response.ok: raise ValueError(response.content) - - endpoints = response.json()["model_endpoints"] - print(f"DEBUG: Looking for endpoint name='{name}', found {len(endpoints)} endpoints:") - for ep in endpoints: - print(f" - name='{ep['name']}', status='{ep['status']}'") - - if not endpoints: - raise ValueError(f"No endpoint found with name '{name}'") - return endpoints[0] + return response.json()["model_endpoints"][0] @retry(stop=stop_after_attempt(6), wait=wait_fixed(1)) diff --git a/integration_tests/test_endpoints.py b/integration_tests/test_endpoints.py index 9e63fb91c..609dc6faa 100644 --- a/integration_tests/test_endpoints.py +++ b/integration_tests/test_endpoints.py @@ -132,7 +132,7 @@ def test_sync_model_endpoint(capsys): print(f"Creating {create_endpoint_request['name']} model endpoint...") create_model_endpoint(create_endpoint_request, user) - ensure_n_ready_endpoints_short(1, user) + ensure_n_ready_endpoints_short(1, user) # TODO: changed to long in ci for now. print(f"Updating {create_endpoint_request['name']} model endpoint...") update_model_endpoint( From ca603dd551f2e03ae8be9df1f5ca43b066d5af12 Mon Sep 17 00:00:00 2001 From: meher-m Date: Fri, 26 Sep 2025 23:07:31 +0000 Subject: [PATCH 27/31] reformat --- integration_tests/test_endpoints.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/test_endpoints.py b/integration_tests/test_endpoints.py index 609dc6faa..00871e03b 100644 --- a/integration_tests/test_endpoints.py +++ b/integration_tests/test_endpoints.py @@ -132,7 +132,7 @@ def test_sync_model_endpoint(capsys): print(f"Creating {create_endpoint_request['name']} model endpoint...") create_model_endpoint(create_endpoint_request, user) - ensure_n_ready_endpoints_short(1, user) # TODO: changed to long in ci for now. + ensure_n_ready_endpoints_short(1, user) # TODO: changed to long in ci for now. print(f"Updating {create_endpoint_request['name']} model endpoint...") update_model_endpoint( From 9a5f7895b3f05f414dca1b4a8efb8008cf1ec44c Mon Sep 17 00:00:00 2001 From: meher-m Date: Fri, 26 Sep 2025 23:27:39 +0000 Subject: [PATCH 28/31] remove 1 --- integration_tests/test_endpoints.py | 2 +- .../infra/repositories/db_model_bundle_repository.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/integration_tests/test_endpoints.py b/integration_tests/test_endpoints.py index 00871e03b..9e63fb91c 100644 --- a/integration_tests/test_endpoints.py +++ b/integration_tests/test_endpoints.py @@ -132,7 +132,7 @@ def test_sync_model_endpoint(capsys): print(f"Creating {create_endpoint_request['name']} model endpoint...") create_model_endpoint(create_endpoint_request, user) - ensure_n_ready_endpoints_short(1, user) # TODO: changed to long in ci for now. + ensure_n_ready_endpoints_short(1, user) print(f"Updating {create_endpoint_request['name']} model endpoint...") update_model_endpoint( diff --git a/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py b/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py index 29a0a1982..7072d2cac 100644 --- a/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py +++ b/model-engine/model_engine_server/infra/repositories/db_model_bundle_repository.py @@ -141,13 +141,13 @@ def translate_model_bundle_orm_to_model_bundle( repository=model_bundle_orm.runnable_image_repository, tag=model_bundle_orm.runnable_image_tag, command=model_bundle_orm.runnable_image_command, - predict_route=model_bundle_orm.runnable_image_predict_route or "/predict", - healthcheck_route=model_bundle_orm.runnable_image_healthcheck_route or "/readyz", + predict_route=model_bundle_orm.runnable_image_predict_route, + healthcheck_route=model_bundle_orm.runnable_image_healthcheck_route, env=model_bundle_orm.runnable_image_env, protocol=model_bundle_orm.runnable_image_protocol, readiness_initial_delay_seconds=model_bundle_orm.runnable_image_readiness_initial_delay_seconds, - routes=model_bundle_orm.runnable_image_routes or [], - extra_routes=model_bundle_orm.runnable_image_extra_routes or [], + routes=model_bundle_orm.runnable_image_routes, + extra_routes=model_bundle_orm.runnable_image_extra_routes, forwarder_type=model_bundle_orm.runnable_image_forwarder_type, worker_command=model_bundle_orm.runnable_image_worker_command, worker_env=model_bundle_orm.runnable_image_worker_env, From 6affb1a3039f4fa5933c332469cb51fc5b33c268 Mon Sep 17 00:00:00 2001 From: meher-m Date: Sat, 27 Sep 2025 00:17:24 +0000 Subject: [PATCH 29/31] remove 2 --- integration_tests/rest_api_utils.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/integration_tests/rest_api_utils.py b/integration_tests/rest_api_utils.py index eea7e638d..7db937dc6 100644 --- a/integration_tests/rest_api_utils.py +++ b/integration_tests/rest_api_utils.py @@ -107,11 +107,6 @@ def my_model(**keyword_args): }, "protocol": "http", "readiness_initial_delay_seconds": 20, - "healthcheck_route": "/health", - "predict_route": "/predict", - "streaming_predict_route": "/stream", - "routes": ["/predict", "/stream"], - "extra_routes": ["/predict", "/stream"], }, } From b25e7a7c21e864ac372c9f98d2e0b33739987d09 Mon Sep 17 00:00:00 2001 From: meher-m Date: Sat, 27 Sep 2025 00:45:43 +0000 Subject: [PATCH 30/31] revert 3 --- .../domain/use_cases/llm_model_endpoint_use_cases.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py index f632ad91a..352b7a060 100644 --- a/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py @@ -1017,8 +1017,8 @@ async def create_vllm_bundle( protocol="http", readiness_initial_delay_seconds=10, healthcheck_route="/health", - predict_route="/v1/completions", - streaming_predict_route="/v1/completions", + predict_route="/predict", + streaming_predict_route="/stream", routes=[ OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH, @@ -1099,8 +1099,8 @@ async def create_vllm_multinode_bundle( protocol="http", readiness_initial_delay_seconds=10, healthcheck_route="/health", - predict_route="/v1/completions", - streaming_predict_route="/v1/completions", + predict_route="/predict", + streaming_predict_route="/stream", routes=[OPENAI_CHAT_COMPLETION_PATH, OPENAI_COMPLETION_PATH], env=common_vllm_envs, worker_command=worker_command, From dcca5a8a0b5456e6cc95da36ca7df4cd07058229 Mon Sep 17 00:00:00 2001 From: meher-m Date: Mon, 29 Sep 2025 16:17:30 +0000 Subject: [PATCH 31/31] reorder params --- .../gateways/resources/k8s_resource_types.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 7b6d27357..6f0d3133c 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -760,7 +760,6 @@ def get_endpoint_resource_arguments_from_request( MAIN_ENV=main_env, COMMAND=flavor.streaming_command, PREDICT_ROUTE=flavor.predict_route, - STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, HEALTHCHECK_ROUTE=flavor.healthcheck_route, READINESS_INITIAL_DELAY=flavor.readiness_initial_delay_seconds, INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH=infra_service_config_volume_mount_path, @@ -769,14 +768,15 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, - FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] - + flavor.routes - + flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Streaming Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, FORWARDER_WORKER_COUNT=FORWARDER_WORKER_COUNT, + STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, + FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + + flavor.routes + + flavor.extra_routes, ) elif endpoint_resource_name == "deployment-runnable-image-streaming-gpu": assert isinstance(flavor, StreamingEnhancedRunnableImageFlavor) @@ -811,7 +811,6 @@ def get_endpoint_resource_arguments_from_request( MAIN_ENV=main_env, COMMAND=flavor.streaming_command, PREDICT_ROUTE=flavor.predict_route, - STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, HEALTHCHECK_ROUTE=flavor.healthcheck_route, READINESS_INITIAL_DELAY=flavor.readiness_initial_delay_seconds, INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH=infra_service_config_volume_mount_path, @@ -820,14 +819,15 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, - FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] - + flavor.routes - + flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Streaming Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, FORWARDER_WORKER_COUNT=FORWARDER_WORKER_COUNT, + STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, + FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + + flavor.routes + + flavor.extra_routes, # GPU Deployment Arguments GPU_TYPE=build_endpoint_request.gpu_type.value, GPUS=build_endpoint_request.gpus, @@ -872,11 +872,11 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, FORWARDER_WORKER_COUNT=FORWARDER_WORKER_COUNT, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, ) elif endpoint_resource_name == "deployment-runnable-image-sync-gpu": assert isinstance(flavor, RunnableImageLike) @@ -919,11 +919,11 @@ def get_endpoint_resource_arguments_from_request( FORWARDER_MEMORY_LIMIT=FORWARDER_MEMORY_USAGE, FORWARDER_STORAGE_LIMIT=FORWARDER_STORAGE_USAGE, USER_CONTAINER_PORT=USER_CONTAINER_PORT, - FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, FORWARDER_TYPE=flavor.forwarder_type, # Sync Deployment Arguments FORWARDER_PORT=FORWARDER_PORT, FORWARDER_WORKER_COUNT=FORWARDER_WORKER_COUNT, + FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, # GPU Deployment Arguments GPU_TYPE=build_endpoint_request.gpu_type.value, GPUS=build_endpoint_request.gpus, @@ -1193,7 +1193,6 @@ def get_endpoint_resource_arguments_from_request( MAIN_ENV=main_env, COMMAND=flavor.streaming_command, PREDICT_ROUTE=flavor.predict_route, - STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, HEALTHCHECK_ROUTE=flavor.healthcheck_route, READINESS_INITIAL_DELAY=flavor.readiness_initial_delay_seconds, INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH=infra_service_config_volume_mount_path, @@ -1206,6 +1205,7 @@ def get_endpoint_resource_arguments_from_request( # Streaming Arguments FORWARDER_PORT=FORWARDER_PORT, FORWARDER_WORKER_COUNT=FORWARDER_WORKER_COUNT, + STREAMING_PREDICT_ROUTE=flavor.streaming_predict_route, FORWARDER_SYNC_ROUTES=[flavor.predict_route] + flavor.routes + flavor.extra_routes, FORWARDER_STREAMING_ROUTES=[flavor.streaming_predict_route] + flavor.routes