## 0. Environment Setup

As necessary, install the required Python packages

In [None]:
# Install required libraries
!pip install pandas numpy scikit-learn matplotlib seaborn mlflow flask scipy



In [None]:
# Core data and ML libraries
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import matplotlib.pyplot as plt
import seaborn as sns
import json
import pickle
import os
import datetime
import sqlite3
from pathlib import Path
import shutil
import logging
import uuid

# For model registry and experiment tracking
import mlflow

# For model serving
import flask
from flask import Flask, request, jsonify

# For drift detection and monitoring
import scipy.stats as stats

### 1. Data Intake & Feature Management

#### 1.1 Data Collection

In [None]:
# Function to download and load the dataset
def get_raw_data():
    """
    Downloads the telco churn dataset or loads it if already exists
    Returns the raw dataframe
    """
    # In a real scenario, this might be pulling from a database or API
    # For this example, we'll use a local CSV file or download it if it doesn't exist

    # Create data directory if it doesn't exist
    data_dir = Path("data")
    data_dir.mkdir(exist_ok=True)

    raw_data_path = data_dir / "telco_churn_raw.csv"

    # Check if we already have the file
    if not raw_data_path.exists():
        # If not, we'll create a simple synthetic dataset
        print("Creating synthetic telco churn dataset...")

        # Create synthetic data
        np.random.seed(42)
        n_samples = 1000

        # Generate synthetic features
        data = {
            'customer_id': [f'CUST-{i:05d}' for i in range(n_samples)],
            'gender': np.random.choice(['Male', 'Female'], size=n_samples),
            'senior_citizen': np.random.choice([0, 1], size=n_samples),
            'partner': np.random.choice(['Yes', 'No'], size=n_samples),
            'dependents': np.random.choice(['Yes', 'No'], size=n_samples),
            'tenure': np.random.randint(0, 72, size=n_samples),
            'phone_service': np.random.choice(['Yes', 'No'], size=n_samples),
            'multiple_lines': np.random.choice(['Yes', 'No', 'No phone service'], size=n_samples),
            'internet_service': np.random.choice(['DSL', 'Fiber optic', 'No'], size=n_samples),
            'online_security': np.random.choice(['Yes', 'No', 'No internet service'], size=n_samples),
            'online_backup': np.random.choice(['Yes', 'No', 'No internet service'], size=n_samples),
            'tech_support': np.random.choice(['Yes', 'No', 'No internet service'], size=n_samples),
            'streaming_tv': np.random.choice(['Yes', 'No', 'No internet service'], size=n_samples),
            'streaming_movies': np.random.choice(['Yes', 'No', 'No internet service'], size=n_samples),
            'contract': np.random.choice(['Month-to-month', 'One year', 'Two year'], size=n_samples),
            'paperless_billing': np.random.choice(['Yes', 'No'], size=n_samples),
            'payment_method': np.random.choice(['Electronic check', 'Mailed check', 'Bank transfer', 'Credit card'], size=n_samples),
            'monthly_charges': np.random.uniform(20, 120, size=n_samples),
            'total_charges': np.random.uniform(100, 8000, size=n_samples),
        }

        # Churn is more likely for month-to-month contracts and high monthly charges
        probabilities = []
        for i in range(n_samples):
            prob = 0.2  # Base probability
            if data['contract'][i] == 'Month-to-month':
                prob += 0.2
            if data['monthly_charges'][i] > 80:
                prob += 0.15
            if data['tenure'][i] < 12:
                prob += 0.15
            probabilities.append(min(prob, 0.9))

        data['churn'] = np.random.binomial(1, probabilities)
        data['churn'] = ['Yes' if x == 1 else 'No' for x in data['churn']]

        # Create DataFrame
        df = pd.DataFrame(data)

        # Save to CSV
        df.to_csv(raw_data_path, index=False)
        print(f"Dataset saved to {raw_data_path}")
    else:
        print(f"Loading dataset from {raw_data_path}")
        df = pd.DataFrame(pd.read_csv(raw_data_path))

    # Record data intake in our log
    logging.basicConfig(filename='mlops_pipeline.log', level=logging.INFO)
    logging.info(f"Data ingested at {datetime.datetime.now()}: {len(df)} records")

    return df

# Load the raw data
raw_data = get_raw_data()

# Display first few rows
print(f"Loaded {len(raw_data)} rows")
raw_data.head()

Loading dataset from data/telco_churn_raw.csv
Loaded 1000 rows


Unnamed: 0,customer_id,gender,senior_citizen,partner,dependents,tenure,phone_service,multiple_lines,internet_service,online_security,online_backup,tech_support,streaming_tv,streaming_movies,contract,paperless_billing,payment_method,monthly_charges,total_charges,churn
0,CUST-00000,Male,1,Yes,No,6,No,Yes,No,Yes,Yes,No internet service,Yes,Yes,Two year,No,Bank transfer,26.875095,4951.885351,No
1,CUST-00001,Female,0,No,No,34,No,No phone service,Fiber optic,No internet service,Yes,No,No,No,One year,Yes,Credit card,70.739409,1586.779855,No
2,CUST-00002,Male,0,No,No,54,Yes,No phone service,No,Yes,Yes,No internet service,No,Yes,Month-to-month,No,Credit card,52.891978,2353.696176,No
3,CUST-00003,Male,0,No,No,13,No,No phone service,Fiber optic,Yes,No,Yes,No,No internet service,One year,Yes,Electronic check,70.831169,3859.820909,No
4,CUST-00004,Male,0,No,Yes,9,No,No phone service,No,Yes,No internet service,No internet service,Yes,No,Month-to-month,Yes,Credit card,22.397545,7639.333018,Yes


#### 1.2 Data Versioning

In [None]:
def create_versioned_dataset(df, version=None):
    """
    Create a versioned snapshot of the dataset

    Args:
        df: DataFrame to version
        version: Version string (if None, a timestamp will be used)

    Returns:
        version: The version string used
        path: Path to the saved versioned data
    """
    # Create versioned data directory
    versioned_data_dir = Path("data/versioned")
    versioned_data_dir.mkdir(exist_ok=True, parents=True)

    # Generate version if not provided
    if version is None:
        version = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

    # Save versioned data
    versioned_path = versioned_data_dir / f"telco_data_v{version}.csv"
    df.to_csv(versioned_path, index=False)

    # Save version metadata
    metadata = {
        "version": version,
        "timestamp": datetime.datetime.now().isoformat(),
        "num_rows": len(df),
        "num_features": len(df.columns),
        "feature_names": list(df.columns),
        "description": f"Telco churn data version {version}"
    }

    metadata_path = versioned_data_dir / f"telco_data_v{version}_metadata.json"
    with open(metadata_path, 'w') as f:
        json.dump(metadata, f, indent=2)

    print(f"Created versioned dataset {version} at {versioned_path}")
    logging.info(f"Created versioned dataset {version} with {len(df)} records at {datetime.datetime.now()}")

    return version, versioned_path

