In [2]:
import numpy as np
from scipy.special import expit
import os
import random
random.seed(111)
import pandas as pd
from scipy import stats
from sklearn.preprocessing import LabelEncoder
import traceback
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import f1_score, roc_auc_score, average_precision_score
from scipy.stats import ttest_rel
import matplotlib.pyplot as plt


#### **Implementation of the base algorithm for the Gradient Boosting Trees with direct inspiration in XGBoost**
- We based our implementation on existing open-source code available at https://github.com/rushter/MLAlgorithms

In [2]:
class BaseEstimator:
    y_required = True
    fit_required = True

    def _setup_input(self, X, y=None):
        """Ensure inputs to an estimator are in the expected format.

        Ensures X and y are stored as numpy ndarrays by converting from an
        array-like object if necessary. Enables estimators to define whether
        they require a set of y target values or not with y_required, e.g.
        kmeans clustering requires no target labels and is fit against only X.

        Parameters
        ----------
        X : array-like
            Feature dataset.
        y : array-like
            Target values. By default is required, but if y_required = false
            then may be omitted.
        """
        if not isinstance(X, np.ndarray):
            X = np.array(X)

        if X.size == 0:
            raise ValueError("Got an empty matrix.")

        if X.ndim == 1:
            self.n_samples, self.n_features = 1, X.shape
        else:
            self.n_samples, self.n_features = X.shape[0], np.prod(X.shape[1:])

        self.X = X

        if self.y_required:
            if y is None:
                raise ValueError("Missed required argument y")

            if not isinstance(y, np.ndarray):
                y = np.array(y)

            if y.size == 0:
                raise ValueError("The targets array must be no-empty.")

        self.y = y

    def fit(self, X, y=None):
        self._setup_input(X, y)

    def predict(self, X=None):
        if not isinstance(X, np.ndarray):
            X = np.array(X)

        if self.X is not None or not self.fit_required:
            return self._predict(X)
        else:
            raise ValueError("You must call `fit` before `predict`")

    def _predict(self, X=None):
        raise NotImplementedError()


In [None]:
def split(X, y, value):
    left_mask = X < value
    right_mask = X >= value
    return y[left_mask], y[right_mask]

def get_split_mask(X, column, value):
    left_mask = X[:, column] < value
    right_mask = X[:, column] >= value
    return left_mask, right_mask

def split_dataset(X, target, column, value, return_X=True):
    left_mask, right_mask = get_split_mask(X, column, value)

    left, right = {}, {}
    for key in target.keys():
        left[key] = target[key][left_mask]
        right[key] = target[key][right_mask]

    if return_X:
        left_X, right_X = X[left_mask], X[right_mask]
        return left_X, right_X, left, right
    else:
        return left, right

def xgb_criterion(y, left, right, loss):
    left = loss.gain(left["actual"], left["y_pred"])
    right = loss.gain(right["actual"], right["y_pred"])
    initial = loss.gain(y["actual"], y["y_pred"])
    gain = left + right - initial
    return gain
    

