Skip to content

Commit

Permalink
[Model Monitoring] Add invocations metric (#5581)
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper committed May 22, 2024
1 parent 9345658 commit b08f60b
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 28 deletions.
9 changes: 9 additions & 0 deletions mlrun/common/schemas/model_monitoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,12 @@ class TSDBTarget:
class HistogramDataDriftApplicationConstants:
NAME = "histogram-data-drift"
GENERAL_RESULT_NAME = "general_drift"


class PredictionsQueryConstants:
DEFAULT_AGGREGATION_GRANULARITY = "10m"
INVOCATIONS = "invocations"


class SpecialApps:
MLRUN_INFRA = "mlrun-infra"
85 changes: 84 additions & 1 deletion mlrun/model_monitoring/db/v3io_tsdb_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from datetime import datetime
from io import StringIO
from typing import Literal, Union
from typing import Literal, Optional, Union

import pandas as pd

Expand All @@ -32,6 +32,7 @@
ModelEndpointMonitoringMetricValues,
ModelEndpointMonitoringResultValues,
_compose_full_name,
_ModelEndpointMonitoringMetricValuesBase,
)
from mlrun.model_monitoring.db.stores.v3io_kv.kv_store import KVStoreBase
from mlrun.model_monitoring.db.tsdb.v3io.v3io_connector import _TSDB_BE
Expand Down Expand Up @@ -188,6 +189,8 @@ def df_to_results_values(
del metrics_without_data[full_name]

for metric in metrics_without_data.values():
if metric.full_name == get_invocations_fqn(project):
continue
metrics_values.append(
ModelEndpointMonitoringMetricNoData(
full_name=metric.full_name,
Expand Down Expand Up @@ -250,3 +253,83 @@ def df_to_metrics_values(
)

return metrics_values


def get_invocations_fqn(project: str):
return mlrun.common.schemas.model_monitoring.model_endpoints._compose_full_name(
project=project,
app=mm_constants.SpecialApps.MLRUN_INFRA,
name=mlrun.common.schemas.model_monitoring.constants.PredictionsQueryConstants.INVOCATIONS,
type=mlrun.common.schemas.model_monitoring.ModelEndpointMonitoringMetricType.METRIC,
)


def read_predictions(
*,
project: str,
endpoint_id: str,
start: Optional[Union[datetime, str]] = None,
end: Optional[Union[datetime, str]] = None,
aggregation_window: Optional[str] = None,
limit: Optional[int] = None,
) -> _ModelEndpointMonitoringMetricValuesBase:
client = mlrun.utils.v3io_clients.get_frames_client(
address=mlrun.mlconf.v3io_framesd,
container="users",
)
frames_client_kwargs = {}
if aggregation_window:
frames_client_kwargs["step"] = aggregation_window
frames_client_kwargs["aggregation_window"] = aggregation_window
if limit:
frames_client_kwargs["limit"] = limit
df: pd.DataFrame = client.read(
backend=_TSDB_BE,
table=f"pipelines/{project}/model-endpoints/predictions",
columns=["latency"],
filter=f"endpoint_id=='{endpoint_id}'",
start=start,
end=end,
aggregators="count",
**frames_client_kwargs,
)

full_name = get_invocations_fqn(project)

if df.empty:
return ModelEndpointMonitoringMetricNoData(
full_name=full_name,
type=ModelEndpointMonitoringMetricType.METRIC,
)

return ModelEndpointMonitoringMetricValues(
full_name=full_name,
values=list(
zip(
df.index,
df["count(latency)"],
)
),
)


def read_prediction_metric_for_endpoint_if_exists(
*,
project: str,
endpoint_id: str,
) -> Optional[ModelEndpointMonitoringMetric]:
predictions = read_predictions(
project=project,
endpoint_id=endpoint_id,
start="0",
end="now",
limit=1, # Read just one record, because we just want to check if there is any data for this endpoint_id
)
if predictions:
return ModelEndpointMonitoringMetric(
project=project,
app=mm_constants.SpecialApps.MLRUN_INFRA,
type=ModelEndpointMonitoringMetricType.METRIC,
name=mlrun.common.schemas.model_monitoring.constants.PredictionsQueryConstants.INVOCATIONS,
full_name=get_invocations_fqn(project),
)
81 changes: 60 additions & 21 deletions server/api/api/endpoints/model_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import asyncio
import json
from collections.abc import Coroutine
from dataclasses import dataclass
from datetime import datetime, timedelta
from http import HTTPStatus
Expand All @@ -24,7 +25,10 @@
from sqlalchemy.orm import Session

import mlrun.common.schemas as schemas
import mlrun.common.schemas.model_monitoring.constants as mm_constants
import mlrun.common.schemas.model_monitoring.model_endpoints
import mlrun.common.schemas.model_monitoring.model_endpoints as mm_endpoints
import mlrun.model_monitoring.db.stores.v3io_kv.kv_store
import mlrun.model_monitoring.db.v3io_tsdb_reader
import mlrun.utils.helpers
import server.api.api.deps
Expand Down Expand Up @@ -370,6 +374,17 @@ async def get_model_endpoint_monitoring_metrics(
)
)
)
tasks.append(
asyncio.create_task(
_wrap_coroutine_in_list(
run_in_threadpool(
mlrun.model_monitoring.db.v3io_tsdb_reader.read_prediction_metric_for_endpoint_if_exists,
project=project,
endpoint_id=endpoint_id,
)
)
)
)
await asyncio.wait(tasks)
metrics: list[mm_endpoints.ModelEndpointMonitoringMetric] = []
for task in tasks:
Expand Down Expand Up @@ -401,14 +416,14 @@ async def _get_metrics_values_data(
"""
Verify authorization, validate parameters and initialize the parameters.
:param project: The name of the project.
:param endpoint_id: The unique id of the model endpoint.
:param name: The full names of the requested results. At least one is required.
:param start: Start and end times are optional, and must be timezone aware.
:param end: See the `start` parameter.
:param auth_info: The auth info of the request.
:param project: The name of the project.
:param endpoint_id: The unique id of the model endpoint.
:param name: The full names of the requested results. At least one is required.
:param start: Start and end times are optional, and must be timezone aware.
:param end: See the `start` parameter.
:param auth_info: The auth info of the request.
:return: _MetricsValuesData object with the validated data.
:return: _MetricsValuesParams object with the validated data.
"""
await _verify_model_endpoint_read_permission(
project=project, endpoint_id=endpoint_id, auth_info=auth_info
Expand Down Expand Up @@ -451,6 +466,10 @@ async def _get_metrics_values_data(
)


async def _wrap_coroutine_in_list(x):
return [await x]


@router.get(
"/{endpoint_id}/metrics-values",
response_model=list[
Expand All @@ -475,25 +494,45 @@ async def get_model_endpoint_monitoring_metrics_values(
:returns: A list of the results values for this model endpoint.
"""
tasks: list[asyncio.Task] = []
coroutines: list[Coroutine] = []

invocations_full_name = (
mlrun.model_monitoring.db.v3io_tsdb_reader.get_invocations_fqn(params.project)
)

for metrics, type in [(params.results, "results"), (params.metrics, "metrics")]:
if metrics:
tasks.append(
asyncio.Task(
run_in_threadpool(
mlrun.model_monitoring.db.v3io_tsdb_reader.read_metrics_data,
project=params.project,
endpoint_id=params.endpoint_id,
start=params.start,
end=params.end,
metrics=metrics,
type=type,
metrics_without_invocations = list(
filter(
lambda metric: metric.full_name != invocations_full_name, metrics
)
)
if len(metrics_without_invocations) != len(metrics):
coroutines.append(
_wrap_coroutine_in_list(
run_in_threadpool(
mlrun.model_monitoring.db.v3io_tsdb_reader.read_predictions,
project=params.project,
endpoint_id=params.endpoint_id,
start=params.start,
end=params.end,
aggregation_window=mm_constants.PredictionsQueryConstants.DEFAULT_AGGREGATION_GRANULARITY,
)
)
)
coroutines.append(
run_in_threadpool(
mlrun.model_monitoring.db.v3io_tsdb_reader.read_metrics_data,
project=params.project,
endpoint_id=params.endpoint_id,
start=params.start,
end=params.end,
metrics=metrics_without_invocations,
type=type,
)
)
await asyncio.wait(tasks)

metrics_values = []
for task in tasks:
metrics_values.extend(task.result())
for result in await asyncio.gather(*coroutines):
metrics_values.extend(result)
return metrics_values
62 changes: 59 additions & 3 deletions tests/model_monitoring/test_stores/test_v3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@
ModelEndpointMonitoringMetricNoData,
ModelEndpointMonitoringMetricType,
ModelEndpointMonitoringResultValues,
_MetricPoint,
)
from mlrun.model_monitoring.db.stores.v3io_kv.kv_store import KVStoreBase
from mlrun.model_monitoring.db.v3io_tsdb_reader import _get_sql_query, read_metrics_data
from mlrun.model_monitoring.db.v3io_tsdb_reader import (
_get_sql_query,
read_metrics_data,
read_predictions,
)


@pytest.fixture(params=["default-project"])
Expand Down Expand Up @@ -373,7 +378,7 @@ def tsdb_df() -> pd.DataFrame:
return pd.DataFrame.from_records(
[
(
pd.Timestamp("2024-04-02 18:00:28+0000", tz="UTC"),
pd.Timestamp("2024-04-02 18:00:28", tz="UTC"),
"histogram-data-drift",
"70450e1ef7cc9506d42369aeeb056eaaaa0bb8bd",
0,
Expand All @@ -383,7 +388,7 @@ def tsdb_df() -> pd.DataFrame:
"2024-04-02 17:59:28.000000+00:00",
),
(
pd.Timestamp("2024-04-02 18:00:28+0000", tz="UTC"),
pd.Timestamp("2024-04-02 18:00:28", tz="UTC"),
"histogram-data-drift",
"70450e1ef7cc9506d42369aeeb056eaaaa0bb8bd",
0,
Expand All @@ -407,6 +412,24 @@ def tsdb_df() -> pd.DataFrame:
)


@pytest.fixture
def predictions_df() -> pd.DataFrame:
return pd.DataFrame.from_records(
[
(
pd.Timestamp("2024-04-02 18:00:00", tz="UTC"),
5,
),
(pd.Timestamp("2024-04-02 18:01:00", tz="UTC"), 10),
],
index="time",
columns=[
"time",
"count(latency)",
],
)


@pytest.fixture
def _mock_frames_client(tsdb_df: pd.DataFrame) -> Iterator[None]:
frames_client_mock = Mock()
Expand All @@ -418,6 +441,17 @@ def _mock_frames_client(tsdb_df: pd.DataFrame) -> Iterator[None]:
yield


@pytest.fixture
def _mock_frames_client_predictions(predictions_df: pd.DataFrame) -> Iterator[None]:
frames_client_mock = Mock()
frames_client_mock.read = Mock(return_value=predictions_df)

with patch.object(
mlrun.utils.v3io_clients, "get_frames_client", return_value=frames_client_mock
):
yield


@pytest.mark.usefixtures("_mock_frames_client")
def test_read_results_data() -> None:
data = read_metrics_data(
Expand Down Expand Up @@ -453,3 +487,25 @@ def test_read_results_data() -> None:
counter = Counter([type(values) for values in data])
assert counter[ModelEndpointMonitoringResultValues] == 2
assert counter[ModelEndpointMonitoringMetricNoData] == 1


@pytest.mark.usefixtures("_mock_frames_client_predictions")
def test_read_predictions() -> None:
result = read_predictions(
project="fictitious-one",
endpoint_id="70450e1ef7cc9506d42369aeeb056eaaaa0bb8bd",
start=datetime.fromtimestamp(0),
end=datetime.now(),
aggregation_window="1m",
)
assert result.full_name == "fictitious-one.mlrun-infra.metric.invocations"
assert result.values == [
_MetricPoint(
timestamp=pd.Timestamp("2024-04-02 18:00:00", tz="UTC"),
value=5.0,
),
_MetricPoint(
timestamp=pd.Timestamp("2024-04-02 18:01:00", tz="UTC"),
value=10.0,
),
]
Loading

0 comments on commit b08f60b

Please sign in to comment.