Skip to content

Commit

Permalink
[Model Monitoring] Add TDEngine Connector (#5592)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eyal-Danieli committed May 22, 2024
1 parent 1198ed4 commit 2e01eb2
Show file tree
Hide file tree
Showing 29 changed files with 993 additions and 102 deletions.
1 change: 1 addition & 0 deletions dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def extra_requirements() -> dict[str, list[str]]:
"distributed~=2023.9.0",
],
"alibaba-oss": ["ossfs==2023.12.0", "oss2==2.18.1"],
"tdengine": ["taos-ws-py~=0.3.2"],
}

# see above why we are excluding google-cloud
Expand Down
1 change: 1 addition & 0 deletions extras-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ databricks-sdk~=0.13.0
sqlalchemy~=1.4
dask~=2023.9.0
distributed~=2023.9.0
taos-ws-py~=0.3.2
2 changes: 1 addition & 1 deletion mlrun/common/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@
ModelMonitoringMode,
ModelMonitoringStoreKinds,
MonitoringFunctionNames,
MonitoringTSDBTables,
PrometheusEndpoints,
TimeSeriesConnector,
TSDBTarget,
V3IOTSDBTables,
)
from .notification import (
Notification,
Expand Down
1 change: 1 addition & 0 deletions mlrun/common/schemas/client_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ClientSpec(pydantic.BaseModel):
sql_url: typing.Optional[str]
model_endpoint_monitoring_store_type: typing.Optional[str]
model_endpoint_monitoring_endpoint_store_connection: typing.Optional[str]
model_monitoring_tsdb_connection: typing.Optional[str]
ce: typing.Optional[dict]
# not passing them as one object as it possible client user would like to override only one of the params
calculate_artifact_hash: typing.Optional[str]
Expand Down
3 changes: 2 additions & 1 deletion mlrun/common/schemas/model_monitoring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
ModelMonitoringMode,
ModelMonitoringStoreKinds,
MonitoringFunctionNames,
MonitoringTSDBTables,
ProjectSecretKeys,
PrometheusEndpoints,
PrometheusMetric,
ResultData,
SchedulingKeys,
TDEngineSuperTables,
TimeSeriesConnector,
TSDBTarget,
V3IOTSDBTables,
VersionedModel,
WriterEvent,
WriterEventKind,
Expand Down
12 changes: 11 additions & 1 deletion mlrun/common/schemas/model_monitoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class EventFieldType:
DRIFT_DETECTED_THRESHOLD = "drift_detected_threshold"
POSSIBLE_DRIFT_THRESHOLD = "possible_drift_threshold"
SAMPLE_PARQUET_PATH = "sample_parquet_path"
TIME = "time"
TABLE_COLUMN = "table_column"


class FeatureSetFeatures(MonitoringStrEnum):
Expand Down Expand Up @@ -171,6 +173,7 @@ class ProjectSecretKeys:
PIPELINES_ACCESS_KEY = "MODEL_MONITORING_PIPELINES_ACCESS_KEY"
KAFKA_BROKERS = "KAFKA_BROKERS"
STREAM_PATH = "STREAM_PATH"
TSDB_CONNECTION = "TSDB_CONNECTION"


class ModelMonitoringStoreKinds:
Expand Down Expand Up @@ -230,12 +233,18 @@ class MonitoringFunctionNames(MonitoringStrEnum):
WRITER = "model-monitoring-writer"


class MonitoringTSDBTables(MonitoringStrEnum):
class V3IOTSDBTables(MonitoringStrEnum):
APP_RESULTS = "app-results"
METRICS = "metrics"
EVENTS = "events"


class TDEngineSuperTables(MonitoringStrEnum):
APP_RESULTS = "app_results"
METRICS = "metrics"
PREDICTIONS = "predictions"


@dataclass
class FunctionURI:
project: str
Expand Down Expand Up @@ -339,6 +348,7 @@ class ControllerPolicy:

class TSDBTarget:
V3IO_TSDB = "v3io-tsdb"
TDEngine = "tdengine"
PROMETHEUS = "prometheus"
APP_RESULTS_TABLE = "app-results"
V3IO_BE = "tsdb"
Expand Down
2 changes: 2 additions & 0 deletions mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,9 @@
# See mlrun.model_monitoring.db.stores.ObjectStoreFactory for available options
"store_type": "v3io-nosql",
"endpoint_store_connection": "",
# See mlrun.model_monitoring.db.tsdb.ObjectTSDBFactory for available options
"tsdb_connector_type": "v3io-tsdb",
"tsdb_connection": "",
},
"secret_stores": {
# Use only in testing scenarios (such as integration tests) to avoid using k8s for secrets (will use in-memory
Expand Down
4 changes: 4 additions & 0 deletions mlrun/db/httpdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@ def connect(self, secrets=None):
server_cfg.get("model_endpoint_monitoring_endpoint_store_connection")
or config.model_endpoint_monitoring.endpoint_store_connection
)
config.model_endpoint_monitoring.tsdb_connection = (
server_cfg.get("model_monitoring_tsdb_connection")
or config.model_endpoint_monitoring.tsdb_connection
)
config.packagers = server_cfg.get("packagers") or config.packagers
server_data_prefixes = server_cfg.get("feature_store_data_prefixes") or {}
for prefix in ["default", "nosql", "redisnosql"]:
Expand Down
41 changes: 35 additions & 6 deletions mlrun/model_monitoring/db/tsdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ObjectTSDBFactory(enum.Enum):
"""Enum class to handle the different TSDB connector type values for storing real time metrics"""

