Skip to content

[MLI-6876] feat(api): add forwarder_max_concurrency to model endpoint create/update#835

Merged
diazagasatya merged 7 commits into
mainfrom
feat/MLI-6876-forwarder-max-concurrency-request-schema
May 28, 2026
Merged

[MLI-6876] feat(api): add forwarder_max_concurrency to model endpoint create/update#835
diazagasatya merged 7 commits into
mainfrom
feat/MLI-6876-forwarder-max-concurrency-request-schema

Conversation

@diazagasatya
Copy link
Copy Markdown
Collaborator

@diazagasatya diazagasatya commented May 27, 2026

Note: this PR supersedes #834 — same code, moved from my fork (diazagasatya/llm-engine) to an upstream branch now that I have write access, so CircleCI can run. All review comments + Lily's approval still apply to the same 3 commits below.

Summary

Adds an optional forwarder_max_concurrency: Optional[int] field to the Launch model-endpoint create and update API. When set, this propagates as a FORWARDER_MAX_CONCURRENCY env var on the generated K8s deployment; the downstream deployer uses it to set LIRA's ForwarderDeployOptions.forwarder_max_concurrency, overriding the HTTP forwarder container's --concurrency flag independently of pod.concurrency.

Default None preserves existing behavior end-to-end.

Why

Followup to Scale-internal MLI-5366 (LIRA layer) and the public-SDK chain (MLI-6875, MLI-6874). MLI-5366 added the field to LIRA's deploy schema; this PR exposes it at the API boundary so the SDK chain can plumb it from public callers all the way to the rendered K8s manifest.

Changes

File Change
dtos/model_endpoints.py Add field to CreateModelEndpointV1Request + UpdateModelEndpointV1Request (Optional[int], gt=0, le=20 to match LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT)
dtos/endpoint_builder.py Add to BuildEndpointRequest so it flows through the celery task to the service builder
domain/services/model_endpoint_service.py Add to abstract service interface for both create and update methods
domain/use_cases/model_endpoint_use_cases.py Extract from the request in both create and update use cases, pass to service
infra/services/live_model_endpoint_service.py Thread kwarg through to the infra gateway calls (create + update)
infra/gateways/model_endpoint_infra_gateway.py + live_model_endpoint_infra_gateway.py Accept the kwarg and set it on BuildEndpointRequest
infra/gateways/resources/k8s_resource_types.py Declare on _DeploymentArguments TypedDict; inject as env var at all 11 template-substitution sites that already set CONCURRENT_REQUESTS_PER_WORKER
infra/gateways/resources/k8s_endpoint_resource_delegate.py Strip None-valued kwargs before Template.substitute() so Optional fields don't render as the literal string "None" (review fix from #834)
tests/unit/conftest.py Update fake gateway and fake service signatures so existing tests still pass under the new abstract interface
specs/openapi-3.0.json, specs/openapi.json, specs/metadata.json Regenerated via python model-engine/scripts/generate_openapi_schemas.py model-engine/specs/

NOT in this PR (downstream coordination)

The deployer that reads the new FORWARDER_MAX_CONCURRENCY env var off the K8s deployment and sets ForwarderDeployOptions.forwarder_max_concurrency lives outside this repo (likely in scaleapi/models/model-engine-internal/ or a dedicated deployer service). Worth confirming that bridge works as expected once this lands and the rest of the chain (MLI-6875, MLI-6874) is live.

Test plan

  • CI: existing unit tests pass (fake gateway/service signatures updated to match the new abstract interface)
  • After follow-up PRs ship: create a test endpoint with forwarder_max_concurrency=5 and confirm via kubectl get deploy <name> -o yaml | grep -A2 -- '--concurrency' that the forwarder containers reflect the override while the user_service container is unchanged

Greptile Summary