In [None]:
class Tree(object):
    """Recursive implementation of decision tree."""

    def __init__(self, criterion=None, n_classes=None):
        self.impurity = None
        self.threshold = None
        self.column_index = None
        self.outcome = None
        self.criterion = criterion
        self.loss = None
        self.n_classes = n_classes  # Only for classification

        self.left_child = None
        self.right_child = None

    @property
    def is_terminal(self):
        return not bool(self.left_child and self.right_child)

    def _find_splits(self, X):
        """Find all possible split values."""
        split_values = set()

        # Get unique values in a sorted order
        x_unique = list(np.unique(X))
        for i in range(1, len(x_unique)):
            # Find a point between two values
            average = (x_unique[i - 1] + x_unique[i]) / 2.0
            split_values.add(average)

        return list(split_values)

    def _find_best_split(self, X, target, n_features):
        """Find best feature and value for a split. Greedy algorithm."""

        # Sample random subset of features
        n_features = min(n_features, X.shape[1])  
        subset = random.sample(list(range(0, X.shape[1])), n_features)
        max_gain, max_col, max_val = None, None, None

        for column in subset:
            split_values = self._find_splits(X[:, column])
            for value in split_values:
            
                left, right = split_dataset(X, target, column, value, return_X=False)
            
                if len(left["y"]) == 0 or len(right["y"]) == 0:
                    continue
                gain = xgb_criterion(target, left, right, self.loss)

                if (max_gain is None) or (gain > max_gain):
                    max_col, max_val, max_gain = column, value, gain

        return max_col, max_val, max_gain


    def _train(self, X, target, max_features=None, min_samples_split=10, max_depth=None, minimum_gain=0.0001):

        try:
            # Stops if few samples or zero depth
            assert X.shape[0] > min_samples_split
            assert max_depth > 0

            if max_features is None:
                max_features = X.shape[1]

            column, value, gain = self._find_best_split(X, target, max_features)

            if gain is None or gain <= minimum_gain:
                raise AssertionError("Insuficient gain for split")

            self.column_index = column
            self.threshold = value
            self.impurity = gain

            # Dataset split
            left_X, right_X, left_target, right_target = split_dataset(X, target, column, value)


            # Recursively grows children
            self.left_child = Tree(self.criterion, self.n_classes)
            self.left_child.loss = self.loss
            self.left_child._train(
                left_X, left_target, max_features, min_samples_split, max_depth - 1, minimum_gain
            )

            self.right_child = Tree(self.criterion, self.n_classes)
            self.right_child.loss = self.loss
            self.right_child._train(
                right_X, right_target, max_features, min_samples_split, max_depth - 1, minimum_gain
            )

        except AssertionError:
            self._calculate_leaf_value(target)


    def train(self, X, target, max_features=None, min_samples_split=10, max_depth=None, minimum_gain=0.01, loss=None):
        """Build a decision tree from training set.

        Parameters
        ----------

        X : array-like
            Feature dataset.
        target : dictionary or array-like
            Target values.
        max_features : int or None
            The number of features to consider when looking for the best split.
        min_samples_split : int
            The minimum number of samples required to split an internal node.
        max_depth : int
            Maximum depth of the tree.
        minimum_gain : float, default 0.01
            Minimum gain required for splitting.
        loss : function, default None
            Loss function for gradient boosting.
        """

        

        if not isinstance(target, dict):
            target = {"y": target}

        
        self.loss = loss

        self.n_classes = len(np.unique(target['y']))

        self._train(X, target, max_features=max_features, min_samples_split=min_samples_split,
                    max_depth=max_depth, minimum_gain=minimum_gain)


    def _calculate_leaf_value(self, targets):
        """Find optimal value for leaf."""
        self.outcome = self.loss.approximate(targets["actual"], targets["y_pred"])

    def predict_row(self, row):
        """Predict single row."""
        if not self.is_terminal:
            if row.iloc[self.column_index] < self.threshold:
                return self.left_child.predict_row(row)
            else:
                return self.right_child.predict_row(row)
        return self.outcome

    def predict(self, X):
    # To garantee that X is a dataset
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)

        result = np.zeros(X.shape[0])
        for i in range(X.shape[0]):
            result[i] = self.predict_row(X.iloc[i, :])
        return result

In [None]:
class Loss:
    """Base class for loss functions."""

    def __init__(self, regularization=1.0):
        self.regularization = regularization

    def grad(self, actual, predicted):
        """First order gradient."""
        raise NotImplementedError()

    def hess(self, actual, predicted):
        """Second order gradient."""
        raise NotImplementedError()

    def approximate(self, actual, predicted):
        """Approximate leaf value."""
        return self.grad(actual, predicted).sum() / (self.hess(actual, predicted).sum() + self.regularization)

    def transform(self, pred):
        """Transform predictions values."""
        return pred

    def gain(self, actual, predicted):
        """Calculate gain for split search."""
        nominator = self.grad(actual, predicted).sum() ** 2
        denominator = self.hess(actual, predicted).sum() + self.regularization
        return 0.5 * (nominator / denominator)



class LogisticLoss(Loss):
    """Logistic loss."""

    def grad(self, actual, predicted):
        return actual * expit(-actual * predicted)

    def hess(self, actual, predicted):
        expits = expit(predicted)
        return expits * (1 - expits)

    def transform(self, output):
        # Apply logistic (sigmoid) function to the output
        return expit(output)


