Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions charts/model-engine/templates/service_template_config_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,35 @@ data:
target:
type: Value
averageValue: ${CONCURRENCY}
keda-scaled-object.yaml: |-
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: ${RESOURCE_NAME}
namespace: ${NAMESPACE}
labels:
{{- $service_template_labels | nindent 8 }}
spec:
scaleTargetRef:
name: ${RESOURCE_NAME}
pollingInterval: 5
cooldownPeriod: 300
minReplicaCount: ${MIN_WORKERS}
maxReplicaCount: ${MAX_WORKERS}
fallback:
failureThreshold: 3
replicas: ${MIN_WORKERS}
triggers:
- type: redis
metadata:
address: ${REDIS_HOST_PORT} # Format must be host:port
passwordFromEnv: ""
listName: "launch-endpoint-autoscaling:${ENDPOINT_ID}"
listLength: "100" # something absurdly high so we don't scale past 1 pod
activationListLength: "0"
enableTLS: "false"
unsafeSsl: "false"
databaseIndex: "${REDIS_DB_INDEX}"
service.yaml: |-
apiVersion: v1
kind: Service
Expand Down
12 changes: 12 additions & 0 deletions model-engine/model_engine_server/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ def from_yaml(cls, yaml_path):
raw_data = yaml.safe_load(f)
return HostedModelInferenceServiceConfig(**raw_data)

@property
def cache_redis_host_port(self) -> str:
# redis://redis.url:6379/<db_index>
# -> redis.url:6379
return self.cache_redis_url.split("redis://")[1].split("/")[0]

@property
def cache_redis_db_index(self) -> int:
# redis://redis.url:6379/<db_index>
# -> <db_index>
return int(self.cache_redis_url.split("/")[-1])


