# 0. Initial setup|

In [None]:
# Display and Copy
from IPython.display import display
import copy
import json

# Plotting
import matplotlib.pyplot as plt
import seaborn as sns

# Numerical and Scientific Computing
import numpy as np
import pandas as pd
from scipy.optimize import minimize_scalar
from scipy.spatial.distance import euclidean, cityblock
from scipy.stats import mannwhitneyu
from scipy.special import expit

# Machine Learning
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import average_precision_score, make_scorer, precision_score, precision_recall_curve, matthews_corrcoef
from sklearn.model_selection import GridSearchCV, train_test_split, StratifiedKFold, KFold
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.semi_supervised import SelfTrainingClassifier, LabelSpreading, LabelPropagation

# Genetic Algorithms
from sklearn_genetic import GASearchCV
from sklearn_genetic.callbacks import ConsecutiveStopping, ProgressBar
from sklearn_genetic.space import Categorical as GeneticCategorical, Integer

# Automated Machine Learning
from tpot import TPOTClassifier

# OS Operations
import os

# 1. Functions and parameters

## 1.1. Initial parametrization

In [None]:
# Global Parameters
n_samples = 10000 # size of the full population.
n_features = 18 # number of unimodal features.
n_multimodal = 2 # number of multimodal features.
optim_hp = True # defines if the optimal parameters will be used. If false, the default HP will be applied.

## 1.2 Optimization methods

### 1.2.1 Scorer: Mean Reciprocal Rank

In [None]:
def mean_reciprocal_rank(y_true, y_scores, decay_factor=0.5, top_k=100):
    """
    Calculate Mean Reciprocal Rank (MRR) with linear decay of ranks limited to top_k.
    
    Args:
    - y_true (array-like): True binary labels.
    - y_scores (array-like): Predicted scores or probabilities.
    - decay_factor (float): Linear decay factor for reciprocal ranks. Default is 0.5.
    - top_k (int): Top K positions to consider for MRR calculation. Default is 100.
    
    Returns:
    - mrr (float): Mean Reciprocal Rank score limited to top_k.
    """
    df = pd.DataFrame({'y_true': y_true, 'y_scores': y_scores})
    df = df.sort_values(by='y_scores', ascending=False).reset_index(drop=True)
    
    # Limit to top_k positions
    df = df.head(top_k)
    
    ranks = (df['y_true'] == 1).cumsum()  # Cumulative sum of relevant items within top_k
    rr = (df['y_true'] == 1) / (1 + decay_factor * ranks)  # Reciprocal ranks with linear decay within top_k
    mrr = rr.sum() / len(df)  # Mean of the decayed reciprocal ranks within top_k
    
    return mrr

# Define the custom scorer
mrr_scorer = make_scorer(mean_reciprocal_rank, greater_is_better=True, response_method="predict_proba")

### 1.2.2 Genetic algorithm for hyperparameters optimization (Random Forest)

In [None]:
def initialize_models(random_state, optim_hp=optim_hp):
    """
    Initialize base model and genetic algorithm model with cross-validation.

    Args:
    - random_state (int): Random state.
    - optim_hp (bool): Flag to toggle hyperparameter optimization.

    Returns:
    - base_model (Pipeline): Base model pipeline.
    - model (GASearchCV or Pipeline): Model with or without hyperparameter optimization.
    - cv_stratified (StratifiedKFold): Stratified K-Fold cross-validation strategy.
    """
    
    # Define the base model using a Random Forest within a pipeline
    base_model = make_pipeline(
        RandomForestClassifier(
            bootstrap=True, 
            class_weight='balanced_subsample', 
            random_state=random_state, 
            n_jobs=-1
        )
    )

    # Define the Stratified K-Fold cross-validator
    cv_stratified = StratifiedKFold(n_splits=5, shuffle=True, random_state=random_state)

    # Parameters grid for the random forest
    param_grid = {
        'randomforestclassifier__n_estimators': Integer(10, 200, random_state=random_state),
        'randomforestclassifier__max_features': GeneticCategorical(np.arange(0.05, 0.625, 0.025).tolist()),
        'randomforestclassifier__max_depth': Integer(2, 20, random_state=random_state),
        'randomforestclassifier__min_samples_split': Integer(2, 10, random_state=random_state),
        'randomforestclassifier__min_samples_leaf': Integer(1, 10, random_state=random_state)
    }

    # Initialize GASearchCV for hyperparameter optimization
    model = GASearchCV(
        estimator=base_model,
        param_grid=param_grid,
        scoring=mrr_scorer,  # Custom scorer defined earlier
        population_size=50,
        generations=40,
        mutation_probability=0.1,
        n_jobs=-1,
        verbose=True,
        cv=cv_stratified
    )

    # Optionally disable hyperparameter optimization
    if not optim_hp:
        model = base_model

    return base_model, model, cv_stratified

# Define stopping rules and visualisation options
callback_pr = ProgressBar() # shows a progress bar
callback_st = ConsecutiveStopping(generations=1, metric='fitness') # stops if fitness stays the same
callback = [callback_pr, callback_st]



### 1.2.3 Genetic algorithm and grid search for semi-supervised methods

Necessary to work with sample with unlabelled class (-1)

In [None]:
def optim_par(X, y, random_state, cv_stratified, method, base_model=''):
    """
    Optimizes parameters for different semi-supervised learning methods.

    Args:
    - X: Feature matrix.
    - y: Target vector.
    - random_state (int): Random state for reproducibility.
    - cv_stratified (StratifiedKFold): Cross-validation strategy.
    - method (str): Method for optimization ('label_spreading', 'self-learning', 'lab_propag').
    - base_model (str): Base model for self-learning. Default is ''.

    Returns:
    - best_parameters (dict): Dictionary with best parameters found.
    """

    # Label Spreading Method
    if method == 'label_spreading':
        # Create the TPOTClassifier with custom configuration
        tpot = TPOTClassifier(
            generations=20,  # Number of generations to run the algorithm
            population_size=20,  # Size of the population
            verbosity=2,  # Verbosity level
            config_dict={
                'sklearn.semi_supervised.LabelSpreading': {
                    'gamma': [0.1, 0.5],
                    'alpha': [0.1, 0.2, 0.5]
                }
            },
            scoring=mrr_scorer,  # Custom scorer
            early_stop=1, # stops if no best fitness if found in 2 consecutive generations
            cv=cv_stratified,  # Number of cross-validation folds
            random_state=random_state
        )
        
        # Fit the TPOT model on the data
        tpot.fit(X, y)
        
        # Extract the best parameters
        best_parameters = tpot.fitted_pipeline_.steps[-1][1].get_params()

    # Self-Learning Method
    elif method == 'self-learning':
        # Define the self-training classifier
        self_training = SelfTrainingClassifier(base_estimator=base_model)
        
        # Define the parameter grid
        param_grid = [
            {'criterion': ['threshold'], 'threshold': list(np.arange(0.1, 1.0, 0.1))},
            {'criterion': ['k_best'], 'k_best': [3, 5, 10]}
        ]

        # Create the GridSearchCV object
        grid_search = GridSearchCV(
            estimator=self_training,
            param_grid=param_grid,
            scoring=mrr_scorer,  # Custom scorer
            cv=cv_stratified,  # Cross-validation strategy
            verbose=2, 
            n_jobs=-1
        )

        # Fit the GridSearchCV object to the data
        grid_search.fit(X, y)
        
        # Extract the best parameters
        best_parameters = grid_search.best_params_

    # Label Propagation Method
    elif method == 'lab_propag':
        # Define the label propagation classifier
        lab_propag_model = LabelPropagation(kernel='rbf', max_iter=50, n_jobs=-1, tol=0.0001)
        
        # Define the parameter grid
        param_grid = {
            'gamma': [0.1, 0.2, 0.5, 1]
        }

        # Create the GridSearchCV object
        grid_search = GridSearchCV(
            estimator=lab_propag_model,
            param_grid=param_grid,
            scoring=mrr_scorer,  # Custom scorer
            cv=cv_stratified,  # Cross-validation strategy
            verbose=2, 
            n_jobs=-1
        )

        # Fit the GridSearchCV object to the data
        grid_search.fit(X, y)
        
        # Extract the best parameters
        best_parameters = grid_search.best_params_

    # Print the best parameters found
    print("Best parameters found: ", best_parameters)

    return best_parameters

