# Part 2: MLflow Model Registry: Alias-Based Model Management

> **Important:** Ensure that the MLflow tracking server started in Part 1 is still running before executing this notebook.

This notebook demonstrates how to use the MLflow Model Registry for experiment tracking, model versioning, and production workflows using aliases (e.g., 'staging', 'production').

## Tasks in this notebook:
1. Connect to the MLflow tracking server.
2. Find the best experiment runs.
3. Register models in the Model Registry.
4. Assign aliases to model versions.
5. Evaluate and compare model versions on a test set.
6. Promote the best model to production by updating the alias.

## 1. Connect to the MLflow tracking server

In [1]:
import os
import logging
from datetime import datetime

import pandas as pd
from sklearn.metrics import root_mean_squared_error

import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType
import pickle
logging.getLogger("sklearn").setLevel(logging.WARNING)

MLFLOW_TRACKING_URI = "sqlite:///mlflow.db"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
client = MlflowClient()

2025/06/29 11:16:09 INFO mlflow.store.db.utils: Creating initial MLflow database tables...
2025/06/29 11:16:09 INFO mlflow.store.db.utils: Updating database tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.


In [2]:
# List all experiments and print their details for better visibility
experiments = client.search_experiments()
for exp in experiments:
    print(f"ID: {exp.experiment_id}, Name: {exp.name}, Artifact Location: {exp.artifact_location}, Lifecycle Stage: {exp.lifecycle_stage}")

ID: 1, Name: nyc-taxi-experiment, Artifact Location: /workspaces/mlops-zoomcamp/02-experiment-tracking/nyc-taxi-mlflow/mlruns/1, Lifecycle Stage: active
ID: 0, Name: Default, Artifact Location: mlflow-artifacts:/0, Lifecycle Stage: active


In [3]:
# Create experiment only if it does not already exist
experiment_name = "nyc-taxi-mlflow-model-registry"
existing = [exp for exp in client.search_experiments() if exp.name == experiment_name]
if not existing:
    client.create_experiment(name=experiment_name)
else:
    print(f"Experiment '{experiment_name}' already exists (ID: {existing[0].experiment_id})")

## 2. Find the best experiment runs

In [4]:
runs = client.search_runs(
    experiment_ids='1',
    filter_string="metrics.val_rmse < 7",
    run_view_type=ViewType.ACTIVE_ONLY,
    max_results=5,
    order_by=["metrics.val_rmse ASC"]
)

for run in runs:
    print(f"run id: {run.info.run_id}, val_rmse: {run.data.metrics['val_rmse']:.4f}")

run id: af52b66621a145c38bba730ce67a6df6, val_rmse: 5.4726
run id: 143e65b964534d6b9d630e5d0bae6f75, val_rmse: 5.4727
run id: 8522b9038eff4e91bb2ece6dedae7844, val_rmse: 5.5057


## 3. Register models in the Model Registry

In [5]:
logging.getLogger("mlflow").setLevel(logging.ERROR)

# Only register runs that have not already been registered as a model version
model_name = "nyc-taxi-regressor"
existing_versions = client.search_model_versions(f"name='{model_name}'")
runid_to_version = {mv.run_id: mv.version for mv in existing_versions}

for idx, run in enumerate(runs, 1):
    run_id = run.info.run_id
    if run_id in runid_to_version:
        print(f"Run {run_id} is already registered as version {runid_to_version[run_id]}. Skipping.")
        continue
    model_uri = f"runs:/{run_id}/models"
    result = mlflow.register_model(model_uri=model_uri, name=model_name)
    print(f"Registered model version {idx} for run_id: {run_id}, version: {result.version}")

INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.


Registered model version 1 for run_id: af52b66621a145c38bba730ce67a6df6, version: 1
Registered model version 2 for run_id: 143e65b964534d6b9d630e5d0bae6f75, version: 2
Registered model version 3 for run_id: 8522b9038eff4e91bb2ece6dedae7844, version: 3


Successfully registered model 'nyc-taxi-regressor'.
Created version '1' of model 'nyc-taxi-regressor'.
Registered model 'nyc-taxi-regressor' already exists. Creating a new version of this model...
Created version '2' of model 'nyc-taxi-regressor'.
Registered model 'nyc-taxi-regressor' already exists. Creating a new version of this model...
Created version '3' of model 'nyc-taxi-regressor'.


