Skip to content

Commit

Permalink
[Model Monitoring] Support alerting with different result types (#5621)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eyal-Danieli committed May 26, 2024
1 parent f1f1b8f commit 2f37ac8
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 52 deletions.
1 change: 1 addition & 0 deletions mlrun/common/schemas/model_monitoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ class ResultKindApp(Enum):
concept_drift = 1
model_performance = 2
system_performance = 3
custom = 4


class ResultStatusApp(IntEnum):
Expand Down
47 changes: 36 additions & 11 deletions mlrun/model_monitoring/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
HistogramDataDriftApplicationConstants,
MetricData,
ResultData,
ResultKindApp,
ResultStatusApp,
WriterEvent,
WriterEventKind,
Expand Down Expand Up @@ -117,26 +118,49 @@ def __init__(self, project: str, tsdb_secret_provider=None) -> None:
)
self._endpoints_records = {}

@staticmethod
def _generate_event_on_drift(
entity_id: str, drift_status: str, event_value: dict, project_name: str
self,
entity_id: str,
result_status: int,
event_value: dict,
project_name: str,
result_kind: int,
) -> None:
logger.info("Sending an event")
entity = mlrun.common.schemas.alert.EventEntities(
kind=alert_objects.EventEntityKind.MODEL_ENDPOINT_RESULT,
project=project_name,
ids=[entity_id],
)
event_kind = (
alert_objects.EventKind.DATA_DRIFT_DETECTED
if drift_status == ResultStatusApp.detected.value
else alert_objects.EventKind.DATA_DRIFT_SUSPECTED

event_kind = self._generate_alert_event_kind(
result_status=result_status, result_kind=result_kind
)

event_data = mlrun.common.schemas.Event(
kind=event_kind, entity=entity, value_dict=event_value
kind=alert_objects.EventKind(value=event_kind),
entity=entity,
value_dict=event_value,
)
mlrun.get_run_db().generate_event(event_kind, event_data)

@staticmethod
def _generate_alert_event_kind(
result_kind: int, result_status: int
) -> alert_objects.EventKind:
"""Generate the required Event Kind format for the alerting system"""
if result_kind == ResultKindApp.custom.value:
# Custom kind is represented as an anomaly detection
event_kind = "mm_app_anomaly"
else:
event_kind = ResultKindApp(value=result_kind).name

if result_status == ResultStatusApp.detected.value:
event_kind = f"{event_kind}_detected"
else:
event_kind = f"{event_kind}_suspected"
return alert_objects.EventKind(value=event_kind)

@staticmethod
def _reconstruct_event(event: _RawEvent) -> tuple[_AppResultEvent, WriterEventKind]:
"""
Expand Down Expand Up @@ -209,14 +233,15 @@ def do(self, event: _RawEvent) -> None:
"result_value": event[ResultData.RESULT_VALUE],
}
self._generate_event_on_drift(
get_result_instance_fqn(
entity_id=get_result_instance_fqn(
event[WriterEvent.ENDPOINT_ID],
event[WriterEvent.APPLICATION_NAME],
event[ResultData.RESULT_NAME],
),
event[ResultData.RESULT_STATUS],
event_value,
self.project,
result_status=event[ResultData.RESULT_STATUS],
event_value=event_value,
project_name=self.project,
result_kind=event[ResultData.RESULT_KIND],
)

if (
Expand Down
158 changes: 117 additions & 41 deletions tests/system/alerts/test_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import time
import typing

import deepdiff
import pytest

import mlrun
Expand Down Expand Up @@ -83,30 +84,121 @@ def test_job_failure_alert(self):
nuclio_function_url, expected_notifications
)

@staticmethod
def _generate_events(
endpoint_id: str, result_name: str
) -> list[dict[str, typing.Any]]:
data_drift_example = {
mm_constants.WriterEvent.ENDPOINT_ID: endpoint_id,
mm_constants.WriterEvent.APPLICATION_NAME: mm_constants.HistogramDataDriftApplicationConstants.NAME,
mm_constants.WriterEvent.START_INFER_TIME: "2023-09-11T12:00:00",
mm_constants.WriterEvent.END_INFER_TIME: "2023-09-11T12:01:00",
mm_constants.WriterEvent.EVENT_KIND: "result",
mm_constants.WriterEvent.DATA: json.dumps(
{
mm_constants.ResultData.RESULT_NAME: result_name,
mm_constants.ResultData.RESULT_KIND: mm_constants.ResultKindApp.data_drift.value,
mm_constants.ResultData.RESULT_VALUE: 0.5,
mm_constants.ResultData.RESULT_STATUS: mm_constants.ResultStatusApp.detected.value,
mm_constants.ResultData.RESULT_EXTRA_DATA: json.dumps(
{"threshold": 0.3}
),
mm_constants.ResultData.CURRENT_STATS: "",
}
),
}

concept_drift_example = {
mm_constants.WriterEvent.ENDPOINT_ID: endpoint_id,
mm_constants.WriterEvent.APPLICATION_NAME: mm_constants.HistogramDataDriftApplicationConstants.NAME,
mm_constants.WriterEvent.START_INFER_TIME: "2023-09-11T12:00:00",
mm_constants.WriterEvent.END_INFER_TIME: "2023-09-11T12:01:00",
mm_constants.WriterEvent.EVENT_KIND: "result",
mm_constants.WriterEvent.DATA: json.dumps(
{
mm_constants.ResultData.RESULT_NAME: result_name,
mm_constants.ResultData.RESULT_KIND: mm_constants.ResultKindApp.concept_drift.value,
mm_constants.ResultData.RESULT_VALUE: 0.9,
mm_constants.ResultData.RESULT_STATUS: mm_constants.ResultStatusApp.potential_detection.value,
mm_constants.ResultData.RESULT_EXTRA_DATA: json.dumps(
{"threshold": 0.7}
),
mm_constants.ResultData.CURRENT_STATS: "",
}
),
}

anomaly_example = {
mm_constants.WriterEvent.ENDPOINT_ID: endpoint_id,
mm_constants.WriterEvent.APPLICATION_NAME: mm_constants.HistogramDataDriftApplicationConstants.NAME,
mm_constants.WriterEvent.START_INFER_TIME: "2023-09-11T12:00:00",
mm_constants.WriterEvent.END_INFER_TIME: "2023-09-11T12:01:00",
mm_constants.WriterEvent.EVENT_KIND: "result",
mm_constants.WriterEvent.DATA: json.dumps(
{
mm_constants.ResultData.RESULT_NAME: result_name,
mm_constants.ResultData.RESULT_KIND: mm_constants.ResultKindApp.custom.value,
mm_constants.ResultData.RESULT_VALUE: 0.9,
mm_constants.ResultData.RESULT_STATUS: mm_constants.ResultStatusApp.detected.value,
mm_constants.ResultData.RESULT_EXTRA_DATA: json.dumps(
{"threshold": 0.4}
),
mm_constants.ResultData.CURRENT_STATS: "",
}
),
}

return [data_drift_example, concept_drift_example, anomaly_example]

def _generate_alerts(
self, nuclio_function_url: str, result_endpoint_fqn
) -> list[str]:
"""Generate alerts for the different result kind and return data from the expected notifications."""
expected_notifications = []
alerts_kind_to_test = [
alert_objects.EventKind.DATA_DRIFT_DETECTED,
alert_objects.EventKind.CONCEPT_DRIFT_SUSPECTED,
alert_objects.EventKind.MM_APP_ANOMALY_DETECTED,
]

for alert_kind in alerts_kind_to_test:
alert_name = f"drift_webhook_{alert_kind.value}"
alert_summary = "Model is drifting"
self._create_alert_config(
self.project_name,
alert_name,
alert_objects.EventEntityKind.MODEL_ENDPOINT_RESULT,
result_endpoint_fqn,
alert_summary,
alert_kind,
self._generate_drift_notifications(
nuclio_function_url, alert_kind.value
),
)
expected_notifications.extend(
[
f"first drift of {alert_kind.value}",
f"second drift of {alert_kind.value}",
]
)
return expected_notifications

def test_drift_detection_alert(self):
"""
validate that an alert is sent in case of a model drift detection
validate that an alert is sent with different result kind and different detection result
"""
# enable model monitoring - deploy writer function
self.project.enable_model_monitoring(image=self.image or "mlrun/mlrun")
# deploy nuclio func for storing notifications, to validate an alert notifications were sent on drift detection
nuclio_function_url = notification_helpers.deploy_notification_nuclio(
self.project, self.image
)

# create an alert with two webhook notifications
alert_name = "drift_webhook"
alert_summary = "Model is drifting"
endpoint_id = "demo-endpoint"
notifications = self._generate_drift_notifications(nuclio_function_url)
self._create_alert_config(
self.project_name,
alert_name,
alert_objects.EventEntityKind.MODEL_ENDPOINT_RESULT,
get_default_result_instance_fqn(endpoint_id),
alert_summary,
alert_objects.EventKind.DATA_DRIFT_DETECTED,
notifications,

# generate alerts for the different result kind and return text from the expected notifications that will be
# used later to validate that the notifications were sent as expected
expected_notifications = self._generate_alerts(
nuclio_function_url, get_default_result_instance_fqn(endpoint_id)
)

# waits for the writer function to be deployed
Expand All @@ -131,30 +223,12 @@ def test_drift_detection_alert(self):
result_name = (
mm_constants.HistogramDataDriftApplicationConstants.GENERAL_RESULT_NAME
)
data = {
mm_constants.WriterEvent.ENDPOINT_ID: endpoint_id,
mm_constants.WriterEvent.APPLICATION_NAME: mm_constants.HistogramDataDriftApplicationConstants.NAME,
mm_constants.WriterEvent.START_INFER_TIME: "2023-09-11T12:00:00",
mm_constants.WriterEvent.END_INFER_TIME: "2023-09-11T12:01:00",
mm_constants.WriterEvent.EVENT_KIND: "result",
mm_constants.WriterEvent.DATA: json.dumps(
{
mm_constants.ResultData.RESULT_NAME: result_name,
mm_constants.ResultData.RESULT_KIND: mm_constants.ResultKindApp.data_drift.value,
mm_constants.ResultData.RESULT_VALUE: 0.5,
mm_constants.ResultData.RESULT_STATUS: mm_constants.ResultStatusApp.detected.value,
mm_constants.ResultData.RESULT_EXTRA_DATA: {"threshold": 0.3},
mm_constants.ResultData.CURRENT_STATS: "",
}
),
}
output_stream.push([data])

output_stream.push(self._generate_events(endpoint_id, result_name))

# wait for the nuclio function to check for the stream inputs
time.sleep(10)

# Validate that the notifications were sent on the drift
expected_notifications = ["first drift", "second drift"]
self._validate_notifications_on_nuclio(
nuclio_function_url, expected_notifications
)
Expand Down Expand Up @@ -182,7 +256,7 @@ def _generate_failure_notifications(nuclio_function_url):
return [alert_objects.AlertNotification(notification=notification)]

@staticmethod
def _generate_drift_notifications(nuclio_function_url):
def _generate_drift_notifications(nuclio_function_url, result_kind):
first_notification = mlrun.common.schemas.Notification(
kind="webhook",
name="drift",
Expand All @@ -194,7 +268,7 @@ def _generate_drift_notifications(nuclio_function_url):
"url": nuclio_function_url,
"override_body": {
"operation": "add",
"data": "first drift",
"data": f"first drift of {result_kind}",
},
},
secret_params={
Expand All @@ -212,7 +286,7 @@ def _generate_drift_notifications(nuclio_function_url):
"url": nuclio_function_url,
"override_body": {
"operation": "add",
"data": "second drift",
"data": f"second drift of {result_kind}",
},
},
secret_params={
Expand Down Expand Up @@ -252,7 +326,9 @@ def _create_alert_config(

@staticmethod
def _validate_notifications_on_nuclio(nuclio_function_url, expected_notifications):
for notification in notification_helpers.get_notifications_from_nuclio_and_reset_notification_cache(
nuclio_function_url
):
assert notification in expected_notifications
sent_notifications = list(
notification_helpers.get_notifications_from_nuclio_and_reset_notification_cache(
nuclio_function_url
)
)
assert deepdiff.DeepDiff(sent_notifications, expected_notifications) == {}

0 comments on commit 2f37ac8

Please sign in to comment.