In [None]:
from typing import Any, Dict, Literal, Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import seaborn as sns
import sidetable as stb
from catboost import CatBoostClassifier
from sklearn.metrics import (PrecisionRecallDisplay, RocCurveDisplay,
                             classification_report, confusion_matrix, f1_score,
                             precision_recall_curve, recall_score)
from sklearn.model_selection import (RepeatedStratifiedKFold, StratifiedKFold,
                                     train_test_split)
from sklearn.utils.class_weight import compute_class_weight

In [None]:
def remove_outliers_iqr_multi(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
    """
    Remove outliers from multiple columns in a DataFrame using the IQR method.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame.
    cols : list of str
        List of column names to clean.

    Returns
    -------
    pd.DataFrame
        DataFrame with outliers removed.

    Examples
    --------
    >>> df_filtered = remove_outliers_iqr_multi(df, ['col1', 'col2'])
    """
    mask = pd.Series(True, index=df.index)
    for col in cols:
        q1, q3 = df[col].quantile([0.25, 0.75])
        iqr = q3 - q1
        lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
        mask &= df[col].between(lower, upper)
    return df[mask]


def add_engineered_features(df: pd.DataFrame, inplace: bool = False) -> pd.DataFrame:
    """
    Add engineered features for GBDT models to a copy of the input DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        Original Bank Marketing DataFrame.
    inplace : bool, optional
        If True, modify the input df in-place. If False (default), return modified copy.

    Returns
    -------
    pd.DataFrame
        DataFrame with added features.
    """
    if not inplace:
        df = df.copy()

    # === Age binning ===
    df["age_group"] = pd.cut(
        df["age"],
        bins=[0, 30, 55, np.inf],  # cover all realistic ages
        labels=["young", "middle", "senior"],
        include_lowest=True
    )

    # === Contact intensity ===
    df["contact_intensity"] = df["contacts_per_campaign"] / (
        df["nb_previous_contact"] + 1
    )

    # === Education × Marital cross-feature ===
    df["education_marital"] = (
        df["education"].astype(str) + "_" + df["marital_status"].astype(str)
    )

    # === Was previously contacted ===
    df["was_previously_contacted"] = (df["nb_previous_contact"] > 0).astype(int)

    # === Long contact binary flag (more than 5 minutes) ===
    df["long_last_contact"] = (df["last_contact_duration"] > 300).astype(int)

    # === Recent contact flag (last 30 days) ===
    df["recently_contacted"] = (df["N_last_days"] < 30).astype(int)

    # === N_last_days == 999 → new binary flag + cleaned version ===
    df["never_contacted_before"] = (df["N_last_days"] == 999).astype(int)
    df["N_last_days_cleaned"] = df["N_last_days"].replace(999, np.nan)

    # === Economic stress proxy ===
    df["economic_pressure"] = -df["cons_conf_index"] * df["emp_var_rate"]

    # === Interest spread: CPI - Euribor ===
    df["interest_diff"] = df["cons_price_index"] - df["euri_3_month"]

    # === Month + weekday combined ===
    df["month_weekday"] = df["month"].astype(str) + "_" + df["week_day"].astype(str)

    # === Contact pressure: calls / (days since last + 1) ===
    df["contact_pressure"] = df["contacts_per_campaign"] / (
        df["N_last_days_cleaned"] + 1
    )

    # === Log features (avoid skew) ===
    df["log_duration"] = np.log1p(df["last_contact_duration"])
    df["log_contacts"] = np.log1p(df["contacts_per_campaign"])
    df["log_previous"] = np.log1p(df["nb_previous_contact"])

    # === Month → Season mapping ===
    month_to_season = {
        "mar": "spring",
        "apr": "spring",
        "may": "spring",
        "jun": "summer",
        "jul": "summer",
        "aug": "summer",
        "sep": "autumn",
        "oct": "autumn",
        "nov": "autumn",
        "dec": "winter",
        "jan": "winter",
        "feb": "winter",
    }
    df["season"] = df["month"].map(month_to_season)

    # === Previous outcome was success ===
    df["prev_outcome_success"] = (df["previous_outcome"] == "success").astype(int)

    # === Marketing intensity per employee ===
    df["contact_to_employee_ratio"] = df["contacts_per_campaign"] / (
        df["nb_employees"] + 1
    )

    # === Stress per employee ===
    df["stress_per_employee"] = df["economic_pressure"] / (df["nb_employees"] + 1)

    # === Age per contact — a proxy for target segment targeting ===
    df["age_per_contact"] = df["age"] / (df["contacts_per_campaign"] + 1)

    # === Both loans flag ===
    df["both_loans"] = (
        (df["housing_loan"] == "yes") & (df["personal_loan"] == "yes")
    ).astype(int)

    # === Loan burden score ===
    df["loan_burden_score"] = (df["housing_loan"] == "yes").astype(int) + (
        df["personal_loan"] == "yes"
    ).astype(int)

    # === Risky contact (short but many calls) ===
    df["risky_contact"] = (
        (df["last_contact_duration"] < 90) & (df["contacts_per_campaign"] > 3)
    ).astype(int)

    # === Employment × Confidence — interaction signal ===
    df["emp_conf_product"] = df["emp_var_rate"] * df["cons_conf_index"]

    # === Volatility index ===
    df["volatility_score"] = (
        df["emp_var_rate"].abs()
        + df["cons_conf_index"].abs()
        + df["euri_3_month"].abs()
    )

    # === Recent effective contact score ===
    df["recent_contact_score"] = df["recently_contacted"] * df["long_last_contact"]

    df = df.drop(columns=["N_last_days_cleaned"])

    # Convert to categorical
    df["N_last_days"] = df["N_last_days"].astype("category")

    # 1. Add new category
    df["N_last_days"] = df["N_last_days"].cat.add_categories("no previous contacts")

    # 2. Replace 999 with 'no previous contacts'
    df.loc[df["N_last_days"] == 999, "N_last_days"] = "no previous contacts"
    return df


def split_data(
    X: pd.DataFrame,
    y: pd.Series,
    val_size: float = 0.1,
    test_size: float = 0.1,
    random_state: int = 42,
    use_stratification: bool = False,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.Series, pd.Series, pd.Series]:
    """
    Split input data into train, validation, and test sets, optionally using stratification.

    Parameters
    ----------
    X : pd.DataFrame
        Feature matrix.
    y : pd.Series
        Target variable.
    val_size : float, optional
        Proportion of the data to include in the validation set, by default 0.1.
    test_size : float, optional
        Proportion of the data to include in the test set, by default 0.1.
    random_state : int, optional
        Random seed for reproducible splits, by default 42.
    use_stratification : bool, optional
        If True, data splits will be stratified using the target variable. By default False.

    Returns
    -------
    Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.Series, pd.Series, pd.Series]
        A tuple containing:
        - X_train: pd.DataFrame
            Training feature matrix
        - X_val: pd.DataFrame
            Validation feature matrix
        - X_test: pd.DataFrame
            Test feature matrix
        - y_train: pd.Series
            Training target vector
        - y_val: pd.Series
            Validation target vector
        - y_test: pd.Series
            Test target vector

    Raises
    ------
    ValueError
        If the sum of val_size and test_size is greater than or equal to 1.
    """
    # Check that the requested splits are valid
    if val_size + test_size >= 1:
        raise ValueError("val_size + test_size must be less than 1.")

    # Determine stratification parameters
    stratify_test = y if use_stratification else None

    # First split off the test set
    X_temp, X_test, y_temp, y_test = train_test_split(
        X,
        y,
        test_size=test_size,
        random_state=random_state,
        stratify=stratify_test,  # stratify if specified
    )

    # Adjust val_size proportion relative to the remaining data after removing test
    adjusted_val_size = val_size / (1 - test_size)

    # For the second split (train/val), stratify on the reduced dataset if needed
    stratify_val = y_temp if use_stratification else None

    # Split the remaining data into train and validation
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp,
        y_temp,
        test_size=adjusted_val_size,
        random_state=random_state,
        stratify=stratify_val,  # stratify if specified
    )

    return X_train, X_val, X_test, y_train, y_val, y_test

