Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Model Monitoring] Test and Fix for Model-monitoring data-path: support LLM/deep-learning models #5289

Merged
merged 6 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 0 additions & 7 deletions mlrun/model_monitoring/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,13 +445,6 @@ def model_endpoint_process(
m_fs = fstore.get_feature_set(
endpoint[mm_constants.EventFieldType.FEATURE_SET_URI]
)
labels = endpoint[mm_constants.EventFieldType.LABEL_NAMES]
assaf758 marked this conversation as resolved.
Show resolved Hide resolved
if labels:
if isinstance(labels, str):
labels = json.loads(labels)
for label in labels:
if label not in list(m_fs.spec.features.keys()):
m_fs.add_feature(fstore.Feature(name=label, value_type="float"))

for application in applications_names:
batch_window = batch_window_generator.get_batch_window(
Expand Down
2 changes: 2 additions & 0 deletions mlrun/model_monitoring/stream_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ def do(self, event):
for key in [
EventFieldType.FEATURES,
EventFieldType.NAMED_FEATURES,
EventFieldType.PREDICTION,
EventFieldType.NAMED_PREDICTIONS,
]:
event.pop(key, None)

Expand Down
56 changes: 56 additions & 0 deletions tests/system/model_monitoring/assets/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mlrun.serving


class OneToOne(mlrun.serving.V2ModelServer):
"""
In this class the predict method returns one result to each input
"""

def load(self):
pass

def predict(self, body: dict) -> list:
inputs = body.get("inputs")
if isinstance(inputs[0], list) and len(inputs) == 600: # single image
outputs = 3
elif isinstance(inputs[0], list) and len(inputs) == 2 and len(inputs[0]) == 600:
outputs = [2, 2]
elif isinstance(inputs[0], list) or (
isinstance(inputs[0], str) and isinstance(inputs, list)
):
outputs = [inp[0] for inp in inputs]
else:
outputs = inputs[0]
return outputs


class OneToMany(mlrun.serving.V2ModelServer):
"""
In this class the predict method returns 5 port outputs result to each input
"""

def load(self):
pass

def predict(self, body: dict) -> list:
inputs = body.get("inputs")
if isinstance(inputs[0], list) or (
isinstance(inputs[0], str) and isinstance(inputs, list)
):
outputs = [[inp[0], inp[0], 3.0, "a", 5] for inp in inputs]
else:
outputs = [inputs[0], inputs[0], 3.0, "a", 5]
return outputs
170 changes: 170 additions & 0 deletions tests/system/model_monitoring/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent
import concurrent.futures
import json
import pickle
import time
Expand All @@ -32,7 +34,9 @@
import mlrun
import mlrun.common.schemas.model_monitoring.constants as mm_constants
import mlrun.feature_store
import mlrun.feature_store as fstore
import mlrun.model_monitoring.api
from mlrun.datastore.targets import ParquetTarget
from mlrun.model_monitoring import TrackingPolicy
from mlrun.model_monitoring.application import ModelMonitoringApplicationBase
from mlrun.model_monitoring.applications.histogram_data_drift import (
Expand Down Expand Up @@ -450,3 +454,169 @@ def test_enable_model_monitoring(self) -> None:
]
== "1m"
)


@TestMLRunSystem.skip_test_if_env_not_configured
@pytest.mark.enterprise
class TestAllKindOfServing(TestMLRunSystem):
project_name = "test-mm-serving"
# Set image to "<repo>/mlrun:<tag>" for local testing
image: typing.Optional[str] = None

@classmethod
def custom_setup_class(cls) -> None:
random_rgb_image_list = (
np.random.randint(0, 256, (20, 30, 3), dtype=np.uint8)
.reshape(-1, 3)
.tolist()
)
cls.models = {
"int_one_to_one": {
"name": "serving_1",
"model_name": "int_one_to_one",
"class_name": "OneToOne",
"data_point": [1, 2, 3],
"schema": ["f0", "f1", "f2", "p0"],
},
"int_one_to_many": {
"name": "serving_2",
"model_name": "int_one_to_many",
"class_name": "OneToMany",
"data_point": [1, 2, 3],
"schema": ["f0", "f1", "f2", "p0", "p1", "p2", "p3", "p4"],
},
"str_one_to_one": {
"name": "serving_3",
"model_name": "str_one_to_one",
"class_name": "OneToOne",
"data_point": "input_str",
"schema": ["f0", "p0"],
},
"str_one_to_many": {
"name": "serving_4",
"model_name": "str_one_to_many",
"class_name": "OneToMany",
"data_point": "input_str",
"schema": ["f0", "p0", "p1", "p2", "p3", "p4"],
},
"img_one_to_one": {
"name": "serving_5",
"model_name": "img_one_to_one",
"class_name": "OneToOne",
"data_point": random_rgb_image_list,
"schema": [f"f{i}" for i in range(600)] + ["p0"],
},
"int_and_str_one_to_one": {
"name": "serving_6",
"model_name": "int_and_str_one_to_one",
"class_name": "OneToOne",
"data_point": [1, "a", 3],
"schema": ["f0", "f1", "f2", "p0"],
},
}

def _log_model(self, model_name) -> None:
self.project.log_model(
model_name,
model_dir=str((Path(__file__).parent / "assets").absolute()),
model_file="model.pkl",
)

@classmethod
def _deploy_model_serving(
cls, name: str, model_name: str, class_name: str, **kwargs
) -> mlrun.runtimes.nuclio.serving.ServingRuntime:
serving_fn = mlrun.code_to_function(
project=cls.project_name,
name=name,
filename=f"{str((Path(__file__).parent / 'assets').absolute())}/models.py",
kind="serving",
)
serving_fn.add_model(
model_name,
model_path=f"store://models/{cls.project_name}/{model_name}:latest",
class_name=class_name,
)
serving_fn.set_tracking()
if cls.image is not None:
serving_fn.spec.image = serving_fn.spec.build.image = cls.image

serving_fn.deploy()
return typing.cast(mlrun.runtimes.nuclio.serving.ServingRuntime, serving_fn)

def _test_endpoint(self, model_name, feature_set_uri) -> dict[str, typing.Any]:
model_dict = self.models[model_name]
serving_fn = self.project.get_function(model_dict.get("name"))
data_point = model_dict.get("data_point")

serving_fn.invoke(
f"v2/models/{model_name}/infer",
json.dumps(
{"inputs": [data_point]},
),
)
serving_fn.invoke(
f"v2/models/{model_name}/infer",
json.dumps({"inputs": [data_point, data_point]}),
)
time.sleep(
mlrun.mlconf.model_endpoint_monitoring.parquet_batching_timeout_secs + 10
)

offline_response_df = ParquetTarget(
name="temp",
path=fstore.get_feature_set(feature_set_uri).spec.targets[0].path,
).as_df()

is_schema_saved = set(model_dict.get("schema")).issubset(
offline_response_df.columns
)
has_all_the_events = offline_response_df.shape[0] == 3

return {
"model_name": model_name,
"is_schema_saved": is_schema_saved,
"has_all_the_events": has_all_the_events,
}

def test_all(self) -> None:
self.project.enable_model_monitoring(
image=self.image or "mlrun/mlrun",
base_period=1,
deploy_histogram_data_drift_app=False,
)
futures = []
with ThreadPoolExecutor() as executor:
for model_name, model_dict in self.models.items():
self._log_model(model_name)
future = executor.submit(self._deploy_model_serving, **model_dict)
futures.append(future)

for future in concurrent.futures.as_completed(futures):
future.result()

futures_2 = []
with ThreadPoolExecutor() as executor:
self.db = mlrun.model_monitoring.get_model_endpoint_store(
project=self.project_name
)
endpoints = self.db.list_model_endpoints()
for endpoint in endpoints:
future = executor.submit(
self._test_endpoint,
model_name=endpoint[mm_constants.EventFieldType.MODEL].split(":")[
0
],
feature_set_uri=endpoint[
mm_constants.EventFieldType.FEATURE_SET_URI
],
)
futures_2.append(future)

for future in concurrent.futures.as_completed(futures_2):
res_dict = future.result()
assert res_dict[
"is_schema_saved"
], f"For {res_dict['model_name']} the schema of parquet is missing columns"

assert res_dict["has_all_the_events"], "Not all the events were saved"