Skip to content

Commit

Permalink
[Model Monitoring] Avoid failing on missing predictions TSDB table (#…
Browse files Browse the repository at this point in the history
…5706)

* [Model Monitoring] Avoid failing on missing predictions TSDB table

[ML-6628](https://iguazio.atlassian.net/browse/ML-6628)

Return an empty dataframe instead.

* Add type annotation

* Fix issue with `read_prediction_metric_for_endpoint_if_exists()`

* Fix case of returning unexpected `None`
  • Loading branch information
gtopper committed Jun 5, 2024
1 parent 6623463 commit 8ab508a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 21 deletions.
32 changes: 19 additions & 13 deletions mlrun/model_monitoring/db/tsdb/v3io/v3io_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,18 +418,24 @@ def _get_records(
# Frames client expects the aggregators to be a comma-separated string
agg_funcs = ",".join(agg_funcs)
table_path = self.tables[table]
df = self._frames_client.read(
backend=_TSDB_BE,
table=table_path,
start=start,
end=end,
columns=columns,
filter=filter_query,
aggregation_window=interval,
aggregators=agg_funcs,
step=sliding_window_step,
**kwargs,
)
try:
df = self._frames_client.read(
backend=_TSDB_BE,
table=table_path,
start=start,
end=end,
columns=columns,
filter=filter_query,
aggregation_window=interval,
aggregators=agg_funcs,
step=sliding_window_step,
**kwargs,
)
except v3io_frames.ReadError as err:
if "No TSDB schema file found" in str(err):
return pd.DataFrame()
else:
raise err

if limit:
df = df.head(limit)
Expand Down Expand Up @@ -620,7 +626,7 @@ def read_prediction_metric_for_endpoint_if_exists(
predictions = self.read_predictions(
endpoint_id=endpoint_id, start="0", end="now", limit=1
)
if predictions:
if predictions.data:
return mm_schemas.ModelEndpointMonitoringMetric(
project=self.project,
app=mm_schemas.SpecialApps.MLRUN_INFRA,
Expand Down
7 changes: 5 additions & 2 deletions server/api/api/endpoints/model_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,11 @@ async def _get_metrics_values_params(


async def _wrap_coroutine_in_list(x):
# TODO: get rid of this function - it may add `None` to the metrics list
return [await x]
result = await x
if result is None:
return []
else:
return [result]


@router.get(
Expand Down
16 changes: 10 additions & 6 deletions tests/system/model_monitoring/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def _test_tsdb_record(cls, ep_id: str) -> None:
), "The TSDB saved metrics are different than expected"

@classmethod
def _test_predictions_table(cls, ep_id: str) -> None:
def _test_predictions_table(cls, ep_id: str, should_be_empty: bool = False) -> None:
if cls._tsdb_storage.type == mm_constants.TSDBTarget.V3IO_TSDB:
predictions_df: pd.DataFrame = cls._tsdb_storage._get_records(
table=mm_constants.FileTargetKind.PREDICTIONS, start="0", end="now"
Expand All @@ -219,10 +219,13 @@ def _test_predictions_table(cls, ep_id: str) -> None:
start=datetime.min,
end=datetime.now().astimezone(),
)
assert not predictions_df.empty, "No TSDB predictions data"
assert (
predictions_df.endpoint_id == ep_id
).all(), "The endpoint IDs are different than expected"
if should_be_empty:
assert predictions_df.empty, "Predictions should be empty"
else:
assert not predictions_df.empty, "No TSDB predictions data"
assert (
predictions_df.endpoint_id == ep_id
).all(), "The endpoint IDs are different than expected"

@classmethod
def _test_apps_parquet(
Expand Down Expand Up @@ -255,7 +258,6 @@ def _test_v3io_records(
cls._test_results_kv_record(ep_id)
cls._test_metrics_kv_record(ep_id)
cls._test_tsdb_record(ep_id)
cls._test_predictions_table(ep_id)

@classmethod
def _test_api_get_metrics(
Expand Down Expand Up @@ -539,6 +541,7 @@ def test_app_flow(self, with_training_set: bool) -> None:

ep_id = self._get_model_endpoint_id()
self._test_v3io_records(ep_id=ep_id, inputs=inputs, outputs=outputs)
self._test_predictions_table(ep_id)

if with_training_set:
self._test_api(ep_id=ep_id, app_data=_DefaultDataDriftAppData)
Expand Down Expand Up @@ -690,6 +693,7 @@ def test_inference_feature_set(self) -> None:
self._test_v3io_records(
self.endpoint_id, inputs=set(self.columns), outputs=set(self.y_name)
)
self._test_predictions_table(self.endpoint_id, should_be_empty=True)


@TestMLRunSystem.skip_test_if_env_not_configured
Expand Down

0 comments on commit 8ab508a

Please sign in to comment.