## 1.3. Performance measures

### 1.3.1. Precision on the top 100

In [None]:
def top_k_precision_scorer(y_true, y_proba, k=100):
    """
    Calculate the precision on the top K instances with the highest predicted probability for class 1.

    In:
    - y_proba: The predicted probabilities for class 1.
    - k: The number of top instances to consider (default is 100).

    Out:
    - precision: The precision score on the top K instances.
    """

    # Get the indices of the top K instances with the highest probability for class 1
    top_k_indices = np.argsort(y_proba)[-k:]

    # Get the true labels and predicted labels for the top K instances
    y_true_top_k = y_true[top_k_indices]

    # Calculate precision score on the top K instances
    precision = (sum(y_true_top_k == 1)) / k

    return precision

### 1.3.2. Bias in the sample

Evaluates how distant the centroids are between the sample and the population, for class 1.

In [None]:
def measure_bias_in_sample(X_test, X_pred, y_test, y_pred, verbose=False):
    """
    Measure the bias in predictions by calculating the distance between centroids of 
    true class 1 and predicted class 1 instances using Euclidean and Manhattan distances.

    Parameters:
    - X_test: DataFrame containing true instances (features).
    - X_pred: DataFrame containing predicted instances (features).
    - y_test: Series or array containing the true labels (0s and 1s).
    - y_pred: Series or array containing the predicted labels (0s and 1s).
    - verbose: Boolean flag for verbose output (default is False).

    Returns:
    - manhattan_distance: The Manhattan distance between centroids.
    - euclidean_distance: The Euclidean distance between centroids.
    """

    # Ensure y_test and y_pred are pandas Series
    y_test = pd.Series(y_test)
    y_pred = pd.Series(y_pred)

    # Filter X_test and X_pred where y_test is 1 and where y_pred is 1
    X_true = X_test[y_test == 1]
    X_pred_true = X_pred[y_pred == 1]

    # Initialize scaler
    scaler = StandardScaler()
    
    # Fit scaler on X_true
    scaler.fit(X_true)

    # Scale X_true and X_pred_true
    X_true_scaled = pd.DataFrame(scaler.transform(X_true))
    X_pred_true_scaled = pd.DataFrame(scaler.transform(X_pred_true))

    # Calculate centroids for all true class 1 instances
    centroid_true_all = X_true_scaled.mean().values

    # Calculate centroids for the predicted class 1 instances
    centroid_pred_true = X_pred_true_scaled.mean().values

    # Calculate Euclidean distance between the centroids
    euclidean_distance = euclidean(centroid_true_all, centroid_pred_true)

    # Calculate Manhattan distance between the centroids
    manhattan_distance = cityblock(centroid_true_all, centroid_pred_true)

    if verbose:
        print(f"Euclidean distance between centroids: {euclidean_distance}")
        print(f"Manhattan distance between centroids: {manhattan_distance}")

    return manhattan_distance, euclidean_distance

### 1.3.3. Prediction bias

Calculates how distant the centroids are between the top-100 predicted class 1 and the real class 1 cases in the population.

In [None]:
def measure_bias(X, y_test, y_pred_prob, verbose=False):
    """
    Measure the bias in predictions by calculating the distance between centroids of 
    true class 1 and top-100 predicted class 1 instances using Euclidean and Manhattan distances.

    Parameters:
    - X: DataFrame containing the full list of features.
    - y_test: Series or array containing the true labels (0s and 1s).
    - y_pred_prob: Series or array containing the predicted probabilities for class 1.
    - verbose: Boolean flag for verbose output (default is False).

    Returns:
    - manhattan_distance: The Manhattan distance between centroids.
    - euclidean_distance: The Euclidean distance between centroids.
    """

    # Ensure y_test and y_pred_prob are pandas Series
    y_test = pd.Series(y_test)
    y_pred_prob = pd.Series(y_pred_prob)

    # Identify the indices of the 100 highest predicted probabilities
    top_100_indices = np.argsort(y_pred_prob)[-100:]
    
    # Create y_pred with the top 100 indices set to 1
    y_pred = pd.Series(np.zeros_like(y_pred_prob))
    y_pred.iloc[top_100_indices] = 1
    
    # Convert X to DataFrame if it's a NumPy array
    if isinstance(X, np.ndarray):
        feature_names = [f'feature_{i}' for i in range(X.shape[1])]
    X = pd.DataFrame(X, columns=feature_names)

    # Scale the features
    scaler = StandardScaler()
    X_scaled = pd.DataFrame(scaler.fit_transform(X), columns=X.columns)

    # Filter scaled X where y_test is 1 and where y_pred is 1
    X_pred_top_100 = X_scaled[y_pred == 1]
    X_true_all = X_scaled[y_test == 1]

    # Handle NaN values after filtering
    X_pred_top_100 = X_pred_top_100.dropna()
    X_true_all = X_true_all.dropna()

    # Calculate centroids for all true class 1 instances
    centroid_true_all = X_true_all.mean().values

    # Calculate centroids for the top 100 predicted class 1 instances
    centroid_pred_top_100 = X_pred_top_100.mean().values

    # Calculate Euclidean distance between the centroids
    euclidean_distance = euclidean(centroid_true_all, centroid_pred_top_100)

    # Calculate Manhattan distance between the centroids
    manhattan_distance = cityblock(centroid_true_all, centroid_pred_top_100)

    if verbose:
        print(f"Euclidean distance between centroids: {euclidean_distance}")
        print(f"Manhattan distance between centroids: {manhattan_distance}")

    return manhattan_distance, euclidean_distance

### 1.3.4. Generate performance measures

In [None]:
def test_performance(X_train, y_train, X_test, y_test, model_i, std_model, verbose=False):
    """
    Returns several performance measures of a model trained in X_train and tested in X_test.

    Parameters:
    - X_train: DataFrame or array-like, training features.
    - y_train: Series or array-like, training labels.
    - X_test: DataFrame or array-like, test features.
    - y_test: Series or array-like, test labels.
    - model_i: Initialized machine learning model.
    - std_model: Standardized model for hyperparameter optimization.
    - verbose: Boolean flag for verbose output (default is False).

    Returns:
    - result: Dictionary containing performance metrics.
    - best_hp: Dictionary of best hyperparameters (if hyperparameter optimization is performed).
    """

    best_hp = []

    # Hyperparameter optimization
    if optim_hp and (type(model_i) == type(std_model)):
        model_i.fit(X_train, y_train, callbacks=callback)
        best_hp = model_i.best_params_
        best_hp = {key.replace('randomforestclassifier__', ''): value for key, value in best_hp.items()}
        if verbose:
            print("Best Hyperparameters for the Random Forest:", best_hp)
    else:
        model_i.fit(X_train, y_train)

    # Predict probabilities and labels on the test set
    y_pred_prob = model_i.predict_proba(X_test)[:, 1]
    y_pred = model_i.predict(X_test)
    
    # Calculate performance metrics
    prec_score = precision_score(y_test, y_pred, zero_division=0)
    auprc_score = average_precision_score(y_test, y_pred_prob)
    top_50 = top_k_precision_scorer(y_test, y_pred_prob, k=50)
    top_100 = top_k_precision_scorer(y_test, y_pred_prob, k=100)
    
    # Calculate bias on the feature used for selection bias
    bias = measure_bias(X_test, y_test, y_pred_prob)[1]

    if verbose:
        print("Precision score:", prec_score)
        print("AUPRC score:", auprc_score)
        print("Precision on top 50:", top_50)
        print("Precision on top 100:", top_100)
        print("Prediction bias:", bias)

    # Construct result dictionary
    result = {
        'pr': prec_score,
        'AUPRC': auprc_score,
        'p.50': top_50,
        'p.100': top_100,
        'Bias effect': bias
    }

    return result, best_hp

# 2. Simulations

## 2.1. Population

