Skip to content

Commit

Permalink
[Model Monitoring] Employ the TSDB abstraction in the metrics endpoin…
Browse files Browse the repository at this point in the history
…ts (#5620)
  • Loading branch information
jond01 committed May 26, 2024
1 parent 94261fa commit 559850f
Show file tree
Hide file tree
Showing 14 changed files with 499 additions and 412 deletions.
1 change: 0 additions & 1 deletion mlrun/common/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@
ModelMonitoringStoreKinds,
MonitoringFunctionNames,
PrometheusEndpoints,
TimeSeriesConnector,
TSDBTarget,
V3IOTSDBTables,
)
Expand Down
9 changes: 6 additions & 3 deletions mlrun/common/schemas/model_monitoring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# flake8: noqa - this is until we take care of the F401 violations with respect to __all__ & sphinx

from .constants import (
ControllerPolicy,
Expand All @@ -30,13 +28,15 @@
ModelMonitoringMode,
ModelMonitoringStoreKinds,
MonitoringFunctionNames,
PredictionsQueryConstants,
ProjectSecretKeys,
PrometheusEndpoints,
PrometheusMetric,
ResultData,
ResultKindApp,
SchedulingKeys,
SpecialApps,
TDEngineSuperTables,
TimeSeriesConnector,
TSDBTarget,
V3IOTSDBTables,
VersionedModel,
Expand All @@ -59,7 +59,10 @@
ModelEndpointList,
ModelEndpointMetadata,
ModelEndpointMonitoringMetric,
ModelEndpointMonitoringMetricNoData,
ModelEndpointMonitoringMetricType,
ModelEndpointMonitoringMetricValues,
ModelEndpointMonitoringResultValues,
ModelEndpointSpec,
ModelEndpointStatus,
)
7 changes: 0 additions & 7 deletions mlrun/common/schemas/model_monitoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ class EventKeyMetrics:
REAL_TIME = "real_time"


class TimeSeriesConnector:
TSDB = "tsdb"


class ModelEndpointTarget:
V3IO_NOSQL = "v3io-nosql"
SQL = "sql"
Expand Down Expand Up @@ -350,9 +346,6 @@ class TSDBTarget:
V3IO_TSDB = "v3io-tsdb"
TDEngine = "tdengine"
PROMETHEUS = "prometheus"
APP_RESULTS_TABLE = "app-results"
V3IO_BE = "tsdb"
V3IO_RATE = "1/s"


