# Imports

In [1]:
from ucimlrepo import fetch_ucirepo

In [2]:
import shap

import pandas  as pd
import numpy   as np
import seaborn as sns

import matplotlib.pyplot as plt

from sklearn.base              import BaseEstimator, ClassifierMixin
from sklearn.metrics           import roc_curve, precision_recall_curve
from sklearn.inspection        import permutation_importance
from sklearn.calibration       import calibration_curve
from sklearn.model_selection   import learning_curve, StratifiedKFold, KFold, train_test_split
from sklearn.feature_selection import SequentialFeatureSelector

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
from scipy.stats  import ks_2samp
from sklearn.base import BaseEstimator

from sklearn.metrics import (
    r2_score, 
    mean_absolute_error, 
    mean_absolute_percentage_error, 
    root_mean_squared_error, 
    explained_variance_score, 
    f1_score, 
    make_scorer,
    balanced_accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    roc_auc_score,
    brier_score_loss,
    median_absolute_error
)

In [4]:
import optuna
from lightgbm import LGBMClassifier
from sklearn.model_selection import cross_validate

## Functions

In [5]:
def ks_score(y_true: pd.Series, y_prob: pd.Series) -> float:
    
    pos_prob = y_prob[y_true == 1]
    neg_prob = y_prob[y_true == 0]
    
    if len(pos_prob) == 0 or len(neg_prob) == 0:
        return 0.0
    
    result = ks_2samp(pos_prob, neg_prob)
    
    return result.statistic

ks_scorer = make_scorer(ks_score, greater_is_better=True, response_method="predict_proba")

def get_eval_scoring(scoring, return_func=True):
    
    scorers = {
        'r2': 'r2',
        'ks': ks_scorer,
        'brier': 'neg_brier_score',
        'roc_auc': 'roc_auc',
        'explained_variance': 'explained_variance',
        'mean_absolute_error': 'neg_mean_absolute_error',
        'median_absolute_error': 'neg_median_absolute_error',
        'root_mean_squared_error': 'neg_root_mean_squared_error',
        'mean_absolute_percentage_error': 'neg_mean_absolute_percentage_error'
    }

    functions = {
        'r2': r2_score,
        'ks': ks_score,
        'brier': lambda y_true, y_pred: -1 * brier_score_loss(y_true, y_pred),
        'roc_auc': roc_auc_score,
        'explained_variance': explained_variance_score,
        'mean_absolute_error': lambda y_true, y_pred: -1 * mean_absolute_error(y_true, y_pred),
        'median_absolute_error': lambda y_true, y_pred: -1 * median_absolute_error(y_true, y_pred),
        'root_mean_squared_error': lambda y_true, y_pred: -1 * root_mean_squared_error(y_true, y_pred),
        'mean_absolute_percentage_error': lambda y_true, y_pred: -1 * mean_absolute_percentage_error(y_true, y_pred)
    }
    
    if scoring not in scorers:
        raise ValueError(f"Metric '{scoring}' is not supported.")

    if scoring not in functions:
        raise ValueError(f"Metric '{scoring}' is not supported.")

    return functions[scoring] if return_func else scorers[scoring]

def analyze_model(model_name: str, model: BaseEstimator, results: dict, X_train: pd.DataFrame, y_train: pd.DataFrame, y_test: pd.DataFrame, target: str) -> None:
    
    print(f"{model_name} Results")

    display(summarize_metric_results(results))

    pred_col = f"{model_name}_pred"
    plot_residuals(y_test, pred_col, f"{model_name} (Test Dataset)", target)
    plot_pred_vs_true(y_test, pred_col, f"{model_name} (Test Dataset)", target)
    plot_error_by_quantile(y_test, pred_col, f"{model_name} (Test Dataset)", target)
    plot_feature_importance(model)
    plot_permutation_importance(model, X_train, y_train[target])
    plot_shap_summary(model, X_train)

def get_regressor_metrics(y: pd.DataFrame, pred_col: str, target: str) -> dict[str, float]:
        
    y_true = y[target]
    y_pred = y[pred_col]
    
    return {
        'R2': r2_score(y_true, y_pred),
        'MAE': mean_absolute_error(y_true, y_pred),
        'MadAE': median_absolute_error(y_true, y_pred),
        'MAPE': mean_absolute_percentage_error(y_true, y_pred),
        'RMSE': root_mean_squared_error(y_true, y_pred),
        'Explained Variance': explained_variance_score(y_true, y_pred)
    }