In [None]:
def create_pop(random_state, size=n_samples, feat=n_features, multimodal = n_multimodal, ratio=0.5, verbose = True):
    """
    Creates simulated populations.

    In:
    - Size, number of features, ratio of one of the classes. 
    
    Out: 
    - A current population, used to extract (biased) samples;
    - A future and similar population, used as test set; the feature with the highest correlation with the target.
    """

    # Set random seed for reproducibility
    np.random.seed(random_state)

    # Define simulation parameters
    se = 2  # Standard deviation of error term
    samples = size * 2

    # Generate predictor variables (features) X
    # Generate random means and standard deviations for features
    means = np.random.rand(feat)  # Random means between 0 and 1 (inclusive)
    #means = np.zeros(feat)  # To make the data less complex
    stds = np.random.rand(feat) * 0.3 + 0.8  # Standard deviations between 0.8 and 1.1
    #stds = np.ones(feat) # To make the data less complex
    # All features follow a normal distribution
    X = np.random.normal(loc=means, scale=stds, size=(samples, feat))

    for _ in range(multimodal):
        # Generates a random number 'modes' from 2 to 4 for the number of modes
        modes = np.random.randint(2, 4)
        # Creates random means
        mode_means = np.random.rand(modes)
        
        # Samples from all normal distributions to get a single multimodal feature
        mode_samples = np.hstack([np.random.normal(loc=mean, scale=1, size=(samples, 1)) for mean in mode_means])
        multimodal_feature = np.random.choice(mode_samples.flatten(), size=samples).reshape(samples, 1)
        
        X = np.hstack((X, multimodal_feature))

    # Define random coefficients (excluding intercept)
    coefficients = np.random.rand(feat + multimodal)

    # Define error term
    error_t = np.random.normal(0, se, samples)

    # Function to find the optimal intercept
    def objective(intercept):
        eta = np.dot(X, coefficients) + intercept + error_t
        probabilities = expit(eta)
        y = (probabilities > 0.5).astype(int)
        actual_ratio = y.mean()
        return (actual_ratio - ratio) ** 2

    # Find the intercept that minimizes the difference from the desired ratio
    result = minimize_scalar(objective)
    intercept = result.x

    # Generate linear predictor (eta) with the optimal intercept
    eta = np.dot(X, coefficients) + intercept + error_t

    # Apply link function (logit) to obtain probabilities
    probabilities = expit(eta)

    # Generate class labels directly from the probabilities
    y = (probabilities > 0.5).astype(int)

    # Divides the data into two groups: current population and future population
    # The `train_size` parameter specifies the size of the current population
    X_cur_pop, X_fut_pop, y_cur_pop, y_fut_pop = train_test_split(
        X, y,
        train_size=size,  # Size of current population
        stratify=y,  # Ensure stratified sampling to preserve class ratios
        random_state=random_state
    )

    if verbose:
        # Ensure the class imbalance is as desired
        actual_ratio = y.mean()
        print("Desired class ratio:", ratio)
        print("Actual class ratio:", actual_ratio)
        # Print statements to check internal states
        #print("Max correlation:", max_abs_corr_value)

    # Return results
    return (X_cur_pop, X_fut_pop, y_cur_pop, y_fut_pop)

## 2.2. Plots and summaries

In [None]:
def plot_population(X, y):
    """
    Plots the population, differentiating by colour according to the class.
    Dimension reduction (PCA) is applied the allow the presentation in a two-dimentional plot.
    Two principal components are sufficient because there are only two informative features in the similated population.

    In:
    - X, a set of features.
    - y, the correspondent class sabels.
    
    Out:
    - A plot of the population, printed.
    """

    # Instantiate a PCA object and fit it on X_cur_pop to get 2 principal components (2 informative features)
    y_copy = np.copy(y)
    X_copy = np.copy(X)
    
    if X.shape[1] > 2:
        pca = PCA(n_components=2)
        X_cur_pop_pca = pca.fit_transform(X_copy)

        # Order X_cur_pop_pca and y_copy by y_copy
        sorted_indices = np.argsort(y_copy)
        X_cur_pop_pca = X_cur_pop_pca[sorted_indices]
        y_copy = y_copy[sorted_indices]

        # Create a scatter plot of the 2 principal components
        # The first principal component (PC1) is in the first column of X_cur_pop_pca
        # The second principal component (PC2) is in the second column of X_cur_pop_pca

        # Use seaborn for a nicer plot
        plt.figure(figsize=(8, 6))
        sns.scatterplot(
            x=X_cur_pop_pca[:, 0],  # PC1
            y=X_cur_pop_pca[:, 1],  # PC2
            hue=y_copy,  # Color points based on y
            palette={-1.: 'grey', 0.: 'blue', 1.: 'red'}, # -1 means unlabeled.
            s=1
        )
        # Add plot labels
        plt.xlabel('PC1')
        plt.ylabel('PC2')
        plt.title('PCA of X colored by target feature')
    else:
        # Order X_cur_pop_pca and y_copy by y_copy
        sorted_indices = np.argsort(y_copy)
        X_sorted = X_copy[sorted_indices]
        y_sorted = y_copy[sorted_indices]

        # Use seaborn for a nicer plot
        plt.figure(figsize=(8, 6))
        sns.scatterplot(
            x=X_sorted[:, 0],  # Feature 1
            y=X_sorted[:, 1],  # Feature 2
            hue=y_sorted,  # Color points based on y
            palette={-1.: 'grey', 0.: 'blue', 1.: 'red'}, # -1 means unlabeled.
            s=1
        )
        # Add plot labels
        plt.xlabel('Feature 1')
        plt.ylabel('Feature 2')

    # Display the plot
    plt.show()

In [None]:
def describe_pop(X, y, full_X, plot_pop = False):
    """
    Prints a summary of a population or sample.

    In:
    - X (ndarray or DataFrame): Features.
    - y (ndarray or Series): Target feature.
    - full_X (ndarray or DataFrame): The complete population for context.

    Out:
    - Plots the population in two dimensions
    - Prints summary information about the population or sample.
    """
    # NUMERIC SUMMARY:

    # Calculate the number of features and observations
    num_features = X.shape[1]
    num_observations = X.shape[0]
    
    # Calculate the percentage of the population represented by the sample
    full_population_size = full_X.shape[0]
    population_percentage = (num_observations / full_population_size) * 100
    
    # Calculate the number and percentage of infractors and compliers
    num_infractors = sum(y == 1)
    num_compliers = sum(y == 0)
    infractor_percentage = (num_infractors / num_observations) * 100
    complier_percentage = (num_compliers / num_observations) * 100
    
    # Print summary information
    print(f"Number of features: {num_features}")
    print(f"Number of labeled observations: {num_observations} ({population_percentage:.2f}% of {full_population_size})")
    print(f"Violators: {num_infractors} ({infractor_percentage:.2f}%)")
    print(f"Compliers: {num_compliers} ({complier_percentage:.2f}%)")

    # PLOTS:
    if plot_pop:
        plot_population(X, y)

## 2.3. Sample

Simulates the labelling of a sample of cases, with or without labelling bias.

