# Advanced Regression Pipeline


This notebook builds on the LightGBM pipeline to compare three regression algorithms using data from the local database. It includes data loading, preprocessing, model training, evaluation, and visualization.

In [1]:
import os
import json
import math
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple

## Environment
from dotenv import load_dotenv

## Core Scientific Stack
import numpy as np
import pandas as pd

## Visualization
import matplotlib.pyplot as plt
import seaborn as sns

## Database
import psycopg2

## Machine Learning / Preprocessing (scikit-learn)
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import OneHotEncoder, StandardScaler, PolynomialFeatures
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline, Pipeline as SkPipeline
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.linear_model import LinearRegression
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_is_fitted
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

## Gradient Boosting Libraries
import lightgbm as lgb
from lightgbm import LGBMRegressor
from catboost import CatBoostRegressor
import xgboost as xgb
from xgboost import XGBRegressor

## Deep Learning / Tabular
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from pytorch_tabnet.tab_model import TabNetRegressor

## Optimization & Persistence
import optuna
import joblib

# Load environment variables from .env file
load_dotenv(override=True)

# Setup model directory (handle notebook environment where __file__ is undefined)
try:
    PROJECT_ROOT = Path(__file__).resolve().parent
except NameError:
    # Fallback: assume notebook is inside src; go up one directory if so
    cwd = Path.cwd().resolve()
    if (cwd / 'supervised.ipynb').exists() or (cwd / 'unsupervised.ipynb').exists():
        PROJECT_ROOT = cwd
    else:
        for parent in cwd.parents:
            if (parent / 'requirements.txt').exists() or (parent / 'README.md').exists():
                PROJECT_ROOT = parent / 'src'
                break
        else:
            PROJECT_ROOT = cwd  # final fallback

MODEL_DIR = (PROJECT_ROOT / '..' / 'supervised').resolve()
MODEL_DIR.mkdir(parents=True, exist_ok=True)
print(f"Models will be saved to: {MODEL_DIR}")

def save_model(model, name: str, extra: dict | None = None):
    """Utility to persist models and optional metadata alongside them.
    Saves model as joblib plus a companion JSON with metadata/hyperparams."""
    timestamp = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
    base_name = f"{name}_{timestamp}"
    model_path = MODEL_DIR / f"{base_name}.joblib"
    meta_path = MODEL_DIR / f"{base_name}.json"
    joblib.dump(model, model_path)
    meta = {'model_name': name, 'saved_utc': timestamp}
    if extra:
        meta.update(extra)
    with open(meta_path, 'w') as f:
        json.dump(meta, f, indent=2)
    print(f"Saved model -> {model_path.name}; metadata -> {meta_path.name}")

# Database connection parameters
db_params = {
    "host": os.getenv("LOCAL_HOST"),
    "user": os.getenv("LOCAL_USER"),
    "password": os.getenv("LOCAL_PW"),
    "port": os.getenv("LOCAL_PORT"),
    "dbname": os.getenv("LOCAL_DB")
}

# Display versions
print('LightGBM version:', lgb.__version__)
print('XGBoost version:', xgb.__version__)

Models will be saved to: D:\docs\MADS\696-Milestone 2\supervised
LightGBM version: 4.6.0
XGBoost version: 3.0.5


In [2]:
# Check CUDA availability
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"Device count: {torch.cuda.device_count()}")
    print(f"Current device: {torch.cuda.current_device()}")
    print(f"Device name: {torch.cuda.get_device_name(0)}")
else:
    print("CUDA is NOT available - PyTorch will use CPU only")
    print("To enable GPU training, install PyTorch with CUDA support:")
    print("  conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia")
    print("  or")
    print("  pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118")

PyTorch version: 2.5.1
CUDA available: True
CUDA version: 12.1
Device count: 1
Current device: 0
Device name: NVIDIA GeForce GTX 1660 Ti


### Auto Load / Conditional Training
If a previously saved optimized model exists in `src/supervised`, the notebook will load the most recent artifact (by timestamp in filename) and skip retraining unless `FORCE_RETRAIN=True`.

In [3]:
# Auto-load previously saved optimized models (XGBoost / RandomForest / SVR / LinearRegression / Polynomial / MLP / TabNet)
from pathlib import Path as _Path
import json as _json

# Initialize placeholders if not already present
globals().setdefault('FORCE_RETRAIN', True)

# Only set to None if not defined to avoid clobbering models loaded earlier in session
if 'xgb_model' not in globals():
    xgb_model = None
if 'rf_model' not in globals():
    rf_model = None
if 'mlp_model' not in globals():
    mlp_model = None
if 'tabnet_model' not in globals():
    tabnet_model = None  # Will delay loading until wrapper class defined
if 'svr_model' not in globals():
    svr_model = None
if 'lr_model' not in globals():
    lr_model = None
if 'poly_model' not in globals():
    poly_model = None

MODEL_GLOB_PATTERNS = {
    'xgb_model': 'xgboost_opt_*.joblib',
    'rf_model': 'random_forest_opt_*.joblib', 
    'mlp_model': 'mlp_opt_*.joblib', 
    # 'tabnet_model': DEFERRED - skip here, load after wrapper class defined
    'svr_model': 'svr_opt_*.joblib',
    'lr_model': 'linear_regression_opt_*.joblib',
    'poly_model': 'poly_reg_opt_*.joblib'
}

loaded_flags = {}
for var, pattern in MODEL_GLOB_PATTERNS.items():
    if globals().get(var) is not None:
        loaded_flags[var] = 'pre-existing'
        continue
    matches = sorted(MODEL_DIR.glob(pattern))
    if not matches:
        loaded_flags[var] = 'not found'
        continue
    latest = matches[-1]
    try:
        globals()[var] = joblib.load(latest)
        meta_file = latest.with_suffix('.json')
        if meta_file.exists():
            with open(meta_file) as f:
                globals()[f"{var}_meta"] = _json.load(f)
        loaded_flags[var] = f"loaded {latest.name}"
    except Exception as e:
        loaded_flags[var] = f"failed: {e}";
        globals()[var] = None

loaded_flags['tabnet_model'] = 'deferred (class not yet defined)'

print("Auto-load status:")
for k,v in loaded_flags.items():
    print(f"  {k}: {v}")
print("FORCE_RETRAIN=", FORCE_RETRAIN)

Auto-load status:
  xgb_model: loaded xgboost_opt_refit_20251008T110355Z.joblib
  rf_model: loaded random_forest_opt_20251011T193747Z.joblib
  mlp_model: failed: Can't get attribute 'TorchMLPRegressor' on <module '__main__'>
  svr_model: loaded svr_opt_20251011T185830Z.joblib
  lr_model: loaded linear_regression_opt_20251011T185830Z.joblib
  poly_model: loaded poly_reg_opt_20251011T191654Z.joblib
  tabnet_model: deferred (class not yet defined)
FORCE_RETRAIN= True


In [4]:
# Connect to database and load data
try:
    conn = psycopg2.connect(**db_params)
    print("Database connection successful")
    sql_query = "SELECT * FROM dev.base_data;"
    df = pd.read_sql_query(sql_query, conn)
    conn.close()
    print("Golden data loaded into DataFrame:")
    print(df.info())
except Exception as e:
    print(f"An error occurred: {e}")

Database connection successful


  df = pd.read_sql_query(sql_query, conn)


