### MLFlow model registry

The key idea in this notebook is to do the same done in the ui.
We take the last four logged experiments in duration-prediction_custom.ipynb

start mlflow ui: $ mlflow ui --backend-store-uri sqlite:///mlflow.db

In [48]:
from mlflow.tracking import MlflowClient
from mlflow import register_model, set_tracking_uri
import mlflow
import os

MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
client.get_experiment(1)

<Experiment: artifact_location='/home/yezer/projects/mlops-zoomcamp/02-experiment-tracking/mlruns/1', creation_time=1747818150363, experiment_id='1', last_update_time=1747818150363, lifecycle_stage='active', name='NYC-taxi-duration-prediction', tags={}>

Create a new experiment

to completely remove an experiment, apart from:
experiment = client.get_experiment_by_name(new_exp)
client.delete_experiment(experiment_id=experiment.experiment_id)

we need in command line:
$ mlflow gc --backend-store-uri sqlite:///mlflow.db

In [2]:
new_exp = "fake-exp-02"
exps = []
client.search_experiments()
for experiment in client.search_experiments():
    print(f"experiment: {experiment.name}")
    exps.append(experiment.name)

experiment = client.get_experiment_by_name(new_exp)

if new_exp not in exps:
    client.create_experiment(new_exp)
else:
    print(f"exp: {new_exp} exists")

experiment: fake-exp-02
experiment: NYC-taxi-duration-prediction
exp: fake-exp-02 exists


In [3]:
from mlflow.entities import ViewType

runs = client.search_runs(
    experiment_ids="1",
    filter_string="metrics.rmse < 10.0 and tags.developer = 'yezer'",
    order_by=["metrics.rmse ASC"],
    run_view_type=ViewType.ACTIVE_ONLY,
)
for run in runs:
    print(f"run id: {run.info.run_id}, rmse: {run.data.metrics['rmse']:.4f}, {run.info.run_name}")

run id: 907ceeaca0a140b587ba34fa2afbe49f, rmse: 5.3333, nervous-squirrel-736
run id: b02425fbf51e4fd9920677535f5a0f01, rmse: 5.3611, classy-penguin-714
run id: 6e60d9492a0a48928ec46375026969bc, rmse: 6.0004, gaudy-sloth-127
run id: 90edc445a6a0470b8fc5babbed0f338c, rmse: 8.5877, incongruous-gnu-622


In [11]:
# register models with commands. SO far, the models are logged but not registered.
run_id = "2cc0ea6b57ee46648d49a29ef374eb83"
model_uri = f"runs:/{run_id}/models_mlflow"  # go to run annd check if the path exists
set_tracking_uri(MLFLOW_TRACKING_URI)
result = register_model(
    model_uri=model_uri,
    name="nyc-taxi-regressor",
)

# we can add an aliases in ui

Registered model 'nyc-taxi-regressor' already exists. Creating a new version of this model...
Created version '2' of model 'nyc-taxi-regressor'.


In [38]:
model_name = "nyc-taxi-regressor"
versions = client.search_model_versions(filter_string=f"name='{model_name}'")
print(versions)
for version in versions:
    print(f"version: {version.version}, tags: {version.tags} run_id:{version.run_id}")

