In [4]:
import mlrun
import os
%config Completer.use_jedi = False

In [5]:
project = mlrun.load_project(name="load-project", url="git://github.com/amit-elbaz/load_project.git", context="./context",clone=True, user_project=True)

In [None]:
print(project.to_yaml())

### Regular jobs

In [None]:
for i in range(3):
    job_num = str(i)
    
    # Every 30 min
    project.run_function("normal_job_sec", name=f"sched_job_sec_30m_{job_num}", schedule="30 * * * *", watch=False)
    project.run_function("normal_job_min", name=f"sched_job_min_30m_{job_num}", schedule="30 * * * *", watch=False)
    project.run_function("normal_job_hour", name=f"sched_job_hour_30m_{job_num}", schedule="30 * * * *", watch=False)
    
    # Every 1 hour
    project.run_function("normal_job_sec", name=f"sched_job_sec_1h_{job_num}", schedule="0 * * * *", watch=False)
    project.run_function("normal_job_min", name=f"sched_job_min_1h_{job_num}", schedule="0 * * * *", watch=False)
    project.run_function("normal_job_hour", name=f"sched_job_hour_1h_{job_num}", schedule="0 * * * *", watch=False)

    # Every 4 hours
    project.run_function("normal_job_sec", name=f"sched_job_sec_4h_{job_num}", schedule="0 */4 * * *", watch=False)
    project.run_function("normal_job_min", name=f"sched_job_min_4h_{job_num}", schedule="0 */4 * * *", watch=False)
    project.run_function("normal_job_hour", name=f"sched_job_hour_4h_{job_num}", schedule="0 */4 * * *", watch=False)
    
    # Every 24 hours
    project.run_function("normal_job_sec", name=f"sched_job_sec_24h_{job_num}", schedule="0 0 */1 * *", watch=False)
    project.run_function("normal_job_min", name=f"sched_job_min_24h_{job_num}", schedule="0 0 */1 * *", watch=False)
    project.run_function("normal_job_hour", name=f"sched_job_hour_24h_{job_num}", schedule="0 0 */1 * *", watch=False)

    # Every 3 days
    project.run_function("normal_job_sec", name=f"sched_job_sec_3d_{job_num}", schedule="0 0 */3 * *", watch=False)
    project.run_function("normal_job_min", name=f"sched_job_min_3d_{job_num}", schedule="0 0 */3 * *", watch=False)
    project.run_function("normal_job_hour", name=f"sched_job_hour_3d_{job_num}", schedule="0 0 */3 * *", watch=False)

### Spark

In [None]:
spark_func = mlrun.code_to_function(name="spark-read",
                                    kind="spark",
                                    handler="spark_handler",
                                    filename="spark_jobs_func.py", requirements=["scikit-learn"]
                                   ).apply(mlrun.auto_mount())
spark_func.with_executor_requests(cpu="1",mem="1G")
spark_func.with_driver_requests(cpu="1",mem="1G")
spark_func.with_driver_limits(cpu="1")
spark_func.with_executor_limits(cpu="1")
spark_func.with_igz_spark()
spark_func.spec.image_pull_policy = "Always"
spark_func.spec.replicas = 2

In [None]:
spark_func.deploy()

In [None]:
spark_func.run(schedule="0 * * * *")

### Dask

In [None]:
dask_cluster = mlrun.new_function("dask-cluster", kind='dask', image='mlrun/ml-models')
dask_cluster.apply(mlrun.mount_v3io())        # add volume mounts
dask_cluster.spec.service_type = "NodePort"   # open interface to the dask UI dashboard
dask_cluster.spec.replicas = 1             # define one container
dask_cluster.set_env("MLRUN_DBPATH",os.environ["MLRUN_DBPATH"])
dask_cluster.set_env("MLRUN_DEFAULT_PROJECT",project.name)
uri = dask_cluster.save()

In [None]:
project.run_function('dask_func',hyperparams={"i":[1,10,20,30,40]},schedule="0 * * * *", hyper_param_options={"strategy":"list","parallel_runs":1,"dask_cluster_uri":uri,"teardown_dask":True})

### Nuclio

In [None]:
for i in range(3):
    job_num = str(i)
    # Create a simple nuclio function
    project.deploy_function("nuclio_func", tag=job_num)


### Serving