In [None]:
def label_cases(X, # full population of features
                y, # full population of correspondent targets
                random_state,
                sample_size=0.05, # percentage of the population in the sample
                sample_bias=0.5, # percentage of cases of class 1
                prop_random=0.2,# proportion of the sample composed with random sampling
                verbose=False,
                ):
    
    """
    This function labels a sample of the data based on specified biases and random sampling.

    Parameters:
    X : array-like, shape (n_samples, n_features)
        The input samples.
    y : array-like, shape (n_samples,)
        The target values.
    random_state : int
        The random seed for reproducibility.
    sample_size : float, optional (default=0.05)
        The percentage of the population in the sample.
    sample_bias : float, optional (default=0.5)
        The percentage of cases of class 1 in the sample.
    prop_random : float, optional (default=0.2)
        The proportion of the sample composed of random sampling.
    verbose : bool, optional (default=False)
        Whether to print detailed information.

    Returns:
    X_lab : array-like, shape (n_samples_lab, n_features)
        The labeled sample.
    X_unl : array-like, shape (n_samples_unl, n_features)
        The unlabeled sample.
    y_lab : array-like, shape (n_samples_lab,)
        The target values for the labeled sample.
    y_unl : array-like, shape (n_samples_unl,)
        The target values for the unlabeled sample.
    """
    
    np.random.seed(random_state)
    
    if sample_bias == 0:
        # No bias, use stratified sampling for labeled and unlabeled split
        X_lab, X_unl, y_lab, y_unl = train_test_split(
            X, y,
            train_size=sample_size,
            random_state=random_state,
            stratify=y  # Ensure stratified sampling to preserve class ratios
        )
        return X_lab, X_unl, y_lab, y_unl

    else:
        # With bias, create labeled and unlabeled samples based on selector feature
        full_pop = pd.DataFrame(X.copy()).reset_index(drop=True)
        full_pop['target'] = y
        full_pop['label'] = -1

        n = int(len(X) * sample_size)  # Calculate the number of cases in the sample
        n_1 = int(n * sample_bias)    # Calculate the desired number of class 1 cases
        n_0 = n - n_1                # Calculate the desired number of class 0 cases
        if verbose:
            print(f"Desired samples per class:")
            print(f"     Class 1: {n_1}")
            print(f"     Class 0: {n_0}")

        if prop_random > 0:
            # Select a fully random sample from the population and assigns labels
            random_sample_indices = full_pop.sample(n=int(prop_random * n), random_state=random_state).index
            full_pop.loc[random_sample_indices, 'label'] = full_pop.loc[random_sample_indices, 'target']
            # Compute how many class 1 and class 0 cases we still need to label
            n_1 = max(n_1 - sum(full_pop['label'] == 1), 0)
            n_0 = max(n_0 - sum(full_pop['label'] == 0), 0)
            if verbose:
                total_random_cases = len(random_sample_indices)
                percent_random_cases = (total_random_cases / n) * 100
                print(f"Random cases included: {total_random_cases} ({percent_random_cases:.2f}%)")
                print(f"     Class 1: {sum(full_pop.loc[random_sample_indices, 'target'] == 1)}")
                print(f"     Class 0: {sum(full_pop.loc[random_sample_indices, 'target'] == 0)}")
                print(f"Cases still to add: {n_1 + n_0}")
                print(f"     Class 1: {n_1}")
                print(f"     Class 0: {n_0}")

        # Create 'selector', a feature that explains part of the selection bias
        # Calculate the correlation of each feature with the target
        correlations = full_pop.corr()['target'].drop('target')
        # Identify the two features with the highest absolute correlation with the target
        top_features = correlations.abs().nlargest(2).index
        # Center the top features by subtracting their mean
        centered_features = full_pop[top_features].apply(lambda x: x - x.mean())
        # Multiply each centered feature by its correlation with the target
        adjusted_features = centered_features * correlations[top_features].values
        # Compute the average of these two adjusted features to create the selector
        full_pop['selector'] = adjusted_features.mean(axis=1)
        full_pop = full_pop.sort_values(by='selector', ascending=False).reset_index(drop=True)

        # Completes the sample with necessary 1s
        potential_1_indices = full_pop[(full_pop['target'] == 1) & (full_pop['label'] == -1)].head(int(1.5 * n_1)).index
        selected_1_indices = np.random.choice(potential_1_indices, size=n_1, replace=False)
        full_pop.loc[selected_1_indices, 'label'] = 1
        # Completes the sample with necessary 0s
        potential_0_indices = full_pop[(full_pop['target'] == 0) & (full_pop['label'] == -1)].head(int(1.5 * n_0)).index
        selected_0_indices = np.random.choice(potential_0_indices, size=n_0, replace=False)
        full_pop.loc[selected_0_indices, 'label'] = 0

        # Extract the labeled instances
        labeled = full_pop[full_pop['label'] != -1].sample(frac=1, random_state=random_state).reset_index(drop=True)
        X_lab = labeled.drop(columns=['target', 'selector', 'label'])
        y_lab = labeled['target']

        # Extract the unlabeled instances
        unlabeled = full_pop[full_pop['label'] == -1].sample(frac=1, random_state=random_state).reset_index(drop=True)
        X_unl = unlabeled.drop(columns=['target', 'selector', 'label'])
        y_unl = unlabeled['target']

    if verbose:
        print("LABELED SAMPLE CREATED:")
        print("Labelling bias:", sample_bias)
        describe_pop(X_lab, y_lab, X)
        # PLOTS:
        full_X = np.vstack((X_lab, X_unl))
        full_y = np.concatenate([y_lab, np.full((X_unl.shape[0],), -1)])
        plot_population(full_X, full_y)

    return X_lab, X_unl, y_lab, y_unl

# 3. Methods

## 3.1. Veenman Method

In [None]:
def init_Veenman(X_lab, X_unl, y_lab, y_unl, seed=42):
    """
    Creates a unified DataFrame with labeled and unlabeled cases, adding columns to identify if it's labeled 
    and the pseudoclass based on the Veenman method.

    Parameters:
    - X_lab: numpy array of labeled features
    - X_unl: numpy array of unlabeled features
    - y_lab: numpy array of labeled targets
    - y_unl: numpy array of unlabeled targets
    - seed: random seed for reproducibility (default=42)

    Returns:
    - A single DataFrame with all cases, including 'label', 'labeled', and 'pseudo' columns.
    """

    np.random.seed(seed)

    # Create DataFrames from the numpy arrays
    df_unlabeled = pd.DataFrame(X_unl)
    df_unlabeled["label"] = y_unl
    df_labeled = pd.DataFrame(X_lab)
    df_labeled["label"] = y_lab

    # Add a 'labeled' column: 0 for unlabeled data, 1 for labeled data
    df_unlabeled['labeled'] = 0
    df_labeled['labeled'] = 1

    # Concatenate all data into one df
    df_combined = pd.concat([df_unlabeled, df_labeled], ignore_index=True)

    # Initialize the "pseudo" column with default value 999
    df_combined['pseudo'] = 999
    # Update "pseudo" column based on the conditions:
    df_combined.loc[(df_combined['labeled'] == 0), 'pseudo'] = 0
    df_combined.loc[(df_combined['labeled'] == 1) & (df_combined['label'] == 0), 'pseudo'] = -1
    df_combined.loc[(df_combined['labeled'] == 1) & (df_combined['label'] == 1), 'pseudo'] = 1
    
    # Shuffle rows
    df_combined = df_combined.sample(frac=1).reset_index(drop=True)

    return df_combined

In [None]:
def Veenman(X_lab, X_unl, y_lab, y_unl, verbose=False):
    """
    Creates a train set composed of all positive labeled instances, plus all unlabeled instances, which are considered as 0.

    Parameters:
    - X_lab: numpy array of labeled features
    - X_unl: numpy array of unlabeled features
    - y_lab: numpy array of labeled targets
    - y_unl: numpy array of unlabeled targets
    - verbose: bool, if True prints information and plots (default=False)

    Returns:
    - X_V_lab: DataFrame of pseudo-labeled features
    - y_V_lab: Series of pseudo-labeled targets
    """

    # Initialize the combined DataFrame with pseudo labels
    df_combined = init_Veenman(X_lab, X_unl, y_lab, y_unl)

    # Separate the labeled and unlabeled instances based on the pseudo label
    X_V_lab = df_combined.loc[df_combined['pseudo'] >= 0, df_combined.columns != 'pseudo'].drop(columns=['labeled', 'label']).reset_index(drop=True)
    y_V_lab = df_combined.loc[df_combined['pseudo'] >= 0, 'pseudo'].reset_index(drop=True)
    X_V_unl = df_combined.loc[df_combined['pseudo'] == -1, df_combined.columns != 'pseudo'].drop(columns=['labeled', 'label']).reset_index(drop=True)
    y_V_unl = df_combined.loc[df_combined['pseudo'] == -1, 'pseudo'].reset_index(drop=True)

    # Reset indices
    X_V_lab.reset_index(drop=True, inplace=True)
    y_V_lab.reset_index(drop=True, inplace=True)
    X_V_unl.reset_index(drop=True, inplace=True)
    y_V_unl.reset_index(drop=True, inplace=True)

    if verbose:
        full_X = np.vstack((X_V_lab, X_V_unl))
        full_y = np.concatenate([y_V_lab, y_V_unl])
        print("VEENMAN SAMPLE CREATED:")
        describe_pop(X_V_lab, y_V_lab, full_X)  # Ensure describe_pop function is defined
        # PLOTS:
        plot_population(full_X, full_y)  # Ensure plot_population function is defined

    return X_V_lab, y_V_lab