class HistogramDataDriftApplicationConstants:
Expand Down
2 changes: 1 addition & 1 deletion mlrun/model_monitoring/db/tsdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _missing_(cls, value: typing.Any):
def get_tsdb_connector(
project: str,
tsdb_connector_type: str = "",
secret_provider: typing.Callable = None,
secret_provider: typing.Optional[typing.Callable] = None,
**kwargs,
) -> TSDBConnector:
"""
Expand Down
81 changes: 75 additions & 6 deletions mlrun/model_monitoring/db/tsdb/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import typing
from abc import ABC
from abc import ABC, abstractmethod
from datetime import datetime

import pandas as pd

import mlrun.common.schemas.model_monitoring.constants as mm_constants
import mlrun.common.schemas.model_monitoring as mm_schemas


class TSDBConnector(ABC):
Expand Down Expand Up @@ -59,7 +58,7 @@ def apply_monitoring_stream_steps(self, graph):
def write_application_event(
self,
event: dict,
kind: mm_constants.WriterEventKind = mm_constants.WriterEventKind.RESULT,
kind: mm_schemas.WriterEventKind = mm_schemas.WriterEventKind.RESULT,
) -> None:
"""
Write a single application or metric to TSDB.
Expand Down Expand Up @@ -100,6 +99,7 @@ def get_model_endpoint_real_time_metrics(
"""
pass

@abstractmethod
def get_records(
self,
table: str,
Expand Down Expand Up @@ -131,7 +131,6 @@ def get_records(
:return: DataFrame with the provided attributes from the data collection.
:raise: MLRunNotFoundError if the provided table wasn't found.
"""
pass

def create_tables(self) -> None:
"""
Expand All @@ -140,3 +139,73 @@ def create_tables(self) -> None:
- metrics: a basic key value that represents a numeric metric.
- predictions: latency of each prediction.
"""

@abstractmethod
def read_metrics_data(
self,
*,
endpoint_id: str,
start: datetime,
end: datetime,
metrics: list[mm_schemas.ModelEndpointMonitoringMetric],
type: typing.Literal["metrics", "results"],
) -> typing.Union[
list[
typing.Union[
mm_schemas.ModelEndpointMonitoringResultValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
],
],
list[
typing.Union[
mm_schemas.ModelEndpointMonitoringMetricValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
],
],
]:
"""
Read metrics OR results from the TSDB and return as a list.
:param endpoint_id: The model endpoint identifier.
:param start: The start time of the query.
:param end: The end time of the query.
:param metrics: The list of metrics to get the values for.
:param type: "metrics" or "results" - the type of each item in metrics.
:return: A list of result values or a list of metric values.
"""

@abstractmethod
def read_predictions(
self,
*,
endpoint_id: str,
start: datetime,
end: datetime,
aggregation_window: typing.Optional[str] = None,
) -> typing.Union[
mm_schemas.ModelEndpointMonitoringMetricValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
]:
"""
Read the "invocations" metric for the provided model endpoint in the given time range,
and return the metric values if any, otherwise signify with the "no data" object.
:param endpoint_id: The model endpoint identifier.
:param start: The start time of the query.
:param end: The end time of the query.
:param aggregation_window: On what time window length should the invocations be aggregated.
:return: Metric values object or no data object.
"""

@abstractmethod
def read_prediction_metric_for_endpoint_if_exists(
self, endpoint_id: str
) -> typing.Optional[mm_schemas.ModelEndpointMonitoringMetric]:
"""
Read the "invocations" metric for the provided model endpoint, and return the metric object
if it exists.
:param endpoint_id: The model endpoint identifier.
:return: `None` if the invocations metric does not exist, otherwise return the
corresponding metric object.
"""
45 changes: 43 additions & 2 deletions mlrun/model_monitoring/db/tsdb/tdengine/tdengine_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import typing
from datetime import datetime

import pandas as pd
import taosws

import mlrun.common.schemas.model_monitoring as mm_schemas
import mlrun.model_monitoring.db
import mlrun.model_monitoring.db.tsdb.tdengine.schemas as tdengine_schemas
import mlrun.model_monitoring.db.tsdb.tdengine.stream_graph_steps
from mlrun.model_monitoring.db import TSDBConnector
Expand Down Expand Up @@ -229,3 +228,45 @@ def get_records(
columns.append(column.name())

return pd.DataFrame(query_result, columns=columns)

def read_metrics_data(
self,
*,
endpoint_id: str,
start: datetime,
end: datetime,
metrics: list[mm_schemas.ModelEndpointMonitoringMetric],
type: typing.Literal["metrics", "results"],
) -> typing.Union[
list[
typing.Union[
mm_schemas.ModelEndpointMonitoringResultValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
],
],
list[
typing.Union[
mm_schemas.ModelEndpointMonitoringMetricValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
],
],
]:
raise NotImplementedError

def read_predictions(
self,
*,
endpoint_id: str,
start: datetime,
end: datetime,
aggregation_window: typing.Optional[str] = None,
) -> typing.Union[
mm_schemas.ModelEndpointMonitoringMetricValues,
mm_schemas.ModelEndpointMonitoringMetricNoData,
]:
raise NotImplementedError

def read_prediction_metric_for_endpoint_if_exists(
self, endpoint_id: str
) -> typing.Optional[mm_schemas.ModelEndpointMonitoringMetric]:
raise NotImplementedError
Loading

0 comments on commit 559850f

Please sign in to comment.