# Dataset preparation

In [None]:
!pip install imbalanced-learn smote-variants optuna

In [None]:
import imblearn.datasets as imb_datasets
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
import numpy as np

import os
import inspect
import smote_variants

# Define data list
data_List = [
        "ecoli",
        "satimage",
        "pen_digits",
        "sick_euthyroid",
        "libras_move",
        "car_eval_4",
        "wine_quality",
        "ozone_level",
        "mammography",
        "optical_digits",
]

# Load and prepare data
train_test_List = []
for data_name in data_List:
    try:
        libras = imb_datasets.fetch_datasets()[data_name]
        X, y = libras['data'], libras['target']

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

        # Rename class -1 to 0 if present
        y_train = np.where(y_train == -1, 0, y_train)
        y_test = np.where(y_test == -1, 0, y_test)

        train_test_List.append((data_name, (X_train, X_test, y_train, y_test)))
        print(f"Loaded {data_name}")
    except Exception as e:
        print(f"Could not load {data_name}: {e}")

# Oversampling methods

In [5]:
oversampling_methods = [
    ('SONA',{}),
    None,  # Original data
    ('SMOTE', {}),
    ('Borderline_SMOTE2', {}),
    ('Safe_Level_SMOTE', {}),
    ('polynom_fit_SMOTE_poly', {}),
    ('SMOTE_IPF', {}),
]
classifiers = [
    ('Logistic regression', {}),
    ('XGBoost', {}),
     ('KNeighborsClassifier', {}),
    ('DecisionTreeClassifier', {}),
    ('MLPClassifier', {}),
    ('SVC', {}),
    ('RandomForestClassifier', {}),
]

In [6]:
from scipy.spatial.distance import cdist

def SONA(X,y, min_label, new_label = 0):
  X_gen_min = (X)[y== min_label]
  X_gen_maj = (X)[y != min_label]

  maj_size = len(X_gen_maj)
  minor_size = len(X_gen_min)

  ## Negative border
  dist_gen_maj2min= cdist(X_gen_maj, X_gen_min)
  rank_dist_maj2min = np.argsort(dist_gen_maj2min)

  neg_border = np.zeros(len(X_gen_min))
  for i in range(maj_size):
    near_point = rank_dist_maj2min[i][0]
    neg_border[near_point] += 1

  ## Positive border
  dist_gen_min2maj = cdist(X_gen_min,X_gen_maj)
  rank_dist_min2maj = np.argsort(dist_gen_min2maj)

  pos_border = np.zeros(len(X_gen_maj))
  for i in range(minor_size):
    near_point = rank_dist_min2maj[i][0]
    pos_border[near_point] += 1

  ## Find radius
  neg_radius = np.zeros(len(X_gen_min))
  rank_dist_gen_pos = np.argsort(dist_gen_min2maj)

  for i in range(minor_size):
    pos_list = rank_dist_gen_pos[i]

    for j in range(maj_size):
      pos_point = pos_list[j]

      if pos_border[pos_point] == 0:
        neg_radius[i] = dist_gen_min2maj[i][pos_point]
        break;

  prop_min = 1 / (neg_border+1)
  prop_min = prop_min / np.sum(prop_min)

  syn_list = []

  dist_min2min = cdist(X_gen_min,X_gen_min)

  synthese_len = maj_size - minor_size

  for t in range(synthese_len):      # assume to 1:1
    i = np.random.choice(len(X_gen_min), p = prop_min)

    terminal_prop = 1/dist_min2min[i]
    terminal_prop = np.nan_to_num(terminal_prop, nan=0, posinf=0, neginf=0)
    terminal_prop = terminal_prop / np.sum(terminal_prop)
    j = np.random.choice(len(X_gen_min), p = terminal_prop)

    direction_vector =  X_gen_min[j] - X_gen_min[i]
    norm_v = np.linalg.norm(direction_vector)
    direction_vector = direction_vector / norm_v

    alpha = np.random.random()

    syn_x = X_gen_min[i] + alpha* direction_vector *min(neg_radius[i], norm_v)
    syn_list.append(syn_x)

  return (
            np.vstack([X, syn_list]),
            np.hstack([y, np.repeat(min_label + new_label, len(syn_list))]),
        )

In [7]:
def oversampling_only(oversampler_config, X_train, X_test, y_train, y_test):
      X_train_resampled, y_train_resampled = X_train, y_train
      if oversampler_config != None:
          oversampler_class_name, oversampler_params = oversampler_config

          # Use the specific oversampler directly
          if oversampler_class_name == "SONA":
              X_train_resampled, y_train_resampled = SONA(X_train, y_train, 1)
          else:
            oversampler_instance = getattr(sv, oversampler_class_name)(**oversampler_params)
            X_train_resampled, y_train_resampled = oversampler_instance.sample(X_train, y_train)

      # print("return ",oversampler_config)

      return X_train_resampled, y_train_resampled