def get_best_threshold_for_f1(y_true: pd.Series, y_probs: pd.Series) -> float:
    
    thresholds = np.linspace(0, 1, 200)
    best_thresh = 0
    best_f1 = 0
    
    for thresh in thresholds:
        
        y_pred = (y_probs >= thresh).astype(int)
        f1 = f1_score(y_true, y_pred)
        
        if f1 > best_f1:
            
            best_f1 = f1
            best_thresh = thresh
            
    return best_thresh

def get_classifier_metrics(y: pd.DataFrame, model_name: str, target: str) -> dict[str, float]:
        
    best_threshold = get_best_threshold_for_f1(y[target], y[f'{model_name}_prob'])

    y[f'{model_name}_pred'] = 0
    y.loc[y[f'{model_name}_prob'] >= best_threshold, f'{model_name}_pred'] = 1
    
    return {
        'Treshold': best_threshold,
        'Balanced Accuracy': balanced_accuracy_score(y[target], y[f'{model_name}_pred']),
        'Precision': precision_score(y[target], y[f'{model_name}_pred']),
        'Recall': recall_score(y[target], y[f'{model_name}_pred']),
        'F1': f1_score(y[target], y[f'{model_name}_pred']),
        'AUC': roc_auc_score(y[target], y[f'{model_name}_prob']),
        'KS': ks_score(y[target], y[f'{model_name}_prob']),
        'Brier': brier_score_loss(y[target], y[f'{model_name}_prob'])
    }

def get_default_model(random_state) -> LGBMClassifier:
    
    return LGBMClassifier(random_state=random_state, verbose=-1, objective='binary')

def get_params_objective(trial, random_state=42) -> dict:

    return {
        'objective': trial.suggest_categorical('objective', ['binary']),
        'boosting_type': trial.suggest_categorical('boosting_type', ['gbdt']),
        'metric': trial.suggest_categorical('metric', ['auc', 'binary_logloss']),
        'num_leaves': trial.suggest_int('num_leaves', 20, 300),
        'max_depth': trial.suggest_int('max_depth', 3, 15),
        'learning_rate': trial.suggest_float('learning_rate', 1e-3, 0.1, log=True),
        'n_estimators': trial.suggest_int('n_estimators', 100, 1000, step=100),
        'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
        'lambda_l1': trial.suggest_float('lambda_l1', 1e-8, 10.0, log=True),
        'lambda_l2': trial.suggest_float('lambda_l2', 1e-8, 10.0, log=True),
        'random_state': trial.suggest_categorical('random_state', [random_state]),
        'verbose': trial.suggest_categorical('verbose', [-1])
}


def prob_to_score(probs: pd.Series, min_score: int = 0, max_score: int = 1000, inverse: bool = False) -> np.array:
    
    """
    Convert probabilities to scores in intervals [min_score, max_score].

    Args:
        probs (array-like): List of probabilities between 0 and 1.
        min_score (int): Scor min (default=1).
        max_score (int): Scor max (default=1000).
        inverse (bool): If True, probability is high => score is lower (ex: risk).

    Returns:
        np.array: Scores between min_score and max_score.
    """

    probs = np.asarray(probs)

    if inverse:
        probs = 1 - probs
    
    scores = min_score + (max_score - min_score) * probs
    
    return np.round(scores).astype(int)

def calc_rating_limits(y: pd.Series, n_ratings: int = 5, min_score: int = 0, max_score: int = 1000) -> list[float]:
    
    scores = np.asarray(y)
    
    quantiles = np.linspace(0, 1, n_ratings + 1)[1:-1]
    inner_thresholds = np.quantile(scores, quantiles)
    
    thresholds = [min_score] + inner_thresholds.tolist() + [max_score]

    return thresholds

def apply_ratings(y: pd.Series, thresholds: list[float], labels: list[str] = None) -> pd.Series:
  
    return pd.cut(y, bins=thresholds, labels=labels, include_lowest=True)

def plot_rating_distribution(y: pd.Series, model_name: str, target: str) -> None:

    """
    Plot the distribution of ratings.

    Args:
        df (pd.DataFrame): DataFrame with the rating and score.
    """
    
    y.groupby([f'{model_name}_rating'], observed=True)[target].count().plot(kind='bar')
    
    plt.title(f'Rating Distribution - {model_name}')
    plt.xlabel('Rating')
    plt.ylabel('Count')
    plt.xticks(rotation=45)
    plt.show()

def plot_rating(y: pd.Series, model_name: str, target: str) -> None:
    
    """
    Plot the average score for each rating.

    Args:
        df (pd.DataFrame): DataFrame with the rating and score.
    """
    
    y.groupby([f'{model_name}_rating'], observed=True)[target].mean().plot(kind='bar')
    
    plt.title(f'Rating vs Target - {model_name}')
    plt.xlabel('Rating')
    plt.ylabel('Average Target')
    plt.xticks(rotation=45)
    plt.show()

