diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index 5593f295..34471f2e 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -147,6 +147,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" {{- $sync_forwarder_template_env | nindent 14 }} readinessProbe: httpGet: @@ -203,6 +205,8 @@ data: - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - "forwarder.stream.routes=${FORWARDER_STREAM_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" {{- $sync_forwarder_template_env | nindent 14 }} readinessProbe: httpGet: @@ -611,6 +615,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" {{- $sync_forwarder_template_env | nindent 16 }} readinessProbe: httpGet: @@ -667,6 +673,8 @@ data: - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - "forwarder.stream.routes=${FORWARDER_STREAM_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" {{- $sync_forwarder_template_env | nindent 16 }} readinessProbe: httpGet: diff --git a/model-engine/model_engine_server/common/dtos/model_endpoints.py b/model-engine/model_engine_server/common/dtos/model_endpoints.py index 9911e426..0f5e76a0 100644 --- a/model-engine/model_engine_server/common/dtos/model_endpoints.py +++ b/model-engine/model_engine_server/common/dtos/model_endpoints.py @@ -73,8 +73,8 @@ class CreateModelEndpointV1Request(BaseModel): description=( "Max in-flight requests admitted by the HTTP forwarder container, " "independent of `per_worker` / autoscaling. When None (default), the " - "forwarder inherits its `--concurrency` flag from `per_worker` (current " - "behavior). Upper bound matches LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT." + "forwarder uses its config-file default (100). Upper bound matches " + "LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT." ), ) labels: Dict[str, str] @@ -122,8 +122,8 @@ class UpdateModelEndpointV1Request(BaseModel): description=( "Max in-flight requests admitted by the HTTP forwarder container, " "independent of `per_worker` / autoscaling. When None (default), the " - "forwarder inherits its `--concurrency` flag from `per_worker` (current " - "behavior). Upper bound matches LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT." + "forwarder uses its config-file default (100). Upper bound matches " + "LIRA's FORWARDER_MAX_CONCURRENCY_LIMIT." ), ) labels: Optional[Dict[str, str]] = None diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index da4c98fa..f906b4f8 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1,5 +1,6 @@ import datetime import os +import re from string import Template from typing import Any, Dict, List, Optional, Tuple, cast @@ -234,6 +235,26 @@ def k8s_yaml_exists(key: str) -> bool: return key in config_map_str["data"] +def _strip_optional_set_pairs(template_str: str, substitution_kwargs: ResourceArguments) -> str: + """Remove conditional `--set "...${KEY}..."` arg-pairs whose KEY is None. + + Templates can declare a config override like: + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" + and this preprocessor drops the pair entirely when the matching kwarg is + None, so the container falls back to its own config-file default. + """ + for kwarg_name, value in substitution_kwargs.items(): + if value is not None: + continue + pattern = ( + r"^[ \t]*- --set\s*\n" + r'^[ \t]*- "[^"]*\$\{' + re.escape(kwarg_name) + r"\}[^\"]*\"\s*\n" + ) + template_str = re.sub(pattern, "", template_str, flags=re.MULTILINE) + return template_str + + def load_k8s_yaml(key: str, substitution_kwargs: ResourceArguments) -> Dict[str, Any]: if LAUNCH_SERVICE_TEMPLATE_FOLDER is not None: with open(os.path.join(LAUNCH_SERVICE_TEMPLATE_FOLDER, key), "r") as f: @@ -243,11 +264,18 @@ def load_k8s_yaml(key: str, substitution_kwargs: ResourceArguments) -> Dict[str, config_map_str = yaml.safe_load(f.read()) template_str = config_map_str["data"][key] - # Strip None-valued entries so they don't stringify to the literal "None" - # in the rendered manifest. Use safe_substitute so that a template - # referencing ${KEY} with a None-valued kwarg renders ${KEY} literally - # and surfaces a loud K8s/container error at deploy time, rather than a - # KeyError deep inside the service-builder celery task. + # For kwargs whose value is None, strip any `--set "...${KEY}..."` arg-pair + # from the template so the conditional config override isn't rendered at + # all. This lets a forwarder/main container opt-in to a config override + # only when the caller actually set the value; otherwise the container + # falls back to its own config-file default. + template_str = _strip_optional_set_pairs(template_str, substitution_kwargs) + + # Strip remaining None-valued entries so they don't stringify to "None". + # Use safe_substitute so that a template referencing ${KEY} with a + # None-valued kwarg renders ${KEY} literally and surfaces a loud + # K8s/container error at deploy time, rather than a KeyError deep + # inside the service-builder celery task. filtered_kwargs = {k: v for k, v in substitution_kwargs.items() if v is not None} yaml_str = Template(template_str).safe_substitute(**filtered_kwargs) try: diff --git a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml index fbbf0a23..8d4c5228 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml +++ b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml @@ -620,6 +620,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -892,6 +894,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -1126,6 +1130,8 @@ data: - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - "forwarder.stream.routes=${FORWARDER_STREAM_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -1874,6 +1880,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -2154,6 +2162,8 @@ data: - "forwarder.stream.healthcheck_route=${HEALTHCHECK_ROUTE}" - --set - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" @@ -2396,6 +2406,8 @@ data: - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" - --set - "forwarder.stream.routes=${FORWARDER_STREAM_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" env: - name: DD_TRACE_ENABLED value: "${DD_TRACE_ENABLED}" diff --git a/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py b/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py index 53537bd8..ca8e9398 100644 --- a/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py +++ b/model-engine/tests/unit/infra/gateways/resources/test_k8s_endpoint_resource_delegate.py @@ -23,6 +23,7 @@ DATADOG_ENV_VAR, MODEL_CACHE_VOLUME_NAME, K8SEndpointResourceDelegate, + _strip_optional_set_pairs, add_datadog_env_to_container, add_pod_metadata_env_to_container, get_main_container_from_deployment_template, @@ -153,6 +154,38 @@ def test_k8s_yaml_exists(): ), "image-cache-abc9001.yaml should not exist" +_OPTIONAL_SET_TEMPLATE = """\ + - --set + - "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" + - --set + - "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" + env: +""" + + +def test_strip_optional_set_pairs_drops_when_none(): + out = _strip_optional_set_pairs( + _OPTIONAL_SET_TEMPLATE, {"FORWARDER_MAX_CONCURRENCY": None, "FORWARDER_SYNC_ROUTES": "x"} + ) + assert "max_concurrency" not in out + # Adjacent kwargs that are set must be untouched + assert "forwarder.sync.routes=${FORWARDER_SYNC_ROUTES}" in out + assert "env:" in out + + +def test_strip_optional_set_pairs_keeps_when_set(): + out = _strip_optional_set_pairs( + _OPTIONAL_SET_TEMPLATE, {"FORWARDER_MAX_CONCURRENCY": 5, "FORWARDER_SYNC_ROUTES": "x"} + ) + assert "max_concurrency=${FORWARDER_MAX_CONCURRENCY}" in out + + +def test_strip_optional_set_pairs_handles_unknown_key(): + # A kwarg that doesn't appear in the template — no-op, no exceptions. + out = _strip_optional_set_pairs(_OPTIONAL_SET_TEMPLATE, {"SOMETHING_NOT_IN_TEMPLATE": None}) + assert out == _OPTIONAL_SET_TEMPLATE + + def _render_service_template_config_map(extra_args: Optional[List[str]] = None) -> str: if shutil.which("helm") is None: pytest.skip("helm is not installed")