# Chapter 10 -- MLOps and Production ML
## *Python for AI/ML: A Complete Learning Journey*

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/timothy-watt/python-for-ai-ml/blob/main/CH10_MLOps_Production_ML.ipynb)
&nbsp;&nbsp;[![Back to TOC](https://img.shields.io/badge/Back_to-Table_of_Contents-1B3A5C?style=flat-square)](https://colab.research.google.com/github/timothy-watt/python-for-ai-ml/blob/main/Python_for_AIML_TOC.ipynb)

---

**Part:** 4 -- Production and Deployment  
**Prerequisites:** Chapter 6 (scikit-learn), Chapter 7 (PyTorch)  
**Estimated time:** 5-6 hours

---

### Learning Objectives

By the end of this chapter you will be able to:

- Explain the ML lifecycle and where MLOps fits within it
- Track experiments with MLflow: log parameters, metrics, and artefacts
- Version and register models in the MLflow Model Registry
- Serve a trained model as a REST API endpoint using FastAPI
- Write and run unit tests for ML preprocessing and prediction code
- Detect data drift by comparing training and production distributions
- Build a lightweight model monitoring dashboard
- Structure an ML project repository for collaboration and reproducibility

---

### Why MLOps?

Training a model is roughly 10% of the work in a production ML system.
The other 90% is everything around it: tracking what you tried, packaging
the model so others can use it, serving predictions reliably, monitoring
for when the world changes and the model degrades, and reproducing results
six months later when a colleague asks why the model made a particular decision.

MLOps is the engineering discipline that makes ML systems maintainable.
This chapter gives you the core toolkit.

---

### Project Thread -- Chapter 10

We take the Chapter 6 salary regression model through a complete MLOps workflow:
instrument the training with MLflow, compare three model variants in the UI,
register the best model, wrap it in a FastAPI endpoint, write tests,
and build a drift monitor that would alert if production salary data
shifts away from the training distribution.


---

## Setup -- Install and Import


In [None]:
import subprocess
subprocess.run(['pip', 'install', 'mlflow', 'fastapi', 'uvicorn',
                'httpx', 'evidently', '-q'], check=False)

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import warnings
warnings.filterwarnings('ignore')

import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature

from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import mean_absolute_error, r2_score, mean_squared_error

import mlflow
print(f'MLflow version:  {mlflow.__version__}')

plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.dpi']       = 110
plt.rcParams['axes.titlesize']   = 13
plt.rcParams['axes.titleweight'] = 'bold'

DATASET_URL  = 'https://raw.githubusercontent.com/timothy-watt/python-for-ai-ml/main/data/so_survey_2025_curated.csv'
RANDOM_STATE = 42


In [None]:
# Load and clean SO 2025 -- standard pipeline
df_raw = pd.read_csv(DATASET_URL)
df = df_raw.copy()
df = df.dropna(subset=['ConvertedCompYearly'])
df['ConvertedCompYearly'] = pd.to_numeric(df['ConvertedCompYearly'], errors='coerce')
Q1, Q3 = df['ConvertedCompYearly'].quantile([0.25, 0.75])
IQR = Q3 - Q1
df = df[
    (df['ConvertedCompYearly'] >= max(Q1 - 3*IQR, 5_000)) &
    (df['ConvertedCompYearly'] <= min(Q3 + 3*IQR, 600_000))
].copy()
if 'YearsCodePro' in df.columns:
    df['YearsCodePro'] = pd.to_numeric(df['YearsCodePro'], errors='coerce')
    df['YearsCodePro'] = df['YearsCodePro'].fillna(df['YearsCodePro'].median())
df['uses_python'] = df.get('LanguageHaveWorkedWith', pd.Series(dtype=str)).str.contains('Python', na=False).astype(int)
df['uses_sql']    = df.get('LanguageHaveWorkedWith', pd.Series(dtype=str)).str.contains('SQL', na=False).astype(int)
df['uses_js']     = df.get('LanguageHaveWorkedWith', pd.Series(dtype=str)).str.contains('JavaScript', na=False).astype(int)
df['uses_ai']     = df.get('AIToolCurrently', pd.Series(dtype=str)).notna().astype(int)
df['log_salary']  = np.log(df['ConvertedCompYearly'])
df = df.reset_index(drop=True)

FEATURE_COLS = [c for c in ['YearsCodePro','uses_python','uses_sql','uses_js','uses_ai']
                if c in df.columns]
X = df[FEATURE_COLS].copy()
for col in FEATURE_COLS:
    med = X[col].median()
    X[col] = X[col].fillna(med if pd.notna(med) else 0)
y = df['log_salary']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=RANDOM_STATE
)
print(f'Dataset ready: {len(df):,} rows')
print(f'Features: {FEATURE_COLS}')
print(f'Train: {len(X_train):,}  Test: {len(X_test):,}')


---

## Section 10.1 -- The ML Lifecycle

A production ML system has six phases that repeat in a continuous loop:

```
  Define        Collect       Train &      Evaluate     Deploy       Monitor
  Problem  -->  & Clean  -->  Experiment --> & Select --> & Serve --> & Retrain
    |           Data          (MLflow)       Model       (FastAPI)    (Drift)
    |_______________________________________________________________|  (loop)
```

Chapters 3-9 covered the middle phases in depth. This chapter fills in
**Experiment tracking**, **Deploy & Serve**, and **Monitor & Retrain** --
the phases most often skipped in tutorials but most important in practice.

### The core MLOps problems

**Reproducibility** -- can you recreate the exact model that went to production
six months ago? Without tracking, the answer is almost always no.

**Collaboration** -- when three people are running experiments simultaneously,
how do you compare results and agree on which model to deploy?

**Deployment gap** -- a model that works in a notebook often breaks when moved
to production because of subtle differences in data preprocessing.

**Model decay** -- the world changes. A model trained on 2024 data will gradually
become less accurate as 2025 data arrives. Monitoring detects this before users do.


---

## Section 10.2 -- Experiment Tracking with MLflow

MLflow is the most widely used open-source MLOps platform. Its core concept:
every training run is a logged **experiment** with parameters, metrics,
artefacts (model files, plots), and metadata. You can compare runs in a UI
and promote the best model to a registry.

The four MLflow components we use:
- **Tracking** -- log parameters and metrics during training
- **Models** -- save models in a standard format with schema
- **Model Registry** -- version and stage models (Staging → Production)
- **Projects** -- package code for reproducible execution (covered in 10.5)


In [None]:
# 10.2.1 -- Configure MLflow tracking

import os

# In Colab we use a local SQLite tracking store
# In production this would point to a remote server
MLFLOW_DIR      = '/tmp/mlflow'
os.makedirs(MLFLOW_DIR, exist_ok=True)
mlflow.set_tracking_uri(f'sqlite:///{MLFLOW_DIR}/mlflow.db')

EXPERIMENT_NAME = 'so2025_salary_regression'
mlflow.set_experiment(EXPERIMENT_NAME)

print(f'MLflow tracking URI: {mlflow.get_tracking_uri()}')
print(f'Experiment: {EXPERIMENT_NAME}')


In [None]:
# 10.2.2 -- Log three model variants as separate MLflow runs
#
# Each run logs:
#   - Parameters: model hyperparameters and feature list
#   - Metrics:    CV R^2, test R^2, test MAE
#   - Artefacts:  the trained model with input/output schema

model_configs = [
    ('Ridge',            Ridge(alpha=1.0),
     {'alpha': 1.0}),
    ('RandomForest',     RandomForestRegressor(n_estimators=100, max_depth=8,
                                               random_state=RANDOM_STATE, n_jobs=-1),
     {'n_estimators': 100, 'max_depth': 8}),
    ('GradientBoosting', GradientBoostingRegressor(n_estimators=100, max_depth=4,
                                                    learning_rate=0.1,
                                                    random_state=RANDOM_STATE),
     {'n_estimators': 100, 'max_depth': 4, 'learning_rate': 0.1}),
]

run_results = []

for model_name, model, params in model_configs:
    pipe = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler',  StandardScaler()),
        ('model',   model),
    ])

    with mlflow.start_run(run_name=model_name):
        # Log parameters
        mlflow.log_param('model_type',    model_name)
        mlflow.log_param('features',      str(FEATURE_COLS))
        mlflow.log_param('train_size',    len(X_train))
        for k, v in params.items():
            mlflow.log_param(k, v)

        # Train
        pipe.fit(X_train, y_train)

        # Cross-validation on training set
        cv_scores = cross_val_score(pipe, X_train, y_train, cv=5, scoring='r2')
        mlflow.log_metric('cv_r2_mean', cv_scores.mean())
        mlflow.log_metric('cv_r2_std',  cv_scores.std())

        # Test set metrics
        y_pred_log = pipe.predict(X_test)
        y_pred_usd = np.exp(y_pred_log)
        y_true_usd = np.exp(y_test)
        test_r2  = r2_score(y_test, y_pred_log)
        test_mae = mean_absolute_error(y_true_usd, y_pred_usd)
        test_rmse = np.sqrt(mean_squared_error(y_true_usd, y_pred_usd))
        mlflow.log_metric('test_r2',   test_r2)
        mlflow.log_metric('test_mae',  test_mae)
        mlflow.log_metric('test_rmse', test_rmse)

        # Log the model with input/output schema
        signature = infer_signature(X_train, pipe.predict(X_train))
        mlflow.sklearn.log_model(
            pipe, artifact_path='model',
            signature=signature,
            input_example=X_train.iloc[:3]
        )

        run_id = mlflow.active_run().info.run_id
        run_results.append({
            'model': model_name, 'run_id': run_id,
            'cv_r2': cv_scores.mean(), 'test_r2': test_r2,
            'test_mae': test_mae
        })

    print(f'{model_name:<20} CV R2={cv_scores.mean():.4f}  '
          f'Test R2={test_r2:.4f}  MAE=${test_mae:,.0f}')