## 3.2. Semi-supervised method

First, a "Veenman" sample is selected. Then, the unlabeled sample is divided into folds. For each fold, a model is trained on the Veenman sample minus the instances of the fold. Predicted class 1 for the fold are assigned as pseudo-labels.

The final sample comprises the original "Veenman" sample, plus the predicted pseudo-labels 1 for unlabeled instances.

In [None]:
def assign_folder(population, folds, random_state):
    """
    Assigns folder numbers for each unlabeled instance using K-Fold cross-validation.

    Args:
    - population (pd.DataFrame): The full population/sample.
    - folds (int): The number of folds.

    Returns:
    - pd.DataFrame: The population/sample with a column "fold" with a fold number per instance.
    """
    # Initialize the "fold" column with -1
    population['fold'] = -1

    mask_unlabeled = population['labeled'] == 0
    unlabeled_data = population[mask_unlabeled]

    # Create a KFold object
    kf = KFold(n_splits=folds, shuffle=True, random_state=random_state)

    # Assign folds to unlabeled data using KFold
    for fold, (_, test_index) in enumerate(kf.split(unlabeled_data)):
        unlabeled_data.iloc[test_index, unlabeled_data.columns.get_loc('fold')] = fold

    # Update the population with the fold assignments for the unlabeled data
    population.loc[mask_unlabeled, 'fold'] = unlabeled_data['fold']

    return population

In [None]:
def semi_supervised(X_lab, X_unl, y_lab, y_unl, model_i, base_model,
                    random_state,
                    folds = 5, threshold = 0.5,
                    population_proportion = 9999,
                    sample_bias = 99999,
                    verbose = False):
    """
    Processes one round of semi-supervised learning to generate pseudo-classes.

    Parameters:
    - X_lab: numpy array of labeled features
    - X_unl: numpy array of unlabeled features
    - y_lab: numpy array of labeled targets
    - y_unl: numpy array of unlabeled targets
    - model_i: model instance for semi-supervised learning
    - base_model: base model instance for copying and setting hyperparameters
    - random_state: int, seed for random number generation
    - folds: int, number of folds for cross-validation
    - threshold: float, threshold for pseudo-labeling
    - optim_hp: bool, whether to optimize hyperparameters (default=False)
    - verbose: bool, if True prints information and plots (default=False)

    Returns:
    - X_train: DataFrame of features for semi-supervised training
    - y_train: Series of pseudo-labels for semi-supervised training
    """

    # Creates a set of initial pseudo-labels following the Veenman method
    df_combined = init_Veenman(X_lab, X_unl, y_lab, y_unl)
    n_pseudo = 0 # to count how many pseudo-labels were used

    # For the unlabeled classes, assign folders
    df_combined = assign_folder(population = df_combined, folds = folds, random_state=random_state)
    df_combined = df_combined.drop(columns=['labeled', 'label']).reset_index(drop=True) # removes real labels to avoid leakage to the unlabeled set.
    
    r_pseudo = 0 # to count how many pseudo-labels were used

    # Initialize lists to keep track of indices to update
    positive_indices_list = []

    for fold in range(folds):
        # Selects instances with labels (pseudo or true) to train a model. Cases in the current fold are not included.
        condition = (df_combined['pseudo'] >= 0) & (df_combined['fold'] != fold)

        # Removes identification columns
        ss_train = df_combined[condition].drop(columns=['fold']).reset_index(drop=True)
        X_train = ss_train.drop(columns=['pseudo']).reset_index(drop=True) # removes the labels from the set of features
        y_train = ss_train['pseudo'].reset_index(drop=True) # selects only the features

        # Instances in the test set will be checked. If they are predicted as 1, they will receive this pseudo-label.
        X_test = df_combined[df_combined['fold'] == fold].drop(columns=['fold', 'pseudo']).reset_index(drop=True)
        
        if optim_hp:
            if (fold == 0):
                best_hp_SS_i = load_hp('Semi-supervised_i', population_proportion,
                                    sample_bias, random_state)   
                if best_hp_SS_i != False:
                    model_i = copy.copy(base_model)
                    model_i.steps[-1][1].set_params(**best_hp_SS_i)
                    model_i.fit(X_train, y_train)
                else:
                    model_i.fit(X_train, y_train, callbacks=callback)
                    best_hp_SS_i = model_i.best_params_
                    best_hp_SS_i = {key.replace('randomforestclassifier__', ''): value for key, value in best_hp_SS_i.items()}
                    save_hp('Semi-supervised_i', population_proportion, sample_bias, random_state, best_hp_SS_i)
            else:
                model_i = copy.copy(base_model)
                model_i.steps[-1][1].set_params(**best_hp_SS_i)
                model_i.fit(X_train, y_train)
            print("Best Hyperparameters for the Random Forest:", best_hp_SS_i)
        else:
            model_i = copy.copy(base_model)
            model_i.fit(X_train, y_train)

        # Gets the prediction probabilities for class 1
        y_pred_prob = model_i.predict_proba(X_test)[:, 1]
        # Selects the indices of the test set in the original full set.
        test_indices = df_combined[df_combined['fold'] == fold].index

        positive_indices = test_indices[y_pred_prob >= threshold]
        if verbose:
            print("Max prediction probability:", y_pred_prob.max())
            print("High probability of being 1:", len(positive_indices))

        # Store indices for updating after the inner loop
        positive_indices_list.extend(positive_indices)

        r_pseudo += len(positive_indices)

    # Update the 'pseudo' and 'fold' columns in df_combined after the inner loop
    df_combined.loc[positive_indices_list, 'pseudo'] = 1
    df_combined.loc[positive_indices_list, 'fold'] = -1

    n_pseudo += r_pseudo
    if verbose:
        print(r_pseudo, "pseudo-labels inserted this round.")

    condition = (df_combined['pseudo'] >= 0) # only cases with true or pseudo-labels.
    ss_final_train = df_combined[condition].drop(columns=['fold']).reset_index(drop=True)
    X_train = ss_final_train.drop(columns=['pseudo']).reset_index(drop=True)
    y_train = ss_final_train['pseudo'].reset_index(drop=True)

    if verbose:
        full_X = df_combined.drop(columns=['pseudo', 'fold']).reset_index(drop=True)
        full_y = df_combined['pseudo'].reset_index(drop=True)
        full_X.columns = full_X.columns.astype(str)
        print("SAMPLE CREATED WITH SEMI-SUPERVISED METHOD")
        print(n_pseudo, "pseudo-labels inserted.")
        describe_pop(X_train, y_train, full_X)
        # PLOTS:
        plot_population(full_X, full_y)

    return(X_train, y_train)

## 3.3. Label Spreading

In [None]:
def LabSpred(X_lab, X_unl, y_lab, y_unl, X_test, y_test, opt_HP=0, verbose=False):
    """
    Applies SciKit LabelSpreading

    in:
    - X_lab, X_unl, y_lab, y_unl
    out:
    - Dictionary with evaluation metrics (Precision, AUPRC, Precision@50, Precision@100, Bias effect)
    """

    # Step 1: Concatenate labeled and unlabeled features (X_lab and X_unl) row-wise (rbind)
    X = np.vstack((X_lab, X_unl))

    # Step 2: Create a new array for unlabeled target values (y_unl) and set all values to -1
    y_unl = np.full((X_unl.shape[0],), -1)

    # Step 3: Concatenate labeled and unlabeled target values (y_lab and y_unl) row-wise (rbind)
    y = np.concatenate((y_lab, y_unl))

    if verbose:
        describe_pop(X, y, full_X=X, plot_pop=True)

    if opt_HP == 'no':
        best_parameters = {'alpha': 0.1, 'gamma': 0.1, 'kernel': 'rbf', 'max_iter': 50, 'n_jobs': -1, 'tol': 0.0001}
        print("Default hyperparameters will be used:", best_parameters)
    else:
        best_parameters = opt_HP
        print("Hyperparameters being used:", best_parameters)

    label_spr_model = LabelSpreading(**best_parameters)
    label_spr_model.fit(X, y)
    # Predict probabilities and labels on the test set
    y_pred_prob = label_spr_model.predict_proba(X_test)[:, 1]
    y_pred = label_spr_model.predict(X_test)

    prec_score = precision_score(y_test, y_pred, zero_division=0)
    y_pred_prob = np.nan_to_num(y_pred_prob, nan=0.0)
    auprc_score = average_precision_score(y_test, y_pred_prob)
    top_50 = top_k_precision_scorer(y_test, y_pred_prob, k = 50)
    top_100 = top_k_precision_scorer(y_test, y_pred_prob, k = 100)
    bias = measure_bias(X_test, y_test, y_pred_prob)[1]
    if verbose:
        print("Precision score:", prec_score)
        print("AUPRC score:", auprc_score)
        print("Precision on top 50:", top_50)
        print("Precision on top 100:", top_100)
        print("Prediction bias:", bias) 

    result = {'pr': prec_score, 'AUPRC': auprc_score, 'p.50':top_50, 'p.100':top_100, 'Bias effect': bias}

    return result

