Skip to content

Commit

Permalink
[Model Monitoring] Add wait_for_deployment flag in create/deploy/up…
Browse files Browse the repository at this point in the history
…date APIs (#5433)
  • Loading branch information
jond01 committed Apr 28, 2024
1 parent 3bba8f5 commit 6a1e37e
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 47 deletions.
2 changes: 1 addition & 1 deletion mlrun/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ def update_model_monitoring_controller(
project: str,
base_period: int = 10,
image: str = "mlrun/mlrun",
):
) -> None:
pass

@abstractmethod
Expand Down
17 changes: 9 additions & 8 deletions mlrun/db/httpdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3214,7 +3214,7 @@ def update_model_monitoring_controller(
project: str,
base_period: int = 10,
image: str = "mlrun/mlrun",
):
) -> None:
"""
Redeploy model monitoring application controller function.
Expand All @@ -3224,13 +3224,14 @@ def update_model_monitoring_controller(
:param image: The image of the model monitoring controller function.
By default, the image is mlrun/mlrun.
"""

params = {
"image": image,
"base_period": base_period,
}
path = f"projects/{project}/model-monitoring/model-monitoring-controller"
self.api_call(method="POST", path=path, params=params)
self.api_call(
method=mlrun.common.types.HTTPMethod.POST,
path=f"projects/{project}/model-monitoring/model-monitoring-controller",
params={
"base_period": base_period,
"image": image,
},
)

