From ed038f5d0d53749d169bd1c4fafb6fa16c726077 Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Mon, 28 Aug 2023 16:42:05 -0700 Subject: [PATCH 01/14] apply patch file --- .../model_engine_server/common/config.py | 12 ++ .../use_cases/model_endpoint_use_cases.py | 5 + ...s_inference_autoscaling_metrics_gateway.py | 1 + .../k8s_endpoint_resource_delegate.py | 183 ++++++++++++++++-- .../gateways/resources/k8s_resource_types.py | 27 +++ .../service_template_config_map_circleci.yaml | 41 ++++ .../domain/test_model_endpoint_use_cases.py | 15 ++ 7 files changed, 265 insertions(+), 19 deletions(-) diff --git a/model-engine/model_engine_server/common/config.py b/model-engine/model_engine_server/common/config.py index deeb4477..67f90137 100644 --- a/model-engine/model_engine_server/common/config.py +++ b/model-engine/model_engine_server/common/config.py @@ -62,6 +62,18 @@ def from_yaml(cls, yaml_path): raw_data = yaml.safe_load(f) return HostedModelInferenceServiceConfig(**raw_data) + @property + def cache_redis_host_port(self) -> str: + # redis://redis.url:6379/ + # -> redis.url:6379 + return self.cache_redis_url.split("redis://")[1].split("/")[0] + + @property + def cache_redis_db_index(self) -> int: + # redis://redis.url:6379/ + # -> + return int(self.cache_redis_url.split("/")[-1]) + def read_default_config(): logger.info(f"Using config file path: `{SERVICE_CONFIG_PATH}`") diff --git a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py index 04e595d4..1633a72b 100644 --- a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py @@ -104,6 +104,11 @@ def validate_deployment_resources( max_workers: Optional[int], endpoint_type: ModelEndpointType, ) -> None: + if endpoint_type in [ModelEndpointType.STREAMING, ModelEndpointType.SYNC]: + # Special case for sync endpoints, where we can have 0, 1 min/max workers. + # Otherwise, fall through to the general case. + if min_workers == 0 and max_workers == 1: + return # TODO: we should be also validating the update request against the existing state in k8s (e.g. # so min_workers <= max_workers always) maybe this occurs already in update_model_endpoint. min_endpoint_size = 0 if endpoint_type == ModelEndpointType.ASYNC else 1 diff --git a/model-engine/model_engine_server/infra/gateways/redis_inference_autoscaling_metrics_gateway.py b/model-engine/model_engine_server/infra/gateways/redis_inference_autoscaling_metrics_gateway.py index 5761493e..a5bcc31e 100644 --- a/model-engine/model_engine_server/infra/gateways/redis_inference_autoscaling_metrics_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/redis_inference_autoscaling_metrics_gateway.py @@ -31,6 +31,7 @@ def __init__( @staticmethod def _find_redis_key(endpoint_id: str): + # Keep in line with keda scaled object yaml return f"launch-endpoint-autoscaling:{endpoint_id}" async def _emit_metric(self, endpoint_id: str, expiry_time: int): diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 7006ca1f..75412f0b 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -196,6 +196,7 @@ def load_k8s_yaml(key: str, substitution_kwargs: ResourceArguments) -> Dict[str, yaml_str = Template(template_str).substitute(**substitution_kwargs) try: yaml_obj = yaml.safe_load(yaml_str) + print(yaml_obj) except: logger.exception("Could not load yaml string: %s", yaml_str) raise @@ -599,6 +600,45 @@ async def _create_vpa(vpa: Dict[str, Any], name: str) -> None: logger.exception("Got an exception when trying to apply the VerticalPodAutoscaler") raise + @staticmethod + async def _create_keda_scaled_object(scaled_object: Dict[str, Any], name: str) -> None: + # TODO test + custom_objects_api = get_kubernetes_custom_objects_client() + try: + await custom_objects_api.create_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + body=scaled_object, + ) + except ApiException as exc: + if exc.status == 409: + logger.info(f"ScaledObject {name} already exists, replacing") + + # The async k8s client has a bug with patching custom objects, so we manually + # merge the new VPA with the old one and then replace the old one with the merged + # one. See _create_vpa for more details. + existing_scaled_object = await custom_objects_api.get_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + name=name, + ) + new_scaled_object = deep_update(existing_scaled_object, scaled_object) + await custom_objects_api.replace_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + name=name, + body=new_scaled_object, + ) + else: + logger.exception("Got an exception when trying to apply the ScaledObject") + raise + @staticmethod async def _create_destination_rule(destination_rule: Dict[str, Any], name: str) -> None: """ @@ -995,6 +1035,28 @@ async def _delete_hpa(endpoint_id: str, deployment_name: str) -> bool: return False return True + @staticmethod + async def _delete_keda_scaled_object(endpoint_id: str) -> bool: + custom_objects_client = get_kubernetes_custom_objects_client() + k8s_resource_group_name = _endpoint_id_to_k8s_resource_group_name(endpoint_id) + try: + await custom_objects_client.delete_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + name=k8s_resource_group_name, + ) + except ApiException as e: + if e.status == 404: + logger.warning( + f"Trying to delete nonexistent ScaledObject {k8s_resource_group_name}" + ) + else: + logger.exception(f"Deletion of ScaledObject {k8s_resource_group_name} failed") + return False + return True + # --- Private higher level fns that interact with k8s @staticmethod @@ -1102,19 +1164,36 @@ async def _create_or_update_resources( else: api_version = "autoscaling/v2beta2" - hpa_arguments = get_endpoint_resource_arguments_from_request( - k8s_resource_group_name=k8s_resource_group_name, - request=request, - sqs_queue_name=sqs_queue_name_str, - sqs_queue_url=sqs_queue_url_str, - endpoint_resource_name="horizontal-pod-autoscaler", - api_version=api_version, - ) - hpa_template = load_k8s_yaml("horizontal-pod-autoscaler.yaml", hpa_arguments) - await self._create_hpa( - hpa=hpa_template, - name=k8s_resource_group_name, - ) + # TODO select between hpa and keda scaled object somehow + if request.build_endpoint_request.min_workers > 0: + hpa_arguments = get_endpoint_resource_arguments_from_request( + k8s_resource_group_name=k8s_resource_group_name, + request=request, + sqs_queue_name=sqs_queue_name_str, + sqs_queue_url=sqs_queue_url_str, + endpoint_resource_name="horizontal-pod-autoscaler", + api_version=api_version, + ) + hpa_template = load_k8s_yaml("horizontal-pod-autoscaler.yaml", hpa_arguments) + await self._create_hpa( + hpa=hpa_template, + name=k8s_resource_group_name, + ) + else: # min workers == 0, use keda + keda_scaled_object_arguments = get_endpoint_resource_arguments_from_request( + k8s_resource_group_name=k8s_resource_group_name, + request=request, + sqs_queue_name=sqs_queue_name_str, + sqs_queue_url=sqs_queue_url_str, + endpoint_resource_name="keda-scaled-object", + ) + keda_scaled_object_template = load_k8s_yaml( + "keda-scaled-object.yaml", keda_scaled_object_arguments + ) + await self._create_keda_scaled_object( + scaled_object=keda_scaled_object_template, + name=k8s_resource_group_name, + ) service_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, @@ -1204,6 +1283,18 @@ def _get_sync_autoscaling_params( per_worker=per_worker, ) + @staticmethod + def _get_sync_autoscaling_params_from_keda( + keda_config, + ) -> HorizontalAutoscalingEndpointParams: + # import pdb; pdb.set_trace() + spec = keda_config["spec"] + return dict( + max_workers=spec.get("maxReplicaCount"), + min_workers=spec.get("minReplicaCount"), + per_worker=1, # TODO dummy value, fill in when we autoscale from 0 to 1 + ) + async def _get_resources( self, endpoint_id: str, deployment_name: str, endpoint_type: ModelEndpointType ) -> ModelEndpointInfraState: @@ -1232,10 +1323,36 @@ async def _get_resources( horizontal_autoscaling_params = self._get_async_autoscaling_params(deployment_config) elif endpoint_type in {ModelEndpointType.SYNC, ModelEndpointType.STREAMING}: autoscaling_client = get_kubernetes_autoscaling_client() - hpa_config = await autoscaling_client.read_namespaced_horizontal_pod_autoscaler( - k8s_resource_group_name, hmi_config.endpoint_namespace - ) - horizontal_autoscaling_params = self._get_sync_autoscaling_params(hpa_config) + custom_object_client = get_kubernetes_custom_objects_client() + try: + hpa_config = await autoscaling_client.read_namespaced_horizontal_pod_autoscaler( + k8s_resource_group_name, hmi_config.endpoint_namespace + ) + except ApiException as e: + if e.status == 404: + hpa_config = None + else: + raise e + try: + keda_scaled_object_config = await custom_object_client.get_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + name=k8s_resource_group_name, + ) + except ApiException: + keda_scaled_object_config = None + if hpa_config is not None: + horizontal_autoscaling_params = self._get_sync_autoscaling_params(hpa_config) + elif keda_scaled_object_config is not None: + horizontal_autoscaling_params = self._get_sync_autoscaling_params_from_keda( + keda_scaled_object_config + ) + else: + raise ValueError( + f"Could not find autoscaling config for {endpoint_type}" + ) # TODO better error type else: raise ValueError(f"Unexpected endpoint type {endpoint_type}") @@ -1326,10 +1443,25 @@ async def _get_all_resources( vpas = [] else: raise + try: + keda_scaled_objects = ( + await custom_objects_client.list_namespaced_custom_object( + group="keda.sh", + version="v1alpha1", + namespace=hmi_config.endpoint_namespace, + plural="scaledobjects", + ) + )["items"] + except ApiException as e: + if e.status == 404: + keda_scaled_objects = [] + else: + raise deployments_by_name = {deployment.metadata.name: deployment for deployment in deployments} hpas_by_name = {hpa.metadata.name: hpa for hpa in hpas} vpas_by_name = {vpa["metadata"]["name"]: vpa for vpa in vpas} + keda_scaled_objects_by_name = {kso["metadata"]["name"]: kso for kso in keda_scaled_objects} all_config_maps = await self._get_all_config_maps() # can safely assume hpa with same name as deployment corresponds to the same Launch Endpoint logger.info(f"Orphaned hpas: {set(hpas_by_name).difference(set(deployments_by_name))}") @@ -1340,6 +1472,7 @@ async def _get_all_resources( try: hpa_config = hpas_by_name.get(name, None) vpa_config = vpas_by_name.get(name, None) + keda_scaled_object_config = keda_scaled_objects_by_name.get(name, None) common_params = self._get_common_endpoint_params(deployment_config) launch_container = self._get_launch_container(deployment_config) @@ -1355,9 +1488,14 @@ async def _get_all_resources( if hpa_config: # Assume it's a sync endpoint # TODO I think this is correct but only barely, it introduces a coupling between - # an HPA existing and an endpoint being a sync endpoint. The "more correct" + # an HPA (or keda SO) existing and an endpoint being a sync endpoint. The "more correct" # thing to do is to query the db to get the endpoints, but it doesn't belong here horizontal_autoscaling_params = self._get_sync_autoscaling_params(hpa_config) + elif keda_scaled_object_config: + # Also assume it's a sync endpoint + horizontal_autoscaling_params = self._get_sync_autoscaling_params_from_keda( + keda_scaled_object_config + ) else: horizontal_autoscaling_params = self._get_async_autoscaling_params( deployment_config @@ -1427,9 +1565,16 @@ async def _delete_resources_sync(self, endpoint_id: str, deployment_name: str) - service_delete_succeeded = await self._delete_service( endpoint_id=endpoint_id, deployment_name=deployment_name ) + # we should have created exactly one of an HPA or a keda scaled object hpa_delete_succeeded = await self._delete_hpa( endpoint_id=endpoint_id, deployment_name=deployment_name ) + if not hpa_delete_succeeded: + keda_scaled_object_succeeded = await self._delete_keda_scaled_object( + endpoint_id=endpoint_id + ) + else: + keda_scaled_object_succeeded = False await self._delete_vpa(endpoint_id=endpoint_id) destination_rule_delete_succeeded = await self._delete_destination_rule( @@ -1443,7 +1588,7 @@ async def _delete_resources_sync(self, endpoint_id: str, deployment_name: str) - deployment_delete_succeeded and config_map_delete_succeeded and service_delete_succeeded - and hpa_delete_succeeded + and (hpa_delete_succeeded or keda_scaled_object_succeeded) and destination_rule_delete_succeeded and virtual_service_delete_succeeded ) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 632ec7bf..6c0f9724 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -284,6 +284,14 @@ class HorizontalPodAutoscalerArguments(_BaseEndpointArguments): API_VERSION: str +class KedaScaledObjectArguments(_BaseEndpointArguments): + MIN_WORKERS: int + MAX_WORKERS: int + # CONCURRENCY: float # TODO add in when we scale from 1 -> N pods + REDIS_HOST_PORT: str + REDIS_DB_INDEX: str + + class UserConfigArguments(_BaseEndpointArguments): """Keyword-arguments for substituting into user-config templates.""" @@ -1089,6 +1097,25 @@ def get_endpoint_resource_arguments_from_request( MIN_WORKERS=build_endpoint_request.min_workers, MAX_WORKERS=build_endpoint_request.max_workers, ) + elif endpoint_resource_name == "keda-scaled-object": + return KedaScaledObjectArguments( + # Base resource arguments + RESOURCE_NAME=k8s_resource_group_name, + NAMESPACE=hmi_config.endpoint_namespace, + ENDPOINT_ID=model_endpoint_record.id, + ENDPOINT_NAME=model_endpoint_record.name, + TEAM=team, + PRODUCT=product, + CREATED_BY=created_by, + OWNER=owner, + GIT_TAG=GIT_TAG, + # Scaled Object arguments + MIN_WORKERS=build_endpoint_request.min_workers, + MAX_WORKERS=build_endpoint_request.max_workers, + # CONCURRENCY=build_endpoint_request.concurrency, + REDIS_HOST_PORT=hmi_config.cache_redis_host_port, + REDIS_DB_INDEX=hmi_config.cache_redis_db_index, + ) elif endpoint_resource_name == "service": # Use ClusterIP by default for sync endpoint. # In Circle CI, we use a NodePort to expose the service to CI. diff --git a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml index 85f312b8..48f0e924 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml +++ b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml @@ -2558,6 +2558,47 @@ data: target: type: Value averageValue: ${CONCURRENCY} + keda-scaled-object.yaml: |- + apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: circleci + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: circleci + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + spec: + scaleTargetRef: + name: ${RESOURCE_NAME} + pollingInterval: 5 + cooldownPeriod: 300 + minReplicaCount: ${MIN_WORKERS} + maxReplicaCount: ${MAX_WORKERS} + fallback: + failureThreshold: 3 + replicas: ${MIN_WORKERS} + triggers: + - type: redis + metadata: + address: ${REDIS_HOST_PORT} # Format must be host:port + passwordFromEnv: "" + listName: "launch-endpoint-autoscaling:${ENDPOINT_ID}" + listLength: "100" # something absurdly high so we don't scale past 1 pod + activationListLength: "0" + enableTLS: "false" + unsafeSsl: "false" + databaseIndex: "${REDIS_DB_INDEX}" service.yaml: |- apiVersion: v1 kind: Service diff --git a/model-engine/tests/unit/domain/test_model_endpoint_use_cases.py b/model-engine/tests/unit/domain/test_model_endpoint_use_cases.py index 1875d7d0..95901f8a 100644 --- a/model-engine/tests/unit/domain/test_model_endpoint_use_cases.py +++ b/model-engine/tests/unit/domain/test_model_endpoint_use_cases.py @@ -69,6 +69,21 @@ async def test_create_model_endpoint_use_case_success( assert response_3.endpoint_creation_task_id assert isinstance(response_3, CreateModelEndpointV1Response) + # test special case where sync/streaming endpoint that has 0-1 min-max workers works + request = create_model_endpoint_request_sync.copy() + request.min_workers = 0 + request.max_workers = 1 + response_4 = await use_case.execute(user=user, request=request) + assert response_4.endpoint_creation_task_id + assert isinstance(response_4, CreateModelEndpointV1Response) + + request = create_model_endpoint_request_streaming.copy() + request.min_workers = 0 + request.max_workers = 1 + response_5 = await use_case.execute(user=user, request=request) + assert response_5.endpoint_creation_task_id + assert isinstance(response_5, CreateModelEndpointV1Response) + @pytest.mark.asyncio async def test_create_model_endpoint_use_case_raises_invalid_value_exception( From bd84d6431f5750f35a9ad0456a8b128d093ffdad Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Mon, 28 Aug 2023 16:47:31 -0700 Subject: [PATCH 02/14] add in yaml template --- .../service_template_config_map.yaml | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index 84c26206..ce8f3ae5 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -396,6 +396,36 @@ data: target: type: Value averageValue: ${CONCURRENCY} + # TODO maybe fix keda scaled object + keda-scaled-object.yaml: |- + apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + {{- $service_template_labels | nindent 8 }} + spec: + scaleTargetRef: + name: ${RESOURCE_NAME} + pollingInterval: 5 + cooldownPeriod: 300 + minReplicaCount: ${MIN_WORKERS} + maxReplicaCount: ${MAX_WORKERS} + fallback: + failureThreshold: 3 + replicas: ${MIN_WORKERS} + triggers: + - type: redis + metadata: + address: ${REDIS_HOST_PORT} # Format must be host:port + passwordFromEnv: "" + listName: "launch-endpoint-autoscaling:${ENDPOINT_ID}" + listLength: "100" # something absurdly high so we don't scale past 1 pod + activationListLength: "0" + enableTLS: "false" + unsafeSsl: "false" + databaseIndex: "${REDIS_DB_INDEX} service.yaml: |- apiVersion: v1 kind: Service From 1c7436968861428fefab5084c7dbe4c1e6ec5a6f Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Tue, 29 Aug 2023 15:53:16 -0700 Subject: [PATCH 03/14] add dockerfile root folder --- .../infra/repositories/ecr_docker_repository.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/model-engine/model_engine_server/infra/repositories/ecr_docker_repository.py b/model-engine/model_engine_server/infra/repositories/ecr_docker_repository.py index ca5f7469..f3216e49 100644 --- a/model-engine/model_engine_server/infra/repositories/ecr_docker_repository.py +++ b/model-engine/model_engine_server/infra/repositories/ecr_docker_repository.py @@ -25,6 +25,10 @@ def build_image(self, image_params: BuildImageRequest) -> BuildImageResponse: if image_params.requirements_folder: folders_to_include.append(image_params.requirements_folder) + dockerfile_root_folder = image_params.dockerfile.split("/")[0] + if dockerfile_root_folder not in folders_to_include: + folders_to_include.append(dockerfile_root_folder) + build_args = { "BASE_IMAGE": image_params.base_image, } From 300d92a76f746abe97462eb458fe8163bd2a6aba Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Tue, 29 Aug 2023 17:33:15 -0700 Subject: [PATCH 04/14] dang it typo --- charts/model-engine/templates/service_template_config_map.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index ce8f3ae5..f9286359 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -425,7 +425,7 @@ data: activationListLength: "0" enableTLS: "false" unsafeSsl: "false" - databaseIndex: "${REDIS_DB_INDEX} + databaseIndex: "${REDIS_DB_INDEX}" service.yaml: |- apiVersion: v1 kind: Service From 7376af091447863210c3a1224e0e1f8a4f924f17 Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Tue, 29 Aug 2023 17:59:07 -0700 Subject: [PATCH 05/14] change _delete_hpa --- .../infra/gateways/resources/k8s_endpoint_resource_delegate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 75412f0b..9e5d7d9f 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1027,7 +1027,7 @@ async def _delete_hpa(endpoint_id: str, deployment_name: str) -> bool: logger.exception( f"Deletion of HorizontalPodAutoscaler {k8s_resource_group_name} failed" ) - return False + return False else: logger.exception( f"Deletion of HorizontalPodAutoscaler {k8s_resource_group_name} failed" From 4d008446f3354350eaf053254ab232bf3c6a6cfc Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Wed, 30 Aug 2023 13:08:38 -0700 Subject: [PATCH 06/14] clean up todos --- charts/model-engine/templates/service_template_config_map.yaml | 1 - .../infra/gateways/resources/k8s_endpoint_resource_delegate.py | 3 --- 2 files changed, 4 deletions(-) diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index f9286359..42c03838 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -396,7 +396,6 @@ data: target: type: Value averageValue: ${CONCURRENCY} - # TODO maybe fix keda scaled object keda-scaled-object.yaml: |- apiVersion: keda.sh/v1alpha1 kind: ScaledObject diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 9e5d7d9f..285b97ff 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -196,7 +196,6 @@ def load_k8s_yaml(key: str, substitution_kwargs: ResourceArguments) -> Dict[str, yaml_str = Template(template_str).substitute(**substitution_kwargs) try: yaml_obj = yaml.safe_load(yaml_str) - print(yaml_obj) except: logger.exception("Could not load yaml string: %s", yaml_str) raise @@ -602,7 +601,6 @@ async def _create_vpa(vpa: Dict[str, Any], name: str) -> None: @staticmethod async def _create_keda_scaled_object(scaled_object: Dict[str, Any], name: str) -> None: - # TODO test custom_objects_api = get_kubernetes_custom_objects_client() try: await custom_objects_api.create_namespaced_custom_object( @@ -1164,7 +1162,6 @@ async def _create_or_update_resources( else: api_version = "autoscaling/v2beta2" - # TODO select between hpa and keda scaled object somehow if request.build_endpoint_request.min_workers > 0: hpa_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, From 3fb5e9980413ec17c87f79b589494f658e893e4c Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Wed, 30 Aug 2023 15:43:26 -0700 Subject: [PATCH 07/14] cleanup --- .../infra/gateways/resources/k8s_endpoint_resource_delegate.py | 1 - 1 file changed, 1 deletion(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 285b97ff..b09ddb19 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1284,7 +1284,6 @@ def _get_sync_autoscaling_params( def _get_sync_autoscaling_params_from_keda( keda_config, ) -> HorizontalAutoscalingEndpointParams: - # import pdb; pdb.set_trace() spec = keda_config["spec"] return dict( max_workers=spec.get("maxReplicaCount"), From 957346466a61a18035f9b0ece12a3de52c8a5f03 Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Wed, 30 Aug 2023 15:47:22 -0700 Subject: [PATCH 08/14] don't change semantics of _delete_hpa --- .../resources/k8s_endpoint_resource_delegate.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index b09ddb19..29b7fb5b 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1025,7 +1025,7 @@ async def _delete_hpa(endpoint_id: str, deployment_name: str) -> bool: logger.exception( f"Deletion of HorizontalPodAutoscaler {k8s_resource_group_name} failed" ) - return False + return False else: logger.exception( f"Deletion of HorizontalPodAutoscaler {k8s_resource_group_name} failed" @@ -1565,12 +1565,9 @@ async def _delete_resources_sync(self, endpoint_id: str, deployment_name: str) - hpa_delete_succeeded = await self._delete_hpa( endpoint_id=endpoint_id, deployment_name=deployment_name ) - if not hpa_delete_succeeded: - keda_scaled_object_succeeded = await self._delete_keda_scaled_object( - endpoint_id=endpoint_id - ) - else: - keda_scaled_object_succeeded = False + keda_scaled_object_succeeded = await self._delete_keda_scaled_object( + endpoint_id=endpoint_id + ) await self._delete_vpa(endpoint_id=endpoint_id) destination_rule_delete_succeeded = await self._delete_destination_rule( From fcf22aee45b2a2994cd1fa64e507232e83dde36f Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Wed, 30 Aug 2023 15:48:29 -0700 Subject: [PATCH 09/14] comment --- .../infra/gateways/resources/k8s_endpoint_resource_delegate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 29b7fb5b..1eaeb99b 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -615,7 +615,7 @@ async def _create_keda_scaled_object(scaled_object: Dict[str, Any], name: str) - logger.info(f"ScaledObject {name} already exists, replacing") # The async k8s client has a bug with patching custom objects, so we manually - # merge the new VPA with the old one and then replace the old one with the merged + # merge the new ScaledObject with the old one and then replace the old one with the merged # one. See _create_vpa for more details. existing_scaled_object = await custom_objects_api.get_namespaced_custom_object( group="keda.sh", From d1410ee951802af4a5c8707c8facf533bffdcdf5 Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Wed, 30 Aug 2023 15:50:17 -0700 Subject: [PATCH 10/14] comment --- .../infra/gateways/resources/k8s_endpoint_resource_delegate.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 1eaeb99b..05b365ec 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1162,6 +1162,9 @@ async def _create_or_update_resources( else: api_version = "autoscaling/v2beta2" + # create exactly one of HPA or KEDA ScaledObject, depending if we request more than 0 min_workers + # Right now, keda only will support scaling from 0 to 1 + # TODO support keda scaling from 1 to N as well if request.build_endpoint_request.min_workers > 0: hpa_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, From 8ed151af3f22ca88d546ecd7c327e4f0bb60d6b0 Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Wed, 30 Aug 2023 15:52:33 -0700 Subject: [PATCH 11/14] better error type --- .../gateways/resources/k8s_endpoint_resource_delegate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 05b365ec..ce161195 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1349,9 +1349,9 @@ async def _get_resources( keda_scaled_object_config ) else: - raise ValueError( + raise EndpointResourceInfraException( f"Could not find autoscaling config for {endpoint_type}" - ) # TODO better error type + ) else: raise ValueError(f"Unexpected endpoint type {endpoint_type}") From b718df569575ca5ab28db004365320da4b484e6d Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Fri, 1 Sep 2023 10:07:49 -0700 Subject: [PATCH 12/14] comment --- .../gateways/resources/k8s_endpoint_resource_delegate.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index ce161195..33a598a9 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -617,6 +617,8 @@ async def _create_keda_scaled_object(scaled_object: Dict[str, Any], name: str) - # The async k8s client has a bug with patching custom objects, so we manually # merge the new ScaledObject with the old one and then replace the old one with the merged # one. See _create_vpa for more details. + # There is a setting `restoreToOriginalReplicaCount` in the keda ScaledObject that should be set to + # false which should make it safe to do this replace (as opposed to a patch) existing_scaled_object = await custom_objects_api.get_namespaced_custom_object( group="keda.sh", version="v1alpha1", @@ -1166,6 +1168,7 @@ async def _create_or_update_resources( # Right now, keda only will support scaling from 0 to 1 # TODO support keda scaling from 1 to N as well if request.build_endpoint_request.min_workers > 0: + # TODO delete keda scaled object if it exists hpa_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, request=request, @@ -1180,6 +1183,7 @@ async def _create_or_update_resources( name=k8s_resource_group_name, ) else: # min workers == 0, use keda + # TODO delete hpa if it exists keda_scaled_object_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, request=request, From 6dff17c7cf272c0a97016006617a6afa267a81be Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Fri, 1 Sep 2023 10:10:58 -0700 Subject: [PATCH 13/14] handle delete case --- .../gateways/resources/k8s_endpoint_resource_delegate.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 33a598a9..330aadd0 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1168,7 +1168,8 @@ async def _create_or_update_resources( # Right now, keda only will support scaling from 0 to 1 # TODO support keda scaling from 1 to N as well if request.build_endpoint_request.min_workers > 0: - # TODO delete keda scaled object if it exists + # Delete keda scaled object if it exists so exactly one of HPA or KEDA ScaledObject remains + await self._delete_keda_scaled_object(request.build_endpoint_request.model_endpoint_record.id) hpa_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, request=request, @@ -1183,7 +1184,8 @@ async def _create_or_update_resources( name=k8s_resource_group_name, ) else: # min workers == 0, use keda - # TODO delete hpa if it exists + # Delete hpa if it exists so exactly one of HPA or KEDA ScaledObject remains + await self._delete_hpa(request.build_endpoint_request.model_endpoint_record.id, k8s_resource_group_name) keda_scaled_object_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, request=request, From 0c5d26c80a0817f64544e4c2e09026594a05566d Mon Sep 17 00:00:00 2001 From: Sean Shi Date: Fri, 1 Sep 2023 10:40:05 -0700 Subject: [PATCH 14/14] shorter lol --- .../gateways/resources/k8s_endpoint_resource_delegate.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 330aadd0..56836596 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -1169,7 +1169,9 @@ async def _create_or_update_resources( # TODO support keda scaling from 1 to N as well if request.build_endpoint_request.min_workers > 0: # Delete keda scaled object if it exists so exactly one of HPA or KEDA ScaledObject remains - await self._delete_keda_scaled_object(request.build_endpoint_request.model_endpoint_record.id) + await self._delete_keda_scaled_object( + build_endpoint_request.model_endpoint_record.id + ) hpa_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, request=request, @@ -1185,7 +1187,9 @@ async def _create_or_update_resources( ) else: # min workers == 0, use keda # Delete hpa if it exists so exactly one of HPA or KEDA ScaledObject remains - await self._delete_hpa(request.build_endpoint_request.model_endpoint_record.id, k8s_resource_group_name) + await self._delete_hpa( + build_endpoint_request.model_endpoint_record.id, k8s_resource_group_name + ) keda_scaled_object_arguments = get_endpoint_resource_arguments_from_request( k8s_resource_group_name=k8s_resource_group_name, request=request,