def summarize_metric_results(results: dict[str, dict[str, float]]) -> pd.DataFrame:
        
    rows = []
    
    for dataset, metrics_dict in results.items():
        row = {"Dataset": dataset}
        row.update(metrics_dict)
        rows.append(row)
    
    return pd.DataFrame(rows)

def detect_model_type(model) -> str:
    
    name = type(model).__name__.lower()
    
    if 'classifier' in name or hasattr(model, 'predict_proba'):
        return 'classifier'
    
    elif 'regressor' in name or (hasattr(model, 'predict') and not hasattr(model, 'predict_proba')):
        return 'regressor'
    
    return 'unknown'

def analyze_model(model_name: str, model: BaseEstimator, results: dict, features: list, X_train: pd.DataFrame, y_train: pd.DataFrame, y_test: pd.DataFrame, target: str, scoring: str) -> None:
    
    print(f"{model_name} Results")

    display(summarize_metric_results(results))

    if detect_model_type(model) == 'regressor':
        pred_col = f"{model_name}_pred"
        
        plot_residuals(y_test, pred_col, target)
        plot_pred_vs_true(y_test, pred_col, target)
        plot_error_by_quantile(y_test, pred_col, target)

    elif detect_model_type(model) == 'classifier':
        pred_col = f"{model_name}_pred"
        prob_col = f"{model_name}_prob"
        
        plot_rating(y_test, model_name, target)
        plot_rating_distribution(y_test, model_name, target)
        plot_roc_curve(y_test, prob_col, target)
        plot_precision_recall_curve(y_test, prob_col, target)
        plot_calibration_curve(y_test, model_name, target, strategy='uniform')

    else:
        raise ValueError("Model needs to be a classifier or regressor.")

    plot_learning_curve(model, X_train, y_train, target, scoring=scoring)
    if hasattr(model, 'feature_importances_'):
        plot_feature_importance(model)
    plot_permutation_importance(model, features, X_train, y_train[target], scoring=scoring)
    plot_shap_summary(model, features, X_train)

def plot_roc_curve(y: pd.DataFrame, prob_col: str, target: str) -> None:
    
    fpr, tpr, _ = roc_curve(y[target], y[prob_col])

    plt.plot(fpr, tpr)
    plt.title('ROC AUC Curve')
    plt.ylabel('True Positive Rate')
    plt.xlabel('False Positive Rate')
    plt.grid(True)
    plt.show()

def plot_precision_recall_curve(y: pd.DataFrame, prob_col: str, target: str):

    precision, recall, _ = precision_recall_curve(y[target], y[prob_col])

    plt.plot(precision, recall)
    plt.title('Precision Recall Curve')
    plt.xlabel('Precision')
    plt.ylabel('Recall')
    plt.grid(True)
    plt.show()

def plot_learning_curve(model: BaseEstimator, X: pd.DataFrame, y: pd.DataFrame, target: str, scoring: str, cv: int = 5):

    if isinstance(model, ClassifierMixin):
        splitter = StratifiedKFold(cv, shuffle=True, random_state=42)
    else:
        splitter = KFold(cv, shuffle=True, random_state=42)
    
    train_sizes_abs, train_scores, val_scores = learning_curve(
        model, X, y[target],
        train_sizes=np.linspace(0.1, 1.0, 5),
        cv=splitter,
        scoring=scoring
    )

    train_scores_mean = np.mean(train_scores, axis=1)
    val_scores_mean = np.mean(val_scores, axis=1)
    train_scores_std = np.std(train_scores, axis=1)
    val_scores_std = np.std(val_scores, axis=1)

    # plt.figure(figsize=(10, 6))
    plt.plot(train_sizes_abs, train_scores_mean, 'o-', color='r', label='Train')
    plt.fill_between(
        train_sizes_abs, train_scores_mean - train_scores_std, 
        train_scores_mean + train_scores_std, alpha=0.1, color='r')

    plt.plot(train_sizes_abs, val_scores_mean, 'o-', color='b', label='Validation')
    plt.fill_between(
        train_sizes_abs, val_scores_mean - val_scores_std,
        val_scores_mean + val_scores_std, alpha=0.1, color='b')

    plt.title("Learning Curve")
    plt.xlabel("Train Size")
    plt.ylabel(f"{scoring}")
    plt.legend(loc="best")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