[<ModelVersion: aliases=[], creation_timestamp=1747925245796, current_stage='None', description='', last_updated_timestamp=1747925245796, name='nyc-taxi-regressor', run_id='90edc445a6a0470b8fc5babbed0f338c', run_link='', source='/home/yezer/projects/mlops-zoomcamp/02-experiment-tracking/mlruns/1/90edc445a6a0470b8fc5babbed0f338c/artifacts/models_mlflow', status='READY', status_message=None, tags={'status': 'production', 'validation_status': 'approved'}, user_id=None, version=4>, <ModelVersion: aliases=[], creation_timestamp=1747924659402, current_stage='None', description='', last_updated_timestamp=1747924659402, name='nyc-taxi-regressor', run_id='907ceeaca0a140b587ba34fa2afbe49f', run_link='', source='/home/yezer/projects/mlops-zoomcamp/02-experiment-tracking/mlruns/1/907ceeaca0a140b587ba34fa2afbe49f/artifacts/models_mlflow', status='READY', status_message=None, tags={'status': 'stage', 'validation_status': 'approved'}, user_id=None, version=3>, <ModelVersion: aliases=[], creation_time

In [44]:
# It is important to delete the alias before deleting the model version
# whether not the model can see and list the aliases of former registered models
client.delete_registered_model_alias(name="nyc-taxi-regressor", alias="tontito")

### Comparing versions of registered models with new data, to select the new champion model.

steps
1. Load the test dataset, which corresponds to the NYC Green Taxi data from the month of March 2023.
2. Download the DictVectorizer that was fitted using the training data and saved to MLflow as an artifact, and load it with pickle.
3. Preprocess the test set using the DictVectorizer so we can properly feed the regressors.
4. Make predictions on the test set using the model versions that are currently tagged and compare their performance.
Based on the results, update the tag model version accordingly.

In [47]:
import pandas as pd
from sklearn.metrics import root_mean_squared_error
import shutil
import pickle
from mlflow.tracking import MlflowClient

MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)


def read_dataframe(filename):
    df = pd.read_parquet(filename)

    df.lpep_dropoff_datetime = pd.to_datetime(df.lpep_dropoff_datetime)
    df.lpep_pickup_datetime = pd.to_datetime(df.lpep_pickup_datetime)

    df["duration"] = df.lpep_dropoff_datetime - df.lpep_pickup_datetime
    df.duration = df.duration.apply(lambda td: td.total_seconds() / 60)

    df = df[(df.duration >= 1) & (df.duration <= 60)]

    categorical = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)

    return df


def preprocess(df, dv):
    df["PU_DO"] = df["PULocationID"] + "_" + df["DOLocationID"]
    cat = ["PU_DO"]
    num = ["trip_distance"]
    target = "duration"
    df_dicts = df[cat + num].to_dict(orient="records")
    X_ = dv.transform(df_dicts)
    y_ = df[target].values
    return X_, y_


def test_model(model_path, X_test, y_test):
    model = mlflow.pyfunc.load_model(model_path)
    y_pred = model.predict(X_test)
    return {"rmse": root_mean_squared_error(y_test, y_pred)}

##### get metrics over the registered models over the new data

In [70]:
model_name = "nyc-taxi-regressor"
data_path = "TAXI_DATA_FOLDER/green_tripdata_2023-03.parquet"

model_ranking = {}
registered_model = client.get_registered_model(model_name)
for key, value in registered_model.aliases.items():
    retrieved_model = client.get_model_version(model_name, value)
    # we only check the validated models, defined in tags
    if retrieved_model.tags.get("validation_status") == "approved":
        print(
            f"\nAPPROVED | Aliases: {key} -- Version: {value} tags: {retrieved_model.tags} run_id: {retrieved_model.run_id}"
        )
        model_path = client.get_model_version_download_uri(
            name=model_name, version=value
        )
        if os.path.exists("preprocessor"):
            shutil.rmtree("preprocessor")
        os.makedirs("preprocessor", exist_ok=True)
        client.download_artifacts(
            run_id=retrieved_model.run_id,
            path="preprocessor/preprocessor.b",
            dst_path="preprocessor",
        )
        with open("preprocessor/preprocessor/preprocessor.b", "rb") as f_in:
            dv = pickle.load(f_in)
        df = read_dataframe(data_path)
        X_test, y_test = preprocess(df, dv)
        rmse = test_model(model_path=model_path, X_test=X_test, y_test=y_test)
        model_ranking[key] = {"run_id": retrieved_model.run_id, "version": value, "rmse": rmse}
    else:
        print(
            f"\nNOT APPROVED | Aliases: {key} -- Version: {value} Tags: {retrieved_model.tags}"
        )

