Skip to content

Commit

Permalink
[API] Add model endpoint list metrics/results endpoint (#5484)
Browse files Browse the repository at this point in the history
  • Loading branch information
jond01 committed May 5, 2024
1 parent 1e63c3a commit ecff614
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 52 deletions.
2 changes: 2 additions & 0 deletions mlrun/common/schemas/model_monitoring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
ModelEndpoint,
ModelEndpointList,
ModelEndpointMetadata,
ModelEndpointMonitoringMetric,
ModelEndpointMonitoringMetricType,
ModelEndpointSpec,
ModelEndpointStatus,
)
3 changes: 3 additions & 0 deletions mlrun/common/schemas/model_monitoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ class ModelMonitoringAppLabel:
KEY = "mlrun__type"
VAL = "mlrun__model-monitoring-application"

def __str__(self) -> str:
return f"{self.KEY}={self.VAL}"


class ControllerPolicy:
BASE_PERIOD = "base_period"
Expand Down
14 changes: 13 additions & 1 deletion mlrun/common/schemas/model_monitoring/model_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import enum
import json
Expand All @@ -21,6 +20,7 @@
from pydantic.main import Extra

import mlrun.common.model_monitoring
import mlrun.common.types

from ..object import ObjectKind, ObjectSpec, ObjectStatus
from .constants import (
Expand Down Expand Up @@ -292,6 +292,18 @@ class ModelEndpointList(BaseModel):
endpoints: list[ModelEndpoint] = []


class ModelEndpointMonitoringMetricType(mlrun.common.types.StrEnum):
RESULT = "result"


class ModelEndpointMonitoringMetric(BaseModel):
project: str
app: str
type: ModelEndpointMonitoringMetricType
name: str
full_name: str


def _mapping_attributes(
base_model: BaseModel,
flattened_dictionary: dict,
Expand Down
67 changes: 65 additions & 2 deletions mlrun/model_monitoring/db/stores/v3io_kv/kv_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import json
import os
Expand Down Expand Up @@ -41,7 +40,7 @@ class KVStoreBase(mlrun.model_monitoring.db.StoreBase):
client and usually the KV table can be found under v3io:///users/pipelines/project-name/model-endpoints/endpoints/.
"""

def __init__(self, project: str, access_key: str):
def __init__(self, project: str, access_key: typing.Optional[str] = None) -> None:
super().__init__(project=project)
# Initialize a V3IO client instance
self.access_key = access_key or os.environ.get("V3IO_ACCESS_KEY")
Expand Down Expand Up @@ -703,3 +702,67 @@ def get_v3io_monitoring_apps_container(project_name: str) -> str:
@staticmethod
def _get_monitoring_schedules_container(project_name: str) -> str:
return f"users/pipelines/{project_name}/monitoring-schedules/functions"

def _extract_metrics_from_items(
self, app_items: list[dict[str, str]]
) -> list[mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetric]:
metrics: list[
mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetric
] = []
for app_item in app_items:
# See https://www.iguazio.com/docs/latest-release/services/data-layer/reference/system-attributes/#sys-attr-__name
app_name = app_item.pop("__name")
if app_name == ".#schema":
continue
for result_name in app_item:
metrics.append(
mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetric(
project=self.project,
app=app_name,
type=mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetricType.RESULT,
name=result_name,
full_name=".".join(
[
self.project,
app_name,
mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetricType.RESULT,
result_name,
]
),
)
)
return metrics

def get_model_endpoint_metrics(
self, endpoint_id: str
) -> list[mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetric]:
"""Get model monitoring results and metrics on the endpoint"""
metrics: list[
mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetric
] = []
container = self.get_v3io_monitoring_apps_container(self.project)
try:
response = self.client.kv.scan(container=container, table_path=endpoint_id)
except v3io.dataplane.response.HttpResponseError as err:
if err.status_code == HTTPStatus.NOT_FOUND:
logger.warning(
"Attempt getting metrics and results - no data. Check the "
"project name, endpoint, or wait for the applications to start.",
container=container,
table_path=endpoint_id,
)
return []
raise

while True:
metrics.extend(self._extract_metrics_from_items(response.output.items))
if response.output.last:
break
# TODO: Use AIO client: `v3io.aio.dataplane.client.Client`
response = self.client.kv.scan(
container=container,
table_path=endpoint_id,
marker=response.output.next_marker,
)

return metrics
11 changes: 7 additions & 4 deletions mlrun/model_monitoring/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from v3io_frames.frames_pb2 import IGNORE

import mlrun.common.model_monitoring
import mlrun.common.schemas
import mlrun.common.schemas.alert as alert_constants
import mlrun.model_monitoring
import mlrun.model_monitoring.db.stores
Expand Down Expand Up @@ -180,16 +181,17 @@ def _update_tsdb(self, event: _AppResultEvent) -> None:

@staticmethod
def _generate_event_on_drift(
uid: str, drift_status: str, event_value: dict, project_name: str
):
model_endpoint: str, drift_status: str, event_value: dict, project_name: str
) -> None:
if (
drift_status == ResultStatusApp.detected
or drift_status == ResultStatusApp.potential_detection
):
logger.info("Sending an alert")
entity = {
"kind": alert_constants.EventEntityKind.MODEL,
"project": project_name,
"id": uid,
"model_endpoint": model_endpoint,
}
event_kind = (
alert_constants.EventKind.DRIFT_DETECTED
Expand Down Expand Up @@ -230,6 +232,8 @@ def do(self, event: _RawEvent) -> None:
logger.info("Starting to write event", event=event)
self._update_tsdb(event)
self._update_kv_db(event)
logger.info("Completed event DB writes")

_Notifier(event=event, notification_pusher=self._custom_notifier).notify()

if mlrun.mlconf.alerts.mode == mlrun.common.schemas.alert.AlertsModes.enabled:
Expand All @@ -251,4 +255,3 @@ def do(self, event: _RawEvent) -> None:
event_value,
self.project,
)
logger.info("Completed event DB writes")
4 changes: 1 addition & 3 deletions mlrun/projects/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -3660,9 +3660,7 @@ def list_model_monitoring_functions(
:returns: List of function objects.
"""

model_monitoring_labels_list = [
f"{mm_constants.ModelMonitoringAppLabel.KEY}={mm_constants.ModelMonitoringAppLabel.VAL}"
]
model_monitoring_labels_list = [str(mm_constants.ModelMonitoringAppLabel())]
if labels:
model_monitoring_labels_list += labels
return self.list_functions(
Expand Down
43 changes: 41 additions & 2 deletions server/api/api/endpoints/model_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import json
from http import HTTPStatus
from typing import Optional
from typing import Literal, Optional

from fastapi import APIRouter, Depends, Query
from fastapi.concurrency import run_in_threadpool
from sqlalchemy.orm import Session

import mlrun.common.schemas
import mlrun.model_monitoring.db.stores.v3io_kv.kv_store
import server.api.api.deps
import server.api.crud
import server.api.utils.auth.verifier
Expand Down Expand Up @@ -317,3 +318,41 @@ async def get_model_endpoint(
end=end,
feature_analysis=feature_analysis,
)


@router.get(
"/{endpoint_id}/metrics",
response_model=list[
mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetric
],
)
async def get_model_endpoint_monitoring_metrics(
project: str,
endpoint_id: str,
auth_info: mlrun.common.schemas.AuthInfo = Depends(
server.api.api.deps.authenticate_request
),
type: Literal["results"] = "results",
) -> list[mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetric]:
"""
:param project: The name of the project.
:param endpoint_id: The unique id of the model endpoint.
:param auth_info: The auth info of the request.
:param type: The type of the metrics to return. Currently, only "results"
is supported.
:returns: A list of the application results for this model endpoint.
"""
await server.api.utils.auth.verifier.AuthVerifier().query_project_resource_permissions(
mlrun.common.schemas.AuthorizationResourceTypes.model_endpoint,
project_name=project,
resource_name=endpoint_id,
action=mlrun.common.schemas.AuthorizationAction.read,
auth_info=auth_info,
)
return await run_in_threadpool(
mlrun.model_monitoring.db.stores.v3io_kv.kv_store.KVStoreBase(
project=project
).get_model_endpoint_metrics,
endpoint_id=endpoint_id,
)
67 changes: 28 additions & 39 deletions server/api/api/endpoints/model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from dataclasses import dataclass
from typing import Annotated
from typing import Annotated, Optional

from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
Expand All @@ -35,6 +35,29 @@ class _CommonParams:
project: str
auth_info: mlrun.common.schemas.AuthInfo
db_session: Session
model_monitoring_access_key: Optional[str] = None

def __post_init__(self) -> None:
if not mlrun.mlconf.is_ce_mode():
# Get V3IO Access Key
self.model_monitoring_access_key = process_model_monitoring_secret(
self.db_session,
self.project,
mlrun.common.schemas.model_monitoring.ProjectSecretKeys.ACCESS_KEY,
)


async def _verify_authorization(
project: str, auth_info: mlrun.common.schemas.AuthInfo
) -> None:
"""Verify project authorization"""
await server.api.utils.auth.verifier.AuthVerifier().query_project_resource_permissions(
resource_type=mlrun.common.schemas.AuthorizationResourceTypes.function,
project_name=project,
resource_name=mlrun.common.schemas.model_monitoring.MonitoringFunctionNames.APPLICATION_CONTROLLER,
action=mlrun.common.schemas.AuthorizationAction.store,
auth_info=auth_info,
)


async def _common_parameters(
Expand All @@ -52,13 +75,7 @@ async def _common_parameters(
:param db_session: A session that manages the current dialog with the database.
:returns: A `_CommonParameters` object that contains the input data.
"""
await server.api.utils.auth.verifier.AuthVerifier().query_project_resource_permissions(
resource_type=mlrun.common.schemas.AuthorizationResourceTypes.function,
project_name=project,
resource_name=mlrun.common.schemas.model_monitoring.MonitoringFunctionNames.APPLICATION_CONTROLLER,
action=mlrun.common.schemas.AuthorizationAction.store,
auth_info=auth_info,
)
await _verify_authorization(project=project, auth_info=auth_info)
return _CommonParams(
project=project,
auth_info=auth_info,
Expand Down Expand Up @@ -89,21 +106,11 @@ async def enable_model_monitoring(
By default, the image is mlrun/mlrun.
:param deploy_histogram_data_drift_app: If true, deploy the default histogram-based data drift application.
"""

model_monitoring_access_key = None
if not mlrun.mlconf.is_ce_mode():
# Generate V3IO Access Key
model_monitoring_access_key = process_model_monitoring_secret(
commons.db_session,
commons.project,
mlrun.common.schemas.model_monitoring.ProjectSecretKeys.ACCESS_KEY,
)

MonitoringDeployment(
project=commons.project,
auth_info=commons.auth_info,
db_session=commons.db_session,
model_monitoring_access_key=model_monitoring_access_key,
model_monitoring_access_key=commons.model_monitoring_access_key,
).deploy_monitoring_functions(
image=image,
base_period=base_period,
Expand All @@ -128,15 +135,6 @@ async def update_model_monitoring_controller(
function, which is a real time nuclio functino, will be deployed with the same
image. By default, the image is mlrun/mlrun.
"""

model_monitoring_access_key = None
if not mlrun.mlconf.is_ce_mode():
# Generate V3IO Access Key
model_monitoring_access_key = process_model_monitoring_secret(
commons.db_session,
commons.project,
mlrun.common.schemas.model_monitoring.ProjectSecretKeys.ACCESS_KEY,
)
try:
# validate that the model monitoring stream has not yet been deployed
mlrun.runtimes.nuclio.function.get_nuclio_deploy_status(
Expand All @@ -156,7 +154,7 @@ async def update_model_monitoring_controller(
project=commons.project,
auth_info=commons.auth_info,
db_session=commons.db_session,
model_monitoring_access_key=model_monitoring_access_key,
model_monitoring_access_key=commons.model_monitoring_access_key,
).deploy_model_monitoring_controller(
controller_image=image,
base_period=base_period,
Expand All @@ -175,18 +173,9 @@ def deploy_histogram_data_drift_app(
:param commons: The common parameters of the request.
:param image: The image of the application, defaults to "mlrun/mlrun".
"""
model_monitoring_access_key = None
if not mlrun.mlconf.is_ce_mode():
# Generate V3IO Access Key
model_monitoring_access_key = process_model_monitoring_secret(
commons.db_session,
commons.project,
mlrun.common.schemas.model_monitoring.ProjectSecretKeys.ACCESS_KEY,
)

MonitoringDeployment(
project=commons.project,
auth_info=commons.auth_info,
db_session=commons.db_session,
model_monitoring_access_key=model_monitoring_access_key,
model_monitoring_access_key=commons.model_monitoring_access_key,
).deploy_histogram_data_drift_app(image=image)
1 change: 0 additions & 1 deletion tests/model_monitoring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
13 changes: 13 additions & 0 deletions tests/model_monitoring/test_stores/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

0 comments on commit ecff614

Please sign in to comment.