In [None]:
models_dir = mlrun.get_sample_path("models/serving/")

suffix = (
    mlrun.__version__.split("-")[0].replace(".", "_")
    if sys.version_info[1] > 7
    else "3.7"
)


framework_sklearn = "sklearn"  # change to 'keras' to try the 2nd option
kwargs = {}

serving_class = "mlrun.frameworks.sklearn.SklearnModelServer"
model_path = models_dir + f"sklearn-{suffix}.pkl"
image = "mlrun/mlrun"

model_object = project.log_model(f"{framework_sklearn}-model", model_file=model_path, **kwargs)
serving_sklearn = mlrun.new_function("serving_sklearn", image=image, kind="serving", requirements=["scikit-learn"])
serving_sklearn.add_model(
    framework_sklearn, model_path=model_object.uri, class_name=serving_class, to_list=True
)
project.deploy_function(serving_sklearn)


framework_keras = "keras"
serving_class = "mlrun.frameworks.tf_keras.TFKerasModelServer"
model_path = models_dir + "keras.h5"
image = "mlrun/ml-models"  # or mlrun/ml-models-gpu when using GPUs
kwargs["labels"] = {"model-format": "h5"}

model_object = project.log_model(f"{framework_keras}-model", model_file=model_path, **kwargs)
serving_tensorf = mlrun.new_function("serving_tensorf", image=image, kind="serving", requirements=["tensorflow"])
serving_tensorf.add_model(
    framework_keras, model_path=model_object.uri, class_name=serving_class, to_list=True
)

project.deploy_function(serving_tensorf)

## Workflow

In [None]:
project.run(workflow_path='workflow.py',watch=True, schedule="0 * * * *")

## ------------------------------------------------------------------------------------------------------------------------

## Log models

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pickle

# Function that trains a RandomForestClassifier model on the 'iris' dataset, saves it as a pickle file and returns the training set and the model path
def create_iris_model():
    model_path = "iris_model.pkl"
    # Load dataset
    iris = load_iris()
    train_set = pd.DataFrame(
        iris["data"],
        columns=["sepal_length_cm", "sepal_width_cm", "petal_length_cm", "petal_width_cm"],
    )

    # Separate features and target variables
    X = iris.data
    y = iris.target

    # Split the data into training and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Create and train the Random Forest Classifier
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    # Save the trained model as pkl file
    with open(model_path, "wb") as f:
        pickle.dump(model, f)
        
    return train_set, model_path
    

# Function that gets the number of models to log, trains and saves a model on the 'iris' dataset and logs n models of it.
def log_n_models(context, number_of_models):
    train_set, model_path = create_iris_model()
    for num in number_of_models:      
        # Log the model
        model_name = f"model_num_{str(num)}"
        context.log_model(model_name, model_file=model_path, training_set=train_set, framework="sklearn")
        

# Function that gets the number of model servings and the number of models for each serving and deploys them
def create_model_servings(context, number_of_servings, number_of_models):
    project_name = mlrun.get_current_project().name
    
    for serving_num in number_of_servings:
        serving_fn = project.set_function("hub://v2_model_server", f"serving-func{str(serving_num)}", kind="serving", image="mlrun/mlrun")
        serving_fn.apply(mlrun.auto_mount())

        # Add the models to the serving function's routing spec
        for model_num in number_of_models:
            model_name = f"model_num_{str(model_num)}"
            serving_fn.add_model(model_name=model_name, model_path=f"store://models/{project_name}/{model_name}:latest")

        # (OPTIONAL) Create a tracking policy 
        # tracking_policy = {'default_batch_intervals':"*/1 * * * *", "default_batch_image":"mlrun/mlrun:1.6.0-rc15", "stream_image":"mlrun/mlrun:1.6.0-rc15", "default_controller_image":"mlrun/mlrun:1.6.0-rc15"}

        # Enable model monitoring (If you specified tracking_policy, pass it to 'tracking_policy' param)
        serving_fn.set_tracking()
        
        serving_fn.deploy_function()
        
        

project.enable_model_monitoring(base_period=2)





In [None]:
import evidently
from evidently.renderers.notebook_utils import determine_template
from evidently.report import Report
from evidently.suite.base_suite import Suite
from evidently.utils.dashboard import TemplateParams
from evidently.ui.workspace import Workspace
from evidently.ui.base import Project
from evidently.metrics import (
        ColumnDriftMetric,
        ColumnSummaryMetric,
        DatasetDriftMetric,
        DatasetMissingValuesMetric,
    )