In [None]:
df = pd.read_csv("bank_dataset (3) (1) (1) (3) (1) (2) (1) (1) (1).csv")

In [None]:
df.head()

### EDA

In [None]:
df.info()

In [None]:
df.stb.missing(style=True)

In [None]:
df.stb.freq(["target"], style=True)

In [None]:
df.stb.freq(["occupation", "marital_status"], style=True)

In [None]:
df.stb.freq(["education"], style=True)

In [None]:
df.stb.counts()

In [None]:
for col in df.select_dtypes(include=["object"]).columns:
    sns.countplot(x=col, data=df)
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()

In [None]:
for col in df.select_dtypes(include=["int64", "float64"]).columns:
    sns.kdeplot(x=df[col], fill=True)
    plt.show()

In [None]:
sns.pairplot(
    df[df.select_dtypes(include=["int64", "float64"]).columns.tolist() + ["target"]].sample(n=5000, random_state=42),
    hue="target",
    diag_kind="kde",
)

In [None]:
# N_last_days = 999 means that the customer has no previous contacts
df[df["N_last_days"] == 999].stb.counts()

In [None]:
# Add new features
df = add_engineered_features(df)

#### Final dataframe

In [None]:
df.head()

In [None]:
df.stb.counts()

### Modeling

In [None]:
categorial_features = df.select_dtypes(include=["object", "category"]).columns.tolist()
categorial_features