This PR adds an optional forwarder_max_concurrency field to the model endpoint create and update APIs, propagating it as a FORWARDER_MAX_CONCURRENCY env var on the generated K8s deployment so LIRA's HTTP forwarder container concurrency can be tuned independently of per_worker. The field is threaded end-to-end through DTOs, domain entities, service/gateway interfaces, and the K8s resource-type substitution layer.

  • API layer: forwarder_max_concurrency: Optional[int] (gt=0, le=20) added to both CreateModelEndpointV1Request and UpdateModelEndpointV1Request; constraints match LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT and default None preserves existing behavior.
  • K8s resource layer: FORWARDER_MAX_CONCURRENCY declared on _BaseDeploymentArguments and injected at all 11 template-substitution sites; load_k8s_yaml now uses Template.safe_substitute after stripping None entries.
  • Update fallback: The if forwarder_max_concurrency is None guard in update_model_endpoint_infra is a no-op \u2014 forwarder_max_concurrency is never read back from the running K8s deployment, so the fallback always resolves to None and the value is silently dropped on any update that omits it.

Confidence Score: 3/5

The create path is safe, but the update path silently loses forwarder_max_concurrency on any update that omits the field because the delegate never reads it back from the running deployment.

The fallback added to preserve forwarder_max_concurrency across updates always resolves to None because _get_resources_from_deployment_type and _get_resources_from_lws_type never populate the field when reconstructing ModelEndpointDeploymentState from K8s. Once downstream templates inject ${FORWARDER_MAX_CONCURRENCY}, any update call that omits the parameter will silently strip the env var from the deployment.

live_model_endpoint_infra_gateway.py (update fallback) and k8s_endpoint_resource_delegate.py (_get_resources_from_deployment_type / _get_resources_from_lws_type)

Important Files Changed

Filename Overview
model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py Adds forwarder_max_concurrency to create/update paths; the update fallback reads from a field never populated by the K8s delegate, making it a silent no-op.
model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py Switches to safe_substitute with None-filtering; ModelEndpointDeploymentState construction does not yet read forwarder_max_concurrency back from K8s.
model-engine/model_engine_server/common/dtos/model_endpoints.py Adds forwarder_max_concurrency: Optional[int] (gt=0, le=20) to create and update request DTOs.
model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py Declares FORWARDER_MAX_CONCURRENCY: Optional[int] and injects at all 11 substitution sites.
model-engine/model_engine_server/domain/entities/model_endpoint_entity.py Adds forwarder_max_concurrency: Optional[int] to ModelEndpointDeploymentState.
model-engine/tests/unit/conftest.py Updates fake gateway/service signatures and all fixture deployment states.

Comments Outside Diff (1)

  1. model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py, line 150-215 (link)

    P1 forwarder_max_concurrency not preserved across updates

    All comparable fields in update_model_endpoint_infra fall back to infra_state when None is passed (e.g., concurrent_requests_per_worker at line 156-159), but forwarder_max_concurrency has no such fallback. A caller that sends PUT /endpoint/{id} without specifying forwarder_max_concurrency (the default) will silently reset the value to None in BuildEndpointRequest, even when the endpoint was originally created with a non-None value. This differs from the established contract for every other comparable field, and will produce incorrect K8s manifests once the downstream templates start injecting ${FORWARDER_MAX_CONCURRENCY}. The root cause is that forwarder_max_concurrency was not added to ModelEndpointDeploymentState (domain entity), so infra_state.deployment_state has no forwarder_max_concurrency to fall back to.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py
    Line: 150-215
    
    Comment:
    **`forwarder_max_concurrency` not preserved across updates**
    
    All comparable fields in `update_model_endpoint_infra` fall back to `infra_state` when `None` is passed (e.g., `concurrent_requests_per_worker` at line 156-159), but `forwarder_max_concurrency` has no such fallback. A caller that sends `PUT /endpoint/{id}` without specifying `forwarder_max_concurrency` (the default) will silently reset the value to `None` in `BuildEndpointRequest`, even when the endpoint was originally created with a non-`None` value. This differs from the established contract for every other comparable field, and will produce incorrect K8s manifests once the downstream templates start injecting `${FORWARDER_MAX_CONCURRENCY}`. The root cause is that `forwarder_max_concurrency` was not added to `ModelEndpointDeploymentState` (domain entity), so `infra_state.deployment_state` has no `forwarder_max_concurrency` to fall back to.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
model-engine/model_engine_server/infra/gateways/live_model_endpoint_infra_gateway.py:160-161
**Fallback reads from a field never populated by the K8s delegate**