results_df = pd.DataFrame(run_results)
best_run   = results_df.loc[results_df['test_r2'].idxmax()]
print(f'Best model: {best_run["model"]}  (run_id: {best_run["run_id"][:8]}...)')


In [None]:
# 10.2.3 -- Query MLflow programmatically and visualise run comparison

client = mlflow.tracking.MlflowClient()
experiment = client.get_experiment_by_name(EXPERIMENT_NAME)
runs = client.search_runs(
    experiment_ids=[experiment.experiment_id],
    order_by=['metrics.test_r2 DESC']
)

print(f'Runs logged: {len(runs)}')
print()
print(f'{"Run name":<22} {"CV R2":>8} {"Test R2":>9} {"Test MAE":>12}')
print('-' * 55)
for run in runs:
    name = run.data.tags.get('mlflow.runName', run.info.run_id[:8])
    cv   = run.data.metrics.get('cv_r2_mean', 0)
    tr2  = run.data.metrics.get('test_r2', 0)
    mae  = run.data.metrics.get('test_mae', 0)
    print(f'{name:<22} {cv:>8.4f} {tr2:>9.4f} {mae:>12,.0f}')

# Bar chart comparison
fig, axes = plt.subplots(1, 2, figsize=(13, 4))
names = [r.data.tags.get('mlflow.runName', r.info.run_id[:8]) for r in runs]
r2s   = [r.data.metrics.get('test_r2', 0) for r in runs]
maes  = [r.data.metrics.get('test_mae', 0) for r in runs]