Golden data loaded into DataFrame:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 23038 entries, 0 to 23037
Data columns (total 22 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   school_name             23038 non-null  object 
 1   school_type             23038 non-null  object 
 2   teachers_fte            22550 non-null  float64
 3   enrollment              22863 non-null  float64
 4   grade_eight_enrollment  21613 non-null  float64
 5   math_counts             22507 non-null  float64
 6   math_high_pct           22507 non-null  float64
 7   math_low_pct            19960 non-null  float64
 8   read_counts             22386 non-null  float64
 9   read_high_pct           22386 non-null  float64
 10  read_low_pct            19907 non-null  float64
 11  pct_hhi_150k_200k       23038 non-null  float64
 12  pct_hhi_220k_plus       23038 non-null  float64
 13  avg_natwalkind          23038 non-null  float64
 14  tot

## 3. Data Splitting: Train, Validation, Test
Split the dataset into train, validation, and test sets, ensuring proper handling of the target variable.

In [5]:
df.columns

Index(['school_name', 'school_type', 'teachers_fte', 'enrollment',
       'grade_eight_enrollment', 'math_counts', 'math_high_pct',
       'math_low_pct', 'read_counts', 'read_high_pct', 'read_low_pct',
       'pct_hhi_150k_200k', 'pct_hhi_220k_plus', 'avg_natwalkind',
       'total_10_14', 'pct_10_14', 'pct_female_10_14', 'total_pop',
       'hhi_150k_200k', 'hhi_220k_plus', 'schools_in_zip', 'dup_rank'],
      dtype='object')

In [6]:
# Define target and drop missing
TARGET = 'math_high_pct' if 'math_high_pct' in df.columns else 'target'
data = df.dropna().reset_index(drop=True)
data = data.set_index('school_name')

# Split features and target
feature_cols = [c for c in data.columns if c != TARGET and c != 'dup_rank' and c != 'math_low_pct']
X = data[feature_cols]
y = data[TARGET]

# Train/validation/test split
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.40, random_state=42)
X_valid, X_test, y_valid, y_test = train_test_split(X_temp, y_temp, test_size=0.50, random_state=42)
print(f'Train shape: {X_train.shape}, Validation shape: {X_valid.shape}, Test shape: {X_test.shape}')

Train shape: (11367, 18), Validation shape: (3789, 18), Test shape: (3789, 18)


## 4. Feature Engineering and Preprocessing Pipeline
Identify numeric and categorical features, set up StandardScaler and OneHotEncoder, and build a ColumnTransformer pipeline.

In [7]:
# Identify numeric and categorical features
numeric_features = X_train.select_dtypes(include=[np.number]).columns.tolist()
categorical_features = X_train.select_dtypes(include=['category', 'object']).columns.tolist()

print('Numeric features:', numeric_features)
print('Categorical features:', categorical_features)

# Preprocessing pipeline
numeric_transformer = Pipeline(steps=[('scaler', StandardScaler())])
categorical_transformer = Pipeline(steps=[('onehot', OneHotEncoder(handle_unknown='ignore'))])
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ],
    remainder='drop'
)

# Fit preprocessor
preprocessor.fit(X_train)
X_train_enc = preprocessor.transform(X_train)
X_valid_enc = preprocessor.transform(X_valid)
X_test_enc = preprocessor.transform(X_test)

Numeric features: ['teachers_fte', 'enrollment', 'grade_eight_enrollment', 'math_counts', 'read_counts', 'read_high_pct', 'read_low_pct', 'pct_hhi_150k_200k', 'pct_hhi_220k_plus', 'avg_natwalkind', 'total_10_14', 'pct_10_14', 'pct_female_10_14', 'total_pop', 'hhi_150k_200k', 'hhi_220k_plus', 'schools_in_zip']
Categorical features: ['school_type']


## Model Optimization function

In [8]:
# Reusable Optuna-based optimizer for cross-validated hyperparameter tuning
from typing import Callable, Dict, Tuple

def optimize_model_with_optuna(
                                model_name: str,
                                estimator_builder: Callable[[Dict], object],
                                param_space_fn: Callable[[optuna.trial.Trial], Dict],
                                X,
                                y,
                                scoring: str = 'neg_root_mean_squared_error',
                                cv: int = 3,
                                n_trials: int = 5,
                                direction: str = 'minimize',
                                random_state: int = 42,
                                n_jobs: int = -1,
                            ) -> Tuple[optuna.study.Study, Dict]:
    """Optimize a model's hyperparameters using Optuna and cross-validation.

    Args:
        model_name: Name used to label the Optuna study
        estimator_builder: Callable that receives a params dict and returns an unfitted estimator
        param_space_fn: Callable that maps an Optuna trial to a hyperparameter dictionary
        X, y: Training features and targets used for cross-validation
        scoring: scikit-learn scoring string guiding optimization
        cv: Number of cross-validation folds
        n_trials: Number of Optuna trials to run
        direction: 'minimize' or 'maximize' depending on the objective
        random_state: Seed for the Optuna sampler
        n_jobs: Parallelism for cross_val_score

    Returns:
        The completed Optuna study and the best hyperparameters discovered.
    """
    sampler = optuna.samplers.TPESampler(seed=random_state)
    study = optuna.create_study(study_name=f"{model_name}_opt", direction=direction, sampler=sampler)

    def objective(trial: optuna.trial.Trial) -> float:
        params = param_space_fn(trial)
        estimator = estimator_builder(params)
        scores = cross_val_score(estimator, X, y, cv=cv, scoring=scoring, n_jobs=n_jobs)
        mean_score = np.mean(scores)
        normalized_score = -mean_score if scoring.startswith('neg') else mean_score
        return normalized_score if direction == 'minimize' else -normalized_score

    study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
    return study, study.best_params

## Model metrics

## LINEAR MODELS


In [None]:
# Linear & Kernel-based Models: SVR + Polynomial Regression (with Optuna tuning + CV)
def build_svr_estimator(params: Dict) -> SVR:
    base = {'kernel': params.get('kernel', 'rbf')}
    # Map params safely
    for k in ['C','epsilon','gamma','degree']:
        if k in params:
            base[k] = params[k]
    return SVR(**base)

def svr_param_space(trial: optuna.trial.Trial) -> Dict:
    kernel = trial.suggest_categorical('kernel', ['rbf','poly','sigmoid'])
    params = {
        'kernel': kernel,
        'C': trial.suggest_float('C', 0.5, 10, log=True),
        'epsilon': trial.suggest_float('epsilon', 0.05, 0.3),
    }
    if kernel in ['rbf','sigmoid']:
        params['gamma'] = trial.suggest_float('gamma', 0.1, 0.5, log=True)
    if kernel == 'poly':
        params['degree'] = trial.suggest_int('degree', 2, 5)
        params['gamma'] = trial.suggest_float('gamma', 0.1, 1, log=True)
    return params

def build_poly_estimator(params: Dict):
    degree = params.get('degree', 2)
    include_bias = params.get('include_bias', False)
    interaction_only = params.get('interaction_only', False)
    return SkPipeline([
        ('poly', PolynomialFeatures(degree=degree, include_bias=include_bias, interaction_only=interaction_only)),
        ('lr', LinearRegression())
    ])

def poly_param_space(trial: optuna.trial.Trial) -> Dict:
    return {
        'degree': trial.suggest_int('degree', 2, 5),
        'include_bias': False,
        'interaction_only': trial.suggest_categorical('interaction_only', [False, True])
    }

# Run / reuse SVR optimization
if svr_model is None or FORCE_RETRAIN:
    print('[SVR] Starting Optuna optimization...')
    svr_study, svr_best_params = optimize_model_with_optuna(
        model_name='SVR',
        estimator_builder=build_svr_estimator,
        param_space_fn=svr_param_space,
        X=X_train_enc,
        y=y_train,
        scoring='neg_root_mean_squared_error',
        cv=3,
        n_trials=3,
        direction='minimize',
        random_state=42,
        n_jobs=-1,
    )
    print('Best SVR params:', svr_best_params)
    svr_model = build_svr_estimator(svr_best_params)
    svr_model.fit(X_train_enc, y_train)
    svr_valid_pred = svr_model.predict(X_valid_enc)
    svr_test_pred = svr_model.predict(X_test_enc)
    save_model(svr_model, 'svr_opt', {'best_params': svr_best_params})
else:
    print('[SVR] Using preloaded optimized model; generating predictions.')
    svr_valid_pred = svr_model.predict(X_valid_enc)
    svr_test_pred = svr_model.predict(X_test_enc)

# Baseline Linear Regression (also optionally re-optimized via polynomial)
if lr_model is None or FORCE_RETRAIN:
    # Keep a simple baseline linear regression for reference
    lr_model = LinearRegression()
    lr_model.fit(X_train_enc, y_train)
    save_model(lr_model, 'linear_regression_opt', {'params': lr_model.get_params(), 'baseline': True})
    lr_valid_pred = lr_model.predict(X_valid_enc)
    lr_test_pred = lr_model.predict(X_test_enc)
elif lr_model is not None:
    lr_valid_pred = lr_model.predict(X_valid_enc)
    lr_test_pred = lr_model.predict(X_test_enc)

