# Register machine learning model

This notebook outlines a workflow for registering a machine learning model from a MLFlow run. A `python_function` MLFlow model object will be created to perform classification, drift detection and outlier detection.


#### Import dependencies, define notebook parameters and constants


In [None]:
import os
import json
import yaml
import joblib
import mlflow
import pandas as pd
import importlib.metadata

from mlflow.tracking import MlflowClient
from alibi_detect.od import IForest
from alibi_detect.cd import TabularDrift

In [None]:
# define notebook parameters
dbutils.widgets.text("model_name", "credit-default-uci-custom")

dbutils.widgets.text("experiment_name", "/online-inference-containers-examples")

dbutils.widgets.text(
    "curated_dataset_table", "hive_metastore.default.credit_default_uci_curated"
)

In [None]:
# define target column
TARGET = ["default_payment_next_month"]

# define categorical feature columns
CATEGORICAL_FEATURES = [
    "sex",
    "education",
    "marriage",
    "repayment_status_1",
    "repayment_status_2",
    "repayment_status_3",
    "repayment_status_4",
    "repayment_status_5",
    "repayment_status_6",
]

# define numeric feature columns
NUMERIC_FEATURES = [
    "credit_limit",
    "age",
    "bill_amount_1",
    "bill_amount_2",
    "bill_amount_3",
    "bill_amount_4",
    "bill_amount_5",
    "bill_amount_6",
    "payment_amount_1",
    "payment_amount_2",
    "payment_amount_3",
    "payment_amount_4",
    "payment_amount_5",
    "payment_amount_6",
]

# define all features
FEATURES = CATEGORICAL_FEATURES + NUMERIC_FEATURES

# define sample data for inference
INPUT_SAMPLE = [
    {
        "sex": "male",
        "education": "university",
        "marriage": "married",
        "repayment_status_1": "duly_paid",
        "repayment_status_2": "duly_paid",
        "repayment_status_3": "duly_paid",
        "repayment_status_4": "duly_paid",
        "repayment_status_5": "no_delay",
        "repayment_status_6": "no_delay",
        "credit_limit": 18000.0,
        "age": 33.0,
        "bill_amount_1": 764.95,
        "bill_amount_2": 2221.95,
        "bill_amount_3": 1131.85,
        "bill_amount_4": 5074.85,
        "bill_amount_5": 3448.0,
        "bill_amount_6": 1419.95,
        "payment_amount_1": 2236.5,
        "payment_amount_2": 1137.55,
        "payment_amount_3": 5084.55,
        "payment_amount_4": 111.65,
        "payment_amount_5": 306.9,
        "payment_amount_6": 805.65,
    }
]

# define sample response for inference
OUTPUT_SAMPLE = {"predictions": [0.02]}

#### Build drift detector and write models


In [None]:
# read and process curated data
df = spark.read.table(dbutils.widgets.get("curated_dataset_table")).toPandas()

# build drift model
categories_per_feature = {i: None for i in range(len(CATEGORICAL_FEATURES))}
drift = TabularDrift(
    df[CATEGORICAL_FEATURES + NUMERIC_FEATURES].values,
    p_val=0.05,
    categories_per_feature=categories_per_feature,
)

# build outlier model
outlier = IForest(threshold=0.95)
outlier.fit(df[NUMERIC_FEATURES].values)

In [None]:
# get best run id from task values
best_run_id = dbutils.jobs.taskValues.get(
    taskKey="train_model", key="best_run_id", debugValue="your-run-id"
)

# load best model
classifier = mlflow.pyfunc.load_model(f"runs:/{best_run_id}/model")

# write drift model and outlier model
os.makedirs("/tmp/models", exist_ok=True)
joblib.dump(drift, "/tmp/models/drift.pkl")
joblib.dump(outlier, "/tmp/models/outlier.pkl")

# write classifier model
client = MlflowClient()
classifier_model_path = "/tmp/models/classifier"
os.makedirs(classifier_model_path, exist_ok=True)
client.download_artifacts(best_run_id, "model", classifier_model_path)

#### Create custom MLFlow Pyfunc model


In [None]:
class CustomModel(mlflow.pyfunc.PythonModel):
    """
    Custom model for classification outlier and feature drift detection.
    """

    def __init__(
        self, categorical_feature_names: list[str], numeric_feature_names: list[str]
    ):
        self.categorical_features = categorical_feature_names
        self.numeric_features = numeric_feature_names
        self.all_features = categorical_feature_names + numeric_feature_names

    def load_context(self, context):
        self.classifier = joblib.load(
            os.path.join(
                context.artifacts["artifacts_path"], "classifier/model/model.pkl"
            )
        )
        self.drift = joblib.load(
            os.path.join(context.artifacts["artifacts_path"], "drift.pkl")
        )
        self.outliers = joblib.load(
            os.path.join(context.artifacts["artifacts_path"], "outlier.pkl")
        )

    def predict(self, context, model_input):
        # convert to pandas dataframe
        df = pd.DataFrame(model_input)

        # generate predictions, drift results, and  outlier results
        predictions = self.classifier.predict_proba(df[self.all_features])[
            :, 1
        ].tolist()
        drift_results = self.drift.predict(df[self.all_features].values)
        outlier_results = self.outliers.predict(df[self.numeric_features].values)

        # format response
        response = {
            "predictions": predictions,
            "outliers": outlier_results["data"]["is_outlier"].tolist(),
            "feature_drift_batch": dict(
                zip(
                    CATEGORICAL_FEATURES + NUMERIC_FEATURES,
                    (1 - drift_results["data"]["p_val"]).tolist(),
                )
            ),
        }

        return response

#### Register custom MLFlow model


In [None]:
# load base conda file
with open("/tmp/models/classifier/model/conda.yaml", "r") as f:
    base_conda_env = yaml.safe_load(f)

# define extra pip dependencies
extra_pip_dependencies = [
    f"{library}=={importlib.metadata.version(library)}"
    for library in ["alibi-detect", "joblib", "numpy", "pandas"]
]

# update base conda file
updated_conda_env = base_conda_env.copy()
updated_conda_env["dependencies"][-1]["pip"] = (
    base_conda_env["dependencies"][-1]["pip"] + extra_pip_dependencies
)

In [None]:
mlflow.set_experiment(dbutils.widgets.get("experiment_name"))
with mlflow.start_run(run_name="credit-default-uci-register") as run:
    # create instance of custom model
    model_artifact = CustomModel(
        categorical_feature_names=CATEGORICAL_FEATURES,
        numeric_feature_names=NUMERIC_FEATURES,
    )

    # log model
    mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=model_artifact,
        artifacts={"artifacts_path": "/tmp/models"},
        conda_env=updated_conda_env,
        input_example=INPUT_SAMPLE,
        signature=False,
    )

    mlflow.end_run()

In [None]:
# get best run id from task values
best_run_id = dbutils.jobs.taskValues.get(
    taskKey="train_model", key="best_run_id", debugValue="your-run-id"
)

# register drift model to MLFlow model registry
registered_model = mlflow.register_model(
    f"runs:/{run.info.run_id}/model",
    dbutils.widgets.get("model_name"),
    tags={"best_classifier_model_run_id": best_run_id},
)

#### Return notebook outputs


In [None]:
# return notebook output
json_output = json.dumps(
    {
        "output": {
            "MODEL_NAME": registered_model.name,
            "MODEL_VERSION": registered_model.version,
        }
    }
)

dbutils.notebook.exit(json_output)