In [None]:
import optuna
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
plt.rcParams["image.cmap"] = "viridis"
plt.rcParams['font.family'] = 'Liberation Sans'
plt.rcParams["font.size"] = 14
import scipy.stats as stats
from scipy.stats import shapiro
from sklearn.pipeline import Pipeline
from sklearn.linear_model import Lasso, ElasticNet
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.feature_selection import VarianceThreshold
from sklearn.model_selection import cross_val_score, train_test_split, StratifiedKFold
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, HistGradientBoostingRegressor
from sklearn.feature_selection import SelectFromModel
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, root_mean_squared_error
import seaborn as sns
import shap
import joblib
shap.initjs()

In [None]:
def create_pipe(trial, X_t):
    # Preprocessing hyperparameters
    # Feature Selection / Engineering
    feature_selector = trial.suggest_categorical('feature_selector', ['none', 'lasso', 'elasticnet'])
    if feature_selector == 'none':
        selector = None
    elif feature_selector == 'lasso':
        selector = SelectFromModel(
            Lasso(alpha=trial.suggest_float('lasso_alpha', 1e-4, 1.0, log=True)),
                max_features=trial.suggest_int('lasso_max_features', 50, 300),
                threshold='mean'
        )

    elif feature_selector == 'elasticnet':
        selector = SelectFromModel(
            ElasticNet(
        alpha=trial.suggest_float('alpha', 1e-4, 1.0, log=True),
        l1_ratio=trial.suggest_float('l1_ratio', 0.1, 1.0)),
        threshold='mean'
        )

    # Scaler
    scaler_name = trial.suggest_categorical('scaler', ['standard', 'minmax', 'robust'])
    scalers = {
    'standard': StandardScaler(),
    'minmax': MinMaxScaler(),
    'robust': RobustScaler()
    }
    scaler = scalers[scaler_name]

    # Suggest model type
    model_name = trial.suggest_categorical('model', ['RandomForest', 'DecisionTree', 'XGBoost', 'HistGradientBosting', 'LightGBM'])
    

    if model_name == 'RandomForest':
        model = RandomForestRegressor(
            n_estimators=trial.suggest_int('rf_n_estimators', 10, 300),
            criterion=trial.suggest_categorical('rf_criterion', ['gini', 'entropy']),
            max_depth=trial.suggest_int('rf_max_depth', 3, 30),
            min_samples_split=trial.suggest_int('rf_min_samples_split', 2, 10),
            min_samples_leaf=trial.suggest_int('rf_min_samples_leaf', 1, 35),
            bootstrap=trial.suggest_categorical('rf_bootstrap', [True, False]),
            random_state=42
        )

    elif model_name == 'DecisionTree':
        model = DecisionTreeRegressor(
            criterion=trial.suggest_categorical('dt_criterion', ['gini', 'entropy']),
            splitter=trial.suggest_categorical('dt_splitter', ['best', 'random']),
            max_depth=trial.suggest_int('dt_max_depth', 3, 30),
            min_samples_split=trial.suggest_int('dt_min_samples_split', 2, 10),
            min_samples_leaf=trial.suggest_int('dt_min_samples_leaf', 1, 35),
            random_state=42
        )

    elif model_name == 'XGBoost':
        model = XGBRegressor(
            learning_rate=trial.suggest_float('xgb_learning_rate', 0.01, 1.0, log=True),
            n_estimators=trial.suggest_int('xgb_n_estimators', 20, 200),
            max_depth=trial.suggest_int('xgb_max_depth', 3, 30),
            booster=trial.suggest_categorical('xgb_booster', ['gbtree', 'gblinear', 'dart']),
            alpha=trial.suggest_float('xgb_alpha', 0.0, 1.0),
            lambda_=trial.suggest_float('xgb_lambda', 0.0, 1.0),
            random_state=42,
        )

    elif model_name == 'HistGradientBosting':
        model = HistGradientBoostingRegressor(
            warm_start=True,
            learning_rate=trial.suggest_float('hgb_learning_rate', 0.01, 1.0, log=True),
            max_iter=trial.suggest_int('hgb_max_iter', 100, 1000),
            max_depth=trial.suggest_int('hgb_max_depth', 3, 40),
            max_bins=trial.suggest_int('hgb_max_bins', 128, 256)
            random_state=42
        )

    elif model_name == 'LightGBM':
        model = LGBMRegressor(
            verbosity=-1,
            learning_rate=trial.suggest_float('lgb_learning_rate', 0.01, 1.0, log=True),
            n_estimators=trial.suggest_int('lgb_n_estimators', 20, 500),
            num_leaves=trial.suggest_int('lgb_num_leaves', 20, 300),
            max_depth=trial.suggest_int('lgb_max_depth', -1, 15)
            feature_fraction=trial.suggest_float('lgb_feature_fraction', 0.4, 1.0),
            random_state=42
        )

    # Assemble the pipeline
    steps = [
        ('scaler', scaler),
    ]
    if feature_selector != 'none':
        steps.append(('feature_selector', selector))
    steps.append(('model', model))

    pipeline = Pipeline(steps)
    return pipeline

In [None]:
# Outer loop for nested cross validation
outer_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
for train_idx, test_idx in outer_cv.split(X, y):
    X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
    y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
    print(X_train.shape, X_test.shape)

In [None]:
# Optuna for hyperparameter optimization
def objective(trial):
    try:
        # StratifiedKFold cross-validation
        skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
        
        # Initialize lists to store metrics
        mean_squared_error_scores = []
        mean_absolute_error_scores = []
        root_mean_squared_error_scores = []
        r2_score_scores = []
        
        # Initialize the split
        for train_idx, test_idx in skf.split(X_train, y_train):
            X_t, X_val = X_train.iloc[train_idx], X_train.iloc[test_idx]
            y_t, y_val = y_train.iloc[train_idx], y_train.iloc[test_idx]
            
            # Create pipeline based on trial hyperparameters
            pipeline = create_pipe(trial, X_t)
            
            # Fit the model
            pipeline.fit(X_t, y_t)
            
            # Predict the validation set
            y_val_pred = pipeline.predict(X_val)
            
            # Compute metrics
            mse = mean_squared_error(y_val, y_val_pred)
            rmse = np.sqrt(mse)
            
            mean_squared_error_scores.append(mse)
            mean_absolute_error_scores.append(mean_absolute_error(y_val, y_val_pred))
            root_mean_squared_error_scores.append(rmse)
            r2_score_scores.append(r2_score(y_val, y_val_pred))
        
        # Compute mean of all scores
        trial.set_user_attr('MSE', np.mean(mean_squared_error_scores))
        trial.set_user_attr('MAE', np.mean(mean_absolute_error_scores))
        trial.set_user_attr('RMSE', np.mean(root_mean_squared_error_scores))
        trial.set_user_attr('R2', np.mean(r2_score_scores))
        
        # Return RMSE as the optimization target (lower is better)
        return np.mean(root_mean_squared_error_scores)

    except ValueError as e:
        trial.set_user_attr("error", str(e))
        raise optuna.exceptions.TrialPruned()


In [None]:
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=500, n_jobs=35, show_progress_bar=True)