class GradientBoosting(BaseEstimator):
    """Gradient boosting trees with Taylor's expansion approximation (as in xgboost)."""
    #classe central, implementa o algoritmo de boosting

    def __init__(self, n_estimators=100, learning_rate=0.1, max_features=10, max_depth=2, min_samples_split=10, loss = None):
        self.min_samples_split = min_samples_split
        self.learning_rate = learning_rate #quão forte cada nova árvore influencia o modelo
        self.max_depth = max_depth
        self.max_features = max_features
        self.n_estimators = n_estimators #nr de árvores (iterações de boosting)
        self.trees = [] #lista de árvores treinadas
        self.loss = loss #função de perda(classificação)

    def fit(self, X, y=None): 
        self._setup_input(X, y) #vem da BaseEstimator, prepara os dados
        self.y_mean = np.mean(y) #inicializa a média dos valores de y
        self._train() #onde acontece o boosting

    def _train(self): #coração do boosting
        # Initialize model with zeros
        y_pred = np.zeros(self.n_samples, np.float32)

        for n in range(self.n_estimators):

            residuals = self.loss.grad(self.y, y_pred) #calcula o gradiente(resíduo)
            tree = Tree(criterion=xgb_criterion) #cria uma nova tree(árvore de decisão para classificação)
            # Pass multiple target values to the tree learner
            targets = { #define os targets especiais
                # Residual values
                "y": residuals,
                # Actual target values
                "actual": self.y,
                # Predictions from previous step
                "y_pred": y_pred,
            }
            tree.train( #treina a arvore com os dados anteriores
                self.X,
                targets,
                max_features=self.max_features,
                min_samples_split=self.min_samples_split,
                max_depth=self.max_depth,
                loss=self.loss,
            )

            #prediz com a nova árvore e atualiza y_pred
            #Cada nova árvore corrige um pouco o erro das anteriores.
            predictions = tree.predict(self.X)
            y_pred += self.learning_rate * predictions
            self.trees.append(tree)

    def _predict(self, X=None):
        y_pred = np.zeros(X.shape[0], np.float32)

        for i, tree in enumerate(self.trees):
            #soma as previsões de todas as árvores
            y_pred += self.learning_rate * tree.predict(X)
        return y_pred

    def predict(self, X=None):
        #aplica transform, importante na classificação:
        #transforma a predição bruta com a função logística sigmoid
        #para obter probabilidade
        return self.loss.transform(self._predict(X))


class GradientBoostingClassifier(GradientBoosting):
    def __init__(self, **kwargs):
        super().__init__(loss=LogisticLoss(), **kwargs)

    def fit(self, X, y=None):
        y = (y * 2) - 1  # transforma {0, 1} em {-1, 1}
        super().fit(X, y)

#### **Challenge chosen: Class Imbalance**

We pre-processed all 50 datasets so that:

- No dataset was eliminated
- Numerical attributes: missing values are filled with the mean
- Categorical attributes: missing values are filled with the mode and then applied Label Encoder

All testing of our algorithm is done on the pre-processed (cleaned) datasets.

In [None]:
DATA_DIR = "class_imbalance/class_imbalance"
CLEANED_DIR = "cleaned_datasets"

os.makedirs(CLEANED_DIR, exist_ok=True)

def preprocess_data(df):
    df = df.copy()
    df.dropna(axis=1, how='all', inplace=True)

    for col in df.select_dtypes(include=[np.number]).columns:
        df[col] = df[col].fillna(df[col].mean())

    for col in df.select_dtypes(include='object').columns:
        df[col] = df[col].fillna(df[col].mode()[0])

    for col in df.select_dtypes(include='object').columns:
        le = LabelEncoder()
        df[col] = le.fit_transform(df[col].astype(str))

    return df

def convert_target_to_zero_one(y):
    """Converte target binário qualquer (ex: [-1, 1]) para [0, 1]"""
    classes = np.unique(y)
    if len(classes) == 2 and not np.array_equal(classes, [0, 1]):
        mapping = {classes[0]: 0, classes[1]: 1}
        return y.map(mapping)
    return y

def save_cleaned_datasets():
    for filename in os.listdir(DATA_DIR):
        if filename.endswith(".csv"):
            try:
                file_path = os.path.join(DATA_DIR, filename)
                df = pd.read_csv(file_path)
                df_clean = preprocess_data(df)

                # Verifica se o target é binário
                y = df_clean.iloc[:, -1]
                if len(np.unique(y)) != 2:
                    print(f"Ignorado {filename}: target não é binário")
                    continue

                # Converte target para [0, 1] se necessário
                df_clean.iloc[:, -1] = convert_target_to_zero_one(df_clean.iloc[:, -1])

                clean_name = filename.replace("dataset_", "").replace(".csv", "") + "_cleaned.csv"
                save_path = os.path.join(CLEANED_DIR, clean_name)
                df_clean.to_csv(save_path, index=False)
                print(f"[SALVO] {save_path}")

            except Exception as e:
                print(f"Erro ao processar {filename}: {e}")
                traceback.print_exc()

if __name__ == "__main__":
    save_cleaned_datasets()

#### **Testing the base algorithm**

- To test the algorithm, we divided it into 3 folds, keeping the original proportion of classes in each fold (that is why it is stratified).  

- The algorithm is trained with 2/3 of the data and tested on the remaining 1/3, repeating the process 3 times, every time with a different fold.

- Then, for each fold we calculated the f1_score, ROC AUC and PR AUC. These result's mean if given as the final result for the dataset.

In [None]:
CLEANED_DIR = "cleaned_datasets"
RESULTS_FILE = "resultados_gbm_antes.csv" 