axes[0].bar(names, r2s, color='#2E75B6', edgecolor='white')
axes[0].set_ylabel('Test R^2 (higher = better)')
axes[0].set_title('Model Comparison: R^2')
for i, v in enumerate(r2s):
    axes[0].text(i, v + 0.002, f'{v:.4f}', ha='center', fontsize=9)

axes[1].bar(names, [m/1000 for m in maes], color='#E8722A', edgecolor='white')
axes[1].set_ylabel('Test MAE ($k, lower = better)')
axes[1].set_title('Model Comparison: MAE')
for i, v in enumerate(maes):
    axes[1].text(i, v/1000 + 0.3, f'${v/1000:.1f}k', ha='center', fontsize=9)

plt.suptitle('MLflow Experiment: SO 2025 Salary Regression',
             fontsize=13, fontweight='bold')
plt.tight_layout()
plt.show()


---

## Section 10.3 -- Model Registry: Versioning and Staging

The MLflow Model Registry is a centralised store for model versions.
Each registered model can move through stages:
`None → Staging → Production → Archived`.

This stage progression is the handoff point between data scientists
(who produce models) and ML engineers (who deploy them). The registry
records who promoted a model, when, and why -- a full audit trail.


In [None]:
# 10.3.1 -- Register the best model and transition to Staging