# Create a versioned snapshot of our raw data
initial_version, versioned_raw_path = create_versioned_dataset(raw_data)

Created versioned dataset 20250512_172103 at data/versioned/telco_data_v20250512_172103.csv


#### 1.3 Data Cleaning and Validation

In [None]:
def clean_and_validate_data(df, version):
    """
    Clean and validate the data

    Args:
        df: Raw DataFrame
        version: Version string for tracking

    Returns:
        cleaned_df: Cleaned DataFrame
        validation_report: Dictionary with validation results
    """
    print(f"Cleaning and validating data version {version}...")

    # Create a copy to avoid modifying the original
    df_cleaned = df.copy()

    # Track cleaning operations
    cleaning_ops = []
    validation_issues = []

    # Check for and handle missing values
    missing_values = df_cleaned.isnull().sum()
    if missing_values.sum() > 0:
        cleaning_ops.append(f"Found {missing_values.sum()} missing values")
        for col in missing_values[missing_values > 0].index:
            cleaning_ops.append(f"Column {col} has {missing_values[col]} missing values")

            # Handle missing values based on data type
            if df_cleaned[col].dtype == 'object':
                # For categorical, fill with mode
                df_cleaned[col] = df_cleaned[col].fillna(df_cleaned[col].mode()[0])
                cleaning_ops.append(f"Filled missing values in {col} with mode: {df_cleaned[col].mode()[0]}")
            else:
                # For numerical, fill with median
                df_cleaned[col] = df_cleaned[col].fillna(df_cleaned[col].median())
                cleaning_ops.append(f"Filled missing values in {col} with median: {df_cleaned[col].median()}")

    # Handle TotalCharges - convert to numeric if string
    if df_cleaned['total_charges'].dtype == 'object':
        cleaning_ops.append("Converting total_charges to numeric")
        df_cleaned['total_charges'] = pd.to_numeric(df_cleaned['total_charges'], errors='coerce')
        # Fill any new missing values
        df_cleaned['total_charges'] = df_cleaned['total_charges'].fillna(df_cleaned['total_charges'].median())

    # Data validation checks

    # 1. Check for negative values in numerical columns that should be positive
    for col in ['tenure', 'monthly_charges', 'total_charges']:
        neg_values = (df_cleaned[col] < 0).sum()
        if neg_values > 0:
            validation_issues.append(f"Found {neg_values} negative values in {col}")
            # Replace with absolute value
            df_cleaned[col] = df_cleaned[col].abs()
            cleaning_ops.append(f"Converted {neg_values} negative values to positive in {col}")

    # 2. Check for logical consistency
    if 'tenure' in df_cleaned.columns and 'total_charges' in df_cleaned.columns:
        inconsistent = ((df_cleaned['tenure'] > 0) & (df_cleaned['total_charges'] <= 0)).sum()
        if inconsistent > 0:
            validation_issues.append(f"Found {inconsistent} rows with tenure > 0 but total_charges <= 0")
            # Fix by setting total_charges to at least monthly_charges
            mask = (df_cleaned['tenure'] > 0) & (df_cleaned['total_charges'] <= 0)
            df_cleaned.loc[mask, 'total_charges'] = df_cleaned.loc[mask, 'monthly_charges']
            cleaning_ops.append(f"Fixed {inconsistent} rows with inconsistent tenure and total_charges")

    # 3. Check categorical variable validity
    for col in df_cleaned.select_dtypes(include=['object']).columns:
        if col == 'customer_id':  # Skip ID column
            continue

        # Count unique values
        unique_values = df_cleaned[col].unique()
        cleaning_ops.append(f"Column {col} has {len(unique_values)} unique values: {unique_values}")

        # If binary Yes/No column has other values
        if set(df_cleaned[col].unique()) - set(['Yes', 'No', 'No internet service', 'No phone service']):
            unexpected = set(df_cleaned[col].unique()) - set(['Yes', 'No', 'No internet service', 'No phone service'])
            validation_issues.append(f"Column {col} has unexpected values: {unexpected}")

    # Create validation report
    validation_report = {
        "data_version": version,
        "timestamp": datetime.datetime.now().isoformat(),
        "num_rows_before": len(df),
        "num_rows_after": len(df_cleaned),
        "cleaning_operations": cleaning_ops,
        "validation_issues": validation_issues,
        "columns": list(df_cleaned.columns),
        "dtypes": {col: str(df_cleaned[col].dtype) for col in df_cleaned.columns}
    }

    # Save validation report
    validation_dir = Path("data/validation_reports")
    validation_dir.mkdir(exist_ok=True, parents=True)

    validation_path = validation_dir / f"validation_report_v{version}.json"
    with open(validation_path, 'w') as f:
        json.dump(validation_report, f, indent=2)

    print(f"Data cleaning and validation complete. Report saved to {validation_path}")

    return df_cleaned, validation_report

# Clean and validate our data
cleaned_data, validation_report = clean_and_validate_data(raw_data, initial_version)

# Display validation results summary
print("\nValidation Summary:")
print(f"- Cleaning operations: {len(validation_report['cleaning_operations'])}")
print(f"- Validation issues: {len(validation_report['validation_issues'])}")
if validation_report['validation_issues']:
    for issue in validation_report['validation_issues']:
        print(f"  - {issue}")

Cleaning and validating data version 20250512_172103...
Data cleaning and validation complete. Report saved to data/validation_reports/validation_report_v20250512_172103.json

Validation Summary:
- Cleaning operations: 15
- Validation issues: 4
  - Column gender has unexpected values: {'Male', 'Female'}
  - Column internet_service has unexpected values: {'DSL', 'Fiber optic'}
  - Column contract has unexpected values: {'One year', 'Two year', 'Month-to-month'}
  - Column payment_method has unexpected values: {'Electronic check', 'Bank transfer', 'Mailed check', 'Credit card'}


#### 1.4 Feature Engineering and Storage