In [None]:
categorial_features = df.select_dtypes(include=["object", "category"]).columns.tolist()
categorial_features.remove("target")
categorial_features.extend(["age_group", "education_marital", "month_weekday", "season", "was_previously_contacted",
                            "long_last_contact", "recently_contacted", "never_contacted_before", "prev_outcome_success",
                            "both_loans", "loan_burden_score", "risky_contact", "recent_contact_score"])
categorial_features

In [None]:
target_dict = {"yes": 1, "no": 0}
df.loc[:, "target"] = df["target"].map(target_dict)
df.loc[:, "target"] = df.astype("category")

In [None]:
y = df.pop("target")
y = y.astype(int)

In [None]:
df.head()

In [None]:
X_train, X_val, X_test, y_train, y_val, y_test = split_data(
    df, y, val_size=0.1, test_size=0.2, use_stratification=True, random_state=42
)

In [None]:
X_train_full = pd.concat([X_train, X_val], axis=0)
y_train_full = pd.concat([y_train, y_val], axis=0)

In [None]:
y_val.value_counts(normalize=True)

In [None]:
y_train_full.value_counts(normalize=True)

In [None]:
y_train_full.info()

In [None]:
classes = np.unique(y_train_full)
class_weights = compute_class_weight(
    class_weight="balanced", classes=classes, y=y_train_full
)
class_weights = class_weights.tolist()
class_weights

In [None]:
def optimize_catboost(
    X: pd.DataFrame,
    y: pd.Series,
    n_splits: int = 5,
    n_repeats: Optional[int] = None,
    n_trials: int = 50,
    random_state: int = 42,
    max_iterations: int = 1000,
    optimize_metric: Literal["F1", "Recall"] = "F1",
) -> Tuple[optuna.Study, Dict[str, Any], pd.DataFrame]:
    """
    Optimize CatBoost hyperparameters for imbalanced classification, including class_weights,
    using StratifiedKFold/RepeatedStratifiedKFold and Optuna.
    """

    import warnings

    warnings.filterwarnings("ignore", category=UserWarning)

    if optimize_metric not in ["F1", "Recall"]:
        raise ValueError("optimize_metric must be either 'F1' or 'Recall'.")

    assert not y.isnull().any(), "Target y contains NaN values."

    def objective(trial: optuna.Trial) -> float:
        """Objective function to optimize."""
        params = {
            "loss_function": "Logloss",
            "eval_metric": optimize_metric,
            "custom_metric": ["Logloss"],
            "iterations": max_iterations,
            "use_best_model": False,
            "metric_period": 1,
            "depth": trial.suggest_int("depth", 3, 14),
            "learning_rate": trial.suggest_float("learning_rate", 1e-4, 0.4, log=True),
            "l2_leaf_reg": trial.suggest_int("l2_leaf_reg", 1, 100, log=True),
            "random_strength": trial.suggest_float(
                "random_strength", 1e-2, 10.0, log=True
            ),
            "border_count": trial.suggest_int("border_count", 32, 256),
            "auto_class_weights": "Balanced",
            "verbose": 0,
            "random_seed": random_state,
            "allow_writing_files": False,
        }

        if n_repeats:
            skf = RepeatedStratifiedKFold(
                n_splits=n_splits, n_repeats=n_repeats, random_state=random_state
            )
        else:
            skf = StratifiedKFold(
                n_splits=n_splits, shuffle=True, random_state=random_state
            )

        scoring_func = recall_score if optimize_metric == "Recall" else f1_score

        fold_scores = []
        for train_idx, val_idx in skf.split(X, y):
            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

            model = CatBoostClassifier(**params, cat_features=categorial_features)
            model.fit(X_train, y_train, eval_set=(X_val, y_val))

            y_pred = model.predict(X_val)
            fold_scores.append(scoring_func(y_val, y_pred, pos_label=1))

        return np.mean(fold_scores)

    # Create and configure Optuna study
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

    best_params = study.best_trial.params

    # Merge static params back
    fixed_params = {
        "loss_function": "Logloss",
        "eval_metric": optimize_metric,
        "custom_metric": ["Logloss"],
        "iterations": max_iterations,
        "use_best_model": False,
        "metric_period": 1,
        "verbose": 0,
        "random_seed": random_state,
        "allow_writing_files": False,
    }

    best_params.update(fixed_params)

    # Final CV pass
    def cross_val_iterations(
        X: pd.DataFrame,
        y: pd.Series,
        params: Dict[str, Any],
        n_splits: int,
        n_repeats: Optional[int],
        random_state: int,
        main_metric: str,
    ) -> pd.DataFrame:
        """Manual CV to gather iteration-wise statistics."""
        if n_repeats:
            skf = RepeatedStratifiedKFold(
                n_splits=n_splits, n_repeats=n_repeats, random_state=random_state
            )
        else:
            skf = StratifiedKFold(
                n_splits=n_splits, shuffle=True, random_state=random_state
            )

        train_metric_folds, test_metric_folds = [], []
        train_logloss_folds, test_logloss_folds = [], []

        for train_idx, val_idx in skf.split(X, y):
            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

            model = CatBoostClassifier(**params, cat_features=categorial_features)
            model.fit(X_train, y_train, eval_set=(X_val, y_val))

            results = model.get_evals_result()
            train_key, test_key = "learn", "validation"

            train_metric_folds.append(results[train_key][main_metric])
            test_metric_folds.append(results[test_key][main_metric])
            train_logloss_folds.append(
                results[train_key].get("Logloss", results[train_key]["TotalLoss"])
            )
            test_logloss_folds.append(
                results[test_key].get("Logloss", results[test_key]["TotalLoss"])
            )

        n_iters = len(train_metric_folds[0])

        data = []
        for i in range(n_iters):
            data.append(
                {
                    "iteration": i + 1,
                    f"train-{main_metric}-mean": np.mean(
                        [fold[i] for fold in train_metric_folds]
                    ),
                    f"train-{main_metric}-std": np.std(
                        [fold[i] for fold in train_metric_folds], ddof=1
                    ),
                    f"test-{main_metric}-mean": np.mean(
                        [fold[i] for fold in test_metric_folds]
                    ),
                    f"test-{main_metric}-std": np.std(
                        [fold[i] for fold in test_metric_folds], ddof=1
                    ),
                    "train-Logloss-mean": np.mean(
                        [fold[i] for fold in train_logloss_folds]
                    ),
                    "train-Logloss-std": np.std(
                        [fold[i] for fold in train_logloss_folds], ddof=1
                    ),
                    "test-Logloss-mean": np.mean(
                        [fold[i] for fold in test_logloss_folds]
                    ),
                    "test-Logloss-std": np.std(
                        [fold[i] for fold in test_logloss_folds], ddof=1
                    ),
                }
            )

        return pd.DataFrame(data)

    cv_results = cross_val_iterations(
        X, y, best_params, n_splits, n_repeats, random_state, optimize_metric
    )

    return study, best_params, cv_results