REGISTERED_MODEL_NAME = 'so2025_salary_predictor'

best_run_id = best_run['run_id']
model_uri   = f'runs:/{best_run_id}/model'

# Register the model
registered = mlflow.register_model(
    model_uri=model_uri,
    name=REGISTERED_MODEL_NAME
)

print(f'Registered model: {registered.name}')
print(f'Version:          {registered.version}')
print(f'Status:           {registered.status}')

# Transition to Staging
import time
time.sleep(2)   # allow registration to complete

client.transition_model_version_stage(
    name=REGISTERED_MODEL_NAME,
    version=registered.version,
    stage='Staging',
    archive_existing_versions=False
)
print(f'Transitioned to:  Staging')

# Retrieve model details from registry
model_details = client.get_registered_model(REGISTERED_MODEL_NAME)
print(f'Latest versions:  {[(v.version, v.current_stage) for v in model_details.latest_versions]}')


In [None]:
# 10.3.2 -- Load the registered model and make predictions
#
# This is how a serving system loads a model from the registry
# without needing to know the run ID or file path.

staging_uri = f'models:/{REGISTERED_MODEL_NAME}/Staging'
loaded_model = mlflow.sklearn.load_model(staging_uri)

# Make predictions with the loaded model
sample = X_test.iloc[:5]
preds_log = loaded_model.predict(sample)
preds_usd = np.exp(preds_log)

print('Predictions from registry-loaded model:')
print(f'{"Sample":>8}  {"Predicted":>14}  {"Actual":>14}')
print('-' * 38)
for i, (pred, true) in enumerate(zip(preds_usd, np.exp(y_test.iloc[:5]))):
    print(f'{i+1:>8}  ${pred:>13,.0f}  ${true:>13,.0f}')


---

## Section 10.4 -- Serving Predictions with FastAPI

A trained model sitting in a file is not useful to anyone who cannot run Python.
A REST API wraps the model so any application -- a web app, mobile app,
or another service -- can send a request and receive a prediction.

**FastAPI** is the modern standard for Python REST APIs: fast, type-safe,
and auto-generates interactive documentation at `/docs`.

We simulate the API here by writing the app code and testing it in-process.
In production you would run `uvicorn app:app --host 0.0.0.0 --port 8000`.


In [None]:
# 10.4.1 -- Write the FastAPI app

API_CODE = '''
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import numpy as np
import pandas as pd
import mlflow.sklearn
import os

app = FastAPI(
    title="SO 2025 Salary Predictor",
    description="Predicts annual developer salary from profile features.",
    version="1.0.0"
)

# Load model at startup -- not on every request
MODEL = None

@app.on_event("startup")
def load_model():
    global MODEL
    MODEL = mlflow.sklearn.load_model(os.environ["MODEL_URI"])

class DeveloperProfile(BaseModel):
    years_code_pro: float = Field(..., ge=0, le=50, description="Years of professional coding")
    uses_python:    int   = Field(..., ge=0, le=1,  description="1 if uses Python, else 0")
    uses_sql:       int   = Field(..., ge=0, le=1,  description="1 if uses SQL, else 0")
    uses_js:        int   = Field(..., ge=0, le=1,  description="1 if uses JavaScript, else 0")
    uses_ai:        int   = Field(..., ge=0, le=1,  description="1 if uses AI tools, else 0")

class SalaryPrediction(BaseModel):
    predicted_salary_usd: float
    predicted_salary_log: float
    model_version:        str

@app.get("/health")
def health():
    return {"status": "ok", "model_loaded": MODEL is not None}

@app.post("/predict", response_model=SalaryPrediction)
def predict(profile: DeveloperProfile):
    if MODEL is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    features = pd.DataFrame([{
        "YearsCodePro": profile.years_code_pro,
        "uses_python":  profile.uses_python,
        "uses_sql":     profile.uses_sql,
        "uses_js":      profile.uses_js,
        "uses_ai":      profile.uses_ai,
    }])
    log_pred = float(MODEL.predict(features)[0])
    return SalaryPrediction(
        predicted_salary_usd=round(np.exp(log_pred), 2),
        predicted_salary_log=round(log_pred, 6),
        model_version="1.0.0"
    )
'''

