Skip to content
35 changes: 35 additions & 0 deletions model-engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,41 @@ Run `mypy . --install-types` to set up mypy.
Most of the business logic in Model Engine should contain unit tests, located in
[`tests/unit`](./tests/unit). To run the tests, run `pytest`.

### Testing the http_forwarder

First have some endpoint running on port 5005
```sh
(llm-engine-vllm) ➜ vllm git:(dmchoi/vllm_batch_upgrade) ✗ export IMAGE=692474966980.dkr.ecr.us-west-2.amazonaws.com/vllm:0.10.1.1-rc2
(llm-engine-vllm) ➜ vllm git:(dmchoi/vllm_batch_upgrade) ✗ export MODEL=meta-llama/Meta-Llama-3.1-8B-Instruct && export MODEL_PATH=/data/model_files/$MODEL
(llm-engine-vllm) ➜ vllm git:(dmchoi/vllm_batch_upgrade) ✗ export REPO_PATH=/mnt/home/dmchoi/repos/scale
(llm-engine-vllm) ➜ vllm git:(dmchoi/vllm_batch_upgrade) ✗ docker kill vlll; docker rm vllm; docker run \
--runtime nvidia \
--shm-size=16gb \
--gpus '"device=0,1,2,3"' \
-v $MODEL_PATH:/workspace/model_files:ro \
-v ${REPO_PATH}/llm-engine/model-engine/model_engine_server/inference/vllm/vllm_server.py:/workspace/vllm_server.py \
-p 5005:5005 \
--name vllm \
${IMAGE} \
python -m vllm_server --model model_files --port 5005 --disable-log-requests --max-model-len 4096 --max-num-seqs 16 --enforce-eager
```

Then you can run the forwarder locally like this
```sh
GIT_TAG=test python model_engine_server/inference/forwarding/http_forwarder.py \
--config model_engine_server/inference/configs/service--http_forwarder.yaml \
--num-workers 1 \
--set "forwarder.sync.extra_routes=['/v1/chat/completions','/v1/completions']" \
--set "forwarder.stream.extra_routes=['/v1/chat/completions','/v1/completions']" \
--set "forwarder.sync.healthcheck_route=/health" \
--set "forwarder.stream.healthcheck_route=/health"
```

Then you can hit the forwarder like this
```sh
curl -X POST localhost:5000/v1/chat/completions -H "Content-Type: application/json" -d "{\"args\": {\"model\":\"$MODEL\", \"messages\":[{\"role\": \"systemr\", \"content\": \"Hey, what's the temperature in Paris right now?\"}],\"max_tokens\":100,\"temperature\":0.2,\"guided_regex\":\"Sean.*\"}}"
```

## Generating OpenAI types
We've decided to make our V2 APIs OpenAI compatible. We generate the
corresponding Pydantic models:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class RunnableImageLike(BaseModel, ABC):
protocol: Literal["http"] # TODO: add support for other protocols (e.g. grpc)
readiness_initial_delay_seconds: int = 120
extra_routes: List[str] = Field(default_factory=list)
routes: List[str] = Field(default_factory=list)
forwarder_type: Optional[str] = ForwarderType.DEFAULT.value
worker_command: Optional[List[str]] = None
worker_env: Optional[Dict[str, str]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def get_forwarder_loader(destination_path: Optional[str] = None) -> LoadForwarde
config = get_config()["sync"]
if "extra_routes" in config:
del config["extra_routes"]
if "routes" in config:
del config["routes"]
if destination_path:
config["predict_route"] = destination_path
if "forwarder_type" in config:
Expand All @@ -55,6 +57,8 @@ def get_streaming_forwarder_loader(
config = get_config()["stream"]
if "extra_routes" in config:
del config["extra_routes"]
if "routes" in config:
del config["routes"]
if destination_path:
config["predict_route"] = destination_path
if "forwarder_type" in config:
Expand Down Expand Up @@ -276,14 +280,34 @@ async def init_app():
def healthcheck():
return "OK"

def add_extra_sync_or_stream_routes(app: FastAPI):
"""Read extra_routes from config and dynamically add routes to app"""
def add_sync_or_stream_routes(app: FastAPI):
"""Read routes from config (both old extra_routes and new routes field) and dynamically add routes to app"""
config = get_config()
sync_forwarders: Dict[str, Forwarder] = dict()
stream_forwarders: Dict[str, StreamingForwarder] = dict()
for route in config.get("sync", {}).get("extra_routes", []):

# Gather all sync routes from extra_routes and routes fields
sync_routes_to_add = set()
sync_routes_to_add.update(config.get("sync", {}).get("extra_routes", []))
sync_routes_to_add.update(config.get("sync", {}).get("routes", []))

# predict_route = config.get("sync", {}).get("predict_route", None)
# if predict_route:
# sync_routes_to_add.add(predict_route)

# Gather all stream routes from extra_routes and routes fields
stream_routes_to_add = set()
stream_routes_to_add.update(config.get("stream", {}).get("extra_routes", []))
stream_routes_to_add.update(config.get("stream", {}).get("routes", []))

# stream_predict_route = config.get("stream", {}).get("predict_route", None)
# if stream_predict_route:
# stream_routes_to_add.add(stream_predict_route)

# Load forwarders for all routes
for route in sync_routes_to_add:
sync_forwarders[route] = load_forwarder(route)
for route in config.get("stream", {}).get("extra_routes", []):
for route in stream_routes_to_add:
stream_forwarders[route] = load_streaming_forwarder(route)

all_routes = set(list(sync_forwarders.keys()) + list(stream_forwarders.keys()))
Expand Down Expand Up @@ -327,7 +351,14 @@ def add_stream_passthrough_routes(app: FastAPI):
config = get_config()

passthrough_forwarders: Dict[str, PassthroughForwarder] = dict()
for route in config.get("stream", {}).get("extra_routes", []):

# Gather all routes from extra_routes and routes fields
stream_passthrough_routes_to_add = set()
stream_passthrough_routes_to_add.update(config.get("stream", {}).get("extra_routes", []))
stream_passthrough_routes_to_add.update(config.get("stream", {}).get("routes", []))

# Load passthrough forwarders for all routes
for route in stream_passthrough_routes_to_add:
passthrough_forwarders[route] = load_stream_passthrough_forwarder(route)

for route in passthrough_forwarders:
Expand All @@ -352,7 +383,13 @@ def add_sync_passthrough_routes(app: FastAPI):
config = get_config()

passthrough_forwarders: Dict[str, PassthroughForwarder] = dict()
for route in config.get("sync", {}).get("extra_routes", []):

# Handle legacy extra_routes configuration (backwards compatibility)
sync_passthrough_routes_to_add = set()
sync_passthrough_routes_to_add.update(config.get("sync", {}).get("extra_routes", []))
sync_passthrough_routes_to_add.update(config.get("sync", {}).get("routes", []))

for route in sync_passthrough_routes_to_add:
passthrough_forwarders[route] = load_sync_passthrough_forwarder(route)

for route in passthrough_forwarders:
Expand Down Expand Up @@ -380,7 +417,7 @@ def add_extra_routes(app: FastAPI):
elif config.get("sync", {}).get("forwarder_type") == "passthrough":
add_sync_passthrough_routes(app)
else:
add_extra_sync_or_stream_routes(app)
add_sync_or_stream_routes(app)

app.add_api_route(path="/healthz", endpoint=healthcheck, methods=["GET"])
app.add_api_route(path="/readyz", endpoint=healthcheck, methods=["GET"])
Expand Down