From 6d84d229239c1f1bb2f3e4d46946294ef55b996f Mon Sep 17 00:00:00 2001 From: Diaz Agasatya Date: Thu, 28 May 2026 17:49:33 -0700 Subject: [PATCH 1/2] [MLI-6891] feat: wire forwarder_max_concurrency through K8s template MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `--set "max_concurrency=${FORWARDER_MAX_CONCURRENCY}"` to all 6 http-forwarder containers in the service template (CircleCI + chart variants). The flag is rendered conditionally — when the kwarg is None, a new `_strip_optional_set_pairs` preprocessor in `load_k8s_yaml` drops the `--set` arg-pair entirely, so existing endpoints fall back to the forwarder's config-file default (100) with zero behavior change. Also fixes the description on `forwarder_max_concurrency` to reflect the actual fallback (config-file default, not per_worker). Follow-up to MLI-6876 (#835). Without this, the API field is silently dropped at the manifest-rendering layer. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../service_template_config_map.yaml | 8 ++++ .../common/dtos/model_endpoints.py | 8 ++-- .../k8s_endpoint_resource_delegate.py | 40 ++++++++++++++++--- .../service_template_config_map_circleci.yaml | 12 ++++++ .../test_k8s_endpoint_resource_delegate.py | 35 ++++++++++++++++ 5 files changed, 94 insertions(+), 9 deletions(-) diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index 5593f295..34471f2e 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -147,6 +147,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" {{- $sync_forwarder_template_env | nindent 14 }} readinessProbe: httpGet: @@ -203,6 +205,8 @@ data: - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - "forwarder.stream.routes=${FORWARDER_STREAM_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" {{- $sync_forwarder_template_env | nindent 14 }} readinessProbe: httpGet: @@ -611,6 +615,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" {{- $sync_forwarder_template_env | nindent 16 }} readinessProbe: httpGet: @@ -667,6 +673,8 @@ data: - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - "forwarder.stream.routes=${FORWARDER_STREAM_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" {{- $sync_forwarder_template_env | nindent 16 }} readinessProbe: httpGet: diff --git a/model-engine/model_engine_server/common/dtos/model_endpoints.py b/model-engine/model_engine_server/common/dtos/model_endpoints.py index 9911e426..0f5e76a0 100644 --- a/model-engine/model_engine_server/common/dtos/model_endpoints.py +++ b/model-engine/model_engine_server/common/dtos/model_endpoints.py @@ -73,8 +73,8 @@ class CreateModelEndpointV1Request(BaseModel): description=( "Max in-flight requests admitted by the HTTP forwarder container, " "independent of `per_worker` / autoscaling. When None (default), the " - "forwarder inherits its `--concurrency` flag from `per_worker` (current " - "behavior). Upper bound matches LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT." + "forwarder uses its config-file default (100). Upper bound matches " + "LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT." ), ) labels: Dict[str, str] @@ -122,8 +122,8 @@ class UpdateModelEndpointV1Request(BaseModel): description=( "Max in-flight requests admitted by the HTTP forwarder container, " "independent of `per_worker` / autoscaling. When None (default), the " - "forwarder inherits its `--concurrency` flag from `per_worker` (current " - "behavior). Upper bound matches LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT." + "forwarder uses its config-file default (100). Upper bound matches " + "LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT." ), ) labels: Optional[Dict[str, str]] = None diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index da4c98fa..bc9a5c9d 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1,5 +1,6 @@ import datetime import os +import re from string import Template from typing import Any, Dict, List, Optional, Tuple, cast @@ -234,6 +235,28 @@ def k8s_yaml_exists(key: str) -> bool: return key in config_map_str["data"] +def _strip_optional_set_pairs( + template_str: str, substitution_kwargs: ResourceArguments +) -> str: + """Remove conditional `--set "...${KEY}..."` arg-pairs whose KEY is None. + + Templates can declare a config override like: + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" + and this preprocessor drops the pair entirely when the matching kwarg is + None, so the container falls back to its own config-file default. + """ + for kwarg_name, value in substitution_kwargs.items(): + if value is not None: + continue + pattern = ( + r"^[ \t]*- --set\s*\n" + r'^[ \t]*- "[^"]*\$\{' + re.escape(kwarg_name) + r"\}[^\"]*\"\s*\n" + ) + template_str = re.sub(pattern, "", template_str, flags=re.MULTILINE) + return template_str + + def load_k8s_yaml(key: str, substitution_kwargs: ResourceArguments) -> Dict[str, Any]: if LAUNCH_SERVICE_TEMPLATE_FOLDER is not None: with open(os.path.join(LAUNCH_SERVICE_TEMPLATE_FOLDER, key), "r") as f: @@ -243,11 +266,18 @@ def load_k8s_yaml(key: str, substitution_kwargs: ResourceArguments) -> Dict[str, config_map_str = yaml.safe_load(f.read()) template_str = config_map_str["data"][key] - # Strip None-valued entries so they don't stringify to the literal "None" - # in the rendered manifest. Use safe_substitute so that a template - # referencing ${KEY} with a None-valued kwarg renders ${KEY} literally - # and surfaces a loud K8s/container error at deploy time, rather than a - # KeyError deep inside the service-builder celery task. + # For kwargs whose value is None, strip any `--set "...${KEY}..."` arg-pair + # from the template so the conditional config override isn't rendered at + # all. This lets a forwarder/main container opt-in to a config override + # only when the caller actually set the value; otherwise the container + # falls back to its own config-file default. + template_str = _strip_optional_set_pairs(template_str, substitution_kwargs) + + # Strip remaining None-valued entries so they don't stringify to "None". + # Use safe_substitute so that a template referencing ${KEY} with a + # None-valued kwarg renders ${KEY} literally and surfaces a loud + # K8s/container error at deploy time, rather than a KeyError deep + # inside the service-builder celery task. filtered_kwargs = {k: v for k, v in substitution_kwargs.items() if v is not None} yaml_str = Template(template_str).safe_substitute(**filtered_kwargs) try: diff --git a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml index fbbf0a23..8d4c5228 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml +++ b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml @@ -620,6 +620,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -892,6 +894,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -1126,6 +1130,8 @@ data: - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - "forwarder.stream.routes=${FORWARDER_STREAM_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -1874,6 +1880,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -2154,6 +2162,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -2396,6 +2406,8 @@ data: - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - "forwarder.stream.routes=${FORWARDER_STREAM_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" diff --git a/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py b/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py index 53537bd8..57049247 100644 --- a/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py +++ b/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py @@ -23,6 +23,7 @@ DATADOG_ENV_VAR, MODEL_CACHE_VOLUME_NAME, K8SEndpointResourceDelegate, + _strip_optional_set_pairs, add_datadog_env_to_container, add_pod_metadata_env_to_container, get_main_container_from_deployment_template, @@ -153,6 +154,40 @@ def test_k8s_yaml_exists(): ), "image-cache-abc9001.yaml should not exist" +_OPTIONAL_SET_TEMPLATE = """\ + - --set + - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" + env: +""" + + +def test_strip_optional_set_pairs_drops_when_none(): + out = _strip_optional_set_pairs( + _OPTIONAL_SET_TEMPLATE, {"FORWARDER_MAX_CONCURRENCY": None, "FORWARDER_SYNC_ROUTES": "x"} + ) + assert "max_concurrency" not in out + # Adjacent kwargs that are set must be untouched + assert "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" in out + assert "env:" in out + + +def test_strip_optional_set_pairs_keeps_when_set(): + out = _strip_optional_set_pairs( + _OPTIONAL_SET_TEMPLATE, {"FORWARDER_MAX_CONCURRENCY": 5, "FORWARDER_SYNC_ROUTES": "x"} + ) + assert 'max_concurrency=${FORWARDER_MAX_CONCURRENCY}' in out + + +def test_strip_optional_set_pairs_handles_unknown_key(): + # A kwarg that doesn't appear in the template — no-op, no exceptions. + out = _strip_optional_set_pairs( + _OPTIONAL_SET_TEMPLATE, {"SOMETHING_NOT_IN_TEMPLATE": None} + ) + assert out == _OPTIONAL_SET_TEMPLATE + + def _render_service_template_config_map(extra_args: Optional[List[str]] = None) -> str: if shutil.which("helm") is None: pytest.skip("helm is not installed") From e50c5f19cb6b6fe11d38d6d9bc572b7db0fa664f Mon Sep 17 00:00:00 2001 From: Diaz Agasatya Date: Thu, 28 May 2026 17:59:28 -0700 Subject: [PATCH 2/2] [MLI-6891] style: apply black 24.8.0 formatting Co-Authored-By: Claude Opus 4.7 (1M context) --- .../gateways/resources/k8s_endpoint_resource_delegate.py | 4 +--- .../resources/test_k8s_endpoint_resource_delegate.py | 6 ++---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index bc9a5c9d..f906b4f8 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -235,9 +235,7 @@ def k8s_yaml_exists(key: str) -> bool: return key in config_map_str["data"] -def _strip_optional_set_pairs( - template_str: str, substitution_kwargs: ResourceArguments -) -> str: +def _strip_optional_set_pairs(template_str: str, substitution_kwargs: ResourceArguments) -> str: """Remove conditional `--set "...${KEY}..."` arg-pairs whose KEY is None. Templates can declare a config override like: diff --git a/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py b/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py index 57049247..ca8e9398 100644 --- a/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py +++ b/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py @@ -177,14 +177,12 @@ def test_strip_optional_set_pairs_keeps_when_set(): out = _strip_optional_set_pairs( _OPTIONAL_SET_TEMPLATE, {"FORWARDER_MAX_CONCURRENCY": 5, "FORWARDER_SYNC_ROUTES": "x"} ) - assert 'max_concurrency=${FORWARDER_MAX_CONCURRENCY}' in out + assert "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" in out def test_strip_optional_set_pairs_handles_unknown_key(): # A kwarg that doesn't appear in the template — no-op, no exceptions. - out = _strip_optional_set_pairs( - _OPTIONAL_SET_TEMPLATE, {"SOMETHING_NOT_IN_TEMPLATE": None} - ) + out = _strip_optional_set_pairs(_OPTIONAL_SET_TEMPLATE, {"SOMETHING_NOT_IN_TEMPLATE": None}) assert out == _OPTIONAL_SET_TEMPLATE