Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Model Monitoring] Add TDEngine Connector #5592

Merged
merged 24 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def extra_requirements() -> dict[str, list[str]]:
"distributed~=2023.9.0",
],
"alibaba-oss": ["ossfs==2023.12.0", "oss2==2.18.1"],
"tdengine": ["taos-ws-py~=0.3.2"],
}

# see above why we are excluding google-cloud
Expand Down
1 change: 1 addition & 0 deletions extras-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ databricks-sdk~=0.13.0
sqlalchemy~=1.4
dask~=2023.9.0
distributed~=2023.9.0
taos-ws-py~=0.3.2
2 changes: 1 addition & 1 deletion mlrun/common/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@
ModelMonitoringMode,
ModelMonitoringStoreKinds,
MonitoringFunctionNames,
MonitoringTSDBTables,
PrometheusEndpoints,
TimeSeriesConnector,
TSDBTarget,
V3IOTSDBTables,
)
from .notification import (
Notification,
Expand Down
1 change: 1 addition & 0 deletions mlrun/common/schemas/client_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ClientSpec(pydantic.BaseModel):
sql_url: typing.Optional[str]
model_endpoint_monitoring_store_type: typing.Optional[str]
model_endpoint_monitoring_endpoint_store_connection: typing.Optional[str]
model_monitoring_tsdb_connection: typing.Optional[str]
ce: typing.Optional[dict]
# not passing them as one object as it possible client user would like to override only one of the params
calculate_artifact_hash: typing.Optional[str]
Expand Down
3 changes: 2 additions & 1 deletion mlrun/common/schemas/model_monitoring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
ModelMonitoringMode,
ModelMonitoringStoreKinds,
MonitoringFunctionNames,
MonitoringTSDBTables,
ProjectSecretKeys,
PrometheusEndpoints,
PrometheusMetric,
ResultData,
SchedulingKeys,
TDEngineSuperTables,
TimeSeriesConnector,
TSDBTarget,
V3IOTSDBTables,
VersionedModel,
WriterEvent,
WriterEventKind,
Expand Down
12 changes: 11 additions & 1 deletion mlrun/common/schemas/model_monitoring/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class EventFieldType:
DRIFT_DETECTED_THRESHOLD = "drift_detected_threshold"
POSSIBLE_DRIFT_THRESHOLD = "possible_drift_threshold"
SAMPLE_PARQUET_PATH = "sample_parquet_path"
TIME = "time"
TABLE_COLUMN = "table_column"


class FeatureSetFeatures(MonitoringStrEnum):
Expand Down Expand Up @@ -171,6 +173,7 @@ class ProjectSecretKeys:
PIPELINES_ACCESS_KEY = "MODEL_MONITORING_PIPELINES_ACCESS_KEY"
KAFKA_BROKERS = "KAFKA_BROKERS"
STREAM_PATH = "STREAM_PATH"
TSDB_CONNECTION = "TSDB_CONNECTION"


class ModelMonitoringStoreKinds:
Expand Down Expand Up @@ -230,12 +233,18 @@ class MonitoringFunctionNames(MonitoringStrEnum):
WRITER = "model-monitoring-writer"


class MonitoringTSDBTables(MonitoringStrEnum):
class V3IOTSDBTables(MonitoringStrEnum):
APP_RESULTS = "app-results"
METRICS = "metrics"
EVENTS = "events"


class TDEngineSuperTables(MonitoringStrEnum):
APP_RESULTS = "app_results"
METRICS = "metrics"
PREDICTIONS = "predictions"


@dataclass
class FunctionURI:
project: str
Expand Down Expand Up @@ -339,6 +348,7 @@ class ControllerPolicy:

class TSDBTarget:
V3IO_TSDB = "v3io-tsdb"
TDEngine = "tdengine"
PROMETHEUS = "prometheus"
APP_RESULTS_TABLE = "app-results"
V3IO_BE = "tsdb"
Expand Down
2 changes: 2 additions & 0 deletions mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,9 @@
# See mlrun.model_monitoring.db.stores.ObjectStoreFactory for available options
"store_type": "v3io-nosql",
"endpoint_store_connection": "",
# See mlrun.model_monitoring.db.tsdb.ObjectTSDBFactory for available options
"tsdb_connector_type": "v3io-tsdb",
"tsdb_connection": "",
},
"secret_stores": {
# Use only in testing scenarios (such as integration tests) to avoid using k8s for secrets (will use in-memory
Expand Down
4 changes: 4 additions & 0 deletions mlrun/db/httpdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@ def connect(self, secrets=None):
server_cfg.get("model_endpoint_monitoring_endpoint_store_connection")
or config.model_endpoint_monitoring.endpoint_store_connection
)
config.model_endpoint_monitoring.tsdb_connection = (
server_cfg.get("model_monitoring_tsdb_connection")
or config.model_endpoint_monitoring.tsdb_connection
)
config.packagers = server_cfg.get("packagers") or config.packagers
server_data_prefixes = server_cfg.get("feature_store_data_prefixes") or {}
for prefix in ["default", "nosql", "redisnosql"]:
Expand Down
41 changes: 35 additions & 6 deletions mlrun/model_monitoring/db/tsdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ObjectTSDBFactory(enum.Enum):
"""Enum class to handle the different TSDB connector type values for storing real time metrics"""

v3io_tsdb = "v3io-tsdb"
tdengine = "tdengine"