v3io_tsdb = "v3io-tsdb"
tdengine = "tdengine"

def to_tsdb_connector(self, project: str, **kwargs) -> TSDBConnector:
"""
Expand All @@ -43,6 +44,13 @@ def to_tsdb_connector(self, project: str, **kwargs) -> TSDBConnector:

return V3IOTSDBConnector(project=project, **kwargs)

# Assuming TDEngine connector if connector type is not V3IO TSDB.
# Update these lines once there are more than two connector types.

from .tdengine.tdengine_connector import TDEngineConnector

return TDEngineConnector(project=project, **kwargs)

@classmethod
def _missing_(cls, value: typing.Any):
"""A lookup function to handle an invalid value.
Expand All @@ -54,18 +62,39 @@ def _missing_(cls, value: typing.Any):
)


def get_tsdb_connector(project: str, **kwargs) -> TSDBConnector:
def get_tsdb_connector(
project: str,
tsdb_connector_type: str = "",
secret_provider: typing.Callable = None,
**kwargs,
) -> TSDBConnector:
"""
Get the TSDB connector type based on mlrun.config.model_endpoint_monitoring.tsdb_connector_type.
Get TSDB connector object.
:param project: The name of the project.
:param tsdb_connector_type: The type of the TSDB connector. See mlrun.model_monitoring.db.tsdb.ObjectTSDBFactory
for available options.
:param secret_provider: An optional secret provider to get the connection string secret.
:return: `TSDBConnector` object. The main goal of this object is to handle different operations on the
TSDB connector such as updating drift metrics or write application record result.
"""

