In [None]:
import keras
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sklearn
import tensorflow as tf

from datetime import datetime
from imblearn.over_sampling import SMOTE

### Load XP coefficients and VOSA training labels

In [None]:
coeffs = pd.read_parquet("data/coeffs_20061_L2-norm.par")
# Columns in coeffs table that are not GaiaEDR3
xp_columns = coeffs.columns[coeffs.columns != "GaiaDR3"]

coeffs

In [None]:
labels = pd.read_csv("data/VOSA_labels_training.csv")
labels

### Cross-match coefficients and VOSA labels

In [None]:
data = pd.merge(coeffs, labels, on="GaiaDR3")
data

### Make test, training, and validation sets

In [None]:
def select_test_set(data, n_test):
    """
    Randomly selects a test set from the dataset.

    Parameters:
    - data (pd.DataFrame): The full dataset.
    - n_test (int): Number of samples to include in the test set.

    Returns:
    - data_test (pd.DataFrame): Randomly selected test set.
    - data_train_val (pd.DataFrame): Remaining data after test set is removed.
    """

    # Randomly sample test set
    data_test = data.sample(n=n_test, ignore_index=True)

    # Exclude test set rows from the original dataset
    data_train_val = data[~data["GaiaDR3"].isin(data_test["GaiaDR3"])].reset_index(drop=True)

    return data_test, data_train_val


def split_train_val(data_train_val, xp_columns, n_val):
    """
    Splits the dataset into a validation set and a training set.
    The training set is balanced using SMOTE to address class imbalance.

    Parameters:
    - data_train_val (pd.DataFrame): The full dataset (minus the test samples) with features and labels.
    - xp_columns (list): List of columns with XP coefficients to use for SMOTE.
    - n_val (int): Number of samples to include in the validation set.

    Returns:
    - data_train (pd.DataFrame): Balanced training set.
    - data_val (pd.DataFrame): Unbalanced validation set.
    """

    # Sample validation set randomly
    data_val = data_train_val.sample(n=n_val)

    # Create training set by excluding validation samples
    data_train = data_train_val[~data_train_val["GaiaDR3"].isin(data_val["GaiaDR3"])]

    # Apply SMOTE to balance the training set
    smote = SMOTE(sampling_strategy=0.5, k_neighbors=2)
    X_resampled, y_resampled = smote.fit_resample(data_train[xp_columns],
                                                  data_train["VOSA"])

    # Reconstruct the training DataFrame with resampled data
    data_train = pd.DataFrame(X_resampled, columns=xp_columns)
    data_train["VOSA"] = y_resampled

    return data_train, data_val

In [None]:
data_test, data_train_val = select_test_set(data, n_test=1000)
data_train, data_val = split_train_val(data_train_val, xp_columns, n_val=500)

print(f"Training set: {len(data_train)}")
n_single_training = len(data_train[data_train["VOSA"] == 0])
n_binary_training = len(data_train[data_train["VOSA"] == 1])
print(f"{n_single_training} singles")
print(f"{n_binary_training} binaries ({n_binary_training/len(data_train)*100:0.0f} %, \
upsampled with SMOTE)" + "\n")

print(f"Validation set: {len(data_val)} samples")
n_single = len(data_val[data_val["VOSA"] == 0])
n_binary = len(data_val[data_val["VOSA"] == 1])
print(f"{n_single} singles")
print(f"{n_binary} binaries ({n_binary/len(data_val)*100:0.0f} %)" + "\n")

print(f"Test set: {len(data_test)} samples")
n_single = len(data_test[data_test["VOSA"] == 0])
n_binary = len(data_test[data_test["VOSA"] == 1])
print(f"{n_single} singles")
print(f"{n_binary} binaries ({n_binary/len(data_test)*100:0.0f} %)" + "\n")

### Build CNN architecture

In [None]:
def get_model_CNN():

    model = tf.keras.Sequential([
    
        tf.keras.layers.Input(shape=(110, 1)),

        keras.layers.Conv1D(filters=16, kernel_size=3),
        keras.layers.LeakyReLU(),
    
        keras.layers.Conv1D(filters=16, kernel_size=3),
        keras.layers.LeakyReLU(),
        
        keras.layers.Conv1D(filters=16, kernel_size=3),
        keras.layers.LeakyReLU(),

        keras.layers.Flatten(),

        tf.keras.layers.Dense(128),
        tf.keras.layers.LeakyReLU(),

        tf.keras.layers.Dense(512),
        tf.keras.layers.LeakyReLU(),
        tf.keras.layers.Dropout(0.33),

        tf.keras.layers.Dense(265),
        tf.keras.layers.LeakyReLU(),
        
        tf.keras.layers.Dense(64),
        tf.keras.layers.LeakyReLU(),

        tf.keras.layers.Dense(16),
        tf.keras.layers.LeakyReLU(),

        tf.keras.layers.Dense(1, activation="sigmoid")
    
    ])

    return model