def to_tsdb_connector(self, project: str, **kwargs) -> TSDBConnector:
"""
Expand All @@ -43,6 +44,13 @@ def to_tsdb_connector(self, project: str, **kwargs) -> TSDBConnector:

return V3IOTSDBConnector(project=project, **kwargs)

# Assuming TDEngine connector if connector type is not V3IO TSDB.
# Update these lines once there are more than two connector types.

from .tdengine.tdengine_connector import TDEngineConnector

return TDEngineConnector(project=project, **kwargs)

@classmethod
def _missing_(cls, value: typing.Any):
"""A lookup function to handle an invalid value.
Expand All @@ -54,18 +62,39 @@ def _missing_(cls, value: typing.Any):
)


def get_tsdb_connector(project: str, **kwargs) -> TSDBConnector:
def get_tsdb_connector(
project: str,
tsdb_connector_type: str = "",
secret_provider: typing.Callable = None,
**kwargs,
) -> TSDBConnector:
"""
Get the TSDB connector type based on mlrun.config.model_endpoint_monitoring.tsdb_connector_type.
Get TSDB connector object.
:param project: The name of the project.
:param tsdb_connector_type: The type of the TSDB connector. See mlrun.model_monitoring.db.tsdb.ObjectTSDBFactory
for available options.
:param secret_provider: An optional secret provider to get the connection string secret.

:return: `TSDBConnector` object. The main goal of this object is to handle different operations on the
TSDB connector such as updating drift metrics or write application record result.
"""

# Get store type value from ObjectTSDBFactory enum class
tsdb_connector_type = ObjectTSDBFactory(
mlrun.mlconf.model_endpoint_monitoring.tsdb_connector_type
tsdb_connection_string = mlrun.model_monitoring.helpers.get_tsdb_connection_string(
secret_provider=secret_provider
)

if tsdb_connection_string and tsdb_connection_string.startswith("taosws"):
tsdb_connector_type = mlrun.common.schemas.model_monitoring.TSDBTarget.TDEngine
kwargs["connection_string"] = tsdb_connection_string

# Set the default TSDB connector type if no connection has been set
tsdb_connector_type = (
tsdb_connector_type
or mlrun.mlconf.model_endpoint_monitoring.tsdb_connector_type
)

# Get connector type value from ObjectTSDBFactory enum class
tsdb_connector_factory = ObjectTSDBFactory(tsdb_connector_type)

# Convert into TSDB connector object
return tsdb_connector_type.to_tsdb_connector(project=project, **kwargs)
return tsdb_connector_factory.to_tsdb_connector(project=project, **kwargs)
39 changes: 24 additions & 15 deletions mlrun/model_monitoring/db/tsdb/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#


import typing
from abc import ABC

import pandas as pd
Expand All @@ -22,6 +23,8 @@


class TSDBConnector(ABC):
type: str = ""

def __init__(self, project: str):
"""
Initialize a new TSDB connector. The connector is used to interact with the TSDB and store monitoring data.
Expand Down Expand Up @@ -75,8 +78,8 @@ def get_model_endpoint_real_time_metrics(
self,
endpoint_id: str,
metrics: list[str],
start: str = "now-1h",
end: str = "now",
start: str,
end: str,
) -> dict[str, list[tuple[str, float]]]:
"""
Getting real time metrics from the TSDB. There are pre-defined metrics for model endpoints such as
Expand All @@ -100,34 +103,40 @@ def get_model_endpoint_real_time_metrics(
def get_records(
self,
table: str,
columns: list[str] = None,
start: str,
end: str,
columns: typing.Optional[list[str]] = None,
filter_query: str = "",
start: str = "now-1h",
end: str = "now",
) -> pd.DataFrame:
"""
Getting records from TSDB data collection.
:param table: Table name, e.g. 'metrics', 'app_results'.
:param start: The start time of the metrics.
If using V3IO, can be represented by a string containing an RFC 3339 time, a Unix
timestamp in milliseconds, a relative time (`'now'` or `'now-[0-9]+[mhd]'`, where
`m` = minutes, `h` = hours, `'d'` = days, and `'s'` = seconds), or 0 for the earliest
time.
If using TDEngine, can be represented by datetime.
:param end: The end time of the metrics.
If using V3IO, can be represented by a string containing an RFC 3339 time, a Unix
timestamp in milliseconds, a relative time (`'now'` or `'now-[0-9]+[mhd]'`, where
`m` = minutes, `h` = hours, `'d'` = days, and `'s'` = seconds), or 0 for the earliest
time.
If using TDEngine, can be represented by datetime.
:param columns: Columns to include in the result.
:param filter_query: Optional filter expression as a string. The filter structure depends on the TSDB
connector type.
:param start: The start time of the metrics. Can be represented by a string containing an RFC
3339 time, a Unix timestamp in milliseconds, a relative time (`'now'` or
`'now-[0-9]+[mhd]'`, where `m` = minutes, `h` = hours, `'d'` = days, and `'s'`
= seconds), or 0 for the earliest time.
:param end: The end time of the metrics. Can be represented by a string containing an RFC
3339 time, a Unix timestamp in milliseconds, a relative time (`'now'` or
`'now-[0-9]+[mhd]'`, where `m` = minutes, `h` = hours, `'d'` = days, and `'s'`
= seconds), or 0 for the earliest time.


:return: DataFrame with the provided attributes from the data collection.
:raise: MLRunNotFoundError if the provided table wasn't found.
"""
pass

def create_tsdb_application_tables(self) -> None:
def create_tables(self) -> None:
"""
Create the application tables using the TSDB connector. At the moment we support 2 types of application tables:
Create the TSDB tables using the TSDB connector. At the moment we support 3 types of tables:
- app_results: a detailed result that includes status, kind, extra data, etc.
- metrics: a basic key value that represents a numeric metric.
- predictions: latency of each prediction.
"""
15 changes: 15 additions & 0 deletions mlrun/model_monitoring/db/tsdb/tdengine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .tdengine_connector import TDEngineConnector
Loading
Loading