def plot_calibration_curve(y: pd.DataFrame, model_name: str, target: str, n_bins=11, strategy='uniform') -> None:

    prob_true, prob_pred = calibration_curve(y[target], y[f"{model_name}_prob"], n_bins=n_bins, strategy=strategy)

    # plt.figure(figsize=(8, 6))
    plt.plot(prob_pred, prob_true, marker='o', label='Model')
    plt.plot([0, 1], [0, 1], linestyle='--', label='Perfectly calibrated', color='gray')
    plt.xlabel('Predicted Probability')
    plt.ylabel('Observed Frequency')
    plt.title('Calibration Curve')
    plt.legend(loc='best')
    plt.grid(True)
    plt.tight_layout()
    plt.show()

def plot_feature_importance(model: BaseEstimator) -> None:

    feature_names = model.feature_name_ if hasattr(model, 'feature_name_') else model.feature_names_ # for catboost
    
    df_imp = pd.DataFrame(model.feature_importances_, feature_names).reset_index()
    df_imp.columns = ["Variable", "Importance"]
    df_imp = df_imp.sort_values("Importance", ascending=False)

    sns.barplot(x="Importance", y="Variable", color="#006e9cff", data=df_imp[:20])

    plt.title(f"Importance of Variables")
    plt.show()

def plot_permutation_importance(model: BaseEstimator, features: list, X: pd.DataFrame, y: pd.DataFrame, scoring: str) -> None:

    permu_results = permutation_importance(model, X[features], y, scoring=scoring, n_repeats=5, random_state=42)

    sorted_importances_idx = permu_results.importances_mean.argsort()
    
    df_results = pd.DataFrame(permu_results.importances[sorted_importances_idx].T, columns=X.columns[sorted_importances_idx])
    
    ax = df_results.plot.box(vert=False, whis=10, patch_artist=True, boxprops={'facecolor':'skyblue', 'color':'blue'})
    ax.axvline(x=0, color="k", linestyle="--")
    ax.set_xlabel(f"Decrease in {scoring}")

    plt.show()

def plot_shap_summary(model: BaseEstimator, features: list, X: pd.DataFrame) -> None:

    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X[features])
    
    shap.summary_plot(shap_values, X[features])