def run_experiment_cleaned(file_path):
    dataset_name = os.path.basename(file_path)
    print(f"\nProcessando: {dataset_name}")

    try:
        df = pd.read_csv(file_path)
        X = df.iloc[:, :-1]
        y = df.iloc[:, -1]

        unique_classes = np.unique(y)
        if len(unique_classes) != 2:
            print(f"Dataset ignorado ({dataset_name}): Target não é binário")
            return dataset_name, np.nan, np.nan, np.nan

        # Definir a classe minoritária como positiva
        class_counts = pd.Series(y).value_counts()
        minor_class = class_counts.idxmin()

        min_class_count = class_counts.min()
        n_splits = min(3, min_class_count)
        if n_splits < 2:
            print(f"Dataset ignorado ({dataset_name}): Classe minoritária muito pequena para validação cruzada")
            return dataset_name, np.nan, np.nan, np.nan

        cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
        f1_scores = []
        auc_scores = []
        pr_auc_scores = []

        for fold, (train_idx, test_idx) in enumerate(cv.split(X, y)):
            X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

            model = GradientBoostingClassifier(n_estimators=50, max_depth=2, min_samples_split=5)
            model.fit(X_train, y_train)

            y_pred_proba = model.predict(X_test)
            y_pred = (y_pred_proba >= 0.5).astype(int)

            try:
                f1 = f1_score(y_test, y_pred, pos_label=minor_class, average='binary')
            except Exception as e:
                print(f"[F1 ERROR] {e}")
                f1 = np.nan
            f1_scores.append(f1)

            try:
                auc = roc_auc_score((y_test == minor_class).astype(int), y_pred_proba)
            except Exception as e:
                print(f"[AUC ERROR] {e}")
                auc = np.nan
            auc_scores.append(auc)

            try:
                pr_auc = average_precision_score((y_test == minor_class).astype(int), y_pred_proba)
            except Exception as e:
                print(f"[PR AUC ERROR] {e}")
                pr_auc = np.nan
            pr_auc_scores.append(pr_auc)

        return dataset_name, np.nanmean(f1_scores), np.nanmean(auc_scores), np.nanmean(pr_auc_scores)

    except Exception as e:
        print(f"Erro ao processar {dataset_name}: {e}")
        traceback.print_exc()
        return dataset_name, np.nan, np.nan, np.nan


def main():
    results = []

    for filename in os.listdir(CLEANED_DIR):
        if filename.endswith(".csv"):
            path = os.path.join(CLEANED_DIR, filename)
            result = run_experiment_cleaned(path)
            results.append(result)

    df_results = pd.DataFrame(results, columns=[
        "Dataset", 
        "F1 Score (minor class)", 
        "ROC AUC", 
        "PR AUC"
    ])
    df_results.to_csv(RESULTS_FILE, index=False)
    print(f"\nExperimentos finalizados. Resultados salvos em '{RESULTS_FILE}'.")


if __name__ == "__main__":
    main()

#### **Applying changes to make the algorithm more robust to class imbalance**

- **Hellinger Distance Gain:** Replaced default gain function in the first tree with Hellinger gain, encourages splits that highlight minority patterns. Used only in the first tree to guide initial splits while keeping the full power of XGBoost in subsequent trees.

In [9]:
def hellinger_gain(y, splits):
    def hellinger(p, q):
        return np.sqrt(np.sum((np.sqrt(p) - np.sqrt(q)) ** 2)) / np.sqrt(2)

    def class_prob(y):
        counts = np.bincount(y.astype(int),minlength=2)  #(para as classes 0 e 1), y está em float então usamos em int
        total = counts.sum()
        return counts / total if total > 0 else np.zeros_like(counts)

    p = class_prob(y)
    gain = 0.0
    for split in splits:
        q = class_prob(split)
        gain += len(split) / len(y) * hellinger(p, q)

    return gain

- **Class Weights:** Assigns directly in the loss function higher weight to errors on the minority class, based on inverse class frequencies.

In [10]:
def xgb_criterion(y, left, right, loss, class_weights=None):
    left_gain = loss.gain(left["actual"], left["y_pred"], class_weights)
    right_gain = loss.gain(right["actual"], right["y_pred"], class_weights)
    initial_gain = loss.gain(y["actual"], y["y_pred"], class_weights)
    return left_gain + right_gain - initial_gain

In [None]:
#Updating the Tree Class to accept weighted trees