# Hyperparameter tuning

In [12]:
import imblearn.datasets as imb_datasets
from sklearn.model_selection import train_test_split
import smote_variants as sv
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score, roc_auc_score
from sklearn.metrics import precision_score, recall_score, f1_score
import pandas as pd
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from xgboost import XGBClassifier

def get_classifier_params(trial, classifier_name):
    """Defines search spaces for classifiers."""
    if classifier_name == 'Logistic regression':
        return {
            'C': trial.suggest_float('C', 1e-4, 1e2, log=True),
            'solver': trial.suggest_categorical('solver', ['liblinear', 'lbfgs'])
        }
    elif classifier_name == 'XGBoost':
        return {
            'n_estimators': trial.suggest_int('n_estimators', 50, 200),
            'max_depth': trial.suggest_int('max_depth', 3, 10),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'verbosity': 0
        }
    elif classifier_name == 'KNeighborsClassifier':
        return {
            'n_neighbors': trial.suggest_int('n_neighbors', 3, 20),
            'weights': trial.suggest_categorical('weights', ['uniform', 'distance']),
            'metric': trial.suggest_categorical('metric', ['euclidean', 'manhattan', 'minkowski'])
        }
    elif classifier_name == 'DecisionTreeClassifier':
        return {
            'max_depth': trial.suggest_int('max_depth', 3, 20),
            'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
            'criterion': trial.suggest_categorical('criterion', ['gini', 'entropy'])
        }
    elif classifier_name == 'MLPClassifier':
        return {
            'hidden_layer_sizes': trial.suggest_categorical('hidden_layer_sizes', [(50,), (100,), (50, 50)]),
            'activation': trial.suggest_categorical('activation', ['tanh', 'relu']),
            'alpha': trial.suggest_float('alpha', 1e-4, 1e-1, log=True),
            'learning_rate_init': trial.suggest_float('learning_rate_init', 1e-3, 1e-1, log=True),
            'max_iter': 500
        }
    elif classifier_name == 'SVC':
        return {
            'C': trial.suggest_float('C', 0.1, 100, log=True),
            'kernel': trial.suggest_categorical('kernel', ['rbf', 'linear', 'poly']),
            'gamma': trial.suggest_categorical('gamma', ['scale', 'auto']),
            'probability': True,
        }
    elif classifier_name == 'RandomForestClassifier':
        return {
            'n_estimators': trial.suggest_int('n_estimators', 50, 200),
            'max_depth': trial.suggest_int('max_depth', 3, 20),
            'min_samples_split': trial.suggest_int('min_samples_split', 2, 20)
        }
    else:
        raise ValueError(f"Unknown classifier: {classifier_name}")

def get_classifier_instance(classifier_name, params):
    if classifier_name == 'Logistic regression':
        return LogisticRegression(**params)
    elif classifier_name == 'XGBoost':
        return XGBClassifier(**params)
    elif classifier_name == 'KNeighborsClassifier':
        return KNeighborsClassifier(**params)
    elif classifier_name == 'DecisionTreeClassifier':
        return DecisionTreeClassifier(**params)
    elif classifier_name == 'MLPClassifier':
        return MLPClassifier(**params)
    elif classifier_name == 'SVC':
        return SVC(**params)
    elif classifier_name == 'RandomForestClassifier':
        return RandomForestClassifier(**params)
    elif classifier_name == 'GaussianNB':
        return GaussianNB(**params)
    else:
        raise ValueError(f"Unknown classifier: {classifier_name}")

In [None]:
import optuna

def objective(trial, classifier_name, X_train, y_train):
    # Retrieve hyperparameters for the current classifier using the Optuna trial
    params = get_classifier_params(trial, classifier_name)

    # Instantiate the classifier with the suggested hyperparameters
    classifier = get_classifier_instance(classifier_name, params)

    # Create a pipeline with StandardScaler and the classifier
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', classifier)
    ])

    # Initialize StratifiedKFold for cross-validation
    kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    # Evaluate the pipeline using cross_val_score with weighted F1-score
    scores = cross_val_score(pipeline, X_train, y_train, cv=kf, scoring='f1_weighted', n_jobs=-1)

    # Return the mean F1-score from cross-validation
    return scores.mean()

print("Objective function 'objective' defined successfully.")

In [None]:
results = []

for data_name, (X_train, X_test, y_train, y_test) in train_test_List:
    print(f"\nProcessing dataset: {data_name}")
    for classifier_name, _ in classifiers:
        print(f"  Optimizing for classifier: {classifier_name}")

        # Create an Optuna study for each classifier and dataset
        study = optuna.create_study(direction='maximize')
        study.optimize(lambda trial: objective(trial, classifier_name, X_train, y_train), n_trials=10, show_progress_bar=True)

        # Store the best results
        results.append({
            'dataset': data_name,
            'classifier': classifier_name,
            'best_f1_score': study.best_value,
            'best_params': study.best_params
        })