class AutoMLLGBMClassifierCV:

    def __init__(
        self, X_train: pd.DataFrame, y_train: pd.DataFrame, X_test: pd.DataFrame, y_test: pd.DataFrame, best_features: list[str], 
        target: str, scoring: str, n_trials: int = 50, cv: int = 5, random_state: int = 42):

        self.X_train = X_train
        self.y_train = y_train
        self.X_test = X_test
        self.y_test = y_test
        self.best_features = best_features
        self.target = target
        self.n_trials = n_trials
        self.cv = cv
        self.scorer = get_eval_scoring(scoring, return_func=False)
        self.random_state = random_state

    def _cross_validate(self, model: LGBMClassifier, features: list[str]) -> None:
        
        cv_results = cross_validate(
            estimator=model, 
            X=self.X_train[features], 
            y=self.y_train[self.target], 
            cv=self.cv,
            scoring={
                'balanced_accuracy': 'balanced_accuracy', 
                'precision': 'precision', 
                'recall': 'recall', 
                'f1': 'f1', 
                'roc_auc': 'roc_auc', 
                'ks': ks_scorer, 
                'brier': 'neg_brier_score'
            }
        )

        return {
            'Treshold': 0.5,
            'Balanced Accuracy': cv_results['test_balanced_accuracy'].mean(),
            'Precision': cv_results['test_precision'].mean(),
            'Recall': cv_results['test_recall'].mean(),
            'F1': cv_results['test_f1'].mean(),
            'AUC': cv_results['test_roc_auc'].mean(),
            'KS': cv_results['test_ks'].mean(),
            'Brier': np.abs(cv_results['test_brier'].mean())
        }

    def _train_model(self, model_name: str, features: list[str], model: LGBMClassifier) -> tuple[LGBMClassifier, dict[str, dict[str, float]]]:

        model.fit(self.X_train[features], self.y_train[self.target])
        
        self.y_train[f'{model_name}_prob'] = model.predict_proba(self.X_train[features])[:, 1]
        self.y_train[f'{model_name}_score'] = prob_to_score(self.y_train[f'{model_name}_prob'], inverse=True)
        self.train_rating_limits = calc_rating_limits(self.y_train[f'{model_name}_score'])
        self.y_train[f'{model_name}_rating'] = apply_ratings(self.y_train[f'{model_name}_score'], self.train_rating_limits)
        
        self.y_test[f'{model_name}_prob'] = model.predict_proba(self.X_test[features])[:, 1]
        self.y_test[f'{model_name}_score'] = prob_to_score(self.y_test[f'{model_name}_prob'], inverse=True)
        self.y_test[f'{model_name}_rating'] = apply_ratings(self.y_test[f'{model_name}_score'], self.train_rating_limits)
        
        results = {
            'Train CV': self._cross_validate(model, features),
            'Test': get_classifier_metrics(self.y_test, model_name, self.target)
        }
        
        return model, results

    def _train_base_model(self) -> dict[str, dict[str, float]]:
        
        model = get_default_model(random_state=self.random_state)
        
        return self._train_model('base_model', self.X_train.columns.tolist(), model)

    def _train_best_feature_model(self) -> dict[str, dict[str, float]]:
        
        model = get_default_model(random_state=self.random_state)
        
        return self._train_model('best_feature_model', self.best_features, model)

    def _get_best_params(self) -> dict:

        def objective(trial):
            
            params = get_params_objective(trial, random_state=self.random_state)
    
            cv_results = cross_validate(
                estimator=LGBMClassifier(**params), cv=self.cv, scoring=self.scorer,
                X=self.X_train[self.best_features], y=self.y_train[self.target])
    
            return cv_results['test_score'].mean()
    
        optuna.logging.set_verbosity(optuna.logging.WARNING)
        
        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials=self.n_trials)
    
        return study.best_params

    def _train_best_params_model(self) -> dict[str, dict[str, float]]:
        
        best_params = self._get_best_params()
        self.best_params_model = LGBMClassifier(**best_params)
        
        return self._train_model('best_params_model', self.best_features, self.best_params_model)

    def train(self) -> None:
        
        self.base_model, self.base_model_results = self._train_base_model()
        self.best_feature_model, self.best_feature_model_results = self._train_best_feature_model()
        self.best_params_model, self.best_params_model_results = self._train_best_params_model()

    def get_metrics(self) -> pd.DataFrame:

        model_results = {
            "Base Model": self.base_model_results,
            "Best Feature Model": self.best_feature_model_results,
            "Best Params Model": self.best_params_model_results,
        }

        summary_frames = [summarize_metric_results(results).assign(Model=name) for name, results in model_results.items()]

        return pd.concat(summary_frames, ignore_index=True)

    def get_result_analysis(self) -> None:
    
        analyze_model(
            "base_model", 
            self.base_model, 
            self.base_model_results, 
            self.X_train.columns.tolist(), 
            self.X_train, 
            self.y_train, 
            self.y_test, 
            self.target, 
            self.scorer
        )
        analyze_model(
            "best_feature_model", 
            self.best_feature_model, 
            self.best_feature_model_results, 
            self.best_features, 
            self.X_train, 
            self.y_train, 
            self.y_test, 
            self.target, 
            self.scorer
        )
        analyze_model(
            "best_params_model", 
            self.best_params_model, 
            self.best_params_model_results,
            self.best_features, 
            self.X_train, 
            self.y_train, 
            self.y_test, 
            self.target, 
            self.scorer
        )

## Loading Dataset

In [6]:
df = fetch_ucirepo(id=350) 

X = df.data.features
df_aux = df.variables
X.columns = df_aux.loc[~df_aux['name'].isin(['ID', 'Y']), 'description'].values

y = df.data.targets

In [7]:
y['Y'].value_counts(1)

Y
0    0.7788
1    0.2212
Name: proportion, dtype: float64

In [8]:
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.6, stratify=y)

# Feature Engineering

## Feature Selection

In [None]:
sfs = SequentialFeatureSelector(
    estimator=LGBMClassifier(verbosity=-1), 
    n_features_to_select='auto',
    direction='forward',
    scoring='roc_auc', 
    cv=3, 
    n_jobs=-1
)

sfs.fit(X_train, y_train)

In [None]:
X_train.loc[:, sfs.get_support()].columns.tolist()

In [None]:
selected_columns = ['LIMIT_BAL', 'EDUCATION', 'PAY_0', 'PAY_2', 'PAY_4', 'PAY_5', 'BILL_AMT1', 'BILL_AMT3', 'PAY_AMT1', 'PAY_AMT2', 'PAY_AMT6']

# Machine Learning

In [None]:
auto_lgbm = AutoMLLGBMClassifierCV(
    X_train=X_train,
    y_train=y_train,
    X_test=X_test,
    y_test=y_test,
    best_features=selected_columns,
    target='Y',
    scoring='brier',
    random_state=42
)

In [None]:
auto_lgbm.train()

In [None]:
auto_lgbm.get_metrics()

In [None]:
auto_lgbm.get_result_analysis()

In [None]:
y_test.head()