# Polynomial Regression optimization
if poly_model is None or FORCE_RETRAIN:
    print('[PolynomialRegression] Starting Optuna optimization...')
    poly_study, poly_best_params = optimize_model_with_optuna(
        model_name='PolynomialRegression',
        estimator_builder=build_poly_estimator,
        param_space_fn=poly_param_space,
        X=X_train_enc,
        y=y_train,
        scoring='neg_root_mean_squared_error',
        cv=3,
        n_trials=3,
        direction='minimize',
        random_state=42,
        n_jobs=-1,
    )
    print('Best Polynomial Regression params:', poly_best_params)
    poly_model = build_poly_estimator(poly_best_params)
    poly_model.fit(X_train_enc, y_train)
    poly_valid_pred = poly_model.predict(X_valid_enc)
    poly_test_pred = poly_model.predict(X_test_enc)
    save_model(poly_model, 'poly_reg_opt', {'best_params': poly_best_params})
else:
    print('[PolynomialRegression] Using preloaded optimized model; generating predictions.')
    poly_valid_pred = poly_model.predict(X_valid_enc)
    poly_test_pred = poly_model.predict(X_test_enc)

[I 2025-10-11 16:02:21,066] A new study created in memory with name: SVR_opt


[SVR] Starting Optuna optimization...


  0%|          | 0/3 [00:00<?, ?it/s]

[I 2025-10-11 16:02:28,609] Trial 0 finished with value: 21.65850568247251 and parameters: {'kernel': 'poly', 'C': 3.0049873591901566, 'epsilon': 0.08900466011060913, 'degree': 2, 'gamma': 0.1143098387631322}. Best is trial 0 with value: 21.65850568247251.
[I 2025-10-11 16:02:36,048] Trial 1 finished with value: 20.70295950569489 and parameters: {'kernel': 'rbf', 'C': 0.5318033256270142, 'epsilon': 0.2924774630404986, 'gamma': 0.3818145165896869}. Best is trial 1 with value: 20.70295950569489.
[I 2025-10-11 16:02:36,048] Trial 1 finished with value: 20.70295950569489 and parameters: {'kernel': 'rbf', 'C': 0.5318033256270142, 'epsilon': 0.2924774630404986, 'gamma': 0.3818145165896869}. Best is trial 1 with value: 20.70295950569489.
[I 2025-10-11 16:02:41,615] Trial 2 finished with value: 19.044401625690607 and parameters: {'kernel': 'rbf', 'C': 1.2439367209907215, 'epsilon': 0.18118910790805948, 'gamma': 0.2004087187654156}. Best is trial 2 with value: 19.044401625690607.
Best SVR param

[I 2025-10-11 16:02:53,313] A new study created in memory with name: PolynomialRegression_opt


Saved model -> svr_opt_20251011T200253Z.joblib; metadata -> svr_opt_20251011T200253Z.json
Saved model -> linear_regression_opt_20251011T200253Z.joblib; metadata -> linear_regression_opt_20251011T200253Z.json
[PolynomialRegression] Starting Optuna optimization...


  0%|          | 0/3 [00:00<?, ?it/s]

# NEURAL NETWORKS


## MLP

In [None]:
# Torch MLP Regressor with sklearn API

def _ensure_dense_np(X):
    return X.toarray() if hasattr(X, 'toarray') else X

class TorchMLPRegressor(BaseEstimator, RegressorMixin):
    """PyTorch MLP for regression with sklearn API and CUDA support."""
    
    def __init__(self, hidden_layer_sizes=(128, 64), activation='relu', 
                 learning_rate_init=1e-3, alpha=0.0, batch_size=128, max_iter=100,
                 early_stopping=True, validation_fraction=0.1, n_iter_no_change=10,
                 random_state=42, verbose=False, device=None):
        self.hidden_layer_sizes = hidden_layer_sizes
        self.activation = activation
        self.learning_rate_init = learning_rate_init
        self.alpha = alpha
        self.batch_size = batch_size
        self.max_iter = max_iter
        self.early_stopping = early_stopping
        self.validation_fraction = validation_fraction
        self.n_iter_no_change = n_iter_no_change
        self.random_state = random_state
        self.verbose = verbose
        self.device = device

    def _build_network(self, in_features):
        layers = []
        act_fn = nn.ReLU if self.activation == 'relu' else nn.Tanh
        prev = in_features
        for h in self.hidden_layer_sizes:
            layers.extend([nn.Linear(prev, h), act_fn()])
            prev = h
        layers.append(nn.Linear(prev, 1))
        return nn.Sequential(*layers)

    def fit(self, X, y):
        # Setup
        torch.manual_seed(self.random_state)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(self.random_state)
        
        X_np = _ensure_dense_np(X).astype('float32')
        y_np = (y.values if hasattr(y, 'values') else y).astype('float32').reshape(-1, 1)
        
        # Train/val split
        if self.early_stopping and 0 < self.validation_fraction < 0.5:
            val_size = max(1, int(len(X_np) * self.validation_fraction))
            idx = np.random.RandomState(self.random_state).permutation(len(X_np))
            X_train, X_val = X_np[idx[val_size:]], X_np[idx[:val_size]]
            y_train, y_val = y_np[idx[val_size:]], y_np[idx[:val_size]]
        else:
            X_train, y_train, X_val, y_val = X_np, y_np, None, None
        
        # Device and model
        device = self.device or ('cuda' if torch.cuda.is_available() else 'cpu')
        self._device_ = device
        self.model_ = self._build_network(X_np.shape[1]).to(device)
        self.optimizer_ = torch.optim.Adam(self.model_.parameters(), 
                                          lr=self.learning_rate_init, 
                                          weight_decay=self.alpha)
        criterion = nn.MSELoss()
        
        # Training
        train_loader = DataLoader(
            TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train)),
            batch_size=self.batch_size, shuffle=True
        )
        
        best_val, best_state, patience = math.inf, None, 0
        for epoch in range(1, self.max_iter + 1):
            self.model_.train()
            for xb, yb in train_loader:
                xb, yb = xb.to(device), yb.to(device)
                self.optimizer_.zero_grad()
                criterion(self.model_(xb), yb).backward()
                self.optimizer_.step()
            
            # Validation
            if X_val is not None:
                self.model_.eval()
                with torch.no_grad():
                    val_loss = criterion(
                        self.model_(torch.from_numpy(X_val).to(device)),
                        torch.from_numpy(y_val).to(device)
                    ).item()
                
                if val_loss < best_val - 1e-9:
                    best_val, patience = val_loss, 0
                    best_state = {k: v.cpu().clone() for k, v in self.model_.state_dict().items()}
                else:
                    patience += 1
                
                if patience >= self.n_iter_no_change:
                    break
        
        if best_state:
            self.model_.load_state_dict(best_state)
        self.n_features_in_ = X_np.shape[1]
        return self

    def predict(self, X):
        check_is_fitted(self, ['model_'])
        X_np = _ensure_dense_np(X).astype('float32')
        self.model_.eval()
        with torch.no_grad():
            preds = self.model_(torch.from_numpy(X_np).to(self._device_))
        return preds.cpu().numpy().ravel()

# Helper functions
def build_mlp_estimator(params):
    params = params.copy()
    params.pop('hl1', None)
    params.pop('hl2', None)
    return TorchMLPRegressor(**params)

def mlp_param_space(trial):
    return {
        'hidden_layer_sizes': tuple(sorted([
            trial.suggest_int('hl1', 64, 256, step=32),
            trial.suggest_int('hl2', 32, 192, step=32)
        ], reverse=True)),
        'learning_rate_init': trial.suggest_float('learning_rate_init', 1e-4, 1e-2, log=True),
        'alpha': trial.suggest_float('alpha', 1e-6, 1e-2, log=True),
        'batch_size': trial.suggest_categorical('batch_size', [64, 128, 256]),
        'max_iter': trial.suggest_int('max_iter', 150, 600, step=75),
        'n_iter_no_change': trial.suggest_int('n_iter_no_change', 5, 25, step=5),
        'early_stopping': True,
        'validation_fraction': 0.15,
        'activation': trial.suggest_categorical('activation', ['relu', 'tanh']),
        'random_state': 42,
        'verbose': False,
    }

# Training
if mlp_model is None or FORCE_RETRAIN:
    print(f"[MLP] Starting optimization on {'CUDA' if torch.cuda.is_available() else 'CPU'}")
    mlp_study, mlp_best_params = optimize_model_with_optuna(
        model_name='TorchMLPRegressor',
        estimator_builder=build_mlp_estimator,
        param_space_fn=mlp_param_space,
        X=X_train_enc, y=y_train,
        scoring='neg_root_mean_squared_error',
        cv=3, 
        n_trials=5, 
        direction='minimize',
        random_state=42, n_jobs=-1,
    )
    print(f'[MLP] Best params: {mlp_best_params}')
    mlp_model = build_mlp_estimator(mlp_best_params)
    mlp_model.fit(X_train_enc, y_train)
    mlp_valid_pred = mlp_model.predict(X_valid_enc)
    mlp_test_pred = mlp_model.predict(X_test_enc)
    save_model(mlp_model, 'mlp_opt', {'best_params': mlp_best_params, 'framework': 'torch'})