In [None]:
def engineer_features(df, version):
    """
    Apply feature engineering transformations

    Args:
        df: Cleaned DataFrame
        version: Version string for tracking

    Returns:
        features_df: DataFrame with engineered features
        feature_metadata: Dictionary with feature metadata
    """
    print(f"Engineering features for data version {version}...")

    # Create a copy to avoid modifying the original
    features_df = df.copy()

    # Track feature engineering operations
    feature_ops = []

    # 1. Convert binary categorical variables to 0/1
    binary_columns = ['partner', 'dependents', 'phone_service', 'paperless_billing']
    for col in binary_columns:
        if col in features_df.columns:
            features_df[col] = features_df[col].map({'Yes': 1, 'No': 0})
            feature_ops.append(f"Converted {col} to binary 0/1")

    # 2. Create tenure-related features
    if 'tenure' in features_df.columns:
        # Tenure in years
        features_df['tenure_years'] = features_df['tenure'] / 12
        feature_ops.append("Created tenure_years feature")

        # Tenure bins
        tenure_bins = [0, 12, 24, 36, 48, 60, 72]
        tenure_labels = ['0-1 year', '1-2 years', '2-3 years', '3-4 years', '4-5 years', '5+ years']
        features_df['tenure_group'] = pd.cut(features_df['tenure'], bins=tenure_bins, labels=tenure_labels, right=False)
        feature_ops.append("Created tenure_group feature with 6 bins")

    # 3. Create price-related features
    if 'monthly_charges' in features_df.columns and 'tenure' in features_df.columns:
        # Average charge per month of tenure
        mask = features_df['tenure'] > 0  # Avoid division by zero
        features_df['avg_monthly_charges'] = 0
        features_df.loc[mask, 'avg_monthly_charges'] = features_df.loc[mask, 'total_charges'] / features_df.loc[mask, 'tenure']
        feature_ops.append("Created avg_monthly_charges feature")

        # Monthly charges bin
        charge_bins = [0, 35, 70, 105, float('inf')]
        charge_labels = ['Low', 'Medium', 'High', 'Very High']
        features_df['monthly_charges_category'] = pd.cut(features_df['monthly_charges'], bins=charge_bins, labels=charge_labels)
        feature_ops.append("Created monthly_charges_category feature with 4 bins")

    # 4. Services count feature
    service_columns = ['phone_service', 'multiple_lines', 'internet_service', 'online_security',
                       'online_backup', 'tech_support', 'streaming_tv', 'streaming_movies']

    # Initialize services count
    features_df['services_count'] = 0

    # Count 'Yes' values
    for col in service_columns:
        if col in features_df.columns:
            features_df['services_count'] += (features_df[col] == 'Yes').astype(int)

    feature_ops.append("Created services_count feature")

    # 5. Contract type as ordinal
    if 'contract' in features_df.columns:
        contract_map = {'Month-to-month': 0, 'One year': 1, 'Two year': 2}
        features_df['contract_type_code'] = features_df['contract'].map(contract_map)
        feature_ops.append("Created contract_type_code feature")

    # 6. Target encoding for churn prediction
    features_df['churn_binary'] = features_df['churn'].map({'Yes': 1, 'No': 0})
    feature_ops.append("Created churn_binary feature for target")

    # Create feature metadata
    feature_metadata = {
        "data_version": version,
        "feature_version": f"{version}_feat",
        "timestamp": datetime.datetime.now().isoformat(),
        "num_rows": len(features_df),
        "num_features": len(features_df.columns),
        "feature_engineering_ops": feature_ops,
        "numerical_features": list(features_df.select_dtypes(include=['int64', 'float64']).columns),
        "categorical_features": list(features_df.select_dtypes(include=['object']).columns),
        "binary_features": [col for col in features_df.columns if features_df[col].nunique() == 2],
        "target_feature": "churn_binary"
    }

    # Save engineered features
    features_dir = Path("data/features")
    features_dir.mkdir(exist_ok=True, parents=True)

    features_path = features_dir / f"telco_features_v{version}.csv"
    features_df.to_csv(features_path, index=False)

    # Save feature metadata
    metadata_path = features_dir / f"telco_features_v{version}_metadata.json"
    with open(metadata_path, 'w') as f:
        json.dump(feature_metadata, f, indent=2)

    print(f"Feature engineering complete. Features saved to {features_path}")
    logging.info(f"Feature engineering completed for version {version} at {datetime.datetime.now()}")

    return features_df, feature_metadata

# Engineer features
features_df, feature_metadata = engineer_features(cleaned_data, initial_version)

# Display feature summary
print("\nFeature Engineering Summary:")
print(f"- Original features: {len(raw_data.columns)}")
print(f"- Engineered features: {len(features_df.columns)}")
print(f"- New features added: {len(features_df.columns) - len(raw_data.columns)}")

# Display sample of engineered features
features_df.head()

Engineering features for data version 20250512_172103...
Feature engineering complete. Features saved to data/features/telco_features_v20250512_172103.csv

Feature Engineering Summary:
- Original features: 20
- Engineered features: 27
- New features added: 7


 8.48814780e+02 1.36938389e+02 1.10943446e+02 5.17629125e+01
 5.70463733e+01 8.41303614e+01 1.20460558e+01 1.14457683e+02
 1.01104262e+02 8.70262056e+01 1.30602476e+01 2.38914956e+02
 2.70803064e+02 1.02516846e+02 4.63917025e+02 2.49873981e+02
 2.03965064e+02 4.17602759e+02 6.44385998e+01 2.22358440e+01
 1.80014994e+02 3.43760298e+01 3.14326101e+02 6.61805600e+01
 8.49066538e+01 1.40251790e+02 7.63303226e+01 2.66563425e+02
 1.49680606e+01 2.24232317e+03 6.58049809e+01 1.56820644e+02
 3.40020337e+02 7.07533128e+02 6.61154918e+00 5.88138021e+01
 5.13699511e+01 3.83920897e+00 3.00652727e+01 7.80332896e+01
 6.01750777e+01 1.79179444e+02 1.18562718e+02 9.30114595e+01
 1.19169246e+02 5.33772740e+01 8.81026888e+01 3.74656846e+01
 1.34437002e+02 6.20168181e+01 4.27105391e+02 8.80690727e+01
 7.13486086e+01 4.65187119e+01 4.39615368e+02 6.23159973e+03
 1.87744375e+02 3.34915232e+01 2.52596840e+01 9.37778097e+01
 1.24791469e+02 5.02368096e+01 1.09242920e+02 9.91139527e+01
 7.93038323e+01 1.211986

Unnamed: 0,customer_id,gender,senior_citizen,partner,dependents,tenure,phone_service,multiple_lines,internet_service,online_security,...,monthly_charges,total_charges,churn,tenure_years,tenure_group,avg_monthly_charges,monthly_charges_category,services_count,contract_type_code,churn_binary
0,CUST-00000,Male,1,1,0,6,0,Yes,No,Yes,...,26.875095,4951.885351,No,0.5,0-1 year,825.314225,Low,5,2,0
1,CUST-00001,Female,0,0,0,34,0,No phone service,Fiber optic,No internet service,...,70.739409,1586.779855,No,2.833333,2-3 years,46.669996,High,1,1,0
2,CUST-00002,Male,0,0,0,54,1,No phone service,No,Yes,...,52.891978,2353.696176,No,4.5,4-5 years,43.586966,Medium,3,0,0
3,CUST-00003,Male,0,0,0,13,0,No phone service,Fiber optic,Yes,...,70.831169,3859.820909,No,1.083333,1-2 years,296.909301,High,2,1,0
4,CUST-00004,Male,0,0,1,9,0,No phone service,No,Yes,...,22.397545,7639.333018,Yes,0.75,0-1 year,848.81478,Low,2,0,1