A fallback was added here to preserve `forwarder_max_concurrency` across updates when the caller omits it, but `infra_state.deployment_state.forwarder_max_concurrency` is always `None` in practice. Neither `_get_resources_from_deployment_type` (line 2282 of `k8s_endpoint_resource_delegate.py`) nor `_get_resources_from_lws_type` sets `forwarder_max_concurrency` when constructing `ModelEndpointDeploymentState` — the field just keeps its default `None`. As a result the fallback is a no-op: every update that omits `forwarder_max_concurrency` will silently reset it to `None`, stripping the env var from the deployment once downstream templates reference `${FORWARDER_MAX_CONCURRENCY}`.

Reviews (4): Last reviewed commit: "fix: use Template.safe_substitute in loa..." | Re-trigger Greptile

… create/update

Adds an optional `forwarder_max_concurrency: Optional[int]` field to the
Launch model-endpoint create and update API. When set, this propagates as
a `FORWARDER_MAX_CONCURRENCY` env var on the generated K8s deployment, and
the downstream deployer uses it to set LIRA's
`ForwarderDeployOptions.forwarder_max_concurrency` — overriding the HTTP
forwarder container's `--concurrency` flag independently of `pod.concurrency`.

Why
---
Followup to MLI-5366 (LIRA layer) and the public-SDK chain (MLI-6875,
MLI-6874). MLI-5366 added the field to LIRA's deploy schema; this PR
exposes it at the API boundary so the SDK chain can plumb it from public
callers all the way to the rendered K8s manifest.

Scope
-----
- `dtos/model_endpoints.py`: add field to `CreateModelEndpointV1Request`
  and `UpdateModelEndpointV1Request` (Optional[int], gt=0, le=20 to match
  LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT).
- `dtos/endpoint_builder.py`: add to `BuildEndpointRequest` so it flows
  through the celery task to the service builder.
- `domain/services/model_endpoint_service.py`: add to abstract service
  interface for both create and update methods.
- `domain/use_cases/model_endpoint_use_cases.py`: extract from the request
  in both create and update use cases, pass to service.
- `infra/services/live_model_endpoint_service.py`: thread kwarg through
  to the infra gateway calls (create + update).
- `infra/gateways/model_endpoint_infra_gateway.py` (abstract) +
  `live_model_endpoint_infra_gateway.py` (concrete): accept the kwarg and
  set it on `BuildEndpointRequest`.
- `infra/gateways/resources/k8s_resource_types.py`: declare on the
  `_DeploymentArguments` TypedDict; inject as env var at all 11
  template-substitution sites that already set `CONCURRENT_REQUESTS_PER_WORKER`.
- `tests/unit/conftest.py`: update fake gateway and fake service signatures
  so existing tests still pass under the new abstract interface.

Default `None` preserves existing behavior — only deploys that explicitly
set the field see the override.

NOT in this PR (follow-up coordination needed)
----------------------------------------------
- OpenAPI spec regeneration (`model-engine/specs/openapi-3.0.json` and
  `openapi.json`). The pydantic models are the source of truth, but the
  regen script requires a full model-engine venv to run. Reviewer or
  CI should run: `python model-engine/scripts/generate_openapi_schemas.py
  model-engine/specs/`
- The downstream deployer that reads `FORWARDER_MAX_CONCURRENCY` env var
  and sets `ForwarderDeployOptions.forwarder_max_concurrency` lives outside
  this repo (likely `scaleapi/models/model-engine-internal/` or a
  dedicated deployer service). Verify that bridge works as expected once
  this lands and the rest of the chain (MLI-6875, MLI-6874) is live.
Runs `python model-engine/scripts/generate_openapi_schemas.py model-engine/specs/`
to regenerate openapi.json and openapi-3.0.json from the updated pydantic
models. `forwarder_max_concurrency` now appears in both
CreateModelEndpointV1Request and UpdateModelEndpointV1Request schemas
with the correct constraints (Optional[int], gt=0, le=20).