else:
    print("[MLP] Refitting preloaded model")
    mlp_model.fit(X_train_enc, y_train)
    mlp_valid_pred = mlp_model.predict(X_valid_enc)
    mlp_test_pred = mlp_model.predict(X_test_enc)
    save_model(mlp_model, 'mlp_opt_refit', {'refit': True, 'framework': 'torch'})


## TABNET

In [None]:
# Simplified TabNet Implementation
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_is_fitted

def prepare_data_for_tabnet(X, y=None):
    """Convert data to TabNet-compatible format."""
    X_dense = X.toarray().astype(np.float32) if hasattr(X, 'toarray') else np.asarray(X, dtype=np.float32)
    if y is not None:
        y_reshaped = np.asarray(y).reshape(-1, 1) if np.asarray(y).ndim == 1 else np.asarray(y)
        return X_dense, y_reshaped
    return X_dense

class SimpleTabNetWrapper(BaseEstimator, RegressorMixin):
    """Simplified TabNet wrapper with sensible defaults."""
    
    def __init__(self, n_d=16, n_a=16, n_steps=5, gamma=1.3, lambda_sparse=1e-4, 
                 lr=0.02, max_epochs=100, patience=20, batch_size=1024, seed=42):
        self.n_d = n_d
        self.n_a = n_a
        self.n_steps = n_steps
        self.gamma = gamma
        self.lambda_sparse = lambda_sparse
        self.lr = lr
        self.max_epochs = max_epochs
        self.patience = patience
        self.batch_size = batch_size
        self.seed = seed
        self.model_ = None

    def fit(self, X, y):
        X_prep, y_prep = prepare_data_for_tabnet(X, y)
        
        # Create internal validation split (15% for early stopping)
        n_samples = X_prep.shape[0]
        val_size = max(1, int(n_samples * 0.15))
        rng = np.random.default_rng(self.seed)
        indices = rng.permutation(n_samples)
        
        train_idx, val_idx = indices[val_size:], indices[:val_size]
        X_train, y_train = X_prep[train_idx], y_prep[train_idx]
        X_val, y_val = X_prep[val_idx], y_prep[val_idx]
        
        self.model_ = TabNetRegressor(
            n_d=self.n_d, n_a=self.n_a, n_steps=self.n_steps,
            gamma=self.gamma, lambda_sparse=self.lambda_sparse,
            optimizer_params={'lr': self.lr}, seed=self.seed
        )
        
        self.model_.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            eval_metric=['rmse'],
            max_epochs=self.max_epochs,
            patience=self.patience,
            batch_size=self.batch_size,
            virtual_batch_size=128
        )
        return self

    def predict(self, X):
        check_is_fitted(self, 'model_')
        X_prep = prepare_data_for_tabnet(X)
        return self.model_.predict(X_prep).ravel()

def get_tabnet_param_space(trial):
    """Simplified parameter space for TabNet optimization."""
    return {
        'n_d': trial.suggest_categorical('n_d', [8, 16, 24]),
        'n_a': trial.suggest_categorical('n_a', [8, 16, 24]),
        'n_steps': trial.suggest_int('n_steps', 3, 6),
        'gamma': trial.suggest_float('gamma', 1.0, 1.8),
        'lambda_sparse': trial.suggest_float('lambda_sparse', 1e-6, 1e-3, log=True),
        'lr': trial.suggest_float('lr', 1e-3, 2e-2, log=True),
        'max_epochs': 100,
        'patience': 20
    }

def load_or_train_tabnet():
    """Load existing TabNet model or train new one."""
    tabnet_files = sorted(MODEL_DIR.glob('tabnet_opt_*.joblib'))
    
    if tabnet_files and not FORCE_RETRAIN:
        try:
            model = joblib.load(tabnet_files[-1])
            print(f"[TabNet] Loaded existing model: {tabnet_files[-1].name}")
            return model
        except Exception as e:
            print(f"[TabNet] Failed to load model: {e}")
    
    # Train new model
    print("[TabNet] Training new model with Optuna optimization")
    study, best_params = optimize_model_with_optuna(
        model_name='TabNetRegressor',
        estimator_builder=lambda params: SimpleTabNetWrapper(**params),
        param_space_fn=get_tabnet_param_space,
        X=X_train_enc, y=y_train,
        scoring='neg_root_mean_squared_error',
        cv=3, 
        n_trials=5, 
        direction='minimize',
        random_state=42, n_jobs=-1
    )
    
    model = SimpleTabNetWrapper(**best_params)
    model.fit(X_train_enc, y_train)
    
    # Save model and metadata
    save_model(model, 'tabnet_opt', {'best_params': best_params})
    print(f"[TabNet] Best parameters: {best_params}")
    
    return model

# Execute TabNet training/loading
tabnet_model = load_or_train_tabnet()

# Generate predictions
tabnet_valid_pred = tabnet_model.predict(X_valid_enc)
tabnet_test_pred = tabnet_model.predict(X_test_enc)

# 5. Tree-based ensemble models

## XGBOOST

In [None]:
# Hyperparameter optimization and training for ensemble models
def build_xgb_estimator(params: Dict) -> XGBRegressor:
    base_params = {
        'random_state': 42,
        'device': 'cuda',
        'verbosity': 0,
        'tree_method': 'gpu_hist'
    }
    base_params.update(params)
    return XGBRegressor(**base_params)


def xgb_param_space(trial: optuna.trial.Trial) -> Dict:
    return {
        'n_estimators': trial.suggest_int('n_estimators', 200, 600),
        'max_depth': trial.suggest_int('max_depth', 3, 10),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
        'subsample': trial.suggest_float('subsample', 0.6, 0.9),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 0.8),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 1e-1, log=True),
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10, log=True)
    }

if xgb_model is None or FORCE_RETRAIN:
    print("[XGBoost] No preloaded model (or FORCE_RETRAIN=True). Starting Optuna optimization...")
    xgb_study, xgb_best_params = optimize_model_with_optuna(
        model_name='XGBoost',
        estimator_builder=build_xgb_estimator,
        param_space_fn=xgb_param_space,
        X=X_train_enc,
        y=y_train,
        scoring='neg_root_mean_squared_error',
        cv=3,
        n_trials=5,
        direction='minimize',
        random_state=42,
        n_jobs=-1
    )
    print('Best XGBoost params:', xgb_best_params)
    # Fit model with optimized hyperparameters
    xgb_model = build_xgb_estimator(xgb_best_params)
    xgb_model.fit(X_train_enc, y_train)
    xgb_valid_pred = xgb_model.predict(X_valid_enc)
    xgb_test_pred = xgb_model.predict(X_test_enc)
    save_model(xgb_model, 'xgboost_opt', {'best_params': xgb_best_params})
else:
    # Refit even when preloaded to ensure alignment with current data & preprocessing
    print('[XGBoost] Preloaded model found; refitting on current data.')
    # Try to pull previously stored best params from metadata if available
    reuse_params = None
    try:
        if 'xgb_model_meta' in globals() and isinstance(xgb_model_meta, dict):
            reuse_params = xgb_model_meta.get('best_params')
    except Exception:
        reuse_params = None
    if reuse_params is None:
        # Fall back to current model's parameters (filter to search space + core)
        try:
            current = xgb_model.get_params()
            reuse_keys = {'n_estimators','max_depth','learning_rate','subsample','colsample_bytree','reg_alpha','reg_lambda'}
            reuse_params = {k: v for k, v in current.items() if k in reuse_keys}
        except Exception:
            reuse_params = {}
    # Rebuild a fresh estimator to avoid any internal state carry-over
    xgb_model = build_xgb_estimator(reuse_params)
    xgb_model.fit(X_train_enc, y_train)
    xgb_valid_pred = xgb_model.predict(X_valid_enc)
    xgb_test_pred = xgb_model.predict(X_test_enc)
    # Save refit artifact
    save_model(xgb_model, 'xgboost_opt_refit', {'refit': True, 'best_params': reuse_params})

# RANDOM FOREST

