Skip to content

Commit

Permalink
[Model Monitoring] Implement TDEngine read metrics API (#5642)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eyal-Danieli committed May 30, 2024
1 parent 7d1bfc6 commit fc2d6db
Show file tree
Hide file tree
Showing 11 changed files with 621 additions and 273 deletions.
186 changes: 152 additions & 34 deletions mlrun/model_monitoring/db/tsdb/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import pandas as pd

import mlrun.common.schemas.model_monitoring as mm_schemas
import mlrun.model_monitoring.db.tsdb.helpers
import mlrun.model_monitoring.helpers
from mlrun.utils import logger


class TSDBConnector(ABC):
Expand Down Expand Up @@ -99,39 +102,6 @@ def get_model_endpoint_real_time_metrics(
"""
pass

@abstractmethod
def get_records(
self,
table: str,
start: str,
end: str,
columns: typing.Optional[list[str]] = None,
filter_query: str = "",
) -> pd.DataFrame:
"""
Getting records from TSDB data collection.
:param table: Table name, e.g. 'metrics', 'app_results'.
:param start: The start time of the metrics.
If using V3IO, can be represented by a string containing an RFC 3339 time, a Unix
timestamp in milliseconds, a relative time (`'now'` or `'now-[0-9]+[mhd]'`, where
`m` = minutes, `h` = hours, `'d'` = days, and `'s'` = seconds), or 0 for the earliest
time.
If using TDEngine, can be represented by datetime.
:param end: The end time of the metrics.
If using V3IO, can be represented by a string containing an RFC 3339 time, a Unix
timestamp in milliseconds, a relative time (`'now'` or `'now-[0-9]+[mhd]'`, where
`m` = minutes, `h` = hours, `'d'` = days, and `'s'` = seconds), or 0 for the earliest
time.
If using TDEngine, can be represented by datetime.
:param columns: Columns to include in the result.
:param filter_query: Optional filter expression as a string. The filter structure depends on the TSDB
connector type.
:return: DataFrame with the provided attributes from the data collection.
:raise: MLRunNotFoundError if the provided table wasn't found.
"""

def create_tables(self) -> None:
"""
Create the TSDB tables using the TSDB connector. At the moment we support 3 types of tables:
Expand Down Expand Up @@ -182,6 +152,8 @@ def read_predictions(
start: datetime,
end: datetime,
aggregation_window: typing.Optional[str] = None,
agg_funcs: typing.Optional[list[str]] = None,
limit: typing.Optional[int] = None,
) -> typing.Union[
mm_schemas.ModelEndpointMonitoringMetricValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
Expand All @@ -193,7 +165,15 @@ def read_predictions(
:param endpoint_id: The model endpoint identifier.
:param start: The start time of the query.
:param end: The end time of the query.
:param aggregation_window: On what time window length should the invocations be aggregated.
:param aggregation_window: On what time window length should the invocations be aggregated. If provided,
the `agg_funcs` must be provided as well. Provided as a string in the format of '1m',
'1h', etc.
:param agg_funcs: List of aggregation functions to apply on the invocations. If provided, the
`aggregation_window` must be provided as well. Provided as a list of strings in
the format of ['sum', 'avg', 'count', ...]
:param limit: The maximum number of records to return.
:raise mlrun.errors.MLRunInvalidArgumentError: If only one of `aggregation_window` and `agg_funcs` is provided.
:return: Metric values object or no data object.
"""

Expand All @@ -209,3 +189,141 @@ def read_prediction_metric_for_endpoint_if_exists(
:return: `None` if the invocations metric does not exist, otherwise return the
corresponding metric object.
"""

@staticmethod
def df_to_metrics_values(
*,
df: pd.DataFrame,
metrics: list[mm_schemas.ModelEndpointMonitoringMetric],
project: str,
) -> list[
typing.Union[
mm_schemas.ModelEndpointMonitoringMetricValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
]
]:
"""
Parse a time-indexed DataFrame of metrics from the TSDB into a list of
metrics values per distinct results.
When a metric is not found in the DataFrame, it is represented in a no-data object.
"""
metrics_without_data = {metric.full_name: metric for metric in metrics}

metrics_values: list[
typing.Union[
mm_schemas.ModelEndpointMonitoringMetricValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
]
] = []
if not df.empty:
grouped = df.groupby(
[
mm_schemas.WriterEvent.APPLICATION_NAME,
mm_schemas.MetricData.METRIC_NAME,
],
observed=False,
)
else:
logger.debug("No metrics", missing_metrics=metrics_without_data.keys())
grouped = []
for (app_name, name), sub_df in grouped:
full_name = mlrun.model_monitoring.helpers._compose_full_name(
project=project,
app=app_name,
name=name,
type=mm_schemas.ModelEndpointMonitoringMetricType.METRIC,
)
metrics_values.append(
mm_schemas.ModelEndpointMonitoringMetricValues(
full_name=full_name,
values=list(
zip(
sub_df.index,
sub_df[mm_schemas.MetricData.METRIC_VALUE],
)
), # pyright: ignore[reportArgumentType]
)
)
del metrics_without_data[full_name]

for metric in metrics_without_data.values():
metrics_values.append(
mm_schemas.ModelEndpointMonitoringMetricNoData(
full_name=metric.full_name,
type=mm_schemas.ModelEndpointMonitoringMetricType.METRIC,
)
)

return metrics_values

@staticmethod
def df_to_results_values(
*,
df: pd.DataFrame,
metrics: list[mm_schemas.ModelEndpointMonitoringMetric],
project: str,
) -> list[
typing.Union[
mm_schemas.ModelEndpointMonitoringResultValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
]
]:
"""
Parse a time-indexed DataFrame of results from the TSDB into a list of
results values per distinct results.
When a result is not found in the DataFrame, it is represented in no-data object.
"""
metrics_without_data = {metric.full_name: metric for metric in metrics}

metrics_values: list[
typing.Union[
mm_schemas.ModelEndpointMonitoringResultValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
]
] = []
if not df.empty:
grouped = df.groupby(
[
mm_schemas.WriterEvent.APPLICATION_NAME,
mm_schemas.ResultData.RESULT_NAME,
],
observed=False,
)
else:
grouped = []
logger.debug("No results", missing_results=metrics_without_data.keys())
for (app_name, name), sub_df in grouped:
result_kind = mlrun.model_monitoring.db.tsdb.helpers._get_result_kind(
sub_df
)
full_name = mlrun.model_monitoring.helpers._compose_full_name(
project=project, app=app_name, name=name
)
metrics_values.append(
mm_schemas.ModelEndpointMonitoringResultValues(
full_name=full_name,
result_kind=result_kind,
values=list(
zip(
sub_df.index,
sub_df[mm_schemas.ResultData.RESULT_VALUE],
sub_df[mm_schemas.ResultData.RESULT_STATUS],
)
), # pyright: ignore[reportArgumentType]
)
)
del metrics_without_data[full_name]

for metric in metrics_without_data.values():
if metric.full_name == mlrun.model_monitoring.helpers.get_invocations_fqn(
project
):
continue
metrics_values.append(
mm_schemas.ModelEndpointMonitoringMetricNoData(
full_name=metric.full_name,
type=mm_schemas.ModelEndpointMonitoringMetricType.RESULT,
)
)

return metrics_values
30 changes: 30 additions & 0 deletions mlrun/model_monitoring/db/tsdb/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd

import mlrun.common.schemas.model_monitoring as mm_schemas
from mlrun.utils import logger


def _get_result_kind(result_df: pd.DataFrame) -> mm_schemas.ResultKindApp:
kind_series = result_df[mm_schemas.ResultData.RESULT_KIND]
unique_kinds = kind_series.unique()
if len(unique_kinds) > 1:
logger.warning(
"The result has more than one kind",
kinds=list(unique_kinds),
application_name=result_df[mm_schemas.WriterEvent.APPLICATION_NAME],
result_name=result_df[mm_schemas.ResultData.RESULT_NAME],
)
return unique_kinds[0]
63 changes: 48 additions & 15 deletions mlrun/model_monitoring/db/tsdb/tdengine/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import datetime
from dataclasses import dataclass
from io import StringIO
from typing import Union
from typing import Optional, Union

import mlrun.common.schemas.model_monitoring as mm_schemas
import mlrun.common.types
Expand Down Expand Up @@ -95,7 +95,7 @@ def _delete_subtable_query(
values: dict[str, Union[str, int, float, datetime.datetime]],
) -> str:
values = " AND ".join(
f"{val} like '{values[val]}'" for val in self.tags if val in values
f"{val} LIKE '{values[val]}'" for val in self.tags if val in values
)
if not values:
raise mlrun.errors.MLRunInvalidArgumentError(
Expand All @@ -114,7 +114,7 @@ def _get_subtables_query(
values: dict[str, Union[str, int, float, datetime.datetime]],
) -> str:
values = " AND ".join(
f"{val} like '{values[val]}'" for val in self.tags if val in values
f"{val} LIKE '{values[val]}'" for val in self.tags if val in values
)
if not values:
raise mlrun.errors.MLRunInvalidArgumentError(
Expand All @@ -125,33 +125,65 @@ def _get_subtables_query(
@staticmethod
def _get_records_query(
table: str,
start: str,
end: str,
start: datetime,
end: datetime,
columns_to_filter: list[str] = None,
filter_query: str = "",
filter_query: Optional[str] = None,
interval: Optional[str] = None,
limit: int = 0,
agg_funcs: Optional[list] = None,
sliding_window_step: Optional[str] = None,
timestamp_column: str = "time",
database: str = _MODEL_MONITORING_DATABASE,
) -> str:
if agg_funcs and not columns_to_filter:
raise mlrun.errors.MLRunInvalidArgumentError(
"`columns_to_filter` must be provided when using aggregate functions"
)

# if aggregate function or interval is provided, the other must be provided as well
if interval and not agg_funcs:
raise mlrun.errors.MLRunInvalidArgumentError(
"`agg_funcs` must be provided when using interval"
)

if sliding_window_step and not interval:
raise mlrun.errors.MLRunInvalidArgumentError(
"`interval` must be provided when using sliding window"
)

with StringIO() as query:
query.write("SELECT ")
if columns_to_filter:
if interval:
query.write("_wstart, _wend, ")
if agg_funcs:
query.write(
", ".join(
[f"{a}({col})" for a in agg_funcs for col in columns_to_filter]
)
)
elif columns_to_filter:
query.write(", ".join(columns_to_filter))
else:
query.write("*")
query.write(f" from {database}.{table}")
query.write(f" FROM {database}.{table}")

if any([filter_query, start, end]):
query.write(" where ")
query.write(" WHERE ")
if filter_query:
query.write(f"{filter_query} and ")
query.write(f"{filter_query} AND ")
if start:
query.write(f"{timestamp_column} >= '{start}'" + " and ")
query.write(f"{timestamp_column} >= '{start}'" + " AND ")
if end:
query.write(f"{timestamp_column} <= '{end}'")
full_query = query.getvalue()
if full_query.endswith(" and "):
full_query = full_query[:-5]
return full_query + ";"
if interval:
query.write(f" INTERVAL({interval})")
if sliding_window_step:
query.write(f" SLIDING({sliding_window_step})")
if limit:
query.write(f" LIMIT {limit}")
query.write(";")
return query.getvalue()


@dataclass
Expand All @@ -170,6 +202,7 @@ class AppResultTable(TDEngineSchema):
mm_schemas.WriterEvent.ENDPOINT_ID: _TDEngineColumn.BINARY_64,
mm_schemas.WriterEvent.APPLICATION_NAME: _TDEngineColumn.BINARY_64,
mm_schemas.ResultData.RESULT_NAME: _TDEngineColumn.BINARY_64,
mm_schemas.ResultData.RESULT_KIND: _TDEngineColumn.INT,
}
database = _MODEL_MONITORING_DATABASE

Expand Down
Loading

0 comments on commit fc2d6db

Please sign in to comment.