Skip to content

Commit

Permalink
[Model Monitoring] Test and Fix for Model-monitoring data-path: suppo…
Browse files Browse the repository at this point in the history
…rt LLM/deep-learning models (#5289)
  • Loading branch information
davesh0812 committed Mar 20, 2024
1 parent 2af3d96 commit 0cfbc03
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 7 deletions.
7 changes: 0 additions & 7 deletions mlrun/model_monitoring/controller.py
Expand Up @@ -445,13 +445,6 @@ def model_endpoint_process(
m_fs = fstore.get_feature_set(
endpoint[mm_constants.EventFieldType.FEATURE_SET_URI]
)
labels = endpoint[mm_constants.EventFieldType.LABEL_NAMES]
if labels:
if isinstance(labels, str):
labels = json.loads(labels)
for label in labels:
if label not in list(m_fs.spec.features.keys()):
m_fs.add_feature(fstore.Feature(name=label, value_type="float"))

for application in applications_names:
batch_window = batch_window_generator.get_batch_window(
Expand Down
2 changes: 2 additions & 0 deletions mlrun/model_monitoring/stream_processing.py
Expand Up @@ -587,6 +587,8 @@ def do(self, event):
for key in [
EventFieldType.FEATURES,
EventFieldType.NAMED_FEATURES,
EventFieldType.PREDICTION,
EventFieldType.NAMED_PREDICTIONS,
]:
event.pop(key, None)

Expand Down
56 changes: 56 additions & 0 deletions tests/system/model_monitoring/assets/models.py
@@ -0,0 +1,56 @@
# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mlrun.serving


class OneToOne(mlrun.serving.V2ModelServer):
"""
In this class the predict method returns one result to each input
"""

def load(self):
pass

def predict(self, body: dict) -> list:
inputs = body.get("inputs")
if isinstance(inputs[0], list) and len(inputs) == 600: # single image
outputs = 3
elif isinstance(inputs[0], list) and len(inputs) == 2 and len(inputs[0]) == 600:
outputs = [2, 2]
elif isinstance(inputs[0], list) or (
isinstance(inputs[0], str) and isinstance(inputs, list)
):
outputs = [inp[0] for inp in inputs]
else:
outputs = inputs[0]
return outputs


class OneToMany(mlrun.serving.V2ModelServer):
"""
In this class the predict method returns 5 port outputs result to each input
"""

def load(self):
pass

def predict(self, body: dict) -> list:
inputs = body.get("inputs")
if isinstance(inputs[0], list) or (
isinstance(inputs[0], str) and isinstance(inputs, list)
):
outputs = [[inp[0], inp[0], 3.0, "a", 5] for inp in inputs]
else:
outputs = [inputs[0], inputs[0], 3.0, "a", 5]
return outputs
170 changes: 170 additions & 0 deletions tests/system/model_monitoring/test_app.py
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent
import concurrent.futures
import json
import pickle
import time
Expand All @@ -32,7 +34,9 @@
import mlrun
import mlrun.common.schemas.model_monitoring.constants as mm_constants
import mlrun.feature_store
import mlrun.feature_store as fstore
import mlrun.model_monitoring.api
from mlrun.datastore.targets import ParquetTarget
from mlrun.model_monitoring import TrackingPolicy
from mlrun.model_monitoring.application import ModelMonitoringApplicationBase
from mlrun.model_monitoring.applications.histogram_data_drift import (
Expand Down Expand Up @@ -450,3 +454,169 @@ def test_enable_model_monitoring(self) -> None:
]
== "1m"
)


@TestMLRunSystem.skip_test_if_env_not_configured
@pytest.mark.enterprise
class TestAllKindOfServing(TestMLRunSystem):
project_name = "test-mm-serving"
# Set image to "<repo>/mlrun:<tag>" for local testing
image: typing.Optional[str] = None

@classmethod
def custom_setup_class(cls) -> None:
random_rgb_image_list = (
np.random.randint(0, 256, (20, 30, 3), dtype=np.uint8)
.reshape(-1, 3)
.tolist()
)
cls.models = {
"int_one_to_one": {
"name": "serving_1",
"model_name": "int_one_to_one",
"class_name": "OneToOne",
"data_point": [1, 2, 3],
"schema": ["f0", "f1", "f2", "p0"],
},
"int_one_to_many": {
"name": "serving_2",
"model_name": "int_one_to_many",
"class_name": "OneToMany",
"data_point": [1, 2, 3],
"schema": ["f0", "f1", "f2", "p0", "p1", "p2", "p3", "p4"],
},
"str_one_to_one": {
"name": "serving_3",
"model_name": "str_one_to_one",
"class_name": "OneToOne",
"data_point": "input_str",
"schema": ["f0", "p0"],
},
"str_one_to_many": {
"name": "serving_4",
"model_name": "str_one_to_many",
"class_name": "OneToMany",
"data_point": "input_str",
"schema": ["f0", "p0", "p1", "p2", "p3", "p4"],
},
"img_one_to_one": {
"name": "serving_5",
"model_name": "img_one_to_one",
"class_name": "OneToOne",
"data_point": random_rgb_image_list,
"schema": [f"f{i}" for i in range(600)] + ["p0"],
},
"int_and_str_one_to_one": {
"name": "serving_6",
"model_name": "int_and_str_one_to_one",
"class_name": "OneToOne",
"data_point": [1, "a", 3],
"schema": ["f0", "f1", "f2", "p0"],
},
}

def _log_model(self, model_name) -> None:
self.project.log_model(
model_name,
model_dir=str((Path(__file__).parent / "assets").absolute()),
model_file="model.pkl",
)

@classmethod
def _deploy_model_serving(
cls, name: str, model_name: str, class_name: str, **kwargs
) -> mlrun.runtimes.nuclio.serving.ServingRuntime:
serving_fn = mlrun.code_to_function(
project=cls.project_name,
name=name,
filename=f"{str((Path(__file__).parent / 'assets').absolute())}/models.py",
kind="serving",
)
serving_fn.add_model(
model_name,
model_path=f"store://models/{cls.project_name}/{model_name}:latest",
class_name=class_name,
)
serving_fn.set_tracking()
if cls.image is not None:
serving_fn.spec.image = serving_fn.spec.build.image = cls.image

serving_fn.deploy()
return typing.cast(mlrun.runtimes.nuclio.serving.ServingRuntime, serving_fn)

def _test_endpoint(self, model_name, feature_set_uri) -> dict[str, typing.Any]:
model_dict = self.models[model_name]
serving_fn = self.project.get_function(model_dict.get("name"))
data_point = model_dict.get("data_point")

serving_fn.invoke(
f"v2/models/{model_name}/infer",
json.dumps(
{"inputs": [data_point]},
),
)
serving_fn.invoke(
f"v2/models/{model_name}/infer",
json.dumps({"inputs": [data_point, data_point]}),
)
time.sleep(
mlrun.mlconf.model_endpoint_monitoring.parquet_batching_timeout_secs + 10
)

offline_response_df = ParquetTarget(
name="temp",
path=fstore.get_feature_set(feature_set_uri).spec.targets[0].path,
).as_df()

is_schema_saved = set(model_dict.get("schema")).issubset(
offline_response_df.columns
)
has_all_the_events = offline_response_df.shape[0] == 3

return {
"model_name": model_name,
"is_schema_saved": is_schema_saved,
"has_all_the_events": has_all_the_events,
}

def test_all(self) -> None:
self.project.enable_model_monitoring(
image=self.image or "mlrun/mlrun",
base_period=1,
deploy_histogram_data_drift_app=False,
)
futures = []
with ThreadPoolExecutor() as executor:
for model_name, model_dict in self.models.items():
self._log_model(model_name)
future = executor.submit(self._deploy_model_serving, **model_dict)
futures.append(future)

for future in concurrent.futures.as_completed(futures):
future.result()

futures_2 = []
with ThreadPoolExecutor() as executor:
self.db = mlrun.model_monitoring.get_model_endpoint_store(
project=self.project_name
)
endpoints = self.db.list_model_endpoints()
for endpoint in endpoints:
future = executor.submit(
self._test_endpoint,
model_name=endpoint[mm_constants.EventFieldType.MODEL].split(":")[
0
],
feature_set_uri=endpoint[
mm_constants.EventFieldType.FEATURE_SET_URI
],
)
futures_2.append(future)

for future in concurrent.futures.as_completed(futures_2):
res_dict = future.result()
assert res_dict[
"is_schema_saved"
], f"For {res_dict['model_name']} the schema of parquet is missing columns"

assert res_dict["has_all_the_events"], "Not all the events were saved"

0 comments on commit 0cfbc03

Please sign in to comment.