In [None]:
def build_rf_estimator(params: Dict) -> RandomForestRegressor:
    base_params = {
        'random_state': 42,
        'n_jobs': -1
    }
    base_params.update(params)
    return RandomForestRegressor(**base_params)


def rf_param_space(trial: optuna.trial.Trial) -> Dict:
    return {
        'n_estimators': trial.suggest_int('n_estimators', 200, 800),
        'max_depth': trial.suggest_int('max_depth', 5, 15),
        'min_samples_split': trial.suggest_int('min_samples_split', 2, 10),
        'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 5),
        'max_features': trial.suggest_float('max_features', 0.4, 0.9)
    }

if rf_model is None or FORCE_RETRAIN:
    print("[RandomForest] No preloaded model (or FORCE_RETRAIN=True). Starting Optuna optimization...")
    rf_study, rf_best_params = optimize_model_with_optuna(
        model_name='RandomForest',
        estimator_builder=build_rf_estimator,
        param_space_fn=rf_param_space,
        X=X_train_enc,
        y=y_train,
        scoring='neg_root_mean_squared_error',
        cv=3,
        n_trials=5,
        direction='minimize',
        random_state=42,
        n_jobs=-1,
    )
    print('Best Random Forest params:', rf_best_params)
    rf_model = build_rf_estimator(rf_best_params)
    rf_model.fit(X_train_enc, y_train)
    rf_valid_pred = rf_model.predict(X_valid_enc)
    rf_test_pred = rf_model.predict(X_test_enc)
    save_model(rf_model, 'random_forest_opt', {'best_params': rf_best_params})
else:
    print("[RandomForest] Using preloaded optimized model. Skipping training.")
    rf_valid_pred = rf_model.predict(X_valid_enc)
    rf_test_pred = rf_model.predict(X_test_enc)


## 6. Compare Model Performance
Evaluate predictions from each model using RMSE, MAE, and R¬≤ metrics.



In [None]:
# Helper functions for metrics

def adjusted_r2_score(r2, n, p):
    return 1 - (1 - r2) * ((n - 1) / (n - p - 1))

def regression_metrics(y_true, y_pred, X=None):
    try:
        # Try using root_mean_squared_error if available (sklearn >= 1.4)
        from sklearn.metrics import root_mean_squared_error
        rmse = root_mean_squared_error(y_true, y_pred)
    except ImportError:
        # Fall back to mean_squared_error with squared=False for older versions
        rmse = mean_squared_error(y_true, y_pred, squared=False)
    
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    metrics = {'RMSE': rmse, 'MAE': mae, 'R2': r2}
    if X is not None:
        n = X.shape[0]
        p = X.shape[1]
        adj_r2 = adjusted_r2_score(r2, n, p)
        metrics['Adj_R2'] = adj_r2
    return metrics


In [None]:
# Comprehensive Model Performance Comparison & Best Model Selection (All Methods)
import json as _json_summary
from copy import deepcopy

# Utility to compute metrics if predictions exist and metrics missing
def _ensure_metrics(prefix: str, display_name: str, Xv, Xt):
    g = globals()
    valid_pred_var = f'{prefix}_valid_pred'
    test_pred_var = f'{prefix}_test_pred'
    valid_metrics_var = f'{prefix}_metrics'  # normalized name for validation metrics
    test_metrics_var = f'{prefix}_test_metrics'
    # Accept older naming conventions (e.g., rf_metrics already used)
    if valid_pred_var in g:
        if valid_metrics_var not in g or g.get(valid_metrics_var) is None:
            try:
                g[valid_metrics_var] = regression_metrics(y_valid, g[valid_pred_var], Xv)
            except Exception as e:
                print(f'[WARN] Could not compute validation metrics for {display_name}: {e}')
                g[valid_metrics_var] = None
    if test_pred_var in g:
        if test_metrics_var not in g or g.get(test_metrics_var) is None:
            try:
                g[test_metrics_var] = regression_metrics(y_test, g[test_pred_var], Xt)
            except Exception as e:
                print(f'[WARN] Could not compute test metrics for {display_name}: {e}')
                g[test_metrics_var] = None
    return (g.get(valid_metrics_var), g.get(test_metrics_var))

# Recompute core ensemble metrics if missing (guards for partial executions)
# This path is now superseded by _ensure_metrics but kept for clarity
if 'xgb_metrics' not in globals() and 'xgb_valid_pred' in globals():
    xgb_metrics = regression_metrics(y_valid, xgb_valid_pred, X_valid_enc)
if 'rf_metrics' not in globals() and 'rf_valid_pred' in globals():
    rf_metrics = regression_metrics(y_valid, rf_valid_pred, X_valid_enc)
if 'xgb_test_metrics' not in globals() and 'xgb_test_pred' in globals():
    xgb_test_metrics = regression_metrics(y_test, xgb_test_pred, X_test_enc)
if 'rf_test_metrics' not in globals() and 'rf_test_pred' in globals():
    rf_test_metrics = regression_metrics(y_test, rf_test_pred, X_test_enc)

# Model families to probe (prefix aligns with variable naming pattern)
MODEL_PREFIXES = [
    ('XGBoost', 'xgb'),
    ('RandomForest', 'rf'),
    ('SVR', 'svr'),
    ('PolynomialRegression', 'poly'),
    ('LinearRegression', 'lr'),
    ('TorchMLP', 'mlp'),
    ('TabNet', 'tabnet')
]

candidates = []
for display_name, prefix in MODEL_PREFIXES:
    valid_pred_var = f'{prefix}_valid_pred'
    test_pred_var = f'{prefix}_test_pred'
    if valid_pred_var in globals():
        v_metrics, t_metrics = _ensure_metrics(prefix, display_name, X_valid_enc, X_test_enc)
        if v_metrics is not None:
            candidates.append({'Model': display_name, 'Valid': deepcopy(v_metrics), 'Test': deepcopy(t_metrics) if t_metrics else None})

# (Legacy metrics variables that don't match prefix pattern already handled above)
if not candidates:
    print('No model metrics available yet. Run earlier training cells first.')
else:
    # Build a long-form DataFrame
    rows = []
    for c in candidates:
        for ds in ['Valid','Test']:
            if c[ds] is None:
                continue
            row = {'Model': c['Model'], 'Dataset': ds}
            row.update(c[ds])
            rows.append(row)
    all_results_df = pd.DataFrame(rows).drop_duplicates(subset=['Model','Dataset']).set_index(['Model','Dataset']).sort_index()

    display(all_results_df)

    # Ranking based on Validation metrics only (composite)
    val_df = all_results_df.xs('Valid', level='Dataset').copy()
    metric_cols = [m for m in ['RMSE','MAE','R2','Adj_R2'] if m in val_df.columns]
    if not metric_cols:
        print('No numeric metrics found for ranking.')
    else:
        ranks = {}
        for m in metric_cols:
            ascending = m in ['RMSE','MAE']  # lower better
            ranks[m + '_rank'] = val_df[m].rank(ascending=ascending, method='min')
        rank_df = pd.DataFrame(ranks)
        val_ranked = val_df.join(rank_df)
        rank_cols = [c for c in val_ranked.columns if c.endswith('_rank')]
        val_ranked['avg_rank'] = val_ranked[rank_cols].mean(axis=1)
        best_model_name = val_ranked['avg_rank'].idxmin()
        best_model_avg_rank = val_ranked.loc[best_model_name, 'avg_rank']
        best_model_metrics_valid = val_df.loc[best_model_name]
        try:
            best_test_metrics = all_results_df.loc[(best_model_name, 'Test')].to_dict()
        except KeyError:
            best_test_metrics = None
        print('\n=== Validation Ranking (lower avg_rank better) ===')
        display(round(val_ranked.sort_values('avg_rank'), 2))

        print(f"\nOverall Best Model (Validation composite rank): {best_model_name} | avg_rank={best_model_avg_rank:.2f}")
        print('\nBest Model Validation Metrics:')
        for k,v in best_model_metrics_valid.items():
            if not k.endswith('_rank') and k != 'avg_rank':
                print(f'  {k}: {v:.4f}' if isinstance(v,(int,float)) else f'  {k}: {v}')
        if best_test_metrics:
            print('\nBest Model Test Metrics:')
            for k,v in best_test_metrics.items():
                print(f'  {k}: {v:.4f}' if isinstance(v,(int,float)) else f'  {k}: {v}')

        # Metric-wise winners
        metric_winners = {}
        for m in metric_cols:
            ascending = m in ['RMSE','MAE']
            winner = val_df[m].idxmin() if ascending else val_df[m].idxmax()
            metric_winners[m] = {
                'model': winner,
                'value': val_df.loc[winner, m]
            }
        print('\n=== Metric-wise Best (Validation) ===')
        for m, info in metric_winners.items():
            print(f"  {m}: {info['model']} -> {info['value']:.4f}")

        # Persist summary JSON
        summary_path = MODEL_DIR / 'model_comparison_summary.json'
        summary_payload = {
            'models_evaluated': [c['Model'] for c in candidates],
            'validation_table': val_df.to_dict(orient='index'),
            'ranking': val_ranked[['avg_rank']].to_dict(orient='index'),
            'metric_winners': {m: {'model': v['model'], 'value': float(v['value'])} for m,v in metric_winners.items()},
            'overall_best_model': {
                'name': best_model_name,
                'validation_metrics': {k: float(v) for k,v in best_model_metrics_valid.items() if isinstance(v,(int,float))},
                'test_metrics': {k: float(v) if isinstance(v,(int,float)) else v for k,v in (best_test_metrics or {}).items()},
                'avg_rank': float(best_model_avg_rank)
            }
        }
        try:
            with open(summary_path, 'w') as f:
                _json_summary.dump(summary_payload, f, indent=2)
            print(f"\nSaved model comparison summary -> {summary_path.name}")
        except Exception as e:
            print(f"Could not save summary JSON: {e}")

