In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from imblearn.over_sampling import SMOTENC
from imblearn.combine import SMOTEENN
from imblearn.over_sampling import BorderlineSMOTE
from imblearn.over_sampling import SMOTE
from imblearn.over_sampling import ADASYN

In [None]:
# Mounting google drive if it is already not mounted
def load_data(verbose_level, ip_file_path, ip_sheet_name):
  googleDriveFolder = '/content/drive'

  # Loading cleaded excel data
  df_orig = pd.read_excel(ip_file_path, sheet_name=ip_sheet_name)

  if verbose_level > 0:
    print("Ip File Path: " + ip_file_path + "\n SheetName: " + ip_sheet_name)
    print(f"df_orig.shape: {df_orig.shape}")
  if verbose_level > 1:
    display(df_orig.head())

  return df_orig


# SMOTE type2: Apply SMOTEENN only to training data
def ApplySMOTEENN(X_train, y_train):
    sm = SMOTEENN(random_state=42)
    X_train_res, y_train_res = sm.fit_resample(X_train, y_train)
    return X_train_res, y_train_res

# Main preprocessing function
def data_preprocessing(verbose_level, X, y):
    # -------------------------------------
    # STEP 1: Train / Validation / Test Split (60/20/20)
    # -------------------------------------
    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.4, stratify=y, random_state=42
    )

    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42
    )

    # -------------------------------------
    # STEP 2: Apply SMOTEENN on training data only
    # -------------------------------------
    X_train_res, y_train_res = ApplyADASYN(X_train.copy(), y_train.copy())
    X_train_res = pd.DataFrame(X_train_res, columns=X.columns)

    # -------------------------------------
    # STEP 3: Scale features
    # -------------------------------------
    scaler = StandardScaler()
    X_train_res_scaled = scaler.fit_transform(X_train_res)
    X_val_scaled = scaler.transform(X_val)
    X_test_scaled = scaler.transform(X_test)

    # Convert to DataFrames
    X_train_res_scaled = pd.DataFrame(X_train_res_scaled, columns=X.columns)
    X_val_scaled = pd.DataFrame(X_val_scaled, columns=X.columns)
    X_test_scaled = pd.DataFrame(X_test_scaled, columns=X.columns)

    # -------------------------------------
    # STEP 4: Verbose summary
    # -------------------------------------
    if verbose_level > 0:
      print(f"\nOriginal data shape: {X.shape}")

      print(f"\nBefore SMOTE => (Train shape: {X_train.shape}), "
            f"(Validation shape: {X_val.shape}), (Test shape: {X_test.shape})")

      print(f"After SMOTE  => (Train shape: {X_train_res.shape}), "
            f"(Validation shape: {X_val.shape}), (Test shape: {X_test.shape})")

      print(f"After Scaling => (Train shape: {X_train_res_scaled.shape}), "
            f"(Validation shape: {X_val_scaled.shape}), (Test shape: {X_test_scaled.shape})")


    # -------------------------------------
    # STEP 5: Return data
    # -------------------------------------
    return {
        "X_train": X_train,
        "y_train": y_train,
        "X_val": X_val,
        "y_val": y_val,
        "X_test": X_test,
        "y_test": y_test,

        "X_train_res": X_train_res,
        "y_train_res": y_train_res,

        "X_train_res_scaled": X_train_res_scaled,
        "X_val_scaled": X_val_scaled,
        "X_test_scaled": X_test_scaled,

        "features": X.columns.tolist()
    }


# SMOTE type1: Apply SMOTE only to training data
def ApplySMOTE(X_train, y_train):
  sm = SMOTE(random_state=42)
  X_res, y_res = sm.fit_resample(X_train, y_train)
  return X_res, y_res

# SMOTE type2: Apply SMOTEENN only to training data
def ApplySMOTEENN(X_train, y_train):
  sm = SMOTEENN(random_state=42)
  X_train_res, y_train_res = sm.fit_resample(X_train, y_train)
  return X_train_res, y_train_res

# SMOTE type3: Apply ApplyADASYN only to training data
def ApplyADASYN(X_train, y_train):
  adasyn = ADASYN(random_state=42)
  X_res, y_res = adasyn.fit_resample(X_train, y_train)
  return X_res, y_res

# SMOTE type4: Apply ApplyBorderlineSMOTE only to training data
def ApplyBorderlineSMOTE(X_train, y_train, kind='borderline-1'):
  sm = BorderlineSMOTE(kind=kind, random_state=42)
  X_res, y_res = sm.fit_resample(X_train, y_train)
  return X_res, y_res