In [None]:
# Optimize and retrieve iteration-wise CV results
study_obj, best_params, cv_table = optimize_catboost(
    X_train_full,
    y_train_full,
    optimize_metric="F1",
    n_splits=3,
    n_trials=20,
    random_state=42,
    max_iterations=2000
)

In [None]:
# Print results
print("Best F1:", study_obj.best_value)

best_params["iterations"] = cv_table["test-F1-mean"].idxmax()
print("Best parameters:", best_params)

In [None]:
model = CatBoostClassifier(
    cat_features=categorial_features,
    loss_function=best_params["loss_function"],
    eval_metric=best_params["eval_metric"],
    iterations=best_params["iterations"],
    depth=best_params["depth"],
    learning_rate=best_params["learning_rate"],
    l2_leaf_reg=best_params["l2_leaf_reg"],
    random_strength=best_params["random_strength"],
    border_count=best_params["border_count"],
    auto_class_weights="Balanced",
    random_seed=42,
)

In [None]:
model.fit(X_train_full, y_train_full)

In [None]:
pred = model.predict(X_test)
pred_proba = model.predict_proba(X_test)[:, 1]

In [None]:
print(classification_report(y_test, pred))

In [None]:
confusion_matrix(y_test, pred)

In [None]:
display = RocCurveDisplay.from_predictions(
    y_test,
    pred_proba,
    color="darkorange",
    plot_chance_level=True,
)
_ = display.ax_.set(xlabel="False Positive Rate", ylabel="True Positive Rate")

In [None]:
display = PrecisionRecallDisplay.from_estimator(
    model, X_test, y_test, name="Pipeline", plot_chance_level=True
)
_ = display.ax_.set_title("2-class Precision-Recall curve")

In [None]:
precision, recall, thresholds = precision_recall_curve(y_test, pred_proba)
thresholds = np.append(thresholds, 1.0)

plt.figure(figsize=(12, 6))
plt.plot(thresholds, precision, label="Precision", color="blue")
plt.plot(thresholds, recall, label="Recall", color="green")

plt.xlabel("Thresholds")
plt.ylabel("Value")
plt.title("Precision and Recall vs Thresholds")
plt.legend()
plt.grid()
plt.show()