## 3.4. Label Propagation

In [None]:
def LabProp(X_lab, X_unl, y_lab, y_unl, X_test, y_test, opt_HP=0, verbose=False):
    """
    Applies SciKit LabelPropagation

    in:
    - X_lab, X_unl, y_lab, y_unl (numpy arrays or pandas dataframes)
    - X_test, y_test (numpy arrays or pandas dataframes for testing)
    - opt_HP: Boolean indicating whether to use optimized hyperparameters (default: False)
    - verbose: Boolean indicating whether to print verbose output (default: False)

    out:
    - Dictionary with evaluation metrics (Precision, AUPRC, Precision@50, Precision@100, Bias effect)
    """

    # Step 1: Concatenate labeled and unlabeled features (X_lab and X_unl) row-wise (rbind)
    X = np.vstack((X_lab, X_unl))

    # Step 2: Create a new array for unlabeled target values (y_unl) and set all values to -1
    y_unl = np.full((X_unl.shape[0],), -1)

    # Step 3: Concatenate labeled and unlabeled target values (y_lab and y_unl) row-wise (rbind)
    y = np.concatenate((y_lab, y_unl))

    if verbose:
        describe_pop(X, y, full_X=X, plot_pop=True)

    if opt_HP == 'no':
        best_parameters = {'n_jobs': -1}
    else:
        best_parameters = opt_HP

    label_prop_model = LabelPropagation(**best_parameters, n_jobs=-1)
    label_prop_model.fit(X, y)
    # Predict probabilities and labels on the test set
    y_pred_prob = label_prop_model.predict_proba(X_test)[:, 1]
    y_pred = label_prop_model.predict(X_test)

    prec_score = precision_score(y_test, y_pred, zero_division=0)
    y_pred_prob = np.nan_to_num(y_pred_prob, nan=0.0)
    auprc_score = average_precision_score(y_test, y_pred_prob)
    top_50 = top_k_precision_scorer(y_test, y_pred_prob, k = 50)
    top_100 = top_k_precision_scorer(y_test, y_pred_prob, k = 100)
    bias = measure_bias(X_test, y_test, y_pred_prob)[1]
    if verbose:
        print("Precision score:", prec_score)
        print("AUPRC score:", auprc_score)
        print("Precision on top 50:", top_50)
        print("Precision on top 100:", top_100)
        print("Prediction bias:", bias) 

    result = {'pr': prec_score, 'AUPRC': auprc_score, 'p.50':top_50, 'p.100':top_100, 'Bias effect': bias}

    return result

## 3.5 Self Learning

In [None]:
def SelfLearn(X_lab, X_unl, y_lab, y_unl, X_test, y_test, trad_model, opt_HP=0, verbose=False):
    """
    Applies SciKit SelfTrainingClassifier for semi-supervised learning

    in:
    - X_lab, X_unl, y_lab, y_unl (numpy arrays or pandas dataframes)
    - X_test, y_test (numpy arrays or pandas dataframes for testing)
    - trad_model: Base estimator for self-training
    - opt_HP: Boolean indicating whether to use optimized hyperparameters (default: False)
    - verbose: Boolean indicating whether to print verbose output (default: False)

    out:
    - Dictionary with evaluation metrics (Precision, AUPRC, Precision@50, Precision@100, Bias effect)
    """

    # Step 1: Concatenate labeled and unlabeled features (X_lab and X_unl) row-wise (rbind)
    X = np.vstack((X_lab, X_unl))

    # Step 2: Create a new array for unlabeled target values (y_unl) and set all values to -1
    y_unl = np.full((X_unl.shape[0],), -1)

    # Step 3: Concatenate labeled and unlabeled target values (y_lab and y_unl) row-wise (rbind)
    y = np.concatenate((y_lab, y_unl))

    if verbose:
        describe_pop(X, y, full_X=X, plot_pop=True)

    if opt_HP == 'no':
        best_parameters = {'criterion': 'k_best', 'k_best': 3}
    else:
        best_parameters = opt_HP

    label_prop_model = SelfTrainingClassifier(**best_parameters, base_estimator=trad_model)
    label_prop_model.fit(X, y)
    # Predict probabilities and labels on the test set
    y_pred_prob = label_prop_model.predict_proba(X_test)[:, 1]
    y_pred = label_prop_model.predict(X_test)

    prec_score = precision_score(y_test, y_pred, zero_division=0)
    y_pred_prob = np.nan_to_num(y_pred_prob, nan=0.0)
    auprc_score = average_precision_score(y_test, y_pred_prob)
    top_50 = top_k_precision_scorer(y_test, y_pred_prob, k = 50)
    top_100 = top_k_precision_scorer(y_test, y_pred_prob, k = 100)
    bias = measure_bias(X_test, y_test, y_pred_prob)[1]
    if verbose:
        print("Precision score:", prec_score)
        print("AUPRC score:", auprc_score)
        print("Precision on top 50:", top_50)
        print("Precision on top 100:", top_100)
        print("Prediction bias:", bias) 

    result = {'pr': prec_score, 'AUPRC': auprc_score, 'p.50':top_50, 'p.100':top_100, 'Bias effect': bias}

    return result

# 4. Results

## 4.1 Functions

In [None]:
def save_hp(method, population_proportion, sample_bias, seed, hyperparameters, folder='hp_cache'):
    """
    Save hyperparameters to a JSON file.

    Parameters:
    - method: Method identifier
    - population_proportion: Population proportion parameter
    - sample_bias: Sample bias parameter
    - seed: Random seed
    - hyperparameters: Dictionary of hyperparameters to save
    - folder: Folder to save the JSON file (default: 'hp_cache')
    """
    if not os.path.exists(folder):
        os.makedirs(folder)
    filename = f"{folder}/hp_{method}_{population_proportion}_{sample_bias}_{seed}.json"
    with open(filename, 'w') as f:
        json.dump(hyperparameters, f)

In [None]:
def load_hp(method, population_proportion, sample_bias, seed, folder='hp_cache'):
    """
    Load hyperparameters from a JSON file.

    Parameters:
    - method: Method identifier
    - population_proportion: Population proportion parameter
    - sample_bias: Sample bias parameter
    - seed: Random seed
    - folder: Folder containing the JSON file (default: 'hp_cache')

    Returns:
    - hyperparameters: Loaded hyperparameters as a dictionary, or False if file not found
    """
    filename = f"{folder}/hp_{method}_{population_proportion}_{sample_bias}_{seed}.json"
    if os.path.exists(filename):
        with open(filename, 'r') as f:
            return json.load(f)
    return False

In [None]:
def save_results(folder='scenario_results', output_excel='aggregated_results.xlsx', output_csv='aggregated_results.csv'):
    """
    Save aggregated scenario results to Excel and CSV files.

    Parameters:
    - folder: Folder containing scenario results (default: 'scenario_results')
    - output_excel: Output Excel file name (default: 'aggregated_results.xlsx')
    - output_csv: Output CSV file name (default: 'aggregated_results.csv')

    Returns:
    - df: Aggregated results DataFrame
    """
    # Load all scenario results from the folder
    scenario_results = load_all_scenario_results(folder)
    
    # Initialize an empty list to aggregate all results
    aggregated_results = []
    
    # Iterate through each scenario result
    for result in scenario_results:
        aggregated_results.extend(result)
    
    # Convert to DataFrame for easy manipulation (optional)
    df = pd.DataFrame(scenario_results)
    
    # Save aggregated results to Excel
    df.to_excel(output_excel, index=False)

    # Save aggregated results to CSV
    df.to_csv(output_csv, index=False)
    
    # Optionally, return the aggregated results DataFrame
    return df