#### 1.5 Feature Store Setup

In [None]:
def setup_feature_store():
    """
    Set up a SQLite feature store for versioned feature management

    Returns:
        conn: SQLite connection
    """
    # Create feature store directory
    feature_store_dir = Path("feature_store")
    feature_store_dir.mkdir(exist_ok=True)

    # Connect to SQLite database
    conn = sqlite3.connect('feature_store/feature_store.db')

    # Create features table
    cursor = conn.cursor()
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS features (
        id INTEGER PRIMARY KEY,
        feature_name TEXT NOT NULL,
        feature_version TEXT NOT NULL,
        data_version TEXT NOT NULL,
        feature_type TEXT NOT NULL,
        created_at TEXT NOT NULL,
        description TEXT,
        stats TEXT,
        UNIQUE(feature_name, feature_version)
    )
    ''')

    # Create feature_values table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS feature_values (
        id INTEGER PRIMARY KEY,
        entity_id TEXT NOT NULL,
        feature_id INTEGER NOT NULL,
        value TEXT NOT NULL,
        timestamp TEXT NOT NULL,
        FOREIGN KEY (feature_id) REFERENCES features(id)
    )
    ''')

    # Create feature_sets table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS feature_sets (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        version TEXT NOT NULL,
        created_at TEXT NOT NULL,
        description TEXT,
        UNIQUE(name, version)
    )
    ''')

    # Create feature_set_features table (many-to-many relationship)
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS feature_set_features (
        feature_set_id INTEGER NOT NULL,
        feature_id INTEGER NOT NULL,
        PRIMARY KEY (feature_set_id, feature_id),
        FOREIGN KEY (feature_set_id) REFERENCES feature_sets(id),
        FOREIGN KEY (feature_id) REFERENCES features(id)
    )
    ''')

    conn.commit()
    print("Feature store database initialized")

    return conn