## 4. Assign aliases to model versions
Assign human-friendly aliases (e.g., 'production', 'staging') to model versions for robust model management and CI/CD workflows. 

In [6]:
# Show only model versions with at least one alias assigned (not None or empty)
model_name = "nyc-taxi-regressor"
versions = client.search_model_versions(f"name='{model_name}'")

print(f"{'Version':<8} {'Run ID':<32} {'Aliases':<20} {'Status':<12}")
print("-" * 80)
for mv in versions:
    full_mv = client.get_model_version(model_name, mv.version)
    if full_mv.aliases:
        aliases = ', '.join(full_mv.aliases)
        print(f"{full_mv.version:<8} {full_mv.run_id:<32} {aliases:<20} {full_mv.status:<12}")


Version  Run ID                           Aliases              Status      
--------------------------------------------------------------------------------


In [7]:
# Assign the alias to the model version
new_alias = "production"
model_version = 1

client.set_registered_model_alias(
    name=model_name,
    alias=new_alias,
    version=model_version
)

print(f"Alias '{new_alias}' assigned to model version {model_version}.")

Alias 'production' assigned to model version 1.


In [8]:
# Assign the alias to the model version
new_alias = "staging"
model_version = 2

client.set_registered_model_alias(
    name=model_name,
    alias=new_alias,
    version=model_version
)

print(f"Alias '{new_alias}' assigned to model version {model_version}.")

Alias 'staging' assigned to model version 2.


In [9]:
# Refresh and print all model versions with at least one alias assigned
print(f"{'Version':<8} {'Run ID':<32} {'Aliases':<20} {'Status':<12}")
print("-" * 80)
for mv in versions:
    full_mv = client.get_model_version(model_name, mv.version)
    if full_mv.aliases:
        aliases = ', '.join(full_mv.aliases)
        print(f"{full_mv.version:<8} {full_mv.run_id:<32} {aliases:<20} {full_mv.status:<12}")

Version  Run ID                           Aliases              Status      
--------------------------------------------------------------------------------
2        143e65b964534d6b9d630e5d0bae6f75 staging              READY       
1        af52b66621a145c38bba730ce67a6df6 production           READY       


## 5. Evaluate and compare model versions on a test 
Evaluate model performance on the test set using the 'production' and 'staging' aliases. Use the results to decide which model to promote.

Compare the performance of different model versions on a test set and automatically update the production and staging aliases as needed.

**Automated Model Promotion Workflow:**

1. Load the test dataset (NYC Yellow Taxi data, March 2024).
2. Load the `DictVectorizer` (preprocessor) artifact for the current model version.
3. Preprocess the test set for model input.
4. Evaluate both the 'production' and 'staging' models on the test set and compare their RMSEs.
5. If the 'staging' model has a lower RMSE than 'production', promote it to 'production' and demote the previous 'production' model to 'staging'. Otherwise, keep the current assignments.

**Note:** Assigning the 'production' alias in the model registry does not actually deploy the model. You should complement the registry with CI/CD code for real deployment.

In [10]:
categorical = ['PU_DO']
numerical = ['trip_distance']

def read_data(filename):
    df = pd.read_parquet(filename)

    df['duration'] = df.tpep_dropoff_datetime - df.tpep_pickup_datetime
    df['duration'] = df.duration.dt.total_seconds() / 60

    df = df[(df.duration >= 1) & (df.duration <= 60)].copy()

    # Feature engineering: create PU_DO and cast to string
    df['PU_DO'] = df['PULocationID'].astype(str) + '_' + df['DOLocationID'].astype(str)
    df[categorical] = df[categorical].astype(str)

    return df, categorical, numerical

def preprocess(df, dv, categorical, numerical):
    dicts = df[categorical + numerical].to_dict(orient='records')
    return dv.transform(dicts)

def test_model(name, alias, X, y):
    """
    Loads a model from the MLflow Model Registry using an alias (e.g., 'production', 'staging').
    """
    model = mlflow.pyfunc.load_model(f"models:/{name}@{alias}")
    y_pred = model.predict(X)
    rmse = root_mean_squared_error(y, y_pred)
    print(f"{alias.capitalize()} model RMSE on test dataset: {rmse:.4f}")
    return rmse

In [11]:
# Define the test data path for yellow taxi March 2024 data
TEST_PATH = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet"
df_test, categorical, numerical = read_data(TEST_PATH)

In [12]:
# Set the alias to use for loading the DictVectorizer (e.g., 'production' or 'staging')
alias = 'production'  # or 'staging'

# Load DictVectorizer for the current model version (by alias)
run_id = client.get_model_version_by_alias(model_name, alias).run_id
experiment_id = client.get_run(run_id).info.experiment_id
artifact_dir = os.path.join("mlruns", experiment_id, run_id, "artifacts", "models")
artifact_file = next((os.path.join(artifact_dir, f) for f in os.listdir(artifact_dir) if f.endswith(".bin")), None)
if not artifact_file or not os.path.exists(artifact_file):
    raise FileNotFoundError(f"No .bin artifact found for run_id {run_id} in {artifact_dir}")
dv_lr_tuple = pickle.load(open(artifact_file, "rb"))
if isinstance(dv_lr_tuple, tuple) and len(dv_lr_tuple) == 2:
    dv = dv_lr_tuple[0]
else:
    raise ValueError(f"Expected a tuple (DictVectorizer, model) in {artifact_file}, got {type(dv_lr_tuple)}.")

In [13]:
X_test = preprocess(df_test, dv, categorical, numerical)

In [14]:
# Compute duration if not already present in df_test
if "duration" not in df_test.columns:
    df_test["duration"] = (df_test["tpep_dropoff_datetime"] - df_test["tpep_pickup_datetime"]).dt.total_seconds() / 60

# Assign y_test from the refactored test dataframe
y_test = df_test["duration"].values

In [15]:
# Make sure the desired alias (e.g., 'production') is assigned to a model version before running this cell

result = test_model(model_name, 'production', X_test, y_test)

Production model RMSE on test dataset: 5.7796


In [16]:
# Make sure the desired alias (e.g., 'staging') is assigned to a model version before running this cell

result = test_model(model_name, 'staging', X_test, y_test)

Staging model RMSE on test dataset: 5.7797


## 6. Promote the best model to production by updating the alias

In [17]:
# Compare RMSEs of production and staging, and promote the best to production
prod_rmse = test_model(model_name, 'production', X_test, y_test)
stag_rmse = test_model(model_name, 'staging', X_test, y_test)

# Get model versions for each alias
prod_version = client.get_model_version_by_alias(model_name, 'production').version
stag_version = client.get_model_version_by_alias(model_name, 'staging').version

Production model RMSE on test dataset: 5.7796
Staging model RMSE on test dataset: 5.7797


In [18]:
if prod_rmse <= stag_rmse:
    print(f"Production model (version {prod_version}) is better or equal (RMSE: {prod_rmse:.4f} <= {stag_rmse:.4f}). No change.")
else:
    # Promote staging to production and production to staging
    client.set_registered_model_alias(name=model_name, alias='production', version=stag_version)
    client.set_registered_model_alias(name=model_name, alias='staging', version=prod_version)
    print(f"Staging model (version {stag_version}) promoted to production (RMSE: {stag_rmse:.4f} < {prod_rmse:.4f}).")
    print(f"Previous production model (version {prod_version}) demoted to staging.")

Production model (version 1) is better or equal (RMSE: 5.7796 <= 5.7797). No change.


In [19]:
# Refresh and print all model versions with at least one alias assigned
print(f"{'Version':<8} {'Run ID':<32} {'Aliases':<20} {'Status':<12}")
print("-" * 80)
for mv in versions:
    full_mv = client.get_model_version(model_name, mv.version)
    if full_mv.aliases:
        aliases = ', '.join(full_mv.aliases)
        print(f"{full_mv.version:<8} {full_mv.run_id:<32} {aliases:<20} {full_mv.status:<12}")

Version  Run ID                           Aliases              Status      
--------------------------------------------------------------------------------
2        143e65b964534d6b9d630e5d0bae6f75 staging              READY       
1        af52b66621a145c38bba730ce67a6df6 production           READY       