from evidently.test_preset import DataDriftTestPreset
from evidently.test_suite import TestSuite
from evidently.ui.dashboards import (
        CounterAgg,
        DashboardConfig,
        DashboardPanelCounter,
        DashboardPanelPlot,
        PanelValue,
        PlotType,
        ReportFilter,
    )

from typing import Optional
from uuid import UUID


def create_demo_project(workspace_path: str) -> tuple[Workspace, Project]:
    workspace = Workspace.create(workspace_path)
    project = _create_evidently_project(workspace)
    return workspace, project


def _create_evidently_project(workspace: Workspace, id: Optional[UUID] = None, project_name="default", project_description="my description") -> Project:
    if id:
        project = Project(
            name=project_name,
            description=project_description,
            dashboard=DashboardConfig(name=project_name, panels=[]),
            id=id,
        )  # pyright: ignore[reportGeneralTypeIssues]
        project = workspace.add_project(project)
    else:
        project = workspace.create_project(project_name)
    project.description = project_description
    project.dashboard.add_panel(
        DashboardPanelCounter(
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            agg=CounterAgg.NONE,
            title="Income Dataset (iris)",
        )  # pyright: ignore[reportGeneralTypeIssues]
    )
    project.dashboard.add_panel(
        DashboardPanelCounter(
            title="Model Calls",
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            value=PanelValue(
                metric_id="DatasetMissingValuesMetric",
                field_path=DatasetMissingValuesMetric.fields.current.number_of_rows,
                legend="count",
            ),
            text="count",
            agg=CounterAgg.SUM,
            size=1,
        )  # pyright: ignore[reportGeneralTypeIssues]
    )
    project.dashboard.add_panel(
        DashboardPanelCounter(
            title="Share of Drifted Features",
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            value=PanelValue(
                metric_id="DatasetDriftMetric",
                field_path="share_of_drifted_columns",
                legend="share",
            ),
            text="share",
            agg=CounterAgg.LAST,
            size=1,
        )  # pyright: ignore[reportGeneralTypeIssues]
    )
    project.dashboard.add_panel(
        DashboardPanelPlot(
            title="Dataset Quality",
            filter=ReportFilter(metadata_values={}, tag_values=[]),
            values=[
                PanelValue(
                    metric_id="DatasetDriftMetric",
                    field_path="share_of_drifted_columns",
                    legend="Drift Share",
                ),
                PanelValue(
                    metric_id="DatasetMissingValuesMetric",
                    field_path=DatasetMissingValuesMetric.fields.current.share_of_missing_values,
                    legend="Missing Values Share",
                ),
            ],
            plot_type=PlotType.LINE,
        )  # pyright: ignore[reportGeneralTypeIssues]
    )
    project.save()
    return project




In [None]:
# A v3io path, for example in "/v3io/bigdata/evidently-folder"
evidently_project_path = ""
workspace, project_evid = create_demo_project(evidently_project_path)

In [None]:
%%writefile custom_evidently_app.py
import datetime
import pandas as pd
from sklearn.datasets import load_iris
from mlrun.common.schemas.model_monitoring.constants import (
    ResultKindApp,
    ResultStatusApp,
)
from mlrun.model_monitoring.application import ModelMonitoringApplicationResult
from mlrun.model_monitoring.evidently_application import (
    _HAS_EVIDENTLY,
    EvidentlyModelMonitoringApplicationBase,
)
if _HAS_EVIDENTLY:
    from evidently.metrics import (
        ColumnDriftMetric,
        ColumnSummaryMetric,
        DatasetDriftMetric,
        DatasetMissingValuesMetric,
    )
    from evidently.metric_preset import DataQualityPreset
    from evidently.report import Report
    from evidently.test_preset import DataDriftTestPreset
    from evidently.test_suite import TestSuite



    