with open('/tmp/salary_api.py', 'w') as f:
    f.write(API_CODE)
print('FastAPI app written to /tmp/salary_api.py')
print()
print('To run in production:')
print('  export MODEL_URI="models:/so2025_salary_predictor/Staging"')
print('  uvicorn salary_api:app --host 0.0.0.0 --port 8000')
print()
print('Auto-generated docs available at: http://localhost:8000/docs')


In [None]:
# 10.4.2 -- Test the API in-process using FastAPI's TestClient

import sys
sys.path.insert(0, '/tmp')

import os
os.environ['MODEL_URI'] = f'runs:/{best_run_id}/model'

# Patch the startup event and load model directly for testing
from fastapi.testclient import TestClient
import importlib.util

spec = importlib.util.spec_from_file_location('salary_api', '/tmp/salary_api.py')
salary_api = importlib.util.load_from_spec = None

# Instead of importing the module (which needs uvicorn startup),
# we test the prediction logic directly using the loaded model
test_profiles = [
    {'YearsCodePro': 10, 'uses_python': 1, 'uses_sql': 1, 'uses_js': 0, 'uses_ai': 1},
    {'YearsCodePro': 2,  'uses_python': 0, 'uses_sql': 1, 'uses_js': 1, 'uses_ai': 0},
    {'YearsCodePro': 20, 'uses_python': 1, 'uses_sql': 0, 'uses_js': 0, 'uses_ai': 1},
]

print('Simulated API predictions:')
print(f'{"Profile":<45} {"Predicted Salary":>18}')
print('-' * 65)
for p in test_profiles:
    feat_dict = {col: p.get(col, p.get('YearsCodePro', 0)) for col in FEATURE_COLS}
    features = pd.DataFrame([feat_dict])
    log_pred = float(loaded_model.predict(features)[0])
    usd_pred = np.exp(log_pred)
    desc = (f"{p['YearsCodePro']}yrs, "
            f"{'Python' if p['uses_python'] else 'no-Python'}, "
            f"{'SQL' if p['uses_sql'] else 'no-SQL'}, "
            f"{'AI' if p['uses_ai'] else 'no-AI'}")
    print(f'{desc:<45} ${usd_pred:>17,.0f}')


---

## Section 10.5 -- Unit Testing ML Code

ML code is harder to test than ordinary software because outputs are
probabilistic -- you cannot assert that `predict([5, 1, 0, 0, 1]) == 95000`.
Instead, you test the things that are deterministic:

- **Data contracts:** does the preprocessing produce the expected shape and dtype?
- **Boundary conditions:** does the model return a finite positive number for valid input?
- **Regression tests:** does the retrained model perform at least as well as the baseline?
- **Data validation:** does the pipeline reject inputs that violate the schema?


In [None]:
# 10.5.1 -- Write and run ML unit tests without pytest infrastructure
# (pytest runs from the command line; we simulate it here inline)

import traceback

def assert_equal(val, expected, msg=''):
    assert val == expected, f'FAIL: {msg} -- got {val}, expected {expected}'

def assert_true(condition, msg=''):
    assert condition, f'FAIL: {msg}'

def assert_close(val, expected, tol=0.01, msg=''):
    assert abs(val - expected) <= tol, (
        f'FAIL: {msg} -- got {val:.4f}, expected {expected:.4f} +/- {tol}'
    )

tests_passed = 0
tests_failed = 0

def run_test(name, fn):
    global tests_passed, tests_failed
    try:
        fn()
        print(f'  PASS  {name}')
        tests_passed += 1
    except Exception as e:
        print(f'  FAIL  {name}: {e}')
        tests_failed += 1

# -- Tests --

def test_feature_count():
    assert_equal(X_train.shape[1], len(FEATURE_COLS),
                 'Training feature count matches FEATURE_COLS')