## 6.1 Model Family Comparison

Compare the best model from each family (Linear, Neural Network, Tree-based) to understand:
- Which family performs best for this dataset
- Trade-offs between model complexity and performance  
- Family-specific strengths and weaknesses

In [None]:
# Model Family Comparison - Simplified
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import json

# Define model families
MODEL_FAMILIES = {
    'Linear': ['SVR', 'PolynomialRegression', 'LinearRegression'],
    'Neural Network': ['TorchMLP', 'TabNet'],
    'Tree-based': ['XGBoost', 'RandomForest', 'LightGBM']
}

if 'all_results_df' not in globals() or all_results_df.empty:
    print("‚ö†Ô∏è  Run model comparison cell first")
else:
    val_df = all_results_df.xs('Valid', level='Dataset').copy()
    family_results = []
    
    # Find best model per family
    for family, models in MODEL_FAMILIES.items():
        available = [m for m in models if m in val_df.index]
        if not available:
            continue
        
        best_model = val_df.loc[available, 'RMSE'].idxmin()
        val_metrics = val_df.loc[best_model]
        
        try:
            test_metrics = all_results_df.loc[(best_model, 'Test')]
            gap = abs(test_metrics['RMSE'] - val_metrics['RMSE']) / val_metrics['RMSE'] * 100
        except KeyError:
            test_metrics = None
            gap = np.nan
        
        family_results.append({
            'Family': family,
            'Model': best_model,
            'Val_RMSE': val_metrics['RMSE'],
            'Val_R¬≤': val_metrics['R2'],
            'Test_RMSE': test_metrics['RMSE'] if test_metrics is not None else np.nan,
            'Test_R¬≤': test_metrics['R2'] if test_metrics is not None else np.nan,
            'Gap_%': gap
        })
    
    # Display results
    if family_results:
        df_compare = pd.DataFrame(family_results).set_index('Family')
        
        print("="*70)
        print("FAMILY CHAMPIONS COMPARISON")
        print("="*70)
        display(df_compare.round(4))
        
        best_family = df_compare['Val_RMSE'].idxmin()
        print(f"\nüèÜ Best Family: {best_family} ({df_compare.loc[best_family, 'Model']})")
        print(f"   RMSE: {df_compare.loc[best_family, 'Val_RMSE']:.4f} | R¬≤: {df_compare.loc[best_family, 'Val_R¬≤']:.4f}")
        
        # Visualization
        fig, axes = plt.subplots(1, 2, figsize=(12, 4))
        
        # RMSE comparison
        x = np.arange(len(df_compare))
        width = 0.35
        ax1 = axes[0]
        ax1.bar(x - width/2, df_compare['Val_RMSE'], width, label='Validation', color='steelblue', alpha=0.8)
        ax1.bar(x + width/2, df_compare['Test_RMSE'], width, label='Test', color='coral', alpha=0.8)
        ax1.set_ylabel('RMSE')
        ax1.set_title('RMSE by Family')
        ax1.set_xticks(x)
        ax1.set_xticklabels(df_compare.index, rotation=15, ha='right')
        ax1.legend()
        ax1.grid(axis='y', alpha=0.3)
        
        # R¬≤ comparison
        ax2 = axes[1]
        ax2.bar(x - width/2, df_compare['Val_R¬≤'], width, label='Validation', color='forestgreen', alpha=0.8)
        ax2.bar(x + width/2, df_compare['Test_R¬≤'], width, label='Test', color='orange', alpha=0.8)
        ax2.set_ylabel('R¬≤ Score')
        ax2.set_title('R¬≤ by Family')
        ax2.set_xticks(x)
        ax2.set_xticklabels(df_compare.index, rotation=15, ha='right')
        ax2.legend()
        ax2.grid(axis='y', alpha=0.3)
        ax2.set_ylim([0, 1])
        
        plt.tight_layout()
        plt.show()
        
        # Save summary
        summary = {
            'best_family': best_family,
            'best_model': df_compare.loc[best_family, 'Model'],
            'family_comparison': df_compare.to_dict(orient='index')
        }
        
        try:
            def convert(obj):
                if isinstance(obj, (np.integer, np.floating)):
                    return float(obj)
                if isinstance(obj, dict):
                    return {k: convert(v) for k, v in obj.items()}
                return obj
            
            with open(MODEL_DIR / 'family_comparison_summary.json', 'w') as f:
                json.dump(convert(summary), f, indent=2)
            print(f"\n‚úÖ Saved to family_comparison_summary.json")
        except Exception as e:
            print(f"‚ö†Ô∏è  Save failed: {e}")
    else:
        print("‚ùå No models found for comparison")

In [None]:
# XGBoost Best Model - Compact Summary
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt

# Calculate key metrics
train_pred = xgb_model.predict(X_train_enc)
metrics = pd.DataFrame({
    'Dataset': ['Training', 'Validation', 'Test'],
    'RMSE': [
        np.sqrt(mean_squared_error(y_train, train_pred)),
        np.sqrt(mean_squared_error(y_valid, xgb_valid_pred)),
        np.sqrt(mean_squared_error(y_test, xgb_test_pred))
    ],
    'R¬≤': [
        r2_score(y_train, train_pred),
        r2_score(y_valid, xgb_valid_pred),
        r2_score(y_test, xgb_test_pred)
    ]
}).round(4).set_index('Dataset')

# Top features
top_features = pd.DataFrame({
    'Feature': feature_names,
    'Importance': xgb_model.feature_importances_
}).nlargest(5, 'Importance').round(4)

print("üöÄ XGBoost Model Summary")
print("=" * 40)
print(f"üìä Performance: R¬≤ = {metrics.loc['Validation', 'R¬≤']:.3f}, RMSE = {metrics.loc['Validation', 'RMSE']:.3f}")
print(f"‚öôÔ∏è  Config: {xgb_model.n_estimators} trees, depth={xgb_model.max_depth}, lr={xgb_model.learning_rate}")
print(f"üéØ Top Feature: {top_features.iloc[0]['Feature']} ({top_features.iloc[0]['Importance']:.3f})")

display(metrics)
display(top_features.reset_index(drop=True))

# Compact visualization
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Performance comparison
metrics.plot(kind='bar', ax=axes[0], width=0.8)
axes[0].set_title('Model Performance')
axes[0].set_ylabel('Score')
axes[0].legend()
axes[0].grid(axis='y', alpha=0.3)

# Feature importance
axes[1].barh(top_features['Feature'], top_features['Importance'])
axes[1].set_title('Top 5 Features')
axes[1].set_xlabel('Importance')
axes[1].grid(axis='x', alpha=0.3)

plt.tight_layout()
plt.show()

## 6.2 Best Model Deep Dive: Feature Importance, Ablation, Sensitivity & Trade‚Äëoffs

**This section automatically targets the currently best performing model** (based on composite validation rank from the model comparison cell) and provides comprehensive analysis:

### Analysis Components:
1. **Best Model Selection** - Loads the best model from summary JSON and extracts feature names
2. **Feature Importance** - Computes importance using native methods or permutation importance
3. **Sensitivity Analysis** - Perturbs top features to measure RMSE impact
4. **Ablation Study** - Progressive removal of top features to measure performance degradation
5. **Trade‚Äëoff Metrics** - Complexity vs. performance analysis

### Execution Order (Run cells in this sequence):
1. Run the **Best Model Selection** cell first (creates `best_model_obj` & `feature_names`)
2. Run the **Feature Importance** cell (creates `fi_df`)
3. Run the **Sensitivity Analysis** cell (uses both variables above)
4. Run the **Ablation Analysis** cell (uses both variables above)

‚ö†Ô∏è **Important**: Make sure you've run the model comparison cell (section 6) before running these analysis cells.

### Model Sensitivity & Trade-off Analysis
This section perturbs the top-ranked features (by model-specific importance) over a small value grid (quantiles or binary levels) to approximate a *local partial dependence* style effect on validation RMSE.

What it does:
- Selects the top N features by importance.
- For each feature, replaces that column in the validation set with grid values while holding others fixed.
- Recomputes RMSE to estimate how sensitive performance is to that feature.
- Summarizes a trade-off metric: RMSE range / importance (higher means the feature causes relatively more performance swing per unit of importance weight).

Caveats:
- This is a one-at-a-time perturbation (no interaction effects captured).
- For highly correlated features, effects may be inflated or unstable.
- For models with strong non-linear interactions, full PDPs or SHAP may provide deeper insight.

Next improvements (optional):
- Replace simple perturbation with true PDP via averaging predictions over the original distribution.
- Add SHAP interaction values for top features.
- Log plots and tables to MLflow or other tracking system.


In [None]:
# Select best model object + extract feature names
from sklearn.inspection import permutation_importance
best_model_name_runtime = None
best_model_obj = None

# If previous cell stored summary JSON, load it to get best model name
summary_path = MODEL_DIR / 'model_comparison_summary.json'
if summary_path.exists():
    try:
        with open(summary_path) as f:
            _summary_json = json.load(f)
        best_model_name_runtime = _summary_json.get('overall_best_model', {}).get('name')
        print(f"[BestModel] Loaded best model from summary JSON: {best_model_name_runtime}")
    except Exception as e:
        print(f"[BestModel] Could not read summary JSON: {e}")
else:
    print('[BestModel] Summary JSON not found. Run comparison cell first.')

# Map display name to the underlying variable (heuristic)
_MODEL_VAR_MAP = {
    'XGBoost': 'xgb_model',
    'RandomForest': 'rf_model',
    'TorchMLP': 'mlp_model',
    'SVR': 'svr_model',
    'PolynomialRegression': 'poly_model',
    'LinearRegression': 'lr_model',
    'TabNet': 'tabnet_model',
}

if best_model_name_runtime and best_model_name_runtime in _MODEL_VAR_MAP:
    var_name = _MODEL_VAR_MAP[best_model_name_runtime]
    best_model_obj = globals().get(var_name)
    if best_model_obj is None:
        print(f"[BestModel] Variable '{var_name}' not found or is None; ensure model cell was executed.")
else:
    print('[BestModel] Could not determine model variable mapping.')

# Extract feature names after preprocessing
def get_feature_names(preprocessor):
    feat_names = []
    try:
        # Numeric
        for name, trans, cols in preprocessor.transformers_:
            if name == 'remainder':
                continue
            if name == 'num':
                feat_names.extend(cols)
            elif name == 'cat':
                # OneHotEncoder inside pipeline
                try:
                    ohe = trans.named_steps['onehot'] if hasattr(trans, 'named_steps') else trans
                    cats = ohe.get_feature_names_out(cols)
                    feat_names.extend(cats)
                except Exception as e:
                    print('[FeatureNames] OHE extraction failed:', e)
    except Exception as e:
        print('[FeatureNames] Fallback feature name generation:', e)
        # Fallback to generic idx names
        feat_names = [f'f{i}' for i in range(X_train_enc.shape[1])]
    if len(feat_names) != X_train_enc.shape[1]:
        # Align length via fallback if mismatch
        print('[FeatureNames] Length mismatch; falling back to positional feature names.')
        feat_names = [f'f{i}' for i in range(X_train_enc.shape[1])]
    return feat_names

feature_names = get_feature_names(preprocessor)
print(f"Extracted {len(feature_names)} feature names.")
if best_model_name_runtime:
    print(f"Proceeding with best model: {best_model_name_runtime}")
else:
    print('Best model not established yet. Subsequent cells may fail.')

In [None]:
# Ablation Analysis: neutralize/remove top-K features and measure RMSE delta
from sklearn.metrics import mean_squared_error
import numpy as np
import pandas as pd

if best_model_obj is None or 'fi_df' not in globals():
    print('Prerequisites missing: ensure best model & feature importance computed.')
else:
    base_preds = best_model_obj.predict(X_valid_enc)
    base_rmse = np.sqrt(mean_squared_error(y_valid, base_preds))
    print(f'Base validation RMSE: {base_rmse:.4f}')

    top_features = fi_df.head(30)['feature'].tolist()  # consider top 30 for ablation scope
    ablation_results = []
    # Create a mapping feature_name -> column index for efficient masking
    feat_index_map = {f: i for i, f in enumerate(feature_names)}

    X_valid_dense = X_valid_enc.toarray() if hasattr(X_valid_enc, 'toarray') else np.asarray(X_valid_enc)
    for k in [1,2,3,5,10,15,20,25,30]:
        subset = top_features[:k]
        cols = [feat_index_map[f] for f in subset if f in feat_index_map]
        if not cols:
            continue
        X_mod = X_valid_dense.copy()
        # Neutralization strategy: set to column mean (could also zero)
        for c in cols:
            col_mean = X_mod[:, c].mean()
            X_mod[:, c] = col_mean
        preds_mod = best_model_obj.predict(X_mod)
        rmse_mod = mean_squared_error(y_valid, preds_mod, squared=False)
        ablation_results.append({'k_removed': k, 'rmse': rmse_mod, 'delta_rmse': rmse_mod - base_rmse})
    ablation_df = pd.DataFrame(ablation_results)
    display(ablation_df)

    plt.figure(figsize=(7,4))
    plt.plot(ablation_df['k_removed'], ablation_df['delta_rmse'], marker='o')
    plt.axhline(0, color='gray', linestyle='--')
    plt.xlabel('Number of Top Features Neutralized')
    plt.ylabel('RMSE Delta vs Base')
    plt.title(f'Ablation Impact - {best_model_name_runtime}')
    plt.grid(alpha=0.3)
    plt.show()

In [None]:
# Feature Importance (native if available, else permutation)
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.inspection import permutation_importance
from sklearn.metrics import mean_squared_error

if best_model_obj is None:
    print('Best model object not available; run previous cells.')
else:
    # Try native importances
    importances = None
    importance_type = None
    try:
        if hasattr(best_model_obj, 'feature_importances_'):
            importances = getattr(best_model_obj, 'feature_importances_')
            importance_type = 'feature_importances_'
        elif hasattr(best_model_obj, 'coef_'):
            coef = getattr(best_model_obj, 'coef_')
            # Flatten if necessary
            importances = np.abs(coef.ravel())
            importance_type = 'coef_'
    except Exception as e:
        print('[Importance] Native extraction failed:', e)

    if importances is None or len(importances) != len(feature_names):
        print('[Importance] Falling back to permutation importance (n_repeats=10).')
        try:
            perm = permutation_importance(best_model_obj, X_valid_enc, y_valid, n_repeats=10, scoring='neg_root_mean_squared_error', n_jobs=-1, random_state=42)
            importances = np.maximum(0, perm.importances_mean)
            importance_type = 'permutation_neg_rmse'
        except Exception as e:
            print('[Importance] Permutation importance failed:', e)
            importances = None

    if importances is not None:
        fi_df = pd.DataFrame({'feature': feature_names, 'importance': importances})
        fi_df = fi_df.sort_values('importance', ascending=False).reset_index(drop=True)
        display(fi_df.head(25))
        # Plot
        top_plot = fi_df.head(25).iloc[::-1]
        plt.figure(figsize=(8, max(4, 0.3*len(top_plot))))
        plt.barh(top_plot['feature'], top_plot['importance'], color='steelblue')
        plt.title(f'Top Feature Importances ({importance_type}) - {best_model_name_runtime}')
        plt.xlabel('Importance')
        plt.tight_layout()
        plt.show()
    else:
        print('Could not compute feature importance.')