class CustomEvidentlyMonitoringApp(EvidentlyModelMonitoringApplicationBase):
    name = "my-custom-evidently-class"

    def _lazy_init(self, *args, **kwargs) -> None:
        super()._lazy_init(*args, **kwargs)
        self._init_iris_data()

    def _init_iris_data(self) -> None:
        iris = load_iris()
        self.columns = [
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
        ] 
        self.train_set = pd.DataFrame(iris.data, columns=self.columns)

            
    def do_tracking(
    self,
    application_name: str,
    sample_df_stats: pd.DataFrame,
    feature_stats: pd.DataFrame,
    sample_df: pd.DataFrame,
    start_infer_time: pd.Timestamp,
    end_infer_time: pd.Timestamp,
    latest_request: pd.Timestamp,
    endpoint_id: str,
    output_stream_uri: str,
) -> ModelMonitoringApplicationResult:
        self.context.logger.info("Running evidently app")

        # self._init_iris_data()
        sample_df = sample_df[self.columns]

        # Create evidently reports
        data_drift_report = self.create_report(sample_df, end_infer_time)
        self.evidently_workspace.add_report(
            self.evidently_project_id, data_drift_report
        )
        
        data_quality_report = Report(metrics=[
            DataQualityPreset(),
        ])
        data_quality_report.run(reference_data=self.train_set, current_data=sample_df)

        # Create evidently test suite
        data_drift_test_suite = self.create_test_suite(sample_df, end_infer_time)
        self.evidently_workspace.add_test_suite(
            self.evidently_project_id, data_drift_test_suite
        )
        
        # Log the objects in iguazio
        self.log_evidently_object(data_drift_report, f"report_{str(end_infer_time)}")
        self.log_evidently_object(data_drift_test_suite, f"suite_{str(end_infer_time)}")
        self.log_evidently_object(data_quality_report, f"data_quality_report_{str(end_infer_time)}")
        
        # Log the dashboard in iguazio
        self.log_project_dashboard(None, end_infer_time + datetime.timedelta(minutes=1))

        self.context.logger.info("Logged evidently objects")
        
        return ModelMonitoringApplicationResult(
            application_name=self.name,
            endpoint_id=endpoint_id,
            start_infer_time=start_infer_time,
            end_infer_time=end_infer_time,
            result_name="data_drift_test",
            result_value=0.5,
            result_kind=ResultKindApp.data_drift,
            result_status=ResultStatusApp.potential_detection,
        )
    
    # Function that creates an evidently report
    def create_report(
    self, sample_df: pd.DataFrame, schedule_time: pd.Timestamp
) -> "Report":
        metrics = [
            DatasetDriftMetric(),
            DatasetMissingValuesMetric(),
        ]
        for col_name in self.columns:
            metrics.extend(
                [
                    ColumnDriftMetric(column_name=col_name, stattest="wasserstein"),
                    ColumnSummaryMetric(column_name=col_name),
                ]
            )

        data_drift_report = Report(
            metrics=metrics,
            timestamp=schedule_time,
        )

        data_drift_report.run(reference_data=self.train_set, current_data=sample_df)
        return data_drift_report

    # Function that creates an evidently test suite
    def create_test_suite(
        self, sample_df: pd.DataFrame, schedule_time: pd.Timestamp
    ) -> "TestSuite":
        data_drift_test_suite = TestSuite(
            tests=[DataDriftTestPreset()],
            timestamp=schedule_time,
        )

        data_drift_test_suite.run(reference_data=self.train_set, current_data=sample_df)
        return data_drift_test_suite

In [None]:
# Create the monitoring serving function
model_monitoring_func = project.set_model_monitoring_function(func="custom_evidently_app.py", application_class="CustomEvidentlyMonitoringApp", name="custom-evidently-class",
                                      image="mlrun/mlrun", requirements=["evidently==0.4.11"], evidently_workspace_path=workspace.path,
                                     evidently_project_id=str(project_evid.id))

model_monitoring_func.apply(mlrun.auto_mount())

model_monitoring_func.deploy()

## log dataset

In [None]:
%% writefile func.py
import random
import numpy as np

# Function that logs a large dataset
def func_dataset(context, num_rows, num_columns):
        
    # Generate random data
    data = np.random.rand(num_rows, num_columns)

    # Create column names
    columns = [f'Column_{i}' for i in range(num_columns)]

    # Create the DataFrame and give it a name with a random int suffix
    df = pd.DataFrame(data, columns=columns)
    context.log_dataset(f"mydf_{str(random.randint(0,num_rows))}", df)
        
    return 1