class Tree(object):

    def __init__(self, criterion=None, n_classes=None,class_weights=None):
        self.impurity = None
        self.threshold = None
        self.column_index = None
        self.outcome = None
        self.criterion = criterion
        self.loss = None
        self.n_classes = n_classes 
        self.class_weights=class_weights #here is the new parameter

        self.left_child = None
        self.right_child = None

    @property
    def is_terminal(self):
        return not bool(self.left_child and self.right_child)

    def _find_splits(self, X):
        """Find all possible split values."""
        split_values = set()

        # Get unique values in a sorted order
        x_unique = list(np.unique(X))
        for i in range(1, len(x_unique)):
            # Find a point between two values
            average = (x_unique[i - 1] + x_unique[i]) / 2.0
            split_values.add(average)

        return list(split_values)

    def _find_best_split(self, X, target, n_features):
        """Find best feature and value for a split. Greedy algorithm."""

        # Sample random subset of features
        n_features = min(n_features, X.shape[1]) 
        subset = random.sample(list(range(0, X.shape[1])), n_features)
        max_gain, max_col, max_val = None, None, None

        for column in subset:
            split_values = self._find_splits(X[:, column])
            for value in split_values:
            
                left, right = split_dataset(X, target, column, value, return_X=False)
            
                if len(left["y"]) == 0 or len(right["y"]) == 0:
                    continue
                gain = xgb_criterion(target, left, right, self.loss,self.class_weights)

                if (max_gain is None) or (gain > max_gain):
                    max_col, max_val, max_gain = column, value, gain

        return max_col, max_val, max_gain


    def _train(self, X, target, max_features=None, min_samples_split=10, max_depth=None, minimum_gain=0.0001, class_weights=None):

        try:
            # Interrompe se poucas amostras ou profundidade zerada
            assert X.shape[0] > min_samples_split
            assert max_depth > 0

            if max_features is None:
                max_features = X.shape[1]

            column, value, gain = self._find_best_split(X, target, max_features)

            if gain is None or gain <= minimum_gain:
                raise AssertionError("Ganho insuficiente para split")

            self.column_index = column
            self.threshold = value
            self.impurity = gain

            # Divisão do dataset
            left_X, right_X, left_target, right_target = split_dataset(X, target, column, value)


            # Cresce filhos recursivamente
            self.left_child = Tree(self.criterion, self.n_classes,self.class_weights)
            self.left_child.loss = self.loss
            self.left_child._train(
                left_X, left_target, max_features, min_samples_split, max_depth - 1, minimum_gain, self.class_weights
            )

            self.right_child = Tree(self.criterion, self.n_classes, self.class_weights)
            self.right_child.loss = self.loss
            self.right_child._train(
                right_X, right_target, max_features, min_samples_split, max_depth - 1, minimum_gain, self.class_weights
            )

        except AssertionError:
            self._calculate_leaf_value(target)


    def train(self, X, target, max_features=None, min_samples_split=10, max_depth=None, minimum_gain=0.01, loss=None,class_weights=None):
        """Build a decision tree from training set.

        Parameters
        ----------

        X : array-like
            Feature dataset.
        target : dictionary or array-like
            Target values.
        max_features : int or None
            The number of features to consider when looking for the best split.
        min_samples_split : int
            The minimum number of samples required to split an internal node.
        max_depth : int
            Maximum depth of the tree.
        minimum_gain : float, default 0.01
            Minimum gain required for splitting.
        loss : function, default None
            Loss function for gradient boosting.
        """

        if not isinstance(target, dict):
            target = {"y": target}

        self.loss = loss

        self.n_classes = len(np.unique(target['y']))

        self._train(X, target, max_features=max_features, min_samples_split=min_samples_split,
                max_depth=max_depth, minimum_gain=minimum_gain, class_weights=class_weights)

    def _calculate_leaf_value(self, targets):
        """Find optimal value for leaf."""
        self.outcome = self.loss.approximate(targets["actual"], targets["y_pred"])

    def predict_row(self, row):
        """Predict single row."""
        if not self.is_terminal:
            if row.iloc[self.column_index] < self.threshold:
                return self.left_child.predict_row(row)
            else:
                return self.right_child.predict_row(row)
        return self.outcome

    def predict(self, X):
    # Garante que X seja sempre um DataFrame
        if not isinstance(X, pd.DataFrame):
            X = pd.DataFrame(X)

        result = np.zeros(X.shape[0])
        for i in range(X.shape[0]):
            result[i] = self.predict_row(X.iloc[i, :])
        return result

In [None]:
#Updating the Loss Classes for the new changes

class Loss:
    """Base class for loss functions."""

    def __init__(self, regularization=1.0):
        self.regularization = regularization

    def grad(self, actual, predicted): #gradiente da perda (1ª derivada)
        """First order gradient."""
        raise NotImplementedError()

    def hess(self, actual, predicted): # hessiana (2ª derivada)
        """Second order gradient."""
        raise NotImplementedError()

    def approximate(self, actual, predicted): # valor ótimo da folha baseado nas derivadas (como no XGBoost)
        """Approximate leaf value."""
        return self.grad(actual, predicted).sum() / (self.hess(actual, predicted).sum() + self.regularization)

    def transform(self, pred):
        """Transform predictions values."""
        return pred

    def gain(self, actual, predicted,class_weights=None): #mede quanto "ganho de informação" temos ao fazer um split
        """Calculate gain for split search."""
        nominator = self.grad(actual, predicted).sum() ** 2
        denominator = self.hess(actual, predicted).sum() + self.regularization
        return 0.5 * (nominator / denominator)


