diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index fa52f51f..ad9b7f62 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -399,6 +399,35 @@ data: target: type: Value averageValue: ${CONCURRENCY} + keda-scaled-object.yaml: |- + apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + {{- $service_template_labels | nindent 8 }} + spec: + scaleTargetRef: + name: ${RESOURCE_NAME} + pollingInterval: 5 + cooldownPeriod: 300 + minReplicaCount: ${MIN_WORKERS} + maxReplicaCount: ${MAX_WORKERS} + fallback: + failureThreshold: 3 + replicas: ${MIN_WORKERS} + triggers: + - type: redis + metadata: + address: ${REDIS_HOST_PORT} # Format must be host:port + passwordFromEnv: "" + listName: "launch-endpoint-autoscaling:${ENDPOINT_ID}" + listLength: "100" # something absurdly high so we don't scale past 1 pod + activationListLength: "0" + enableTLS: "false" + unsafeSsl: "false" + databaseIndex: "${REDIS_DB_INDEX}" service.yaml: |- apiVersion: v1 kind: Service diff --git a/model-engine/model_engine_server/common/config.py b/model-engine/model_engine_server/common/config.py index 250a51e6..25cf5453 100644 --- a/model-engine/model_engine_server/common/config.py +++ b/model-engine/model_engine_server/common/config.py @@ -63,6 +63,18 @@ def from_yaml(cls, yaml_path): raw_data = yaml.safe_load(f) return HostedModelInferenceServiceConfig(**raw_data) + @property + def cache_redis_host_port(self) -> str: + # redis://redis.url:6379/ + # -> redis.url:6379 + return self.cache_redis_url.split("redis://")[1].split("/")[0] + + @property + def cache_redis_db_index(self) -> int: + # redis://redis.url:6379/ + # -> + return int(self.cache_redis_url.split("/")[-1]) + def read_default_config(): logger.info(f"Using config file path: `{SERVICE_CONFIG_PATH}`") diff --git a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py index 04e595d4..1633a72b 100644 --- a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py @@ -104,6 +104,11 @@ def validate_deployment_resources( max_workers: Optional[int], endpoint_type: ModelEndpointType, ) -> None: + if endpoint_type in [ModelEndpointType.STREAMING, ModelEndpointType.SYNC]: + # Special case for sync endpoints, where we can have 0, 1 min/max workers. + # Otherwise, fall through to the general case. + if min_workers == 0 and max_workers == 1: + return # TODO: we should be also validating the update request against the existing state in k8s (e.g. # so min_workers <= max_workers always) maybe this occurs already in update_model_endpoint. min_endpoint_size = 0 if endpoint_type == ModelEndpointType.ASYNC else 1 diff --git a/model-engine/model_engine_server/infra/gateways/redis_inference_autoscaling_metrics_gateway.py b/model-engine/model_engine_server/infra/gateways/redis_inference_autoscaling_metrics_gateway.py index 5761493e..a5bcc31e 100644 --- a/model-engine/model_engine_server/infra/gateways/redis_inference_autoscaling_metrics_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/redis_inference_autoscaling_metrics_gateway.py @@ -31,6 +31,7 @@ def __init__( @staticmethod def _find_redis_key(endpoint_id: str): + # Keep in line with keda scaled object yaml return f"launch-endpoint-autoscaling:{endpoint_id}" async def _emit_metric(self, endpoint_id: str, expiry_time: int): diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 7006ca1f..56836596 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -599,6 +599,46 @@ async def _create_vpa(vpa: Dict[str, Any], name: str) -> None: logger.exception("Got an exception when trying to apply the VerticalPodAutoscaler") raise + @staticmethod + async def _create_keda_scaled_object(scaled_object: Dict[str, Any], name: str) -> None: + custom_objects_api = get_kubernetes_custom_objects_client() + try: + await custom_objects_api.create_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + body=scaled_object, + ) + except ApiException as exc: + if exc.status == 409: + logger.info(f"ScaledObject {name} already exists, replacing") + + # The async k8s client has a bug with patching custom objects, so we manually + # merge the new ScaledObject with the old one and then replace the old one with the merged + # one. See _create_vpa for more details. + # There is a setting `restoreToOriginalReplicaCount` in the keda ScaledObject that should be set to + # false which should make it safe to do this replace (as opposed to a patch) + existing_scaled_object = await custom_objects_api.get_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + name=name, + ) + new_scaled_object = deep_update(existing_scaled_object, scaled_object) + await custom_objects_api.replace_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + name=name, + body=new_scaled_object, + ) + else: + logger.exception("Got an exception when trying to apply the ScaledObject") + raise + @staticmethod async def _create_destination_rule(destination_rule: Dict[str, Any], name: str) -> None: """ @@ -995,6 +1035,28 @@ async def _delete_hpa(endpoint_id: str, deployment_name: str) -> bool: return False return True + @staticmethod + async def _delete_keda_scaled_object(endpoint_id: str) -> bool: + custom_objects_client = get_kubernetes_custom_objects_client() + k8s_resource_group_name = _endpoint_id_to_k8s_resource_group_name(endpoint_id) + try: + await custom_objects_client.delete_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + name=k8s_resource_group_name, + ) + except ApiException as e: + if e.status == 404: + logger.warning( + f"Trying to delete nonexistent ScaledObject {k8s_resource_group_name}" + ) + else: + logger.exception(f"Deletion of ScaledObject {k8s_resource_group_name} failed") + return False + return True + # --- Private higher level fns that interact with k8s @staticmethod @@ -1102,19 +1164,46 @@ async def _create_or_update_resources( else: api_version = "autoscaling/v2beta2" - hpa_arguments = get_endpoint_resource_arguments_from_request( - k8s_resource_group_name=k8s_resource_group_name, - request=request, - sqs_queue_name=sqs_queue_name_str, - sqs_queue_url=sqs_queue_url_str, - endpoint_resource_name="horizontal-pod-autoscaler", - api_version=api_version, - ) - hpa_template = load_k8s_yaml("horizontal-pod-autoscaler.yaml", hpa_arguments) - await self._create_hpa( - hpa=hpa_template, - name=k8s_resource_group_name, - ) + # create exactly one of HPA or KEDA ScaledObject, depending if we request more than 0 min_workers + # Right now, keda only will support scaling from 0 to 1 + # TODO support keda scaling from 1 to N as well + if request.build_endpoint_request.min_workers > 0: + # Delete keda scaled object if it exists so exactly one of HPA or KEDA ScaledObject remains + await self._delete_keda_scaled_object( + build_endpoint_request.model_endpoint_record.id + ) + hpa_arguments = get_endpoint_resource_arguments_from_request( + k8s_resource_group_name=k8s_resource_group_name, + request=request, + sqs_queue_name=sqs_queue_name_str, + sqs_queue_url=sqs_queue_url_str, + endpoint_resource_name="horizontal-pod-autoscaler", + api_version=api_version, + ) + hpa_template = load_k8s_yaml("horizontal-pod-autoscaler.yaml", hpa_arguments) + await self._create_hpa( + hpa=hpa_template, + name=k8s_resource_group_name, + ) + else: # min workers == 0, use keda + # Delete hpa if it exists so exactly one of HPA or KEDA ScaledObject remains + await self._delete_hpa( + build_endpoint_request.model_endpoint_record.id, k8s_resource_group_name + ) + keda_scaled_object_arguments = get_endpoint_resource_arguments_from_request( + k8s_resource_group_name=k8s_resource_group_name, + request=request, + sqs_queue_name=sqs_queue_name_str, + sqs_queue_url=sqs_queue_url_str, + endpoint_resource_name="keda-scaled-object", + ) + keda_scaled_object_template = load_k8s_yaml( + "keda-scaled-object.yaml", keda_scaled_object_arguments + ) + await self._create_keda_scaled_object( + scaled_object=keda_scaled_object_template, + name=k8s_resource_group_name, + ) service_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, @@ -1204,6 +1293,17 @@ def _get_sync_autoscaling_params( per_worker=per_worker, ) + @staticmethod + def _get_sync_autoscaling_params_from_keda( + keda_config, + ) -> HorizontalAutoscalingEndpointParams: + spec = keda_config["spec"] + return dict( + max_workers=spec.get("maxReplicaCount"), + min_workers=spec.get("minReplicaCount"), + per_worker=1, # TODO dummy value, fill in when we autoscale from 0 to 1 + ) + async def _get_resources( self, endpoint_id: str, deployment_name: str, endpoint_type: ModelEndpointType ) -> ModelEndpointInfraState: @@ -1232,10 +1332,36 @@ async def _get_resources( horizontal_autoscaling_params = self._get_async_autoscaling_params(deployment_config) elif endpoint_type in {ModelEndpointType.SYNC, ModelEndpointType.STREAMING}: autoscaling_client = get_kubernetes_autoscaling_client() - hpa_config = await autoscaling_client.read_namespaced_horizontal_pod_autoscaler( - k8s_resource_group_name, hmi_config.endpoint_namespace - ) - horizontal_autoscaling_params = self._get_sync_autoscaling_params(hpa_config) + custom_object_client = get_kubernetes_custom_objects_client() + try: + hpa_config = await autoscaling_client.read_namespaced_horizontal_pod_autoscaler( + k8s_resource_group_name, hmi_config.endpoint_namespace + ) + except ApiException as e: + if e.status == 404: + hpa_config = None + else: + raise e + try: + keda_scaled_object_config = await custom_object_client.get_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + name=k8s_resource_group_name, + ) + except ApiException: + keda_scaled_object_config = None + if hpa_config is not None: + horizontal_autoscaling_params = self._get_sync_autoscaling_params(hpa_config) + elif keda_scaled_object_config is not None: + horizontal_autoscaling_params = self._get_sync_autoscaling_params_from_keda( + keda_scaled_object_config + ) + else: + raise EndpointResourceInfraException( + f"Could not find autoscaling config for {endpoint_type}" + ) else: raise ValueError(f"Unexpected endpoint type {endpoint_type}") @@ -1326,10 +1452,25 @@ async def _get_all_resources( vpas = [] else: raise + try: + keda_scaled_objects = ( + await custom_objects_client.list_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + ) + )["items"] + except ApiException as e: + if e.status == 404: + keda_scaled_objects = [] + else: + raise deployments_by_name = {deployment.metadata.name: deployment for deployment in deployments} hpas_by_name = {hpa.metadata.name: hpa for hpa in hpas} vpas_by_name = {vpa["metadata"]["name"]: vpa for vpa in vpas} + keda_scaled_objects_by_name = {kso["metadata"]["name"]: kso for kso in keda_scaled_objects} all_config_maps = await self._get_all_config_maps() # can safely assume hpa with same name as deployment corresponds to the same Launch Endpoint logger.info(f"Orphaned hpas: {set(hpas_by_name).difference(set(deployments_by_name))}") @@ -1340,6 +1481,7 @@ async def _get_all_resources( try: hpa_config = hpas_by_name.get(name, None) vpa_config = vpas_by_name.get(name, None) + keda_scaled_object_config = keda_scaled_objects_by_name.get(name, None) common_params = self._get_common_endpoint_params(deployment_config) launch_container = self._get_launch_container(deployment_config) @@ -1355,9 +1497,14 @@ async def _get_all_resources( if hpa_config: # Assume it's a sync endpoint # TODO I think this is correct but only barely, it introduces a coupling between - # an HPA existing and an endpoint being a sync endpoint. The "more correct" + # an HPA (or keda SO) existing and an endpoint being a sync endpoint. The "more correct" # thing to do is to query the db to get the endpoints, but it doesn't belong here horizontal_autoscaling_params = self._get_sync_autoscaling_params(hpa_config) + elif keda_scaled_object_config: + # Also assume it's a sync endpoint + horizontal_autoscaling_params = self._get_sync_autoscaling_params_from_keda( + keda_scaled_object_config + ) else: horizontal_autoscaling_params = self._get_async_autoscaling_params( deployment_config @@ -1427,9 +1574,13 @@ async def _delete_resources_sync(self, endpoint_id: str, deployment_name: str) - service_delete_succeeded = await self._delete_service( endpoint_id=endpoint_id, deployment_name=deployment_name ) + # we should have created exactly one of an HPA or a keda scaled object hpa_delete_succeeded = await self._delete_hpa( endpoint_id=endpoint_id, deployment_name=deployment_name ) + keda_scaled_object_succeeded = await self._delete_keda_scaled_object( + endpoint_id=endpoint_id + ) await self._delete_vpa(endpoint_id=endpoint_id) destination_rule_delete_succeeded = await self._delete_destination_rule( @@ -1443,7 +1594,7 @@ async def _delete_resources_sync(self, endpoint_id: str, deployment_name: str) - deployment_delete_succeeded and config_map_delete_succeeded and service_delete_succeeded - and hpa_delete_succeeded + and (hpa_delete_succeeded or keda_scaled_object_succeeded) and destination_rule_delete_succeeded and virtual_service_delete_succeeded ) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 632ec7bf..6c0f9724 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -284,6 +284,14 @@ class HorizontalPodAutoscalerArguments(_BaseEndpointArguments): API_VERSION: str +class KedaScaledObjectArguments(_BaseEndpointArguments): + MIN_WORKERS: int + MAX_WORKERS: int + # CONCURRENCY: float # TODO add in when we scale from 1 -> N pods + REDIS_HOST_PORT: str + REDIS_DB_INDEX: str + + class UserConfigArguments(_BaseEndpointArguments): """Keyword-arguments for substituting into user-config templates.""" @@ -1089,6 +1097,25 @@ def get_endpoint_resource_arguments_from_request( MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, ) + elif endpoint_resource_name == "keda-scaled-object": + return KedaScaledObjectArguments( + # Base resource arguments + RESOURCE_NAME=k8s_resource_group_name, + NAMESPACE=hmi_config.endpoint_namespace, + ENDPOINT_ID=model_endpoint_record.id, + ENDPOINT_NAME=model_endpoint_record.name, + TEAM=team, + PRODUCT=product, + CREATED_BY=created_by, + OWNER=owner, + GIT_TAG=GIT_TAG, + # Scaled Object arguments + MIN_WORKERS=build_endpoint_request.min_workers, + MAX_WORKERS=build_endpoint_request.max_workers, + # CONCURRENCY=build_endpoint_request.concurrency, + REDIS_HOST_PORT=hmi_config.cache_redis_host_port, + REDIS_DB_INDEX=hmi_config.cache_redis_db_index, + ) elif endpoint_resource_name == "service": # Use ClusterIP by default for sync endpoint. # In Circle CI, we use a NodePort to expose the service to CI. diff --git a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml index 85f312b8..48f0e924 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml +++ b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml @@ -2558,6 +2558,47 @@ data: target: type: Value averageValue: ${CONCURRENCY} + keda-scaled-object.yaml: |- + apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: circleci + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: circleci + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + spec: + scaleTargetRef: + name: ${RESOURCE_NAME} + pollingInterval: 5 + cooldownPeriod: 300 + minReplicaCount: ${MIN_WORKERS} + maxReplicaCount: ${MAX_WORKERS} + fallback: + failureThreshold: 3 + replicas: ${MIN_WORKERS} + triggers: + - type: redis + metadata: + address: ${REDIS_HOST_PORT} # Format must be host:port + passwordFromEnv: "" + listName: "launch-endpoint-autoscaling:${ENDPOINT_ID}" + listLength: "100" # something absurdly high so we don't scale past 1 pod + activationListLength: "0" + enableTLS: "false" + unsafeSsl: "false" + databaseIndex: "${REDIS_DB_INDEX}" service.yaml: |- apiVersion: v1 kind: Service diff --git a/model-engine/model_engine_server/infra/repositories/ecr_docker_repository.py b/model-engine/model_engine_server/infra/repositories/ecr_docker_repository.py index d2277ab3..8ca5dd61 100644 --- a/model-engine/model_engine_server/infra/repositories/ecr_docker_repository.py +++ b/model-engine/model_engine_server/infra/repositories/ecr_docker_repository.py @@ -29,6 +29,10 @@ def build_image(self, image_params: BuildImageRequest) -> BuildImageResponse: if image_params.requirements_folder: folders_to_include.append(image_params.requirements_folder) + dockerfile_root_folder = image_params.dockerfile.split("/")[0] + if dockerfile_root_folder not in folders_to_include: + folders_to_include.append(dockerfile_root_folder) + build_args = { "BASE_IMAGE": image_params.base_image, } diff --git a/model-engine/tests/unit/domain/test_model_endpoint_use_cases.py b/model-engine/tests/unit/domain/test_model_endpoint_use_cases.py index 1875d7d0..95901f8a 100644 --- a/model-engine/tests/unit/domain/test_model_endpoint_use_cases.py +++ b/model-engine/tests/unit/domain/test_model_endpoint_use_cases.py @@ -69,6 +69,21 @@ async def test_create_model_endpoint_use_case_success( assert response_3.endpoint_creation_task_id assert isinstance(response_3, CreateModelEndpointV1Response) + # test special case where sync/streaming endpoint that has 0-1 min-max workers works + request = create_model_endpoint_request_sync.copy() + request.min_workers = 0 + request.max_workers = 1 + response_4 = await use_case.execute(user=user, request=request) + assert response_4.endpoint_creation_task_id + assert isinstance(response_4, CreateModelEndpointV1Response) + + request = create_model_endpoint_request_streaming.copy() + request.min_workers = 0 + request.max_workers = 1 + response_5 = await use_case.execute(user=user, request=request) + assert response_5.endpoint_creation_task_id + assert isinstance(response_5, CreateModelEndpointV1Response) + @pytest.mark.asyncio async def test_create_model_endpoint_use_case_raises_invalid_value_exception(