def register_features_in_store(conn, features_df, feature_metadata):
    """
    Register features in the feature store

    Args:
        conn: SQLite connection
        features_df: DataFrame with features
        feature_metadata: Feature metadata dictionary
    """
    cursor = conn.cursor()

    # Create a feature set
    feature_set_name = "telco_churn_features"
    feature_set_version = feature_metadata['feature_version']
    created_at = datetime.datetime.now().isoformat()
    description = f"Telco churn prediction features version {feature_set_version}"

    cursor.execute('''
    INSERT OR REPLACE INTO feature_sets (name, version, created_at, description)
    VALUES (?, ?, ?, ?)
    ''', (feature_set_name, feature_set_version, created_at, description))

    feature_set_id = cursor.lastrowid

    # Register each feature
    for feature_name in features_df.columns:
        if feature_name == 'customer_id':
            continue  # Skip entity ID column

        # Determine feature type
        if feature_name in feature_metadata['numerical_features']:
            feature_type = 'NUMERIC'
        elif feature_name in feature_metadata['binary_features']:
            feature_type = 'BINARY'
        else:
            feature_type = 'CATEGORICAL'

        # Calculate basic stats
        if feature_type == 'NUMERIC':
            stats = {
                'min': float(features_df[feature_name].min()),
                'max': float(features_df[feature_name].max()),
                'mean': float(features_df[feature_name].mean()),
                'median': float(features_df[feature_name].median()),
                'std': float(features_df[feature_name].std())
            }
        else:
            value_counts = features_df[feature_name].value_counts().to_dict()
            stats = {
                'unique_values': len(value_counts),
                'value_counts': {str(k): int(v) for k, v in value_counts.items()}
            }

        stats_json = json.dumps(stats)

        # Insert feature metadata
        cursor.execute('''
        INSERT OR REPLACE INTO features
        (feature_name, feature_version, data_version, feature_type, created_at, description, stats)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            feature_name,
            feature_set_version,
            feature_metadata['data_version'],
            feature_type,
            created_at,
            f"Feature {feature_name} for telco churn prediction",
            stats_json
        ))

        feature_id = cursor.lastrowid

        # Link feature to feature set
        cursor.execute('''
        INSERT OR REPLACE INTO feature_set_features (feature_set_id, feature_id)
        VALUES (?, ?)
        ''', (feature_set_id, feature_id))

        # For demo purposes, only store a sample of feature values
        if feature_name in ['tenure', 'monthly_charges', 'churn_binary', 'services_count']:
            # Store feature values for each entity (customer)
            for _, row in features_df.sample(min(100, len(features_df))).iterrows():
                entity_id = row['customer_id']
                value = str(row[feature_name])

                cursor.execute('''
                INSERT INTO feature_values (entity_id, feature_id, value, timestamp)
                VALUES (?, ?, ?, ?)
                ''', (entity_id, feature_id, value, created_at))

    conn.commit()
    print(f"Registered {len(features_df.columns) - 1} features in feature store under set '{feature_set_name}' version '{feature_set_version}'")

# Set up feature store
conn = setup_feature_store()

# Register features
register_features_in_store(conn, features_df, feature_metadata)

# Query to verify feature registration
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM features")
feature_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM feature_sets")
feature_set_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM feature_values")
value_count = cursor.fetchone()[0]

print(f"\nFeature Store Summary:")
print(f"- Registered features: {feature_count}")
print(f"- Feature sets: {feature_set_count}")
print(f"- Sample feature values stored: {value_count}")

# Check a few registered features
cursor.execute("SELECT feature_name, feature_type, stats FROM features LIMIT 5")
for row in cursor.fetchall():
    name, type_, stats = row
    print(f"- Feature: {name}, Type: {type_}, Stats: {stats[:60]}...")

Feature store database initialized
Registered 26 features in feature store under set 'telco_churn_features' version '20250512_172103_feat'

Feature Store Summary:
- Registered features: 52
- Feature sets: 2
- Sample feature values stored: 800
- Feature: gender, Type: BINARY, Stats: {"unique_values": 2, "value_counts": {"Female": 510, "Male":...
- Feature: senior_citizen, Type: NUMERIC, Stats: {"min": 0.0, "max": 1.0, "mean": 0.474, "median": 0.0, "std"...
- Feature: partner, Type: NUMERIC, Stats: {"min": 0.0, "max": 1.0, "mean": 0.501, "median": 1.0, "std"...
- Feature: dependents, Type: NUMERIC, Stats: {"min": 0.0, "max": 1.0, "mean": 0.476, "median": 0.0, "std"...
- Feature: tenure, Type: NUMERIC, Stats: {"min": 0.0, "max": 71.0, "mean": 35.672, "median": 36.0, "s...


### 2. Experimentation & Training

#### 2.1 Set Up MLflow for Experiment Tracking

In [None]:
def setup_mlflow():
    """
    Set up MLflow for experiment tracking
    """
    # Create MLflow directory
    mlflow_dir = Path("mlruns")
    mlflow_dir.mkdir(exist_ok=True)

    # Set tracking URI to local directory
    mlflow.set_tracking_uri(f"file:{os.path.abspath(mlflow_dir)}")

    # Create experiment if it doesn't exist
    experiment_name = "telco_churn_prediction"

    experiment = mlflow.get_experiment_by_name(experiment_name)
    if experiment is None:
        experiment_id = mlflow.create_experiment(experiment_name)
        print(f"Created new MLflow experiment '{experiment_name}' with ID {experiment_id}")
    else:
        experiment_id = experiment.experiment_id
        print(f"Using existing MLflow experiment '{experiment_name}' with ID {experiment_id}")

    mlflow.set_experiment(experiment_name)

    return experiment_id

# Set up MLflow
experiment_id = setup_mlflow()

Using existing MLflow experiment 'telco_churn_prediction' with ID 409164801491470435


#### 2.2 Create Data Splits

In [None]:
def create_data_splits(features_df, test_size=0.2, val_size=0.25, random_state=42):
    """
    Create reproducible train/validation/test splits

    Args:
        features_df: DataFrame with features
        test_size: Proportion of data to use for test set
        val_size: Proportion of training data to use for validation
        random_state: Random seed for reproducibility

    Returns:
        splits: Dictionary with X_train, X_val, X_test, y_train, y_val, y_test
    """

    # Make a copy to avoid modifying the original
    df = features_df.copy()

    # Define features and target
    X = df.drop(['churn', 'churn_binary'], axis=1)
    y = df['churn_binary']

    # First split: training+validation vs test
    X_train_val, X_test, y_train_val, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=y
    )

    # Second split: training vs validation
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_val, y_train_val, test_size=val_size, random_state=random_state, stratify=y_train_val
    )

    # Create splits dictionary
    splits = {
        'X_train': X_train,
        'X_val': X_val,
        'X_test': X_test,
        'y_train': y_train,
        'y_val': y_val,
        'y_test': y_test
    }

    # Log split sizes
    print(f"Split sizes:")
    print(f"- Training: {len(X_train)} samples ({len(X_train) / len(df):.1%})")
    print(f"- Validation: {len(X_val)} samples ({len(X_val) / len(df):.1%})")
    print(f"- Test: {len(X_test)} samples ({len(X_test) / len(df):.1%})")

    splits_dir = Path("data/splits")
    splits_dir.mkdir(exist_ok=True, parents=True)

    # Save split indices for reproducibility
    split_indices = {
        'train_indices': X_train.index.tolist(),
        'val_indices': X_val.index.tolist(),
        'test_indices': X_test.index.tolist(),
        'random_state': random_state,
        'test_size': test_size,
        'val_size': val_size,
        'timestamp': datetime.datetime.now().isoformat()
    }

    with open(splits_dir / f"split_indices_{initial_version}.json", 'w') as f:
        json.dump(split_indices, f, indent=2)

    return splits

# Create train/validation/test splits
data_splits = create_data_splits(features_df)

Split sizes:
- Training: 600 samples (60.0%)
- Validation: 200 samples (20.0%)
- Test: 200 samples (20.0%)


#### 2.3 Create Feature Preprocessing Pipeline

In [None]:
def create_preprocessing_pipeline(X_train):
    """
    Create a scikit-learn preprocessing pipeline

    Args:
        X_train: Training features DataFrame

    Returns:
        preprocessor: ColumnTransformer preprocessing pipeline
    """
    # Identify column types
    categorical_cols = X_train.select_dtypes(include=['object', 'category']).columns.tolist()
    numerical_cols = X_train.select_dtypes(include=['int64', 'float64']).columns.tolist()

    # Remove customer_id from features
    if 'customer_id' in categorical_cols:
        categorical_cols.remove('customer_id')
    if 'customer_id' in numerical_cols:
        numerical_cols.remove('customer_id')

    # Create preprocessing steps for each column type
    numerical_transformer = Pipeline(steps=[
        ('scaler', StandardScaler())
    ])

    # The following line was incorrectly indented
    categorical_transformer = Pipeline(steps=[
        ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)) # Changed 'sparse' to 'sparse_output'
    ])

    # Combine preprocessing steps
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_transformer, numerical_cols),
            ('cat', categorical_transformer, categorical_cols)
        ],
        remainder='drop'  # Drop any columns not specified (like customer_id)
    )

    print(f"Created preprocessing pipeline with:")
    print(f"- {len(numerical_cols)} numerical features: {numerical_cols[:5]}")
    print(f"- {len(categorical_cols)} categorical features: {categorical_cols[:5]}")

    return preprocessor

# Create preprocessing pipeline
preprocessor = create_preprocessing_pipeline(data_splits['X_train'])

Created preprocessing pipeline with:
- 12 numerical features: ['senior_citizen', 'partner', 'dependents', 'tenure', 'phone_service']
- 12 categorical features: ['gender', 'multiple_lines', 'internet_service', 'online_security', 'online_backup']


#### 2.4 Train Models with Experiment Tracking

In [None]:
def train_and_evaluate_model(splits, preprocessor, model_params, run_name=None):
    """
    Train and evaluate a model with experiment tracking

    Args:
        splits: Dictionary with data splits
        preprocessor: Preprocessing pipeline
        model_params: Parameters for RandomForestClassifier
        run_name: Name for MLflow run

    Returns:
        model_pipeline: Trained model pipeline
        metrics: Evaluation metrics
    """
    # Extract splits
    X_train = splits['X_train']
    X_val = splits['X_val']
    X_test = splits['X_test']
    y_train = splits['y_train']
    y_val = splits['y_val']
    y_test = splits['y_test']

    # Create and train model pipeline
    model = RandomForestClassifier(**model_params)
    model_pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])

    # Start MLflow run
    with mlflow.start_run(run_name=run_name) as run:
        run_id = run.info.run_id
        print(f"Started MLflow run '{run_name}' with ID {run_id}")

        # Log data versions and features
        mlflow.log_param("data_version", initial_version)
        mlflow.log_param("n_samples_train", len(X_train))
        mlflow.log_param("n_features", X_train.shape[1])

        # Log feature names
        feature_names = list(X_train.columns)
        mlflow.log_param("feature_count", len(feature_names))

        # Log model parameters
        for param, value in model_params.items():
            mlflow.log_param(param, value)

        # Train the model and time it
        start_time = datetime.datetime.now()
        model_pipeline.fit(X_train, y_train)
        train_time = (datetime.datetime.now() - start_time).total_seconds()
        mlflow.log_metric("training_time_seconds", train_time)

        # Evaluate on training set
        y_train_pred = model_pipeline.predict(X_train)
        y_train_prob = model_pipeline.predict_proba(X_train)[:, 1]

        train_metrics = {
            "train_accuracy": accuracy_score(y_train, y_train_pred),
            "train_precision": precision_score(y_train, y_train_pred),
            "train_recall": recall_score(y_train, y_train_pred),
            "train_f1": f1_score(y_train, y_train_pred),
            "train_roc_auc": roc_auc_score(y_train, y_train_prob)
        }

        # Evaluate on validation set
        y_val_pred = model_pipeline.predict(X_val)
        y_val_prob = model_pipeline.predict_proba(X_val)[:, 1]

        val_metrics = {
            "val_accuracy": accuracy_score(y_val, y_val_pred),
            "val_precision": precision_score(y_val, y_val_pred),
            "val_recall": recall_score(y_val, y_val_pred),
            "val_f1": f1_score(y_val, y_val_pred),
            "val_roc_auc": roc_auc_score(y_val, y_val_prob)
        }

        # Evaluate on test set
        y_test_pred = model_pipeline.predict(X_test)
        y_test_prob = model_pipeline.predict_proba(X_test)[:, 1]

        test_metrics = {
            "test_accuracy": accuracy_score(y_test, y_test_pred),
            "test_precision": precision_score(y_test, y_test_pred),
            "test_recall": recall_score(y_test, y_test_pred),
            "test_f1": f1_score(y_test, y_test_pred),
            "test_roc_auc": roc_auc_score(y_test, y_test_prob)
        }

        # Combine all metrics
        metrics = {**train_metrics, **val_metrics, **test_metrics}

        # Log metrics to MLflow
        for metric_name, metric_value in metrics.items():
            mlflow.log_metric(metric_name, metric_value)

        # Log the model
        mlflow.sklearn.log_model(model_pipeline, "model")


        # Log feature importances
        if hasattr(model, 'feature_importances_'):
            # Get column names after preprocessing
            preprocessor_output_feature_names = []
            for name, trans, cols in preprocessor.transformers_:
                if hasattr(trans, 'get_feature_names_out'):
                    preprocessor_output_feature_names.extend(trans.get_feature_names_out(cols))
                else:
                    preprocessor_output_feature_names.extend(cols)

            # Ensure feature names and importances have the same length
            # This is done by selecting the feature names corresponding to the
            # features used by the model (based on the length of feature_importances_)
            preprocessor_output_feature_names = preprocessor_output_feature_names[:len(model.feature_importances_)]

            # Match feature importances with names
            importance_df = pd.DataFrame({
                'feature': preprocessor_output_feature_names,
                'importance': model.feature_importances_
            }).sort_values('importance', ascending=False)

            # Match feature importances with names
            importance_df = pd.DataFrame({
                'feature': preprocessor_output_feature_names,
                'importance': model.feature_importances_
            }).sort_values('importance', ascending=False)

            # Save feature importances
            importance_path = f"feature_importances_{run_id}.csv"
            importance_df.to_csv(importance_path, index=False)
            mlflow.log_artifact(importance_path)

            # Create and log feature importance plot
            plt.figure(figsize=(10, 6))
            top_features = importance_df.head(15)
            sns.barplot(x='importance', y='feature', data=top_features)
            plt.title('Top 15 Feature Importances')
            plt.tight_layout()

            plot_path = f"feature_importance_plot_{run_id}.png"
            plt.savefig(plot_path)
            mlflow.log_artifact(plot_path)
            plt.close()

            # Clean up local files
            os.remove(importance_path)
            os.remove(plot_path)

        # Create and log confusion matrix
        cm = np.zeros((2, 2))
        cm[0, 0] = np.sum((y_val == 0) & (y_val_pred == 0))
        cm[0, 1] = np.sum((y_val == 0) & (y_val_pred == 1))
        cm[1, 0] = np.sum((y_val == 1) & (y_val_pred == 0))
        cm[1, 1] = np.sum((y_val == 1) & (y_val_pred == 1))

        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='g', cmap='Blues',
                    xticklabels=['No Churn', 'Churn'],
                    yticklabels=['No Churn', 'Churn'])
        plt.xlabel('Predicted')
        plt.ylabel('Actual')
        plt.title('Confusion Matrix (Validation Set)')
        plt.tight_layout()

        cm_path = f"confusion_matrix_{run_id}.png"
        plt.savefig(cm_path)
        mlflow.log_artifact(cm_path)
        plt.close()

        # Clean up local file
        os.remove(cm_path)

        print(f"Model training and evaluation complete. MLflow run ID: {run_id}")
        print(f"Validation metrics: accuracy={val_metrics['val_accuracy']:.4f}, f1={val_metrics['val_f1']:.4f}, roc_auc={val_metrics['val_roc_auc']:.4f}")

    return model_pipeline, metrics, run_id

# Train multiple model variants
models = []

# Model 1: Default RandomForest
model_params_1 = {
    'n_estimators': 100,
    'max_depth': None,
    'min_samples_split': 2,
    'min_samples_leaf': 1,
    'random_state': 42
}

model_1, metrics_1, run_id_1 = train_and_evaluate_model(
    data_splits,
    preprocessor,
    model_params_1,
    run_name="RandomForest-Default"
)
models.append(("RandomForest-Default", model_1, metrics_1, run_id_1))

# Model 2: Tuned RandomForest
model_params_2 = {
    'n_estimators': 200,
    'max_depth': 10,
    'min_samples_split': 5,
    'min_samples_leaf': 2,
    'random_state': 42
}

model_2, metrics_2, run_id_2 = train_and_evaluate_model(
    data_splits,
    preprocessor,
    model_params_2,
    run_name="RandomForest-Tuned"
)
models.append(("RandomForest-Tuned", model_2, metrics_2, run_id_2))

# Model 3: RandomForest with feature reduction focus
model_params_3 = {
    'n_estimators': 150,
    'max_depth': 8,
    'min_samples_split': 10,
    'min_samples_leaf': 4,
    'max_features': 'sqrt',
    'random_state': 42
}

model_3, metrics_3, run_id_3 = train_and_evaluate_model(
    data_splits,
    preprocessor,
    model_params_3,
    run_name="RandomForest-FeatureReduction"
)
models.append(("RandomForest-FeatureReduction", model_3, metrics_3, run_id_3))

# Compare models
comparison_df = pd.DataFrame([
    {
        'model_name': model_name,
        'val_accuracy': metrics['val_accuracy'],
        'val_precision': metrics['val_precision'],
        'val_recall': metrics['val_recall'],
        'val_f1': metrics['val_f1'],
        'val_roc_auc': metrics['val_roc_auc'],
        'run_id': run_id
    }
    for model_name, _, metrics, run_id in models
])

print("\nModel Comparison:")
comparison_df

Started MLflow run 'RandomForest-Default' with ID 4bce6674e96248deaa3eabf58da39748


The format of the columns of the 'remainder' transformer in ColumnTransformer.transformers_ will change in version 1.7 to match the format of the other transformers.
At the moment the remainder columns are stored as indices (of type int). With the same ColumnTransformer configuration, in the future they will be stored as column names (of type str).



Model training and evaluation complete. MLflow run ID: 4bce6674e96248deaa3eabf58da39748
Validation metrics: accuracy=0.6650, f1=0.3093, roc_auc=0.6231
Started MLflow run 'RandomForest-Tuned' with ID 7c320ab775db4de1a352fd8060601f20


The format of the columns of the 'remainder' transformer in ColumnTransformer.transformers_ will change in version 1.7 to match the format of the other transformers.
At the moment the remainder columns are stored as indices (of type int). With the same ColumnTransformer configuration, in the future they will be stored as column names (of type str).



Model training and evaluation complete. MLflow run ID: 7c320ab775db4de1a352fd8060601f20
Validation metrics: accuracy=0.6650, f1=0.3093, roc_auc=0.6356
Started MLflow run 'RandomForest-FeatureReduction' with ID a7cfc0187a3b41e7bb015c8d0925ec12


The format of the columns of the 'remainder' transformer in ColumnTransformer.transformers_ will change in version 1.7 to match the format of the other transformers.
At the moment the remainder columns are stored as indices (of type int). With the same ColumnTransformer configuration, in the future they will be stored as column names (of type str).



Model training and evaluation complete. MLflow run ID: a7cfc0187a3b41e7bb015c8d0925ec12
Validation metrics: accuracy=0.6650, f1=0.3093, roc_auc=0.6423

Model Comparison:


Unnamed: 0,model_name,val_accuracy,val_precision,val_recall,val_f1,val_roc_auc,run_id
0,RandomForest-Default,0.665,0.555556,0.214286,0.309278,0.623077,4bce6674e96248deaa3eabf58da39748
1,RandomForest-Tuned,0.665,0.555556,0.214286,0.309278,0.635604,7c320ab775db4de1a352fd8060601f20
2,RandomForest-FeatureReduction,0.665,0.555556,0.214286,0.309278,0.642308,a7cfc0187a3b41e7bb015c8d0925ec12


#### 2.5 Select Best Model and Register in Model Registry

In [None]:
def register_best_model(models):
    """
    Select the best model based on validation metrics and register in model registry

    Args:
        models: List of (name, model, metrics, run_id) tuples

    Returns:
        best_model: Best model object
        best_run_id: MLflow run ID of best model
    """
    # Create a sorted list based on validation F1 score
    sorted_models = sorted(models, key=lambda x: x[2]['val_f1'], reverse=True)

    # Get the best model
    best_model_name, best_model, best_metrics, best_run_id = sorted_models[0]

    print(f"Best model: {best_model_name}")
    print(f"Validation F1 score: {best_metrics['val_f1']:.4f}")
    print(f"Validation ROC AUC: {best_metrics['val_roc_auc']:.4f}")

    # Register the model in MLflow model registry
    model_uri = f"runs:/{best_run_id}/model"
    registered_model_name = "telco_churn_predictor"

    registered_model = mlflow.register_model(model_uri, registered_model_name)
    print(f"Registered model '{registered_model_name}' version {registered_model.version}")

    # Create model directory
    models_dir = Path("models")
    models_dir.mkdir(exist_ok=True)

    # Save the model locally
    model_path = models_dir / f"{registered_model_name}_v{registered_model.version}.pkl"
    with open(model_path, 'wb') as f:
        pickle.dump(best_model, f)

    # Save model metadata
    model_metadata = {
        "model_name": registered_model_name,
        "model_version": registered_model.version,
        "run_id": best_run_id,
        "data_version": initial_version,
        "metrics": best_metrics,
        "timestamp": datetime.datetime.now().isoformat(),
        "description": f"Telco churn prediction model version {registered_model.version}"
    }

    metadata_path = models_dir / f"{registered_model_name}_v{registered_model.version}_metadata.json"
    with open(metadata_path, 'w') as f:
        json.dump(model_metadata, f, indent=2)

    return best_model, best_run_id, registered_model.version

# Register best model
best_model, best_run_id, model_version = register_best_model(models)

Best model: RandomForest-Default
Validation F1 score: 0.3093
Validation ROC AUC: 0.6231
Registered model 'telco_churn_predictor' version 2


Registered model 'telco_churn_predictor' already exists. Creating a new version of this model...
Created version '2' of model 'telco_churn_predictor'.


### 3. Automated Validation (CI)

#### 3.1 Create Validation Test Suite

In [None]:
def create_validation_test_suite():
    """
    Create a validation test suite for the ML pipeline

    Returns:
        test_suite: Dictionary of test functions
    """
    # Dictionary to store test functions
    test_suite = {}

    # Test 1: Feature pipeline validation
    def test_feature_pipeline(X_sample, preprocessor):
        """Test that the feature pipeline correctly transforms data"""
        try:
            # Transform a sample
            X_transformed = preprocessor.transform(X_sample)

            # Check output shape and type
            assert X_transformed is not None, "Preprocessor output is None"
            assert X_transformed.shape[0] == X_sample.shape[0], "Output has wrong number of samples"
            assert not np.isnan(X_transformed).any(), "Output contains NaN values"
            assert not np.isinf(X_transformed).any(), "Output contains infinite values"

            return True, "Feature pipeline validation passed"
        except Exception as e:
            return False, f"Feature pipeline validation failed: {str(e)}"

    test_suite["test_feature_pipeline"] = test_feature_pipeline

    # Test 2: Model prediction validation
    def test_model_predictions(model, X_sample):
        """Test that the model produces valid predictions"""
        try:
            # Generate predictions
            predictions = model.predict(X_sample)
            probabilities = model.predict_proba(X_sample)

            # Check predictions
            assert predictions is not None, "Predictions are None"
            assert predictions.shape[0] == X_sample.shape[0], "Wrong number of predictions"
            assert set(np.unique(predictions)).issubset({0, 1}), "Predictions are not binary"
            assert probabilities.shape == (X_sample.shape[0], 2), "Probability shape is incorrect"
            assert np.allclose(np.sum(probabilities, axis=1), 1.0), "Probabilities don't sum to 1"
            assert np.all(probabilities >= 0) and np.all(probabilities <= 1), "Probabilities outside [0,1]"

            return True, "Model prediction validation passed"
        except Exception as e:
            return False, f"Model prediction validation failed: {str(e)}"

    test_suite["test_model_predictions"] = test_model_predictions

    # Test 3: Feature importance validation
    def test_feature_importance(model):
        """Test that feature importances are available and valid"""
        try:
            # Get the classifier from the pipeline
            classifier = model.named_steps['classifier']

            # Check feature importances
            assert hasattr(classifier, 'feature_importances_'), "Model has no feature_importances_ attribute"
            importances = classifier.feature_importances_
            assert importances is not None, "Feature importances are None"
            assert len(importances) > 0, "No feature importances available"
            assert np.all(importances >= 0), "Negative feature importances found"
            assert np.isclose(np.sum(importances), 1.0), "Feature importances don't sum to 1"

            return True, "Feature importance validation passed"
        except Exception as e:
            return False, f"Feature importance validation failed: {str(e)}"

    test_suite["test_feature_importance"] = test_feature_importance

    # Test 4: Model serialization validation
    def test_model_serialization(model):
        """Test that the model can be serialized and deserialized"""
        try:
            # Serialize model
            serialized = pickle.dumps(model)

            # Deserialize model
            deserialized_model = pickle.loads(serialized)

            # Check if model works after deserialization
            assert deserialized_model is not None, "Deserialized model is None"
            assert hasattr(deserialized_model, 'predict'), "Deserialized model has no predict method"
            assert hasattr(deserialized_model, 'predict_proba'), "Deserialized model has no predict_proba method"

            return True, "Model serialization validation passed"
        except Exception as e:
            return False, f"Model serialization validation failed: {str(e)}"

    test_suite["test_model_serialization"] = test_model_serialization

    # Test 5: Lightweight retraining validation
    def test_lightweight_retraining(model, X_sample, y_sample):
        """Test that the model can be retrained with new data"""
        try:
            # Clone the model to avoid modifying the original
            model_copy = pickle.loads(pickle.dumps(model))

            # Fit on a small sample
            model_copy.fit(X_sample, y_sample)

            # Check predictions
            predictions = model_copy.predict(X_sample)
            assert predictions is not None, "Predictions after retraining are None"
            assert predictions.shape[0] == X_sample.shape[0], "Wrong number of predictions after retraining"

            return True, "Lightweight retraining validation passed"
        except Exception as e:
            return False, f"Lightweight retraining validation failed: {str(e)}"

    test_suite["test_lightweight_retraining"] = test_lightweight_retraining

    return test_suite

# Create validation test suite
test_suite = create_validation_test_suite()

#### 3.2 Run Validation Tests

In [None]:
def run_validation_tests(test_suite, model, preprocessor, data_splits):
    """
    Run all validation tests and report results

    Args:
        test_suite: Dictionary of test functions
        model: Model to validate
        preprocessor: Preprocessor pipeline
        data_splits: Data splits dictionary

    Returns:
        results: Dictionary of test results
    """
    print(f"Running validation tests for model...")

    # Create test dataset (small sample)
    X_sample = data_splits['X_val'].sample(min(100, len(data_splits['X_val'])), random_state=42)
    y_sample = data_splits['y_val'].loc[X_sample.index]

    # Run all tests
    results = {}

    # Test 1: Feature pipeline validation
    success, message = test_suite["test_feature_pipeline"](X_sample, preprocessor)
    results["Feature Pipeline"] = {"success": success, "message": message}
    print(f"- Feature Pipeline Test: {'âœ“' if success else 'âœ—'} {message}")

    # Test 2: Model prediction validation
    success, message = test_suite["test_model_predictions"](model, X_sample)
    results["Model Predictions"] = {"success": success, "message": message}
    print(f"- Model Predictions Test: {'âœ“' if success else 'âœ—'} {message}")

    # Test 3: Feature importance validation
    success, message = test_suite["test_feature_importance"](model)
    results["Feature Importance"] = {"success": success, "message": message}
    print(f"- Feature Importance Test: {'âœ“' if success else 'âœ—'} {message}")

    # Test 4: Model serialization validation
    success, message = test_suite["test_model_serialization"](model)
    results["Model Serialization"] = {"success": success, "message": message}
    print(f"- Model Serialization Test: {'âœ“' if success else 'âœ—'} {message}")

    # Test 5: Lightweight retraining validation
    success, message = test_suite["test_lightweight_retraining"](model, X_sample, y_sample)
    results["Lightweight Retraining"] = {"success": success, "message": message}
    print(f"- Lightweight Retraining Test: {'âœ“' if success else 'âœ—'} {message}")

    # Calculate overall success rate
    success_count = sum(1 for test in results.values() if test["success"])
    success_rate = success_count / len(results)
    print(f"\nValidation summary: {success_count}/{len(results)} tests passed ({success_rate:.0%})")

    # Save test results
    validation_dir = Path("validation")
    validation_dir.mkdir(exist_ok=True)

    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    results_path = validation_dir / f"validation_results_{timestamp}.json"

    with open(results_path, 'w') as f:
        json.dump({
            "timestamp": timestamp,
            "model_version": model_version,
            "data_version": initial_version,
            "success_rate": success_rate,
            "tests": {name: {"success": result["success"], "message": result["message"]}
                      for name, result in results.items()}
        }, f, indent=2)

    print(f"Test results saved to {results_path}")

    if success_rate == 1.0:
        print("All validation tests passed! Model is ready for deployment.")
    else:
        print("Some validation tests failed. Review issues before deployment.")

    return results

# Run validation tests
validation_results = run_validation_tests(test_suite, best_model, preprocessor, data_splits)

Running validation tests for model...
- Feature Pipeline Test: âœ“ Feature pipeline validation passed
- Model Predictions Test: âœ“ Model prediction validation passed
- Feature Importance Test: âœ“ Feature importance validation passed
- Model Serialization Test: âœ“ Model serialization validation passed
- Lightweight Retraining Test: âœ“ Lightweight retraining validation passed

Validation summary: 5/5 tests passed (100%)
Test results saved to validation/validation_results_20250512_172144.json
All validation tests passed! Model is ready for deployment.