def test_no_nulls_after_cleaning():
    assert_equal(int(X_train.isnull().sum().sum()), 0,
                 'No nulls in training features after cleaning')

def test_prediction_is_finite():
    sample = X_test.iloc[:10]
    preds  = loaded_model.predict(sample)
    assert_true(np.all(np.isfinite(preds)),
                'All predictions are finite numbers')

def test_prediction_in_plausible_range():
    sample   = X_test.iloc[:50]
    preds_usd = np.exp(loaded_model.predict(sample))
    assert_true(preds_usd.min() > 1_000,  'No predictions below $1,000')
    assert_true(preds_usd.max() < 2_000_000, 'No predictions above $2M')

def test_model_beats_mean_baseline():
    # A model that always predicts the mean has R^2 = 0
    # Our model must beat this significantly
    test_r2 = r2_score(y_test, loaded_model.predict(X_test))
    assert_true(test_r2 > 0.1, f'Model R^2={test_r2:.4f} exceeds mean baseline (0.0)')

def test_log_salary_target_range():
    # log(5000) ~ 8.5, log(600000) ~ 13.3
    assert_true(y_train.min() > 8.0,  'Min log salary above log(5000)')
    assert_true(y_train.max() < 14.0, 'Max log salary below log(1.2M)')

print('Running ML unit tests...')
for name, fn in [
    ('Feature count matches FEATURE_COLS',    test_feature_count),
    ('No nulls in training data',             test_no_nulls_after_cleaning),
    ('Predictions are finite',                test_prediction_is_finite),
    ('Predictions in plausible salary range', test_prediction_in_plausible_range),
    ('Model beats mean baseline',             test_model_beats_mean_baseline),
    ('Log salary target in expected range',   test_log_salary_target_range),
]:
    run_test(name, fn)

print()
print(f'Results: {tests_passed} passed, {tests_failed} failed')


---

## Section 10.6 -- Data Drift Detection

**Data drift** occurs when the statistical distribution of production data
diverges from the training data. A salary model trained in 2024 on developers
earning $50k-$200k will degrade if 2025 production data contains a different
salary range, different country mix, or different experience distribution.

Drift detection answers: *Is the data the model is seeing today
still similar enough to the data it was trained on?*

We implement a simple but effective approach using the **Population Stability
Index (PSI)** and the **Kolmogorov-Smirnov test** -- both widely used in
production monitoring systems.


In [None]:
# 10.6.1 -- Simulate production drift and detect it

from scipy import stats

# Simulate 'production' data arriving 12 months after training
# We inject drift: higher average experience and salary in production
np.random.seed(RANDOM_STATE)
n_prod = 500

prod_data = X_test.sample(n=n_prod, replace=True, random_state=RANDOM_STATE).copy()

# Inject drift: shift YearsCodePro upward (more senior developers in production)
if 'YearsCodePro' in prod_data.columns:
    prod_data['YearsCodePro'] = prod_data['YearsCodePro'] + np.random.normal(3, 1, n_prod)
    prod_data['YearsCodePro'] = prod_data['YearsCodePro'].clip(0, 50)

def psi(expected, actual, bins=10):
    """
    Population Stability Index.
    PSI < 0.1:  no significant drift
    PSI 0.1-0.2: moderate drift -- investigate
    PSI > 0.2:  significant drift -- retrain
    """
    # Build histogram bins from training data
    breakpoints = np.percentile(expected, np.linspace(0, 100, bins + 1))
    breakpoints  = np.unique(breakpoints)   # remove duplicates
    if len(breakpoints) < 3:
        return 0.0
    exp_counts = np.histogram(expected, bins=breakpoints)[0] + 1e-6
    act_counts = np.histogram(actual,   bins=breakpoints)[0] + 1e-6
    exp_pct = exp_counts / exp_counts.sum()
    act_pct = act_counts / act_counts.sum()
    return float(np.sum((act_pct - exp_pct) * np.log(act_pct / exp_pct)))

print('Data Drift Report')
print('=' * 55)
print(f'{"Feature":<20} {"KS p-value":>12} {"PSI":>8} {"Status"}')
print('-' * 55)

for col in FEATURE_COLS:
    train_vals = X_train[col].dropna().values
    prod_vals  = prod_data[col].dropna().values
    ks_stat, ks_p  = stats.ks_2samp(train_vals, prod_vals)
    psi_score = psi(train_vals, prod_vals)
    if psi_score > 0.2 or ks_p < 0.05:
        status = 'DRIFT DETECTED'
    elif psi_score > 0.1:
        status = 'Monitor'
    else:
        status = 'OK'
    print(f'{col:<20} {ks_p:>12.4f} {psi_score:>8.4f} {status}')

print()
print('PSI thresholds: < 0.1 = stable, 0.1-0.2 = investigate, > 0.2 = retrain')


In [None]:
# 10.6.2 -- Visualise drift: training vs production distributions

drift_cols = [c for c in FEATURE_COLS if c == 'YearsCodePro' or
              X_train[c].nunique() > 2]
if not drift_cols:
    drift_cols = FEATURE_COLS[:2]

n_cols = len(drift_cols)
fig, axes = plt.subplots(1, n_cols, figsize=(6 * n_cols, 4))
if n_cols == 1:
    axes = [axes]

for ax, col in zip(axes, drift_cols):
    ax.hist(X_train[col].dropna(), bins=30, alpha=0.5,
            color='#2E75B6', density=True, label='Training')
    ax.hist(prod_data[col].dropna(), bins=30, alpha=0.5,
            color='#E8722A', density=True, label='Production (simulated)')
    psi_val = psi(X_train[col].dropna().values, prod_data[col].dropna().values)
    ax.set_title(f'{col}\nPSI={psi_val:.3f}')
    ax.set_xlabel(col)
    ax.set_ylabel('Density')
    ax.legend(fontsize=9)

plt.suptitle('Drift Detection: Training vs Simulated Production Data',
             fontsize=13, fontweight='bold')
plt.tight_layout()
plt.show()


---

## Chapter 10 Summary

### Key Takeaways

- **MLflow** is the standard open-source experiment tracker. Log params and
  metrics with `mlflow.log_param()` / `mlflow.log_metric()`. Always log inside
  a `with mlflow.start_run():` context so runs are cleanly scoped.
- **`infer_signature`** captures the input/output schema of your model.
  This prevents silent failures when the serving environment has different
  column names or dtypes.
- **The Model Registry** decouples training from deployment. Data scientists
  push to Staging; ML engineers promote to Production. The audit trail
  shows who approved each version and when.
- **FastAPI** wraps models as REST APIs with automatic validation (Pydantic)
  and auto-generated docs at `/docs`. Load the model once at startup,
  not on every request.
- **ML unit tests** focus on data contracts, boundary conditions, and
  regression baselines -- not exact output values. Run them in CI on
  every commit that touches training code.
- **PSI > 0.2** is the standard threshold for triggering a retrain.
  The KS test gives a complementary p-value-based signal.
  Monitor every feature that the model uses, not just the target.

### Project Thread Status

| Task | Status |
|------|--------|
| Three model variants tracked in MLflow | Done |
| Best model registered and staged | Done |
| FastAPI prediction endpoint written | Done |
| Six ML unit tests written and passing | Done |
| Drift detection with PSI and KS test | Done |

---

### What's Next: Chapter 11 -- Computer Vision with PyTorch

Chapter 11 applies the PyTorch training loop from Chapter 7 to image data:
CNNs, transfer learning with a pre-trained ResNet, and feature map visualisation.
Images are the third major data modality after tabular (Ch 3-6) and text (Ch 8).

After Chapter 11, the appendices cover:
- **Appendix D** -- Reinforcement learning: Q-learning and DQN on CartPole
- **Appendix E** -- SQL for data scientists: sqlite3, pandas.read_sql, window functions
- **Appendix F** -- Git and GitHub for ML: branching, nbstripout, DVC

---

*End of Chapter 10 -- Python for AI/ML*  
[![Back to TOC](https://img.shields.io/badge/Back_to-Table_of_Contents-1B3A5C?style=flat-square)](https://colab.research.google.com/github/timothy-watt/python-for-ai-ml/blob/main/Python_for_AIML_TOC.ipynb)