def enable_model_monitoring(
self,
Expand Down
57 changes: 47 additions & 10 deletions mlrun/projects/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2024,12 +2024,24 @@ def _instantiate_model_monitoring_function(

return resolved_function_name, function_object, func

def _wait_for_functions_deployment(self, function_names: list[str]) -> None:
"""
Wait for the deployment of functions on the backend.
:param function_names: A list of function names.
"""
for fn_name in function_names:
fn = typing.cast(RemoteRuntime, self.get_function(key=fn_name))
fn._wait_for_function_deployment(db=fn._get_db())

def enable_model_monitoring(
self,
default_controller_image: str = "mlrun/mlrun",
base_period: int = 10,
image: str = "mlrun/mlrun",
*,
deploy_histogram_data_drift_app: bool = True,
wait_for_deployment: bool = False,
) -> None:
"""
Deploy model monitoring application controller, writer and stream functions.
Expand All @@ -2039,14 +2051,16 @@ def enable_model_monitoring(
The stream function goal is to monitor the log of the data stream. It is triggered when a new log entry
is detected. It processes the new events into statistics that are then written to statistics databases.
:param default_controller_image: Deprecated.
:param base_period: The time period in minutes in which the model monitoring controller
function is triggered. By default, the base period is 10 minutes.
:param image: The image of the model monitoring controller, writer, monitoring
stream & histogram data drift functions, which are real time nuclio
functions. By default, the image is mlrun/mlrun.
:param deploy_histogram_data_drift_app: If true, deploy the default histogram-based data drift application.
:param wait_for_deployment: If true, return only after the deployment is done on the backend.
Otherwise, deploy the model monitoring infrastructure on the
background, including the histogram data drift app if selected.
"""
if default_controller_image != "mlrun/mlrun":
# TODO: Remove this in 1.9.0
Expand All @@ -2064,37 +2078,55 @@ def enable_model_monitoring(
deploy_histogram_data_drift_app=deploy_histogram_data_drift_app,
)

if wait_for_deployment:
deployment_functions = mm_constants.MonitoringFunctionNames.list()
if deploy_histogram_data_drift_app:
deployment_functions.append(
mm_constants.HistogramDataDriftApplicationConstants.NAME
)
self._wait_for_functions_deployment(deployment_functions)

def deploy_histogram_data_drift_app(
self,
*,
image: str = "mlrun/mlrun",
db: Optional[mlrun.db.RunDBInterface] = None,
wait_for_deployment: bool = False,
) -> None:
"""
Deploy the histogram data drift application.
:param image: The image on which the application will run.
:param db: An optional DB object.
:param image: The image on which the application will run.
:param db: An optional DB object.
:param wait_for_deployment: If true, return only after the deployment is done on the backend.
Otherwise, deploy the application on the background.
"""
if db is None:
db = mlrun.db.get_run_db(secrets=self._secrets)
db.deploy_histogram_data_drift_app(project=self.name, image=image)

if wait_for_deployment:
self._wait_for_functions_deployment(
[mm_constants.HistogramDataDriftApplicationConstants.NAME]
)

def update_model_monitoring_controller(
self,
base_period: int = 10,
image: str = "mlrun/mlrun",
*,
wait_for_deployment: bool = False,
) -> None:
"""
Redeploy model monitoring application controller functions.
:param base_period: The time period in minutes in which the model monitoring controller function
is triggered. By default, the base period is 10 minutes.
:param image: The image of the model monitoring controller, writer & monitoring
stream functions, which are real time nuclio functions.
By default, the image is mlrun/mlrun.
:returns: model monitoring controller job as a dictionary.
:param base_period: The time period in minutes in which the model monitoring controller function
is triggered. By default, the base period is 10 minutes.
:param image: The image of the model monitoring controller, writer & monitoring
stream functions, which are real time nuclio functions.
By default, the image is mlrun/mlrun.
:param wait_for_deployment: If true, return only after the deployment is done on the backend.
Otherwise, deploy the controller on the background.
"""
db = mlrun.db.get_run_db(secrets=self._secrets)
db.update_model_monitoring_controller(
Expand All @@ -2103,6 +2135,11 @@ def update_model_monitoring_controller(
image=image,
)

if wait_for_deployment:
self._wait_for_functions_deployment(
[mm_constants.MonitoringFunctionNames.APPLICATION_CONTROLLER]
)

def disable_model_monitoring(
self, *, delete_histogram_data_drift_app: bool = True
) -> None:
Expand Down
16 changes: 15 additions & 1 deletion tests/projects/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2077,7 +2077,7 @@ class TestModelMonitoring:
@staticmethod
@pytest.fixture
def project() -> mlrun.projects.MlrunProject:
return unittest.mock.Mock() # spec_set=mlrun.projects.MlrunProject)
return unittest.mock.Mock()

@staticmethod
@pytest.mark.parametrize(
Expand Down Expand Up @@ -2111,3 +2111,17 @@ def test_disable(
assert (
deleted_fns == expected_deleted_fns
), "The deleted functions are different than expexted"

@staticmethod
def test_enable_wait_for_deployment(project: mlrun.projects.MlrunProject) -> None:
with unittest.mock.patch.object(
project, "_wait_for_functions_deployment", autospec=True
) as mock:
mlrun.projects.MlrunProject.enable_model_monitoring(
project, deploy_histogram_data_drift_app=False, wait_for_deployment=True
)

mock.assert_called_once()
assert (
mock.call_args_list[0].args[0] == mm_consts.MonitoringFunctionNames.list()
), "Expected to wait for the infra functions"
17 changes: 4 additions & 13 deletions tests/system/model_monitoring/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,21 +472,13 @@ def test_enable_model_monitoring(self) -> None:
image=self.image or "mlrun/mlrun"
)

self.project.enable_model_monitoring(image=self.image or "mlrun/mlrun")
self.project.enable_model_monitoring(
image=self.image or "mlrun/mlrun", wait_for_deployment=True
)

controller = self.project.get_function(
key=mm_constants.MonitoringFunctionNames.APPLICATION_CONTROLLER
)
writer = self.project.get_function(
key=mm_constants.MonitoringFunctionNames.WRITER
)
stream = self.project.get_function(
key=mm_constants.MonitoringFunctionNames.STREAM
)

controller._wait_for_function_deployment(db=controller._get_db())
writer._wait_for_function_deployment(db=writer._get_db())
stream._wait_for_function_deployment(db=stream._get_db())
assert (
controller.spec.config["spec.triggers.cron_interval"]["attributes"][
"interval"
Expand All @@ -495,13 +487,12 @@ def test_enable_model_monitoring(self) -> None:
)

self.project.update_model_monitoring_controller(
image=self.image or "mlrun/mlrun", base_period=1
image=self.image or "mlrun/mlrun", base_period=1, wait_for_deployment=True
)
controller = self.project.get_function(
key=mm_constants.MonitoringFunctionNames.APPLICATION_CONTROLLER,
ignore_cache=True,
)
controller._wait_for_function_deployment(db=controller._get_db())
assert (
controller.spec.config["spec.triggers.cron_interval"]["attributes"][
"interval"
Expand Down
16 changes: 2 additions & 14 deletions tests/system/model_monitoring/test_model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,13 +820,9 @@ def test_batch_drift(self):
base_period=1,
deploy_histogram_data_drift_app=True,
**({} if self.image is None else {"image": self.image}),
wait_for_deployment=True,
)

controller = self.project.get_function(
key=mm_constants.MonitoringFunctionNames.APPLICATION_CONTROLLER
)

controller._wait_for_function_deployment(db=controller._get_db())
# Generate a dataframe that will be written as a monitoring parquet
# This dataframe is basically replacing the result set that is being generated through the batch infer function
infer_results_df = pd.DataFrame(
Expand Down Expand Up @@ -1133,13 +1129,6 @@ def _log_model(self) -> str:
)
return model.uri

def _wait_for_deployments(self) -> None:
for fn_name in mm_constants.MonitoringFunctionNames.list() + [
mm_constants.HistogramDataDriftApplicationConstants.NAME
]:
fn = self.project.get_function(key=fn_name)
fn._wait_for_function_deployment(db=fn._get_db())

@classmethod
def _test_v3io_tsdb_record(cls) -> None:
tsdb_client = ModelMonitoringWriter._get_v3io_frames_client(
Expand Down Expand Up @@ -1169,10 +1158,9 @@ def test_record(self) -> None:
base_period=1,
deploy_histogram_data_drift_app=True,
**({} if self.image is None else {"image": self.image}),
wait_for_deployment=True,
)

self._wait_for_deployments()

model_uri = self._log_model()

mlrun.model_monitoring.api.record_results(
Expand Down

0 comments on commit 6a1e37e

Please sign in to comment.