Note: the diff is larger than the field addition alone because some
unrelated drift between the source pydantic models and the committed
spec was also resolved by this regen — those changes were already
implicit in master's pydantic models, just hadn't been emitted to JSON
yet.
… in K8s manifests

Code-review fix. `Template.substitute(**kwargs)` stringifies every value
via str() before substitution, so an Optional kwarg passed as None renders
as the literal string "None" in the rendered manifest — a K8s env var
with value "None" is worse than absent and would break the
"default preserves existing behavior" guarantee.

Now strips None-valued entries from the substitution dict before
Template.substitute(). No existing template references the currently-
Optional kwargs (e.g. forwarder_max_concurrency), so the strict
substitute() semantics are preserved.
diazagasatya and others added 3 commits May 27, 2026 11:13
`test_resource_arguments_type_and_add_datadog_env_to_main_container`
iterates over a TypedDict's type annotations and looks each up in a
type → default value map. The map had Optional[str] but not Optional[int],
so my new `FORWARDER_MAX_CONCURRENCY: Optional[int]` field broke 10
parametrized cases with `KeyError: typing.Optional[int]`.

Adds Optional[int] → 1 to the map, mirroring the Optional[str] → 'foo'
entry.
…dd Update description

Two code-review fixes from PR #835.

1. **Preserve forwarder_max_concurrency across updates** (the bigger fix)
   All comparable fields in update_model_endpoint_infra fall back to
   infra_state.deployment_state when None is passed — but
   forwarder_max_concurrency had no such fallback, so any PUT without
   the field would reset it to None even if the endpoint was created
   with a value.

   Adds:
   - forwarder_max_concurrency: Optional[int] field on
     ModelEndpointDeploymentState (domain entity)
   - Fallback branch in update_model_endpoint_infra mirroring the
     concurrent_requests_per_worker pattern
   - Populated the new field in the create-path ModelEndpointDeploymentState
     construction (fake gateways in conftest, both sites)
   - Update-in-place handler for the new field in the fake gateway

2. **Add description to UpdateModelEndpointV1Request field**
   Mirrors the description on CreateModelEndpointV1Request for OpenAPI
   consistency.

Also regenerates the OpenAPI spec to reflect both changes.
ModelEndpointDeploymentState gained a new Optional field, so the
serialized response now includes "forwarder_max_concurrency": None.
Update the 8 expected JSON fixtures to match the new shape so
test_get_model_endpoint_by_id_success (and its peers) pass again.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
substitute() raises KeyError for any ${KEY} not in the kwargs, so
combining it with the None-filtering meant a future template
referencing ${FORWARDER_MAX_CONCURRENCY} would crash the service
builder for every pre-existing endpoint (where forwarder_max_concurrency
defaults to None). safe_substitute renders the unresolved token
literally, which surfaces as a loud K8s/container deploy error
instead of a KeyError inside the celery task.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@diazagasatya diazagasatya requested a review from a team May 27, 2026 18:57
@diazagasatya diazagasatya merged commit efbbe3c into main May 28, 2026
7 checks passed
@diazagasatya diazagasatya deleted the feat/MLI-6876-forwarder-max-concurrency-request-schema branch May 28, 2026 16:59
diazagasatya added a commit to scaleapi/launch-python-client that referenced this pull request May 29, 2026
…75) (#180)

Regenerates the API client from the latest llm-engine OpenAPI schema and exposes `forwarder_max_concurrency` on the user-facing client. Companion to scaleapi/llm-engine#835 (MLI-6876).

This regen also picks up four months of additive schema drift since #176 — `queue_message_timeout_seconds` and `task_expires_seconds` on the Create/Update model endpoint variants.

Verified non-breaking:
- 0 paths/operations/schemas removed
- 0 fields removed or type-changed
- 0 fields newly required
- 20/20 unit tests pass; black, ruff, pylint, mypy, isort all clean

Wrapper changes (`launch/client.py`):
- `create_model_endpoint` and `edit_model_endpoint`: add `forwarder_max_concurrency` kwarg + docstring
- `update_if_exists` branch in `create_model_endpoint` forwards the kwarg
- `create_llm_model_endpoint` intentionally untouched (upstream LLM schemas don't expose the field)

Version bumped to 0.4.2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants