
# MLflow Quick Test: Train → Log → Promote

This notebook lets you smoke-test your MLflow stack **without Spark** using synthetic data.

**What it does:**
1. Configures `MLFLOW_TRACKING_URI`
2. Creates a synthetic binary classification dataset
3. Trains **LogisticRegression** and **XGBoost** pipelines
4. Logs metrics and the model to MLflow (`credit_risk_training` experiment)
5. Promotes the best run for a chosen `train_date` into the **Model Registry**

> Tip: If you're running via Docker Compose, set the tracking URI to your MLflow service host (e.g., `http://a2-mlflow:5000`). If you're testing locally, use `http://localhost:5000`.


In [2]:

# If needed, uncomment to install packages (skip if your env already has them)
%pip install -q mlflow scikit-learn xgboost pandas numpy


[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pyopenssl 23.2.0 requires cryptography!=40.0.0,!=40.0.1,<42,>=38.0.0, but you have cryptography 46.0.3 which is incompatible.[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


In [3]:

import os, json, math, time, numpy as np, pandas as pd
from datetime import datetime
import mlflow
from mlflow.tracking import MlflowClient

# ========= CONFIG =========
MLFLOW_TRACKING_URI = os.environ.get("MLFLOW_TRACKING_URI", "http://localhost:5000")
EXPERIMENT_NAME     = "credit_risk_training"
MODEL_NAME          = "credit_risk_model"

# Use *today* as the training date tag by default (YYYY-MM-DD)
TRAIN_DATE = datetime.now().strftime("%Y-%m-%d")

print("Tracking URI :", MLFLOW_TRACKING_URI)
print("Experiment   :", EXPERIMENT_NAME)
print("Model Name   :", MODEL_NAME)
print("Train Date   :", TRAIN_DATE)

mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment(EXPERIMENT_NAME)
client = MlflowClient()


Tracking URI : http://localhost:5000
Experiment   : credit_risk_training
Model Name   : credit_risk_model
Train Date   : 2025-11-01


MlflowException: API request to http://localhost:5000/api/2.0/mlflow/experiments/get-by-name failed with exception HTTPConnectionPool(host='localhost', port=5000): Max retries exceeded with url: /api/2.0/mlflow/experiments/get-by-name?experiment_name=credit_risk_training (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f42e0bd1690>: Failed to establish a new connection: [Errno 111] Connection refused'))

In [None]:

from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
)

def cls_metrics(y_true, y_proba, thr=0.5):
    y_pred = (y_proba >= thr).astype(int)
    return {
        "accuracy":  accuracy_score(y_true, y_pred),
        "precision_micro": precision_score(y_true, y_pred, average="micro", zero_division=0),
        "precision_macro": precision_score(y_true, y_pred, average="macro", zero_division=0),
        "precision_weighted": precision_score(y_true, y_pred, average="weighted", zero_division=0),
        "recall_micro": recall_score(y_true, y_pred, average="micro", zero_division=0),
        "recall_macro": recall_score(y_true, y_pred, average="macro", zero_division=0),
        "recall_weighted": recall_score(y_true, y_pred, average="weighted", zero_division=0),
        "f1_micro": f1_score(y_true, y_pred, average="micro", zero_division=0),
        "f1_macro": f1_score(y_true, y_pred, average="macro", zero_division=0),
        "f1_weighted": f1_score(y_true, y_pred, average="weighted", zero_division=0),
    }

def resolve_logged_model_path(client: MlflowClient, run_id: str) -> str:
    """Find the artifact subpath that contains an MLflow model for this run."""
    run = client.get_run(run_id)
    hist_tag = run.data.tags.get("mlflow.log-model.history")
    # 1) Try history tag (most robust)
    if hist_tag:
        try:
            hist = json.loads(hist_tag)
            for entry in reversed(hist):
                cand = entry.get("artifact_path")
                if not cand:
                    continue
                try:
                    items = client.list_artifacts(run_id, cand)
                    if any(x.path.endswith("MLmodel") for x in items):
                        return cand
                except Exception:
                    pass
        except Exception:
            pass
    # 2) Fallback: scan root
    root = client.list_artifacts(run_id)
    for it in root:
        if it.is_dir:
            children = client.list_artifacts(run_id, it.path)
            if any(ch.path.endswith("MLmodel") for ch in children):
                return it.path
    # 3) Nothing found
    raise RuntimeError(f"No MLflow model folder (with MLmodel) found under run {run_id}.")


In [None]:

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

X, y = make_classification(
    n_samples=8000, n_features=30, n_informative=12, n_redundant=6,
    weights=[0.7, 0.3], random_state=42
)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)
X_val,   X_test, y_val, y_test   = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

X_train.shape, X_val.shape, X_test.shape


In [None]:

from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from mlflow.models import infer_signature

pipe_lr = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler",  StandardScaler()),
    ("clf",     LogisticRegression(max_iter=1000, class_weight="balanced"))
])
pipe_lr.fit(X_train, y_train)

# Eval
import numpy as np
proba_val  = pipe_lr.predict_proba(X_val)[:,1]
proba_test = pipe_lr.predict_proba(X_test)[:,1]

auc_val  = roc_auc_score(y_val,  proba_val)
auc_test = roc_auc_score(y_test, proba_test)
gini_val  = 2*auc_val  - 1
gini_test = 2*auc_test - 1

m_val  = cls_metrics(y_val,  proba_val)
m_test = cls_metrics(y_test, proba_test)

print(f"LR  AUC[val]={auc_val:.4f}  AUC[test]={auc_test:.4f}")

# Log to MLflow
mlflow.set_experiment(EXPERIMENT_NAME)
with mlflow.start_run(run_name=f"logreg_{TRAIN_DATE.replace('-','_')}"):
    mlflow.log_metrics({
        "auc_test": float(auc_test),
        "gini_test": float(gini_test),
        "accuracy_test": float(m_test["accuracy"]),
        "precision_weighted_test": float(m_test["precision_weighted"]),
        "recall_weighted_test": float(m_test["recall_weighted"]),
        "f1_weighted_test": float(m_test["f1_weighted"]),
        "auc_val": float(auc_val),
        "gini_val": float(gini_val),
    })
    mlflow.set_tags({"train_date": TRAIN_DATE, "flavor": "logreg", "source": "notebook"})
    sig = infer_signature(X_test[:5], pipe_lr.predict_proba(X_test[:5])[:,1])

    # Use sklearn flavor; name is optional, but keeps it consistent
    mlflow.sklearn.log_model(
        sk_model=pipe_lr,
        name="model",
        signature=sig,
        input_example=X_test[:5]
    )

print("Logged LogisticRegression run.")


In [None]:

import xgboost as xgb

pipe_xgb = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("clf", xgb.XGBClassifier(
        objective="binary:logistic", eval_metric="auc",
        n_estimators=200, learning_rate=0.1, max_depth=4,
        subsample=0.8, colsample_bytree=0.8, random_state=42, tree_method="hist"
    ))
])
pipe_xgb.fit(X_train, y_train)

proba_val_x  = pipe_xgb.predict_proba(X_val)[:,1]
proba_test_x = pipe_xgb.predict_proba(X_test)[:,1]
auc_val_x  = roc_auc_score(y_val,  proba_val_x)
auc_test_x = roc_auc_score(y_test, proba_test_x)
gini_val_x  = 2*auc_val_x  - 1
gini_test_x = 2*auc_test_x - 1
m_test_x = cls_metrics(y_test, proba_test_x)

print(f"XGB AUC[val]={auc_val_x:.4f}  AUC[test]={auc_test_x:.4f}")

with mlflow.start_run(run_name=f"xgboost_{TRAIN_DATE.replace('-','_')}"):
    mlflow.log_metrics({
        "auc_test": float(auc_test_x),
        "gini_test": float(gini_test_x),
        "accuracy_test": float(m_test_x["accuracy"]),
        "precision_weighted_test": float(m_test_x["precision_weighted"]),
        "recall_weighted_test": float(m_test_x["recall_weighted"]),
        "f1_weighted_test": float(m_test_x["f1_weighted"]),
        "auc_val": float(auc_val_x),
        "gini_val": float(gini_val_x),
    })
    mlflow.set_tags({"train_date": TRAIN_DATE, "flavor": "xgboost", "source": "notebook"})
    sig_x = infer_signature(X_test[:5], pipe_xgb.predict_proba(X_test[:5])[:,1])

    mlflow.sklearn.log_model(
        sk_model=pipe_xgb,
        name="model",
        signature=sig_x,
        input_example=X_test[:5]
    )

print("Logged XGBoost run.")


In [None]:

exp = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
df = mlflow.search_runs(
    experiment_ids=[exp.experiment_id],
    filter_string=f"tags.train_date = '{TRAIN_DATE}' and attributes.status = 'FINISHED'",
    order_by=["metrics.auc_test DESC"]
)
print("Found runs for", TRAIN_DATE)
display(df[["run_id","tags.flavor","metrics.auc_test","end_time"]].head(10))


In [None]:

# Pick best by auc_test and register+promote to Production
df2 = mlflow.search_runs(
    experiment_ids=[exp.experiment_id],
    filter_string=f"tags.train_date = '{TRAIN_DATE}' and attributes.status = 'FINISHED'",
    order_by=["metrics.auc_test DESC"],
    max_results=50,
)
if df2.empty:
    raise RuntimeError("No finished runs found for train_date")

best = df2.iloc[0]
run_id = best.run_id
metric = float(best["metrics.auc_test"])
print("Best run:", run_id, "auc_test=", metric)

artifact_path = resolve_logged_model_path(client, run_id)
src_uri = f"runs:/{run_id}/{artifact_path}"
print("Using src_uri:", src_uri)

# Create or reuse a model version
existing = [mv for mv in client.search_model_versions(f"name='{MODEL_NAME}'")
            if mv.run_id == run_id and mv.source == src_uri]
if existing:
    mv = sorted(existing, key=lambda m: int(m.version))[-1]
    print("Reusing model version:", mv.version)
else:
    mv = mlflow.register_model(src_uri, MODEL_NAME)
    print("Registered new version:", mv.version)

# Transition to Production (archive old)
client.transition_model_version_stage(
    name=MODEL_NAME,
    version=mv.version,
    stage="Production",
    archive_existing_versions=True
)
print(f"Promoted {MODEL_NAME} v{mv.version} -> Production")