# Convert results to a Pandas DataFrame for summary
results_df = pd.DataFrame(results)
results_df.to_csv("best_param.csv")

print("Hyperparameter optimization complete. Results stored in 'results_df'.")

# Training & Experiments

In [None]:
import ast

results_df = pd.read_csv("best_param.csv") # getfrom optuna
param_dict = results_df.set_index(['dataset', 'classifier'])['best_params'].to_dict()

evaluation_results = [] # Re-initialize the evaluation_results list

for dataset_name, data_tuple in train_test_List:
    X_train, X_test, y_train, y_test = data_tuple
    print("dataset_name:", dataset_name)

    for oversampler_config in oversampling_methods:
      if oversampler_config != None:
        oversampler_class_name, oversampler_params = oversampler_config
      else:
        oversampler_class_name = "None"
        oversampler_params = {}

      print("\t oversampler_config:", oversampler_config)

      for classifier_name, _ in classifiers:
        print("\t\t classifier_name:", classifier_name)

        # Instantiate classifier and create pipeline
        best_params_str = param_dict[(dataset_name, classifier_name)]
        # Convert the string representation of the dictionary back to a dictionary
        best_params = ast.literal_eval(best_params_str)

        accuracy_list = []
        precision_list = []
        recall_list = []
        f1_list = []
        roc_auc_list = []

        # repeat for reproducibility
        for _ in range(7):
          X_train2, y_train2 = oversampling_only(oversampler_config, X_train, X_test, y_train, y_test)

          classifier = get_classifier_instance(classifier_name, best_params)
          pipeline = Pipeline(
              [('scaler', StandardScaler()), ('classifier', classifier)]
          )

          # Train the pipeline
          pipeline.fit(X_train2, y_train2)

          # Make predictions
          y_pred = pipeline.predict(X_test)

          # Calculate metrics
          accuracy = accuracy_score(y_test, y_pred)
          precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
          recall = recall_score(y_test, y_pred, average='weighted', zero_division=0)
          f1 = f1_score(y_test, y_pred, average='weighted', zero_division=0)

          roc_auc = np.nan # Initialize as NaN
          # Try to get probability predictions for ROC AUC
          if hasattr(pipeline.named_steps['classifier'], 'predict_proba'):
              y_proba = pipeline.predict_proba(X_test)
              unique_classes = np.unique(y_test)

              if len(unique_classes) > 1:
                  if len(unique_classes) == 2: # Binary classification

                      pos_class_label = unique_classes[-1]
                      # Get the order of classes learned by the classifier
                      class_labels_in_model = pipeline.named_steps['classifier'].classes_
                      # Find the index corresponding to the positive class label
                      pos_class_idx = np.where(class_labels_in_model == pos_class_label)[0][0]

                      y_score_for_roc = y_proba[:, pos_class_idx]

                      try:
                          # For binary classification, 'multi_class' is not needed, and y_score must be 1D.
                          roc_auc = roc_auc_score(y_test, y_score_for_roc, average='weighted')
                      except ValueError as e:
                          print(f"Warning: Could not compute ROC AUC for binary case {data_name} with {classifier_name}: {e}")

                  else: # Multi-class classification (len(unique_classes) > 2)
                      # For multi-class classification, roc_auc_score expects a 2D array (n_samples, n_classes).
                      # The columns of y_proba should correspond to the order of classes in unique_classes.
                      if y_proba.ndim == 2 and y_proba.shape[1] == len(unique_classes):
                          try:
                              roc_auc = roc_auc_score(y_test, y_proba, multi_class='ovr', average='weighted', labels=unique_classes)
                          except ValueError as e:
                              print(f"Warning: Could not compute ROC AUC for multi-class case {data_name} with {classifier_name}: {e}")
                      else:
                          print(f"Warning: Unexpected y_proba shape for multi-class classification in {data_name} with {classifier_name}. ROC AUC will be NaN.")
              else:
                  print(f"Warning: Cannot compute ROC AUC for {data_name} with classifier {classifier_name} as y_test contains only one class.")
          else:
              print(f"Warning: Classifier {classifier_name} does not support predict_proba. ROC AUC will be NaN.")

          accuracy_list.append(accuracy)
          precision_list.append(precision)
          recall_list.append(recall)
          f1_list.append(f1)
          roc_auc_list.append(roc_auc)

        # Store results
        evaluation_results.append({
            'dataset': dataset_name,
            'classifier': classifier_name,
            'oversamping': oversampler_class_name,
            'accuracy': accuracy_list,
            'precision': precision_list,
            'recall': recall_list,
            'f1_score': f1_list,
            'roc_auc': roc_auc_list,
        })

print("Model evaluation complete. Results stored in 'evaluation_results'.")

In [None]:
evaluation_df = pd.DataFrame(evaluation_results)
print("Evaluation DataFrame created successfully:")
display(evaluation_df)
evaluation_df.to_csv("SONA_results.csv")