class LogisticLoss(Loss): #usada para classificação binária (com a função logística/sigmoid)
    """Logistic loss."""
    
    def __init__(self, class_weights=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights
        
    def _safe_index(self, actual):
        # Garante que os valores estejam em {0, 1}
        if np.any((actual != 0) & (actual != 1)):
            return ((actual + 1) // 2).astype(int)
        else:
            return actual.astype(int)

    def grad(self, actual, predicted):
        # Pesos de classe para cada instância
        class_weight = self.class_weights[self._safe_index(actual)]
        return class_weight * actual * expit(-actual * predicted)
    
    def hess(self, actual, predicted):
        expits = expit(predicted)
        class_weight = self.class_weights[self._safe_index(actual)]
        return class_weight * expits * (1 - expits)

    def transform(self, output):
        # Apply logistic (sigmoid) function to the output
        return expit(output)


class GradientBoosting(BaseEstimator):
    """Gradient boosting trees with Taylor's expansion approximation (as in xgboost)."""
    #classe central, implementa o algoritmo de boosting

    def __init__(self, n_estimators=100, learning_rate=0.1, max_features=10, max_depth=2, min_samples_split=10):
        self.min_samples_split = min_samples_split
        self.learning_rate = learning_rate #quão forte cada nova árvore influencia o modelo
        self.max_depth = max_depth
        self.max_features = max_features
        self.n_estimators = n_estimators #nr de árvores (iterações de boosting)
        self.trees = [] #lista de árvores treinadas
        self.loss = None #função de perda, definida depois(classificação)

    def fit(self, X, y=None): 
        self._setup_input(X, y) #vem da BaseEstimator, prepara os dados
        self.y_mean = np.mean(y) #inicializa a média dos valores de y
        self._train() #onde acontece o boosting


    def _train(self): #coração do boosting
        # Initialize model with zeros
        y_pred = np.zeros(self.n_samples, np.float32)
        
        for n in range(self.n_estimators):
            residuals = self.loss.grad(self.y, y_pred) #calcula o gradiente(resíduo)
            
            if n==0:
                if self.use_hellinger:
                    criterion = hellinger_gain #primeira árvore usa hellinger
                else:
                    self.y = (self.y * 2) - 1 #não queremos usar hellinger de todo
                    criterion = lambda y, l, r, loss: xgb_criterion(y, l, r, loss, class_weights=self.loss.class_weights) #as outras usam o critério baseado no loss

                
            else:
                criterion = lambda y, l, r, loss: xgb_criterion(y, l, r, loss, class_weights=self.loss.class_weights) #as outras usam o critério baseado no loss
                
            tree = Tree(criterion=criterion, class_weights=self.loss.class_weights)
            # Pass multiple target values to the tree learner
            targets = { #define os targets especiais
                # Residual values
                "y": residuals,
                # Actual target values
                "actual": self.y,
                # Predictions from previous step
                "y_pred": y_pred,
            }
            tree.train( #treina a arvore com os dados anteriores
                self.X,
                targets,
                max_features=self.max_features,
                min_samples_split=self.min_samples_split,
                max_depth=self.max_depth,
                loss=self.loss,
            )

            #prediz com a nova árvore e atualiza y_pred
            #Cada nova árvore corrige um pouco o erro das anteriores.
            predictions = tree.predict(self.X)
            y_pred += self.learning_rate * predictions
            self.trees.append(tree)
            
            # Na primeira árvore, usa y na forma {0,1}, a partir daí usa y {-1,1}
            if n == 0 and self.use_hellinger:
                self.y = (self.y * 2) - 1

    def _predict(self, X=None): 
        y_pred = np.zeros(X.shape[0], np.float32)

        for i, tree in enumerate(self.trees):
            #soma as previsões de todas as árvores
            y_pred += self.learning_rate * tree.predict(X)
        return y_pred

    def predict(self, X=None):
        #aplica transform, importante na classificação:
        #transforma a predição bruta com a função logística sigmoid
        #para obter probabilidade
        return self.loss.transform(self._predict(X))
        
class GradientBoostingClassifier(GradientBoosting):
    def __init__(self, *, use_weights=True, use_hellinger=True, **kwargs):
        super().__init__(**kwargs)
        self.use_weights = use_weights
        self.use_hellinger = use_hellinger
        
    def fit(self, X, y=None):
        y = y.astype(int)
        
        if self.use_weights:
            # calcular pesos para cada classe
            class_weights = self.calculate_class_weights(y)
        else:
            # peso uniforme(neutro) 
            class_weights = np.ones(2, dtype=float)
        self.loss = LogisticLoss(class_weights=class_weights)

        # Passar os pesos como argumento
        super(GradientBoostingClassifier, self).fit(X, y)
        
    def calculate_class_weights(self, y):
        """Calcula pesos inversamente proporcionais à frequência das classes."""
        class_counts = np.bincount(y)
        total_samples = len(y)
        # Pesos inversamente proporcionais à frequência das classes
        weights = total_samples / (len(class_counts) * class_counts)
        return weights


#### **Testing the modified algorithm**

- To do so, we used the same logic as above
- Just by changing the parameters "use_weights" and "use_splits" of the GradientBoostingClassifier function we could test the algorithm with just one of both modifications and save it in the respective csv

In [None]:
CLEANED_DIR = "cleaned_datasets"
RESULTS_FILE = "resultados_gbm_ignorar.csv"

def run_experiment_cleaned(file_path):
    dataset_name = os.path.basename(file_path)
    print(f"\nProcessando: {dataset_name}")

    try:
        df = pd.read_csv(file_path)
        X = df.iloc[:, :-1]
        y = df.iloc[:, -1]

        unique_classes = np.unique(y)
        if len(unique_classes) != 2:
            print(f"Dataset ignorado ({dataset_name}): Target não é binário")
            return dataset_name, np.nan, np.nan

        # Definir a classe minoritária como positiva
        class_counts = pd.Series(y).value_counts()
        minor_class = class_counts.idxmin()

        min_class_count = class_counts.min()
        n_splits = min(3, min_class_count)
        if n_splits < 2:
            print(f"Dataset ignorado ({dataset_name}): Classe minoritária muito pequena para validação cruzada")
            return dataset_name, np.nan, np.nan

        cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
        f1_scores = []
        auc_scores = []
        pr_auc_scores = []

        for fold, (train_idx, test_idx) in enumerate(cv.split(X, y)):
            X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

            model = GradientBoostingClassifier(n_estimators=50, max_depth=2, min_samples_split=5,use_weights=True, use_hellinger=True)
            model.fit(X_train, y_train)

            y_pred_proba = model.predict(X_test)
            y_pred = (y_pred_proba >= 0.5).astype(int)

            # F1 usando a classe minoritária como positiva
            try:
                f1 = f1_score(y_test, y_pred, pos_label=minor_class, average='binary')
            except Exception as e:
                print(f"[F1 ERROR] {e}")
                f1 = np.nan
            f1_scores.append(f1)

            try:
                auc = roc_auc_score((y_test == minor_class).astype(int), y_pred_proba)
            except Exception as e:
                print(f"[AUC ERROR] {e}")
                auc = np.nan
            auc_scores.append(auc)

            try:
                pr_auc = average_precision_score((y_test == minor_class).astype(int), y_pred_proba)
            except Exception as e:
                print(f"[PR AUC ERROR] {e}")
                pr_auc = np.nan
            pr_auc_scores.append(pr_auc)

        return dataset_name, np.nanmean(f1_scores), np.nanmean(auc_scores), np.nanmean(pr_auc_scores)

    except Exception as e:
        print(f"Erro ao processar {dataset_name}: {e}")
        traceback.print_exc()
        return dataset_name, np.nan, np.nan


def main():
    results = []

    for filename in os.listdir(CLEANED_DIR):
        if filename.endswith(".csv"):
            path = os.path.join(CLEANED_DIR, filename)
            result = run_experiment_cleaned(path)
            results.append(result)

    df_results = pd.DataFrame(results, columns=["Dataset", "F1 Score (minor class)", "ROC AUC", "PR AUC"])
    df_results.to_csv(RESULTS_FILE, index=False)
    print(f"\nExperimentos finalizados. Resultados salvos em '{RESULTS_FILE}'.")


if __name__ == "__main__":
    main()

#### **Comparing our modified algorithm with the one available in sklearn**

- To complement our work, we tested the sklearn algorithm on our 50 datasets and generated the results for the same metrics as above 

In [None]:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import StratifiedKFold


cleaned_dir = "cleaned_datasets"
sklearn_csv = "sklearn_comparison_results.csv"


def evaluate_sklearn(dataset_path):
    df = pd.read_csv(dataset_path)
    X = df.iloc[:, :-1]
    y = df.iloc[:, -1].astype(int)
    
    # Class imbalance handling
    class_counts = np.bincount(y)
    minor_class = np.argmin(class_counts)
    
    # Match your original stratified 3-fold logic
    n_splits = min(3, class_counts.min())
    if n_splits < 2:
        return os.path.basename(dataset_path), np.nan, np.nan, np.nan

    cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    model = GradientBoostingClassifier(
        n_estimators=50,
        max_depth=2,
        min_samples_split=5,
        learning_rate=0.1,
        subsample=0.8,
        random_state=111,
        verbose=0
    )
    f1_scores, roc_auc_scores, pr_auc_scores = [], [], []
    
    for train_idx, test_idx in cv.split(X, y):
        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
        
        model.fit(X_train, y_train)
        y_proba = model.predict_proba(X_test)[:, 1]
        y_pred = (y_proba >= 0.5).astype(int)
        
        f1 = f1_score(y_test, y_pred, pos_label=minor_class)
        roc_auc = roc_auc_score(y_test, y_proba)
        pr_auc = average_precision_score(y_test, y_proba)
        
        f1_scores.append(f1)
        roc_auc_scores.append(roc_auc)
        pr_auc_scores.append(pr_auc)
    
    return (
        os.path.basename(dataset_path),
        np.nanmean(f1_scores),
        np.nanmean(roc_auc_scores),
        np.nanmean(pr_auc_scores)
    )

def run_comparison(cleaned_dir, output_csv):
    results = []
    for filename in os.listdir(cleaned_dir):
        if filename.endswith(".csv"):
            path = os.path.join(cleaned_dir, filename)
            result = evaluate_sklearn(path)
            results.append(result)
    df = pd.DataFrame(results, columns=["Dataset", "F1 Score (minor class)", "ROC AUC", "PR AUC"])
    df.to_csv(output_csv, index=False)
    print(f"[SAVED] {output_csv}")


#### **Comparison between the different results**

- We compute not only the difference mean, median and Std Deviation between the results before and after the modifications but also the p-value for the paired t-test
- We also use the results from the sklearn algorithm to check wheter our is statistically better or not

In [None]:
#df_antigo = pd.read_csv("resultados_gbm_antes.csv")
#df_novo = pd.read_csv("resultados_gbm_splits.csv")
#df_novo = pd.read_csv("resultados_gbm_weights.csv")
df_novo = pd.read_csv("resultados_gbm_weights_splits.csv")
df_antigo = pd.read_csv("sklearn_comparison_results.csv")

#Make sure the datasets have the same order
df_antigo = df_antigo.sort_values("Dataset").reset_index(drop=True)
df_novo = df_novo.sort_values("Dataset").reset_index(drop=True)


assert all(df_antigo["Dataset"] == df_novo["Dataset"]), "Datasets não estão alinhados!"

#Calculate the differences
df_diff = df_novo[["F1 Score (minor class)", "ROC AUC", "PR AUC"]] - df_antigo[["F1 Score (minor class)", "ROC AUC", "PR AUC"]]

df_diff["Dataset"] = df_novo["Dataset"]


print("\n=== Difference Mean ===")
print(df_diff[["F1 Score (minor class)", "ROC AUC", "PR AUC"]].mean())

print("\n=== Difference Median ===")
print(df_diff[["F1 Score (minor class)", "ROC AUC", "PR AUC"]].median())

print("\n=== Difference Std Deviation ===")
print(df_diff[["F1 Score (minor class)", "ROC AUC", "PR AUC"]].std())

print("\n=== Nº of instances that the new algorithm as better results ===")
print((df_diff[["F1 Score (minor class)", "ROC AUC", "PR AUC"]] > 0).sum())

print("\n=== Nº of cases that the new  algorithm has worst results ===")
print((df_diff[["F1 Score (minor class)", "ROC AUC", "PR AUC"]] < 0).sum())

#Statistical testing
print("\n=== Paired t-test (p-value) ===")
for col in ["F1 Score (minor class)", "ROC AUC", "PR AUC"]:
    t_stat, p_value = ttest_rel(df_novo[col], df_antigo[col])
    print(f"{col}: p = {p_value:.4f}")

#Plot the results
df_diff[["F1 Score (minor class)", "ROC AUC", "PR AUC"]].plot(kind="box", title="Difference Boxplot")
plt.ylabel("Difference (new - old)")
plt.grid(True)
plt.tight_layout()
plt.show()

# Scatter plot comparativo 
plt.figure(figsize=(12, 4))
for i, col in enumerate(["F1 Score (minor class)", "ROC AUC", "PR AUC"]):
    plt.subplot(1, 3, i+1)
    plt.scatter(df_antigo[col], df_novo[col], alpha=0.7)
    plt.plot([0, 1], [0, 1], "--", color="gray")
    plt.xlabel("Antigo")
    plt.ylabel("Novo")
    plt.title(col)
    plt.grid(True)
plt.tight_layout()
plt.show()