# SMOTE type1: Apply SMOTENC only to training data
def ApplySMOTENC(X_train, y_train, cat_indices):
  sm = SMOTENC(categorical_features=cat_indices, random_state=42)
  X_res, y_res = sm.fit_resample(X_train, y_train)
  return X_res, y_res


In [None]:
def export_merged_importance_matrics(importance_df_lr, importance_df_tabnet, importance_df_xgb):
  import pandas as pd
  import os
  from openpyxl import load_workbook
  from pandas import ExcelWriter
  # Step 1: Rename columns based on actual names
  importance_df_lr_renamed = importance_df_lr.rename(columns={
      'mean_shap': 'mean_shap_lr',
      'mean_abs_shap': 'mean_abs_shap_lr'
  })

  # Do the same for the others — print their columns first if unsure
  importance_df_tabnet_renamed = importance_df_tabnet.rename(columns={
      'mean_shap': 'mean_shap_tabnet',
      'mean_abs_shap': 'mean_abs_shap_tabnet'
  })

  importance_df_xgb_renamed = importance_df_xgb.rename(columns={
      'mean_shap': 'mean_shap_xgb',
      'mean_abs_shap': 'mean_abs_shap_xgb'
  })

  # Step 2: Convert to numeric
  for df, cols in [
      (importance_df_lr_renamed, ['mean_shap_lr', 'mean_abs_shap_lr']),
      (importance_df_tabnet_renamed, ['mean_shap_tabnet', 'mean_abs_shap_tabnet']),
      (importance_df_xgb_renamed, ['mean_shap_xgb', 'mean_abs_shap_xgb'])
  ]:
      for col in cols:
          df[col] = pd.to_numeric(df[col], errors='coerce')

  # Step 3: Merge
  merged_df = importance_df_lr_renamed.merge(
      importance_df_tabnet_renamed, on='feature', how='outer'
  ).merge(
      importance_df_xgb_renamed, on='feature', how='outer'
  )

  # Step 4: Round all numeric columns
  # merged_df = merged_df.round(4)

  # Step 5: Reorder columns
  merged_importance_df = merged_df[[
      'feature',
      'mean_shap_lr', 'mean_abs_shap_lr',
      'mean_shap_tabnet', 'mean_abs_shap_tabnet',
      'mean_shap_xgb', 'mean_abs_shap_xgb'
  ]]

  # Step 6: Export to Excel (overwrite or update sheet)
  output_path = "../op_data/merged_importance_matrics.xlsx"
  sheet_name = "importance_matrix_all"

  # Optionally: ensure directory exists
  os.makedirs(os.path.dirname(output_path), exist_ok=True)

  with ExcelWriter(output_path, engine='openpyxl', mode='w') as writer:
      merged_importance_df.to_excel(writer, sheet_name=sheet_name, index=False)

  print(f"✅ Exported sheet '{sheet_name}' to: {output_path}")


In [None]:
def build_model_metric_summary_df(results_all: dict[str, dict]) -> pd.DataFrame:
    summary_rows = []

    metrics = ["precision", "recall", "f1-score", "support"]
    label_mapping = {
        "0": "Class0",
        "1": "Class1",
        "macro avg": "MacroAvg",
        "weighted avg": "WeightedAvg"
    }

    for model_name, result in results_all.items():
        report = result.get("report_df")
        if not isinstance(report, pd.DataFrame):
            raise TypeError(f"'report_df' for model '{model_name}' must be a pandas DataFrame.")

        best_f1_thres = result.get("best_threshold", np.nan)
        accuracy = result.get("accuracy", np.nan)
        auc_roc = result.get("auc", np.nan)

        row = {
            "ModelName": model_name,
            "Best_F1_Thres": best_f1_thres,
            "Accuracy": accuracy,
            "AUC-ROC": auc_roc,
        }

        for label, name in label_mapping.items():
            for metric in metrics:
                colname = f"{metric.capitalize()}_{name}"
                value = report.loc[label, metric] if label in report.index else np.nan
                row[colname] = value

        summary_rows.append(row)

    summary_df = pd.DataFrame(summary_rows)

    column_order = [
        "ModelName", "Best_F1_Thres", "Accuracy", "AUC-ROC",
        "Precision_Class0", "Precision_Class1", "Precision_MacroAvg", "Precision_WeightedAvg",
        "Recall_Class0", "Recall_Class1", "Recall_MacroAvg", "Recall_WeightedAvg",
        "F1-score_Class0", "F1-score_Class1", "F1-score_MacroAvg", "F1-score_WeightedAvg",
        "Support_Class0", "Support_Class1", "Support_MacroAvg", "Support_WeightedAvg"
    ]

    existing_columns = [col for col in column_order if col in summary_df.columns]
    summary_df = summary_df[existing_columns]

    return summary_df