In [None]:
# Sensitivity Analysis (Partial Dependence style) & Trade-offs Summary
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
from pathlib import Path
from sklearn.metrics import mean_squared_error

# Preconditions & graceful exit
missing = []
for name in ['best_model_obj', 'fi_df', 'feature_names', 'X_valid_enc', 'y_valid', 'MODEL_DIR']:
    if name not in globals():
        missing.append(name)

if missing:
    print('Prerequisites missing; run earlier cells first. Missing:', missing)
else:
    if best_model_obj is None:
        print('best_model_obj is None - cannot run sensitivity analysis.')
    else:
        # Parameters
        TOP_N_SENS = 5
        TOP_N_SENS = min(TOP_N_SENS, len(fi_df))
        top_sens_features = fi_df.head(TOP_N_SENS)['feature'].tolist()

        # Ensure dense matrix for perturbations
        X_valid_dense = X_valid_enc.toarray() if hasattr(X_valid_enc, 'toarray') else np.asarray(X_valid_enc)
        base_preds = best_model_obj.predict(X_valid_dense)
        base_rmse = mean_squared_error(y_valid, base_preds, squared=False)
        sens_records = []

        for feat in top_sens_features:
            if feat not in feature_names:
                continue
            col_idx = feature_names.index(feat)
            col_values = X_valid_dense[:, col_idx]
            unique_vals = np.unique(col_values)
            # Build grid: handle near-constant / binary / continuous
            if len(unique_vals) <= 2:
                grid = unique_vals
            else:
                # Use robust quantiles; drop duplicates if feature has low variance
                grid = np.unique(np.quantile(col_values, [0.05, 0.25, 0.5, 0.75, 0.95]))
            for gv in grid:
                X_mod = X_valid_dense.copy()
                X_mod[:, col_idx] = gv
                try:
                    preds_mod = best_model_obj.predict(X_mod)
                    rmse_mod = mean_squared_error(y_valid, preds_mod, squared=False)
                except Exception as e:
                    rmse_mod = np.nan
                sens_records.append({
                    'feature': feat,
                    'value': float(gv),
                    'rmse': rmse_mod,
                    'delta_rmse': (rmse_mod - base_rmse) if not np.isnan(rmse_mod) else np.nan
                })

        sens_df = pd.DataFrame(sens_records)
        print('\n=== Sensitivity Analysis Results ===')
        display(sens_df.head(20))

        # Plot curves
        if not sens_df.empty:
            n_feat = len(top_sens_features)
            fig, axes = plt.subplots(n_feat, 1, figsize=(7, 3 * n_feat), sharex=False)
            if n_feat == 1:
                axes = [axes]
            for ax, feat in zip(axes, top_sens_features):
                sub = sens_df[sens_df['feature'] == feat].sort_values('value')
                if sub.empty:
                    ax.set_title(f'Sensitivity: {feat} (no data)')
                    continue
                ax.plot(sub['value'], sub['rmse'], marker='o')
                ax.axhline(base_rmse, color='gray', linestyle='--', label='Base RMSE')
                ax.set_title(f'Sensitivity: {feat}')
                ax.set_xlabel('Value')
                ax.set_ylabel('RMSE')
                ax.legend()
                ax.grid(alpha=0.3)
            plt.tight_layout()
            plt.show()
        else:
            print('No sensitivity records to plot.')

        # Trade-offs summary
        trade_rows = []
        for feat in top_sens_features:
            sub = sens_df[sens_df['feature'] == feat]
            if sub.empty:
                continue
            rmse_range = sub['rmse'].max() - sub['rmse'].min()
            imp_row = fi_df.loc[fi_df['feature'] == feat, 'importance']
            imp_val = float(imp_row.iloc[0]) if not imp_row.empty else np.nan
            trade_rows.append({
                'feature': feat,
                'importance': imp_val,
                'rmse_range': rmse_range,
                'sensitivity_score': rmse_range / (imp_val + 1e-9) if not np.isnan(imp_val) else np.nan
            })
        trade_df = pd.DataFrame(trade_rows).sort_values('sensitivity_score', ascending=False)
        print('\n=== Trade-offs Analysis ===')
        print('(Higher sensitivity_score => larger RMSE movement per unit importance)')
        display(trade_df)

        # Persist artifacts
        analysis_payload = {
            'feature_importances': fi_df.to_dict(orient='records') if 'fi_df' in globals() else None,
            'ablation': ablation_df.to_dict(orient='records') if 'ablation_df' in globals() else None,
            'sensitivity': sens_df.to_dict(orient='records'),
            'tradeoffs': trade_df.to_dict(orient='records'),
            'best_model': best_model_name_runtime if 'best_model_name_runtime' in globals() else None,
            'base_validation_rmse': float(base_rmse)
        }
        analysis_path = MODEL_DIR / 'best_model_explainability.json'
        try:
            with open(analysis_path, 'w') as f:
                json.dump(analysis_payload, f, indent=2)
            print(f'\n‚úì Saved explainability analysis -> {analysis_path.name}')
        except Exception as e:
            print(f'Failed to save explainability JSON: {e}')

In [None]:
# Learning curves for ensemble models
from sklearn.model_selection import learning_curve
from sklearn.base import clone

def plot_learning_curve(estimator, X, y, ax, title, cv=5, scoring='neg_root_mean_squared_error'):
    train_sizes, train_scores, valid_scores = learning_curve(
        estimator,
        X,
        y,
        cv=cv,
        scoring=scoring,
        n_jobs=-1,
        train_sizes=np.linspace(0.1, 1.0, 6),
        shuffle=True,
        random_state=42
    )
    train_rmse = -train_scores.mean(axis=1)
    valid_rmse = -valid_scores.mean(axis=1)

    ax.plot(train_sizes, train_rmse, label='Train RMSE', marker='o')
    ax.fill_between(train_sizes, train_rmse - train_scores.std(axis=1), train_rmse + train_scores.std(axis=1), alpha=0.2)
    ax.plot(train_sizes, valid_rmse, label='Validation RMSE', marker='s')
    ax.fill_between(train_sizes, valid_rmse - valid_scores.std(axis=1), valid_rmse + valid_scores.std(axis=1), alpha=0.2)
    ax.set_title(title)
    ax.set_xlabel('Training Examples')
    ax.set_ylabel('RMSE')
    ax.legend()
    ax.grid(True, linestyle='--', alpha=0.4)

xgb_curve_model = clone(xgb_model)
rf_curve_model = clone(rf_model)


fig, axes = plt.subplots(1, 2, figsize=(14, 5), sharey=True)
plot_learning_curve(xgb_curve_model, X_train_enc, y_train, axes[0], 'XGBoost Learning Curve')
plot_learning_curve(rf_curve_model, X_train_enc, y_train, axes[1], 'Random Forest Learning Curve')
plt.tight_layout()
plt.show()

In [None]:
# Load and display the model comparison summary
import json
from pathlib import Path

summary_file = MODEL_DIR / 'model_comparison_summary.json'

if summary_file.exists():
    with open(summary_file, 'r') as f:
        summary = json.load(f)
    
    print(f"üìä Model Comparison Summary")
    print(f"{'='*60}")
    print(f"\nüìÅ File location: {summary_file}")
    print(f"\nüèÜ Overall Best Model: {summary['overall_best_model']['name']}")
    print(f"   Average Rank: {summary['overall_best_model']['avg_rank']:.2f}")
    
    print(f"\nüìà Validation Metrics:")
    for metric, value in summary['overall_best_model']['validation_metrics'].items():
        print(f"   {metric}: {value:.4f}")
    
    print(f"\nüéØ Test Metrics:")
    for metric, value in summary['overall_best_model']['test_metrics'].items():
        if isinstance(value, (int, float)):
            print(f"   {metric}: {value:.4f}")
    
    print(f"\nü•á Metric-wise Winners:")
    for metric, info in summary['metric_winners'].items():
        print(f"   {metric}: {info['model']} ({info['value']:.4f})")
    
    print(f"\nüìã Models Evaluated: {', '.join(summary['models_evaluated'])}")
    
    # Display full JSON in a pretty format
    print(f"\n\n{'='*60}")
    print("Full Summary (JSON):")
    print(json.dumps(summary, indent=2))
else:
    print(f"‚ùå Summary file not found at: {summary_file}")
    print("Run the model comparison cell first to generate the summary.")