# Get store type value from ObjectTSDBFactory enum class
tsdb_connector_type = ObjectTSDBFactory(
mlrun.mlconf.model_endpoint_monitoring.tsdb_connector_type
tsdb_connection_string = mlrun.model_monitoring.helpers.get_tsdb_connection_string(
secret_provider=secret_provider
)

if tsdb_connection_string and tsdb_connection_string.startswith("taosws"):
tsdb_connector_type = mlrun.common.schemas.model_monitoring.TSDBTarget.TDEngine
kwargs["connection_string"] = tsdb_connection_string

# Set the default TSDB connector type if no connection has been set
tsdb_connector_type = (
tsdb_connector_type
or mlrun.mlconf.model_endpoint_monitoring.tsdb_connector_type
)

# Get connector type value from ObjectTSDBFactory enum class
tsdb_connector_factory = ObjectTSDBFactory(tsdb_connector_type)

# Convert into TSDB connector object
return tsdb_connector_type.to_tsdb_connector(project=project, **kwargs)
return tsdb_connector_factory.to_tsdb_connector(project=project, **kwargs)
39 changes: 24 additions & 15 deletions mlrun/model_monitoring/db/tsdb/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#


import typing
from abc import ABC

import pandas as pd
Expand All @@ -22,6 +23,8 @@


class TSDBConnector(ABC):
type: str = ""

def __init__(self, project: str):
"""
Initialize a new TSDB connector. The connector is used to interact with the TSDB and store monitoring data.
Expand Down Expand Up @@ -75,8 +78,8 @@ def get_model_endpoint_real_time_metrics(
self,
endpoint_id: str,
metrics: list[str],
start: str = "now-1h",
end: str = "now",
start: str,
end: str,
) -> dict[str, list[tuple[str, float]]]:
"""
Getting real time metrics from the TSDB. There are pre-defined metrics for model endpoints such as
Expand All @@ -100,34 +103,40 @@ def get_model_endpoint_real_time_metrics(
def get_records(
self,
table: str,
columns: list[str] = None,
start: str,
end: str,
columns: typing.Optional[list[str]] = None,
filter_query: str = "",
start: str = "now-1h",
end: str = "now",
) -> pd.DataFrame:
"""
Getting records from TSDB data collection.
:param table: Table name, e.g. 'metrics', 'app_results'.
:param start: The start time of the metrics.
If using V3IO, can be represented by a string containing an RFC 3339 time, a Unix
timestamp in milliseconds, a relative time (`'now'` or `'now-[0-9]+[mhd]'`, where
`m` = minutes, `h` = hours, `'d'` = days, and `'s'` = seconds), or 0 for the earliest
time.
If using TDEngine, can be represented by datetime.
:param end: The end time of the metrics.
If using V3IO, can be represented by a string containing an RFC 3339 time, a Unix
timestamp in milliseconds, a relative time (`'now'` or `'now-[0-9]+[mhd]'`, where
`m` = minutes, `h` = hours, `'d'` = days, and `'s'` = seconds), or 0 for the earliest
time.
If using TDEngine, can be represented by datetime.
:param columns: Columns to include in the result.
:param filter_query: Optional filter expression as a string. The filter structure depends on the TSDB
connector type.
:param start: The start time of the metrics. Can be represented by a string containing an RFC
3339 time, a Unix timestamp in milliseconds, a relative time (`'now'` or
`'now-[0-9]+[mhd]'`, where `m` = minutes, `h` = hours, `'d'` = days, and `'s'`
= seconds), or 0 for the earliest time.
:param end: The end time of the metrics. Can be represented by a string containing an RFC
3339 time, a Unix timestamp in milliseconds, a relative time (`'now'` or
`'now-[0-9]+[mhd]'`, where `m` = minutes, `h` = hours, `'d'` = days, and `'s'`
= seconds), or 0 for the earliest time.
:return: DataFrame with the provided attributes from the data collection.
:raise: MLRunNotFoundError if the provided table wasn't found.
"""
pass

def create_tsdb_application_tables(self) -> None:
def create_tables(self) -> None:
"""
Create the application tables using the TSDB connector. At the moment we support 2 types of application tables:
Create the TSDB tables using the TSDB connector. At the moment we support 3 types of tables:
- app_results: a detailed result that includes status, kind, extra data, etc.
- metrics: a basic key value that represents a numeric metric.
- predictions: latency of each prediction.
"""
15 changes: 15 additions & 0 deletions mlrun/model_monitoring/db/tsdb/tdengine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .tdengine_connector import TDEngineConnector
Loading

0 comments on commit 2e01eb2

Please sign in to comment.