get_model_CNN().summary()

### Training of CNN models

In [None]:
# Number of CNNs to train in ensamble training
n_models = 10

# Callback to stop trainig when performance on validation set does not increase
early_stop = keras.callbacks.EarlyStopping(monitor="val_loss", mode="min",
                                           patience=100,
                                           start_from_epoch=50,
                                           restore_best_weights=False)

for n in range(n_models):
    # Generate a unique model path for each run
    date_stamp = datetime.today().strftime('%Y-%m-%d')
    model_path = f"CNN_models/binarity_{date_stamp}_model_{n}.keras"

    # Checkpoint callback to save the best model during training
    checkpoint = tf.keras.callbacks.ModelCheckpoint(model_path, 
                                                    monitor="val_loss", mode="min", 
                                                    save_best_only=True, verbose=0)

    # Create new training and validation sets for the current model
    data_train, data_val = split_train_val(data_train_val, xp_columns, n_val=500)

    # Compute class weights to address imbalance
    n_single = len(data_train[data_train["VOSA"] == 0])
    n_binary = len(data_train[data_train["VOSA"] == 1])
    total = len(data_train)

    weight_single = 1 - n_single / total
    weight_binary = 1 - n_binary / total
    class_weight = {0: weight_single, 1: weight_binary}

        # For training set...
    class_weight_training = {0: weight_single, 1: weight_binary}

        # For validation set...
    data_val["class_weight"] = np.zeros(shape=len(data_val))
    data_val.loc[data_val["VOSA"] == 0, "class_weight"] = weight_single
    data_val.loc[data_val["VOSA"] == 1, "class_weight"] = weight_binary

    # Build and compile the CNN model
    model = get_model_CNN()
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
        loss="binary_crossentropy")

    # Train the model
    model.fit(x=data_train[xp_columns], y=data_train["VOSA"], class_weight=class_weight,
              validation_data=(data_val[xp_columns], data_val["VOSA"], data_val["class_weight"]),
              epochs=10000,
              batch_size=16,
              callbacks=[early_stop, checkpoint],
              verbose=2)

    print(f"Best model saved to {model_path}\n")

### Evaluate performance of CNN models and pick best 5, based on test set losses

In [None]:
def get_model_losses(date_stamp, data_test, n_models):
    """
    Evaluate multiple Keras models and return their test losses.

    Parameters:
    - date_stamp (str): Date identifier used in model filenames.
    - data_test (DataFrame): Test dataset containing input features and target labels.
    - n_runs (int): Number of model runs to evaluate.

    Returns:
    - np.ndarray: Array of test losses for each model.
    """
    model_losses = []

    for n in range(n_models):
        # Construct the model file path
        model_path = f"CNN_models/binarity_{date_stamp}_model_{n}.keras"
        
        # Load the trained model
        model = keras.models.load_model(model_path)
        
        # Evaluate the model on the test set
        eval_loss = model.evaluate(
            data_test[xp_columns],               # Input features
            data_test["VOSA"],                   # Target labels
            batch_size=16,
            verbose=0
        )

        print(f"Test loss of model {model_path}: {eval_loss:.4f}")
        model_losses.append(eval_loss)

    print("\nDisplayed losses do not have class weights.\n")

    return np.array(model_losses)

#### Pick best models

In [None]:
# Parameters
date_stamp = "2025-10-23"
n_models = 10
n_keep = 5

# Evaluate models and get their losses for the test set
model_losses = get_model_losses(date_stamp, data_test, n_models)

# Identify indices of the best-performing models (lowest losses)
i_best_models = np.argsort(model_losses)[:n_keep]

print(f"Indices of the {n_keep} best models: {i_best_models}")

#### Evaluate best models on full, training, validation, and test sets. Use several metrics.

