diff --git a/model-engine/README.md b/model-engine/README.md index 3f6b9579..385afd9a 100644 --- a/model-engine/README.md +++ b/model-engine/README.md @@ -42,6 +42,41 @@ Run `mypy . --install-types` to set up mypy. Most of the business logic in Model Engine should contain unit tests, located in [`tests/unit`](./tests/unit). To run the tests, run `pytest`. +### Testing the http_forwarder + +First have some endpoint running on port 5005 +```sh +(llm-engine-vllm) ➜ vllm git:(dmchoi/vllm_batch_upgrade) ✗ export IMAGE=692474966980.dkr.ecr.us-west-2.amazonaws.com/vllm:0.10.1.1-rc2 +(llm-engine-vllm) ➜ vllm git:(dmchoi/vllm_batch_upgrade) ✗ export MODEL=meta-llama/Meta-Llama-3.1-8B-Instruct && export MODEL_PATH=/data/model_files/$MODEL +(llm-engine-vllm) ➜ vllm git:(dmchoi/vllm_batch_upgrade) ✗ export REPO_PATH=/mnt/home/dmchoi/repos/scale +(llm-engine-vllm) ➜ vllm git:(dmchoi/vllm_batch_upgrade) ✗ docker kill vlll; docker rm vllm; docker run \ + --runtime nvidia \ + --shm-size=16gb \ + --gpus '"device=0,1,2,3"' \ + -v $MODEL_PATH:/workspace/model_files:ro \ + -v ${REPO_PATH}/llm-engine/model-engine/model_engine_server/inference/vllm/vllm_server.py:/workspace/vllm_server.py \ + -p 5005:5005 \ + --name vllm \ + ${IMAGE} \ + python -m vllm_server --model model_files --port 5005 --disable-log-requests --max-model-len 4096 --max-num-seqs 16 --enforce-eager +``` + +Then you can run the forwarder locally like this +```sh +GIT_TAG=test python model_engine_server/inference/forwarding/http_forwarder.py \ + --config model_engine_server/inference/configs/service--http_forwarder.yaml \ + --num-workers 1 \ + --set "forwarder.sync.extra_routes=['/v1/chat/completions','/v1/completions']" \ + --set "forwarder.stream.extra_routes=['/v1/chat/completions','/v1/completions']" \ + --set "forwarder.sync.healthcheck_route=/health" \ + --set "forwarder.stream.healthcheck_route=/health" +``` + +Then you can hit the forwarder like this +```sh + curl -X POST localhost:5000/v1/chat/completions -H "Content-Type: application/json" -d "{\"args\": {\"model\":\"$MODEL\", \"messages\":[{\"role\": \"systemr\", \"content\": \"Hey, what's the temperature in Paris right now?\"}],\"max_tokens\":100,\"temperature\":0.2,\"guided_regex\":\"Sean.*\"}}" +``` + ## Generating OpenAI types We've decided to make our V2 APIs OpenAI compatible. We generate the corresponding Pydantic models: diff --git a/model-engine/model_engine_server/domain/entities/model_bundle_entity.py b/model-engine/model_engine_server/domain/entities/model_bundle_entity.py index 172f84f2..2a5a4863 100644 --- a/model-engine/model_engine_server/domain/entities/model_bundle_entity.py +++ b/model-engine/model_engine_server/domain/entities/model_bundle_entity.py @@ -163,6 +163,7 @@ class RunnableImageLike(BaseModel, ABC): protocol: Literal["http"] # TODO: add support for other protocols (e.g. grpc) readiness_initial_delay_seconds: int = 120 extra_routes: List[str] = Field(default_factory=list) + routes: List[str] = Field(default_factory=list) forwarder_type: Optional[str] = ForwarderType.DEFAULT.value worker_command: Optional[List[str]] = None worker_env: Optional[Dict[str, str]] = None diff --git a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py index d0d9a1bd..eb32b180 100644 --- a/model-engine/model_engine_server/inference/forwarding/http_forwarder.py +++ b/model-engine/model_engine_server/inference/forwarding/http_forwarder.py @@ -41,6 +41,8 @@ def get_forwarder_loader(destination_path: Optional[str] = None) -> LoadForwarde config = get_config()["sync"] if "extra_routes" in config: del config["extra_routes"] + if "routes" in config: + del config["routes"] if destination_path: config["predict_route"] = destination_path if "forwarder_type" in config: @@ -55,6 +57,8 @@ def get_streaming_forwarder_loader( config = get_config()["stream"] if "extra_routes" in config: del config["extra_routes"] + if "routes" in config: + del config["routes"] if destination_path: config["predict_route"] = destination_path if "forwarder_type" in config: @@ -276,14 +280,34 @@ async def init_app(): def healthcheck(): return "OK" - def add_extra_sync_or_stream_routes(app: FastAPI): - """Read extra_routes from config and dynamically add routes to app""" + def add_sync_or_stream_routes(app: FastAPI): + """Read routes from config (both old extra_routes and new routes field) and dynamically add routes to app""" config = get_config() sync_forwarders: Dict[str, Forwarder] = dict() stream_forwarders: Dict[str, StreamingForwarder] = dict() - for route in config.get("sync", {}).get("extra_routes", []): + + # Gather all sync routes from extra_routes and routes fields + sync_routes_to_add = set() + sync_routes_to_add.update(config.get("sync", {}).get("extra_routes", [])) + sync_routes_to_add.update(config.get("sync", {}).get("routes", [])) + + # predict_route = config.get("sync", {}).get("predict_route", None) + # if predict_route: + # sync_routes_to_add.add(predict_route) + + # Gather all stream routes from extra_routes and routes fields + stream_routes_to_add = set() + stream_routes_to_add.update(config.get("stream", {}).get("extra_routes", [])) + stream_routes_to_add.update(config.get("stream", {}).get("routes", [])) + + # stream_predict_route = config.get("stream", {}).get("predict_route", None) + # if stream_predict_route: + # stream_routes_to_add.add(stream_predict_route) + + # Load forwarders for all routes + for route in sync_routes_to_add: sync_forwarders[route] = load_forwarder(route) - for route in config.get("stream", {}).get("extra_routes", []): + for route in stream_routes_to_add: stream_forwarders[route] = load_streaming_forwarder(route) all_routes = set(list(sync_forwarders.keys()) + list(stream_forwarders.keys())) @@ -327,7 +351,14 @@ def add_stream_passthrough_routes(app: FastAPI): config = get_config() passthrough_forwarders: Dict[str, PassthroughForwarder] = dict() - for route in config.get("stream", {}).get("extra_routes", []): + + # Gather all routes from extra_routes and routes fields + stream_passthrough_routes_to_add = set() + stream_passthrough_routes_to_add.update(config.get("stream", {}).get("extra_routes", [])) + stream_passthrough_routes_to_add.update(config.get("stream", {}).get("routes", [])) + + # Load passthrough forwarders for all routes + for route in stream_passthrough_routes_to_add: passthrough_forwarders[route] = load_stream_passthrough_forwarder(route) for route in passthrough_forwarders: @@ -352,7 +383,13 @@ def add_sync_passthrough_routes(app: FastAPI): config = get_config() passthrough_forwarders: Dict[str, PassthroughForwarder] = dict() - for route in config.get("sync", {}).get("extra_routes", []): + + # Handle legacy extra_routes configuration (backwards compatibility) + sync_passthrough_routes_to_add = set() + sync_passthrough_routes_to_add.update(config.get("sync", {}).get("extra_routes", [])) + sync_passthrough_routes_to_add.update(config.get("sync", {}).get("routes", [])) + + for route in sync_passthrough_routes_to_add: passthrough_forwarders[route] = load_sync_passthrough_forwarder(route) for route in passthrough_forwarders: @@ -380,7 +417,7 @@ def add_extra_routes(app: FastAPI): elif config.get("sync", {}).get("forwarder_type") == "passthrough": add_sync_passthrough_routes(app) else: - add_extra_sync_or_stream_routes(app) + add_sync_or_stream_routes(app) app.add_api_route(path="/healthz", endpoint=healthcheck, methods=["GET"]) app.add_api_route(path="/readyz", endpoint=healthcheck, methods=["GET"])