def read_default_config():
logger.info(f"Using config file path: `{SERVICE_CONFIG_PATH}`")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ def validate_deployment_resources(
max_workers: Optional[int],
endpoint_type: ModelEndpointType,
) -> None:
if endpoint_type in [ModelEndpointType.STREAMING, ModelEndpointType.SYNC]:
# Special case for sync endpoints, where we can have 0, 1 min/max workers.
# Otherwise, fall through to the general case.
if min_workers == 0 and max_workers == 1:
return
# TODO: we should be also validating the update request against the existing state in k8s (e.g.
# so min_workers <= max_workers always) maybe this occurs already in update_model_endpoint.
min_endpoint_size = 0 if endpoint_type == ModelEndpointType.ASYNC else 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(

@staticmethod
def _find_redis_key(endpoint_id: str):
# Keep in line with keda scaled object yaml
return f"launch-endpoint-autoscaling:{endpoint_id}"

async def _emit_metric(self, endpoint_id: str, expiry_time: int):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,46 @@ async def _create_vpa(vpa: Dict[str, Any], name: str) -> None:
logger.exception("Got an exception when trying to apply the VerticalPodAutoscaler")
raise

@staticmethod
async def _create_keda_scaled_object(scaled_object: Dict[str, Any], name: str) -> None:
custom_objects_api = get_kubernetes_custom_objects_client()
try:
await custom_objects_api.create_namespaced_custom_object(
group="keda.sh",
version="v1alpha1",
namespace=hmi_config.endpoint_namespace,
plural="scaledobjects",
body=scaled_object,
)
except ApiException as exc:
if exc.status == 409:
logger.info(f"ScaledObject {name} already exists, replacing")

# The async k8s client has a bug with patching custom objects, so we manually
# merge the new ScaledObject with the old one and then replace the old one with the merged
# one. See _create_vpa for more details.
# There is a setting `restoreToOriginalReplicaCount` in the keda ScaledObject that should be set to
# false which should make it safe to do this replace (as opposed to a patch)
existing_scaled_object = await custom_objects_api.get_namespaced_custom_object(
group="keda.sh",
version="v1alpha1",
namespace=hmi_config.endpoint_namespace,
plural="scaledobjects",
name=name,
)
new_scaled_object = deep_update(existing_scaled_object, scaled_object)
await custom_objects_api.replace_namespaced_custom_object(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do replace? Or patch? I just remember that replacing caused rolling restarts to not work, which doesn't apply here, but if it's all the same, maybe just patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that other resources using the custom_objects_api apparently needed to use replace, so am doing that here as well. Separately there's an option restoreToOriginalReplicaCount here that is default false, and which I haven't set, so I think what should happen is that as the ScaledObject gets deleted the deployment doesn't get affected, so we shouldn't get downtime here at least.

group="keda.sh",
version="v1alpha1",
namespace=hmi_config.endpoint_namespace,
plural="scaledobjects",
name=name,
body=new_scaled_object,
)
else:
logger.exception("Got an exception when trying to apply the ScaledObject")
raise

@staticmethod
async def _create_destination_rule(destination_rule: Dict[str, Any], name: str) -> None:
"""
Expand Down Expand Up @@ -995,6 +1035,28 @@ async def _delete_hpa(endpoint_id: str, deployment_name: str) -> bool:
return False
return True

@staticmethod
async def _delete_keda_scaled_object(endpoint_id: str) -> bool:
custom_objects_client = get_kubernetes_custom_objects_client()
k8s_resource_group_name = _endpoint_id_to_k8s_resource_group_name(endpoint_id)
try:
await custom_objects_client.delete_namespaced_custom_object(
group="keda.sh",
version="v1alpha1",
namespace=hmi_config.endpoint_namespace,
plural="scaledobjects",
name=k8s_resource_group_name,
)
except ApiException as e:
if e.status == 404:
logger.warning(
f"Trying to delete nonexistent ScaledObject {k8s_resource_group_name}"
)
else:
logger.exception(f"Deletion of ScaledObject {k8s_resource_group_name} failed")
return False
return True

# --- Private higher level fns that interact with k8s

@staticmethod
Expand Down Expand Up @@ -1102,19 +1164,46 @@ async def _create_or_update_resources(
else:
api_version = "autoscaling/v2beta2"

hpa_arguments = get_endpoint_resource_arguments_from_request(
k8s_resource_group_name=k8s_resource_group_name,
request=request,
sqs_queue_name=sqs_queue_name_str,
sqs_queue_url=sqs_queue_url_str,
endpoint_resource_name="horizontal-pod-autoscaler",
api_version=api_version,
)
hpa_template = load_k8s_yaml("horizontal-pod-autoscaler.yaml", hpa_arguments)
await self._create_hpa(
hpa=hpa_template,
name=k8s_resource_group_name,
)
# create exactly one of HPA or KEDA ScaledObject, depending if we request more than 0 min_workers
# Right now, keda only will support scaling from 0 to 1
# TODO support keda scaling from 1 to N as well
if request.build_endpoint_request.min_workers > 0:
# Delete keda scaled object if it exists so exactly one of HPA or KEDA ScaledObject remains
await self._delete_keda_scaled_object(
build_endpoint_request.model_endpoint_record.id
)
hpa_arguments = get_endpoint_resource_arguments_from_request(
k8s_resource_group_name=k8s_resource_group_name,
request=request,
sqs_queue_name=sqs_queue_name_str,
sqs_queue_url=sqs_queue_url_str,
endpoint_resource_name="horizontal-pod-autoscaler",
api_version=api_version,
)
hpa_template = load_k8s_yaml("horizontal-pod-autoscaler.yaml", hpa_arguments)
await self._create_hpa(
hpa=hpa_template,
name=k8s_resource_group_name,
)
else: # min workers == 0, use keda
# Delete hpa if it exists so exactly one of HPA or KEDA ScaledObject remains
await self._delete_hpa(
build_endpoint_request.model_endpoint_record.id, k8s_resource_group_name
)
keda_scaled_object_arguments = get_endpoint_resource_arguments_from_request(
k8s_resource_group_name=k8s_resource_group_name,
request=request,
sqs_queue_name=sqs_queue_name_str,
sqs_queue_url=sqs_queue_url_str,
endpoint_resource_name="keda-scaled-object",
)
keda_scaled_object_template = load_k8s_yaml(
"keda-scaled-object.yaml", keda_scaled_object_arguments
)
await self._create_keda_scaled_object(
scaled_object=keda_scaled_object_template,
name=k8s_resource_group_name,
)

service_arguments = get_endpoint_resource_arguments_from_request(
k8s_resource_group_name=k8s_resource_group_name,
Expand Down Expand Up @@ -1204,6 +1293,17 @@ def _get_sync_autoscaling_params(
per_worker=per_worker,
)

@staticmethod
def _get_sync_autoscaling_params_from_keda(
keda_config,
) -> HorizontalAutoscalingEndpointParams:
spec = keda_config["spec"]
return dict(
max_workers=spec.get("maxReplicaCount"),
min_workers=spec.get("minReplicaCount"),
per_worker=1, # TODO dummy value, fill in when we autoscale from 0 to 1
)

async def _get_resources(
self, endpoint_id: str, deployment_name: str, endpoint_type: ModelEndpointType
) -> ModelEndpointInfraState:
Expand Down Expand Up @@ -1232,10 +1332,36 @@ async def _get_resources(
horizontal_autoscaling_params = self._get_async_autoscaling_params(deployment_config)
elif endpoint_type in {ModelEndpointType.SYNC, ModelEndpointType.STREAMING}:
autoscaling_client = get_kubernetes_autoscaling_client()
hpa_config = await autoscaling_client.read_namespaced_horizontal_pod_autoscaler(
k8s_resource_group_name, hmi_config.endpoint_namespace
)
horizontal_autoscaling_params = self._get_sync_autoscaling_params(hpa_config)
custom_object_client = get_kubernetes_custom_objects_client()
try:
hpa_config = await autoscaling_client.read_namespaced_horizontal_pod_autoscaler(
k8s_resource_group_name, hmi_config.endpoint_namespace
)
except ApiException as e:
if e.status == 404:
hpa_config = None
else:
raise e
try:
keda_scaled_object_config = await custom_object_client.get_namespaced_custom_object(
group="keda.sh",
version="v1alpha1",
namespace=hmi_config.endpoint_namespace,
plural="scaledobjects",
name=k8s_resource_group_name,
)
except ApiException:
keda_scaled_object_config = None
if hpa_config is not None:
horizontal_autoscaling_params = self._get_sync_autoscaling_params(hpa_config)
elif keda_scaled_object_config is not None:
horizontal_autoscaling_params = self._get_sync_autoscaling_params_from_keda(
keda_scaled_object_config
)
else:
raise EndpointResourceInfraException(
f"Could not find autoscaling config for {endpoint_type}"
)
else:
raise ValueError(f"Unexpected endpoint type {endpoint_type}")

Expand Down Expand Up @@ -1326,10 +1452,25 @@ async def _get_all_resources(
vpas = []
else:
raise
try:
keda_scaled_objects = (
await custom_objects_client.list_namespaced_custom_object(
group="keda.sh",
version="v1alpha1",
namespace=hmi_config.endpoint_namespace,
plural="scaledobjects",
)
)["items"]
except ApiException as e:
if e.status == 404:
keda_scaled_objects = []
else:
raise

deployments_by_name = {deployment.metadata.name: deployment for deployment in deployments}
hpas_by_name = {hpa.metadata.name: hpa for hpa in hpas}
vpas_by_name = {vpa["metadata"]["name"]: vpa for vpa in vpas}
keda_scaled_objects_by_name = {kso["metadata"]["name"]: kso for kso in keda_scaled_objects}
all_config_maps = await self._get_all_config_maps()
# can safely assume hpa with same name as deployment corresponds to the same Launch Endpoint
logger.info(f"Orphaned hpas: {set(hpas_by_name).difference(set(deployments_by_name))}")
Expand All @@ -1340,6 +1481,7 @@ async def _get_all_resources(
try:
hpa_config = hpas_by_name.get(name, None)
vpa_config = vpas_by_name.get(name, None)
keda_scaled_object_config = keda_scaled_objects_by_name.get(name, None)
common_params = self._get_common_endpoint_params(deployment_config)
launch_container = self._get_launch_container(deployment_config)

Expand All @@ -1355,9 +1497,14 @@ async def _get_all_resources(
if hpa_config:
# Assume it's a sync endpoint
# TODO I think this is correct but only barely, it introduces a coupling between
# an HPA existing and an endpoint being a sync endpoint. The "more correct"
# an HPA (or keda SO) existing and an endpoint being a sync endpoint. The "more correct"
# thing to do is to query the db to get the endpoints, but it doesn't belong here
horizontal_autoscaling_params = self._get_sync_autoscaling_params(hpa_config)
elif keda_scaled_object_config:
# Also assume it's a sync endpoint
horizontal_autoscaling_params = self._get_sync_autoscaling_params_from_keda(
keda_scaled_object_config
)
else:
horizontal_autoscaling_params = self._get_async_autoscaling_params(
deployment_config
Expand Down Expand Up @@ -1427,9 +1574,13 @@ async def _delete_resources_sync(self, endpoint_id: str, deployment_name: str) -
service_delete_succeeded = await self._delete_service(
endpoint_id=endpoint_id, deployment_name=deployment_name
)
# we should have created exactly one of an HPA or a keda scaled object
hpa_delete_succeeded = await self._delete_hpa(
endpoint_id=endpoint_id, deployment_name=deployment_name
)
keda_scaled_object_succeeded = await self._delete_keda_scaled_object(
endpoint_id=endpoint_id
)
await self._delete_vpa(endpoint_id=endpoint_id)

destination_rule_delete_succeeded = await self._delete_destination_rule(
Expand All @@ -1443,7 +1594,7 @@ async def _delete_resources_sync(self, endpoint_id: str, deployment_name: str) -
deployment_delete_succeeded
and config_map_delete_succeeded
and service_delete_succeeded
and hpa_delete_succeeded
and (hpa_delete_succeeded or keda_scaled_object_succeeded)
and destination_rule_delete_succeeded
and virtual_service_delete_succeeded
)
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ class HorizontalPodAutoscalerArguments(_BaseEndpointArguments):
API_VERSION: str


class KedaScaledObjectArguments(_BaseEndpointArguments):
MIN_WORKERS: int
MAX_WORKERS: int
# CONCURRENCY: float # TODO add in when we scale from 1 -> N pods
REDIS_HOST_PORT: str
REDIS_DB_INDEX: str


class UserConfigArguments(_BaseEndpointArguments):
"""Keyword-arguments for substituting into user-config templates."""

Expand Down Expand Up @@ -1089,6 +1097,25 @@ def get_endpoint_resource_arguments_from_request(
MIN_WORKERS=build_endpoint_request.min_workers,
MAX_WORKERS=build_endpoint_request.max_workers,
)
elif endpoint_resource_name == "keda-scaled-object":
return KedaScaledObjectArguments(
# Base resource arguments
RESOURCE_NAME=k8s_resource_group_name,
NAMESPACE=hmi_config.endpoint_namespace,
ENDPOINT_ID=model_endpoint_record.id,
ENDPOINT_NAME=model_endpoint_record.name,
TEAM=team,
PRODUCT=product,
CREATED_BY=created_by,
OWNER=owner,
GIT_TAG=GIT_TAG,
# Scaled Object arguments
MIN_WORKERS=build_endpoint_request.min_workers,
MAX_WORKERS=build_endpoint_request.max_workers,
# CONCURRENCY=build_endpoint_request.concurrency,
REDIS_HOST_PORT=hmi_config.cache_redis_host_port,
REDIS_DB_INDEX=hmi_config.cache_redis_db_index,
)
elif endpoint_resource_name == "service":
# Use ClusterIP by default for sync endpoint.
# In Circle CI, we use a NodePort to expose the service to CI.
Expand Down
Loading