model_ranking


APPROVED | Aliases: challenger1 -- Version: 3 tags: {'validation_status': 'approved', 'status': 'stage'} run_id: 907ceeaca0a140b587ba34fa2afbe49f


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]


NOT APPROVED | Aliases: challenger2 -- Version: 2 Tags: {'validation_status': 'pending', 'status': 'stage'}

APPROVED | Aliases: challenger3 -- Version: 4 tags: {'validation_status': 'approved', 'status': 'production'} run_id: 90edc445a6a0470b8fc5babbed0f338c


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]


APPROVED | Aliases: champion -- Version: 1 tags: {'validation_status': 'approved', 'status': 'archived'} run_id: b02425fbf51e4fd9920677535f5a0f01


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

{'challenger1': {'run_id': '907ceeaca0a140b587ba34fa2afbe49f',
  'version': 3,
  'rmse': {'rmse': 5.580916358639859}},
 'challenger3': {'run_id': '90edc445a6a0470b8fc5babbed0f338c',
  'version': 4,
  'rmse': {'rmse': 8.831252416938195}},
 'champion': {'run_id': 'b02425fbf51e4fd9920677535f5a0f01',
  'version': 1,
  'rmse': {'rmse': 5.594412005390787}}}

In [76]:
# Ensure 'champion' alias exists and get its data
if "champion" not in model_ranking:
    print("No 'champion' model found in model_ranking. Skipping alias interchange.")
else:
    champion_data = model_ranking["champion"]
    champion_rmse = champion_data["rmse"]["rmse"]
    champion_version = champion_data["version"]
    print(f"Current Champion: Alias 'champion', Version {champion_version}, RMSE: {champion_rmse:.4f}")

    best_challenger_alias = None
    # Initialize with a value slightly higher than champion's RMSE to find any challenger that is better
    # Or, more simply, initialize with champion's RMSE and look for strictly smaller RMSE.
    min_rmse_among_challengers = champion_rmse
    best_challenger_version = None
    best_challenger_run_id = None  # For more informative print statements

    # Iterate over all models in model_ranking to find the best one that is not the current champion
    for alias, data in model_ranking.items():
        if alias == "champion":
            continue  # Skip the current champion itself

        current_challenger_rmse = data["rmse"]["rmse"]
        # We are looking for a model with strictly less RMSE
        if current_challenger_rmse < min_rmse_among_challengers:
            min_rmse_among_challengers = current_challenger_rmse
            best_challenger_alias = alias
            best_challenger_version = data["version"]
            best_challenger_run_id = data["run_id"]

    if best_challenger_alias:
        print(
            f"\nNew best model identified: Alias '{best_challenger_alias}', \
            Version {best_challenger_version}, RMSE: {min_rmse_among_challengers:.4f}"
        )
        print(
            f"This model is better than the current Champion (Version {champion_version}, \
                RMSE: {champion_rmse:.4f})."
        )

        print("\nInterchanging aliases:")
        print(f"1. Promoting Version {best_challenger_version} (formerly '{best_challenger_alias}') to 'champion'.")
        client.set_registered_model_alias(name=model_name, alias="champion", version=best_challenger_version)

        print(f"2. Assigning alias '{best_challenger_alias}' to former Champion (Version {champion_version}).")
        client.set_registered_model_alias(name=model_name, alias=best_challenger_alias, version=champion_version)

        print("\nAlias interchange complete.")


Current Champion: Alias 'champion', Version 1, RMSE: 5.5944

New best model identified: Alias 'challenger1',             Version 3, RMSE: 5.5809
This model is better than the current Champion (Version 1,                 RMSE: 5.5944).

Interchanging aliases:
1. Promoting Version 3 (formerly 'challenger1') to 'champion'.
2. Assigning alias 'challenger1' to former Champion (Version 1).