In [None]:
def save_scenario_result(scenario_result, method, population_proportion, sample_bias, seed, folder='scenario_results'):
    """
    Save a specific scenario result to a JSON file.

    Parameters:
    - scenario_result: Dictionary containing the scenario result
    - method: Method identifier
    - population_proportion: Population proportion parameter
    - sample_bias: Sample bias parameter
    - seed: Random seed
    - folder: Folder to save the JSON file (default: 'scenario_results')
    """
    if not os.path.exists(folder):
        os.makedirs(folder)
    filename = f"{folder}/result_{method}_{population_proportion}_{sample_bias}_{seed}.json"
    with open(filename, 'w') as f:
        json.dump(scenario_result, f)

In [None]:
def load_all_scenario_results(folder='scenario_results'):
    """
    Load all scenario results from JSON files in a folder.

    Parameters:
    - folder: Folder containing scenario result JSON files (default: 'scenario_results')

    Returns:
    - results: List of loaded scenario results (each element is a dictionary)
    """
    results = []
    if os.path.exists(folder):
        for filename in os.listdir(folder):
            if filename.endswith('.json'):
                with open(os.path.join(folder, filename), 'r') as f:
                    results.append(json.load(f))
    return results

In [None]:
def run_methods(seeds,
        population_proportions=[0.05, 0.5],
        sample_biases=[0.95, 0.5, 0],
        methods=['traditional', 'Veenman', 'Semi-supervised', 'Label_spreading', 'Label_Propagation', 'Self_Learning'],
        verbose=True):
    
    results = []  # Initialize the results list

    for seed in seeds:
        np.random.seed(seed)
        trad_model = []

        base_model, model, cv_stratified = initialize_models(seed)
        
        # Iterate through population proportions
        for population_proportion in population_proportions:
            # Create population with the given proportion
            X_cur_pop, X_fut_pop, y_cur_pop, y_fut_pop = create_pop(random_state = seed, ratio=population_proportion)

            if verbose:
                print("===============================================================================")
                print(f"New population created with proportion={population_proportion}")
                print("Current Population:")
                describe_pop(X_cur_pop, y_cur_pop, full_X=X_cur_pop, plot_pop=True)
                print()
                print("Future Population:")
                describe_pop(X_fut_pop, y_fut_pop, full_X=X_cur_pop, plot_pop=True)
                print()
            
            # Iterate through sample biases
            for sample_bias in sample_biases:

                # Create labeled and unlabeled instances with the given bias and proportion
                X_lab, X_unl, y_lab, y_unl = label_cases(X=X_cur_pop, y=y_cur_pop,
                                                         random_state = seed,
                                                         sample_bias=sample_bias,
                                                         verbose=verbose
                                                        )

                # Saves information about how biased the sample is in relation to the population, for class 1
                original_bias = measure_bias_in_sample(X_cur_pop, X_lab, y_cur_pop, y_lab, verbose=verbose)
                if verbose:
                    print("Distance between the centroid of the sample and the population for class 1:", original_bias[1])
                result_temp = {
                            "Seed": seed,
                            "Prop. class 1": population_proportion,
                            "Sel. bias": sample_bias,
                            "Method": "sample",
                            "Test set": np.NaN,
                            "pr": np.NaN,
                            "AUPRC": np.NaN,
                            "p.50": np.NaN,
                            "p.100": np.NaN,
                            "Bias effect": original_bias[1]
                        }
                save_scenario_result(result_temp, "sample", population_proportion, sample_bias, seed)

                
                for method in methods:
                    if method == 'traditional':
                        if verbose:
                            print("===============================================================================")
                            print("TRADITIONAL METHOD APPLIED")
                            print("")
                            print("Performance on future population:")
                        
                        # Test performance on future population
                        result_temp = {
                            "Seed": seed,
                            "Prop. class 1": population_proportion,
                            "Sel. bias": sample_bias,
                            "Method": method,
                            "Test set": "future"
                        }
                        trad_model = copy.copy(model)
                        if optim_hp:
                            best_hp_t = load_hp(method, population_proportion, sample_bias, seed)     
                            if best_hp_t != False:
                                trad_model = copy.copy(base_model)
                                trad_model.steps[-1][1].set_params(**best_hp_t)
                            
                        Trad_temp = test_performance(X_train=X_lab, y_train=y_lab,
                                                    X_test=X_fut_pop, y_test=y_fut_pop,
                                                    model_i=trad_model,
                                                    std_model=model,
                                                    verbose=verbose)
                        if optim_hp and (best_hp_t is False):
                            best_hp_t = Trad_temp[1]
                            save_hp(method, population_proportion, sample_bias, seed, best_hp_t)
                            trad_model = copy.copy(base_model)
                            trad_model.steps[-1][1].set_params(**best_hp_t)

                        result_temp.update(Trad_temp[0])
                        results.append(result_temp)
                        save_scenario_result(result_temp, method, population_proportion, sample_bias, seed)
                        
                    elif method == 'Veenman':
                        if verbose:
                            print("===============================================================================")
                            print("VEENMAN METHOD APPLIED")
                            print()
                            print("Performance on future population:")

                        # Apply the Veenman method and test performance
                        X_V_lab, y_V_lab = Veenman(X_lab, X_unl, y_lab, y_unl, verbose=verbose)
                        # Performance on future population
                        result_temp = {
                            "Seed": seed,
                            "Prop. class 1": population_proportion,
                            "Sel. bias": sample_bias,
                            "Method": method,
                            "Test set": "future"
                        }
                        veenman_model = copy.copy(model)
                        if optim_hp:
                            best_hp_V = load_hp(method, population_proportion, sample_bias, seed)     
                            if best_hp_V != False:
                                veenman_model = copy.copy(base_model)
                                veenman_model.steps[-1][1].set_params(**best_hp_V)

                        Veenman_temp = test_performance(X_train=X_V_lab, y_train=y_V_lab,
                                                        X_test=X_fut_pop, y_test=y_fut_pop,
                                                        model_i=veenman_model,
                                                        std_model=model,
                                                        verbose=verbose)
                        if optim_hp and (best_hp_V is False):
                            best_hp_V = Veenman_temp[1]
                            save_hp(method, population_proportion, sample_bias, seed, best_hp_V)

                        result_temp.update(Veenman_temp[0])
                        results.append(result_temp)
                        save_scenario_result(result_temp, method, population_proportion, sample_bias, seed)
                        
                    elif method == 'Semi-supervised':
                        if verbose:
                            print("===============================================================================")
                            print("SEMI-SUPERVISED METHOD APPLIED")
                            print()
                            print("Performance on future population:")

                        # Performance on future population
                        result_temp = {
                            "Seed": seed,
                            "Prop. class 1": population_proportion,
                            "Sel. bias": sample_bias,
                            "Method": method,
                            "Test set": "future"
                        }

                        if optim_hp:
                            ss_model = copy.copy(model)
                        else:
                            ss_model = copy.copy(base_model)

                        X_ss_lab, y_ss_lab = semi_supervised(X_lab, X_unl, y_lab, y_unl,
                                                            model_i=ss_model,
                                                            random_state=seed,
                                                            base_model=base_model,
                                                            verbose=verbose,
                                                            population_proportion=population_proportion,
                                                            sample_bias=sample_bias)
                        
                        if optim_hp:
                            best_hp_SS = load_hp(method, population_proportion, sample_bias, seed)     
                            if best_hp_SS != False:
                                ss_model = copy.copy(base_model)
                                ss_model.steps[-1][1].set_params(**best_hp_SS)

                        ss_temp = test_performance(X_train=X_ss_lab, y_train=y_ss_lab,
                                                            X_test=X_fut_pop, y_test=y_fut_pop,
                                                            model_i=ss_model,
                                                            std_model=model,
                                                            verbose=verbose)
                        
                        if optim_hp and (best_hp_SS is False):
                            best_hp_SS = ss_temp[1]
                            save_hp(method, population_proportion, sample_bias, seed, best_hp_SS)

                        result_temp.update(ss_temp[0])
                        results.append(result_temp)
                        save_scenario_result(result_temp, method, population_proportion, sample_bias, seed)
                        
                    elif method == 'Label_spreading':
                        if verbose:
                            print("===============================================================================")
                            print("LABEL SPREADING")
                            print()
                            print("Performance on future population:")

                        # Performance on future population
                        result_temp = {
                            "Seed": seed,
                            "Prop. class 1": population_proportion,
                            "Sel. bias": sample_bias,
                            "Method": method,
                            "Test set": "future"
                        }

                        if optim_hp:
                            best_hp_LS = load_hp(method, population_proportion, sample_bias, seed)
                            if best_hp_LS is False:
                                best_hp_LS = optim_par(X = np.vstack((X_lab, X_unl)),
                                                       y = np.concatenate((y_lab, np.full((X_unl.shape[0],), -1))),
                                                       random_state=seed, cv_stratified=cv_stratified,
                                                       method='label_spreading')
                                save_hp(method, population_proportion, sample_bias, seed, best_hp_LS)
                                if verbose:
                                    print("Optimized hyperparameters: ", best_hp_LS)
                        else:
                            best_hp_LS = 'no'

                        result_temp.update(LabSpred(X_lab, X_unl, y_lab, y_unl,
                                                    X_test=X_fut_pop,
                                                    y_test=y_fut_pop,
                                                    opt_HP=best_hp_LS,
                                                    verbose=verbose)
                        )                    
                        results.append(result_temp)
                        save_scenario_result(result_temp, method, population_proportion, sample_bias, seed)

                    elif method == 'Label_Propagation':
                        if verbose:
                            print("===============================================================================")
                            print("LABEL PROPAGATION")
                            print()
                            print("Performance on future population:")

                        # Performance on future population
                        result_temp = {
                            "Seed": seed,
                            "Prop. class 1": population_proportion,
                            "Sel. bias": sample_bias,
                            "Method": method,
                            "Test set": "future"
                        }

                        if optim_hp:
                            best_hp_LP = load_hp(method, population_proportion, sample_bias, seed)
                            if best_hp_LP is False:
                                best_hp_LP = optim_par(X = np.vstack((X_lab, X_unl)),
                                                       y = np.concatenate((y_lab, np.full((X_unl.shape[0],), -1))),
                                                       random_state=seed, cv_stratified=cv_stratified,
                                                       method='lab_propag')
                                save_hp(method, population_proportion, sample_bias, seed, best_hp_LP)
                                if verbose:
                                    print("Optimized hyperparameters: ", best_hp_LP)
                        else:
                            best_hp_LP = 'no'

                        result_temp.update(LabProp(X_lab, X_unl, y_lab, y_unl,
                                                X_test=X_fut_pop,
                                                y_test=y_fut_pop,
                                                opt_HP=best_hp_LP,
                                                verbose=verbose)
                        )                    
                        results.append(result_temp)
                        save_scenario_result(result_temp, method, population_proportion, sample_bias, seed)

                    elif method == 'Self_Learning':
                        if verbose:
                            print("===============================================================================")
                            print("SELF LEARNING")
                            print()
                            print("Performance on future population:")

                        # Performance on future population
                        result_temp = {
                            "Seed": seed,
                            "Prop. class 1": population_proportion,
                            "Sel. bias": sample_bias,
                            "Method": method,
                            "Test set": "future"
                        }

                        if optim_hp:
                            best_hp_SL = load_hp(method, population_proportion, sample_bias, seed)
                            if best_hp_SL is False:
                                best_hp_SL = optim_par(X = np.vstack((X_lab, X_unl)),
                                                       y = np.concatenate((y_lab, np.full((X_unl.shape[0],), -1))),
                                                       random_state=seed, cv_stratified=cv_stratified,
                                                       method='self-learning', base_model = trad_model)
                                save_hp(method, population_proportion, sample_bias, seed, best_hp_SL)
                                if verbose:
                                    print("Optimized hyperparameters: ", best_hp_SL)
                        else:
                            best_hp_SL = 'no'
                        
                        result_temp.update(SelfLearn(X_lab, X_unl, y_lab, y_unl,
                                                    X_test=X_fut_pop,
                                                    y_test=y_fut_pop,
                                                    trad_model=trad_model,
                                                    opt_HP=best_hp_SL,
                                                    verbose=verbose)
                        )                    
                        results.append(result_temp)
                        save_scenario_result(result_temp, method, population_proportion, sample_bias, seed)
                        
    return results