In [None]:
def eval_set(data_set, i_best_models, date_stamp):
    """
    Evaluate an ensemble of the best models on a given dataset.

    Parameters:
    - data_set (DataFrame): Dataset containing features and true labels.
    - i_best_models (list or array): Indices of the best-performing models.
    - date_stamp (str): Date identifier used in model filenames.

    Returns:
    - None (prints evaluation metrics)
    """
    
    # Initialize array to store predictions from each model
    predictions_runs = np.zeros((len(i_best_models), len(data_set)))

    for a, i in enumerate(i_best_models):
        # Construct model path using the index of the best model
        model_path = f"CNN_models/binarity_{date_stamp}_model_{n}.keras"
        
        # Load the model
        model = keras.models.load_model(model_path)
        
        # Predict and store the results
        predictions = model.predict(data_set[xp_columns], verbose=0).flatten()
        predictions_runs[a, :] = predictions

    # Compute mean and standard deviation of predictions across models
    predictions_mean = np.mean(predictions_runs, axis=0)
    predictions_std = np.std(predictions_runs, axis=0)

    # Create a DataFrame to store predictions and true labels
    p = pd.DataFrame({"VOSA": data_set["VOSA"], "prediction": predictions_mean, "std": predictions_std})

    # Threshold that maximises F1 score
    precision, recall, thresholds = sklearn.metrics.precision_recall_curve(p["VOSA"], p["prediction"])
    numerator = 2 * recall * precision
    denom = recall + precision
    f1_scores = np.divide(numerator, denom, out=np.zeros_like(denom), where=(denom!=0))
    max_f1_thresh = thresholds[np.argmax(f1_scores)]

    print(f"Threshold that maximizes F1: {max_f1_thresh:.3f}")
    print()
    print("With that threshold:")

    f1 = np.max(f1_scores)

    # Sensitivity, specificity, and precision
    p["predicted_label"] = 0
    p.loc[p["prediction"] > max_f1_thresh, "predicted_label"] = 1

    true_positive = ((p["predicted_label"] == 1) & (p["VOSA"] == 1)).sum()
    false_positive = ((p["predicted_label"] == 1) & (p["VOSA"] == 0)).sum()
    true_negative = ((p["predicted_label"] == 0) & (p["VOSA"] == 0)).sum()
    false_negative = ((p["predicted_label"] == 0) & (p["VOSA"] == 1)).sum()

    tpr = true_positive / (true_positive + false_negative) if (true_positive + false_negative) > 0 else 0
    tnr = true_negative / (true_negative + false_positive) if (true_negative + false_positive) > 0 else 0
    ppv = true_positive / (true_positive + false_positive) if (true_positive + false_positive) > 0 else 0

    # Print results
    print()
    print(f"F1: {f1:.3f}")
    print(f"True Positive Rate (Sensitivity): {tpr:.3f}")
    print(f"True Negative Rate (Specificity): {tnr:.3f}")
    print(f"Positive Predictive Value (Precision): {ppv:.3f}")
    print()

In [None]:
date_stamp = "2025-10-23"

print("Full set")
print()
eval_set(data, i_best_models, date_stamp)

### Predict labels for 20061 sds coefficients.

Results are the averages of the predictions from the 5 best CNN models.

In [None]:
def predict(data_set, i_best_models, date_stamp):
    """
    Generate ensemble predictions and uncertainty estimates using the best models.

    Parameters:
    - data_set (DataFrame): Input data for prediction.
    - i_best_models (list or array): Indices of the best-performing models.
    - date_stamp (str): Date identifier used in model filenames.

    Returns:
    - Tuple of np.ndarray: (mean predictions, standard deviation of predictions)
    """
    # Initialize array to store predictions from each model
    predictions_runs = np.zeros((len(i_best_models), len(data_set)))

    for a, i in enumerate(i_best_models):
        model_path = f"CNN_models/binarity_{date_stamp}_model_{n}.keras"
        print(f"Loading model: {model_path}")
        
        model = keras.models.load_model(model_path)
        predictions = model.predict(data_set[xp_columns], verbose=0).flatten()
        predictions_runs[a, :] = predictions

    # Return mean and standard deviation across model predictions
    return np.mean(predictions_runs, axis=0), np.std(predictions_runs, axis=0)


In [None]:
date_stamp = "2025-09-16"
predictions_mean, predictions_std = predict(coeffs, i_best_models, date_stamp)

coeffs["P_binary_CNN"] = predictions_mean
coeffs["std_binary_CNN"] = predictions_std

# Plot histrogram of predicted probabilities
plt.hist(coeffs["P_binary_CNN"], bins=50)
plt.xlabel("P_binary_CNN")
plt.ylabel("N")
plt.show()

# Plot scatter between 5 models relative to predicted probabilities
plt.scatter(coeffs["P_binary_CNN"], coeffs["std_binary_CNN"], s=1)
plt.xlabel("P_binary_CNN")
plt.ylabel("std_binary_CNN")
plt.show()

#### Save results to file

In [None]:
filepath = f"data/CNN_predictions_binarity_{date_stamp}.csv"

coeffs[["GaiaDR3", "P_binary_CNN", "std_binary_CNN"]].to_csv(path_or_buf=filepath, index=False)

print(f"Label predictions saved to {filepath}")