## 4.2. Run Results

In [None]:
# Run!
seeds = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 29, 30, 31, 32, 33, 34, 35, 36, 38, 39, 40, 41, 42, 43, 44] # Hp optimized.
#skipped: seeds 11, 28, 37. Don't generate consistent samples with at least one observation of class 1.
results = run_methods(seeds=seeds, verbose=True)

## 4.3. Save Aggregated Results

In [None]:
save_results(folder='scenario_results', output_excel='aggregated_results.xlsx', output_csv='aggregated_results.csv')

## 4.4. Visualise Results

In [None]:
results_df = pd.DataFrame(results)

results_df.to_excel("results.xlsx", index=False)

# Define the styling function
def style_rows(row):
    # Set font color to black and alternate background colors every four rows
    font_color = 'color: black;'
    row_color = 'background-color: '
    if row.name % 4 == 0:
        return [font_color + row_color + 'lightgray'] * len(row)
    elif row.name % 4 == 1:
        return [font_color + row_color + 'white'] * len(row)
    elif row.name % 4 == 2:
        return [font_color + row_color + 'lightblue'] * len(row)
    else:
        return [font_color + row_color + 'lightgreen'] * len(row)


# Apply the styling function
styled_df = results_df.style.apply(style_rows, axis=1)

# Display the styled DataFrame as an HTML table
styled_df

# 5. Hyperparameter Used During the Simulation

In [None]:
def read_hyperparameters(folder='hp_cache'):
    # Dictionary to store hyperparameters by method
    hyperparams_by_method = {
        'traditional': [],
        'Veenman': [],
        'Semi-supervised': [],
        'Semi-supervised_i': [],
        'Label_spreading': [],
        'Label_Propagation': [],
        'Self_Learning': []
    }

    # Read all JSON files in the folder
    for filename in os.listdir(folder):
        if filename.endswith(".json"):
            filepath = os.path.join(folder, filename)
            with open(filepath, 'r') as f:
                hyperparameters = json.load(f)
                
            # Extract the method from the filename
            method_parts = filename.split('_')[1:-3]  # Extract parts between 'hp' and the last three elements
            method = '_'.join(method_parts)
            
            # Group into the appropriate category
            if method in hyperparams_by_method:
                hyperparams_by_method[method].append(hyperparameters)
            else:
                print(f"Unknown method {method} in file {filename}")

    return hyperparams_by_method

def create_hyperparameter_tables(hyperparams_by_method):
    # Create a dictionary to store the DataFrames for each table
    tables = {}

    # Combine Traditional, Veenman, Semi-supervised, and Semi-supervised_i into one table
    combined_methods = ['traditional', 'Veenman', 'Semi-supervised', 'Semi-supervised_i']
    combined_hyperparams = []
    for method in combined_methods:
        combined_hyperparams.extend(hyperparams_by_method[method])
    
    if combined_hyperparams:
        combined_df = pd.DataFrame(combined_hyperparams)
        tables['Combined'] = combined_df

    # Create separate tables for other methods
    other_methods = ['Label_spreading', 'Label_Propagation', 'Self_Learning']
    for method in other_methods:
        if hyperparams_by_method[method]:
            df = pd.DataFrame(hyperparams_by_method[method])
            tables[method] = df
    
    return tables

def display_tables(tables):
    for method, table in tables.items():
        print(f"\nHyperparameters for {method} method:")
        display(table)

# Read hyperparameters from folder
hyperparams_by_method = read_hyperparameters()

# Create hyperparameter tables
tables = create_hyperparameter_tables(hyperparams_by_method)

# Display the tables
display_tables(tables)