<a href="https://colab.research.google.com/github/melmar-g1thub/NEURAL-NETWORK/blob/main/NN_for_EoS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

2D MODEL: HS(DD2) neutron matter (with electrons):
https://compose.obspm.fr/eos/2

Inputs:
1. n_B [fm-3]: (0.1E-11 0.1E+02)
2. T [MeV]: (0.10000000E+00, 0.15848932E+03)
3. Y_q → Constant (0)

## PREPOCESSING

### UPLOADING LIBRARIES AND DATA

In [None]:
!pip install skorch
from skorch import NeuralNetRegressor
from skorch.callbacks import EarlyStopping

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

import seaborn as sns
import random
import itertools
import json
import joblib
import time
import ast
import csv
import os

from sklearn.model_selection import train_test_split, RandomizedSearchCV, RepeatedKFold
from sklearn.preprocessing import MinMaxScaler, StandardScaler, LabelEncoder, QuantileTransformer
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, make_scorer
from sklearn.ensemble import RandomForestRegressor

from datetime import datetime
from tqdm import tqdm
from scipy.stats import randint, loguniform
from scipy.interpolate import griddata, SmoothBivariateSpline

try:
    from torch.amp import autocast
except ImportError:
    from torch.cuda.amp import autocast

try:
    from torch.amp import GradScaler
except ImportError:
    from torch.cuda.amp import GradScaler

In [None]:
# Load the data from drive
from google.colab import drive
drive.mount('/content/drive')
data_path = "/content/drive/My Drive/Colab Notebooks/data/"

# temperature and baryon number data
eos_t = np.loadtxt(data_path + 'eos.t', skiprows=2)  # 81 values
eos_nb = np.loadtxt(data_path + 'eos.nb', skiprows=2)  # 326 values
eos_yq = np.zeros(len(eos_t))  # Constant value of 0

# thermodynamic data
thermo_data = np.loadtxt(data_path + 'eos.thermo', skiprows=1)

In [None]:
# Extract indices and thermodynamic properties
indices = thermo_data[:, :3].astype(int)  # i_T, i_nb, i_Yq
q_values = thermo_data[:, 3:5]  # Q1 (pressure) and Q2 (entropy)

# Convert to zero-based index
i_t = indices[:, 0] - 1
i_nb = indices[:, 1] - 1

# Ensure indices are valid, np.where() to get valid integer and not boolean
valid_indices = np.where((i_t >= 0) & (i_t < len(eos_t)) & (i_nb >= 0) & (i_nb < len(eos_nb)))[0]
T_valid = eos_t[i_t[valid_indices]]
nb_valid = eos_nb[i_nb[valid_indices]]
q_valid = q_values[valid_indices]

logT = np.log10(T_valid)
log_nb = np.log10(nb_valid)

P_valid = q_valid[:, 0] * nb_valid
S_valid = q_valid[:, 1] * nb_valid

logP = np.log10(np.clip(P_valid, 1e-10, None))  # Protege contra log(0)
logS = np.log10(np.clip(S_valid, 1e-10, None))

In [None]:
print(min(q_valid[:,0]), max(q_valid[:,0]))
print(min(q_valid[:,1]), max(q_valid[:,1]))
print(min(nb_valid), max(nb_valid))

In [None]:
# Check original data to see its behaviour, skewdness, peaks....

fig, axs = plt.subplots(2, 2, figsize=(12, 8))

axs[0, 0].hist(T_valid, bins=50, alpha=0.7, color="orange")
axs[0, 0].set_title('Original T')
axs[0, 1].hist(nb_valid, bins=50, alpha=0.7, color="red")
axs[0, 1].set_title('Original nb')

axs[1, 0].hist(P_valid , bins=50, alpha=0.7, color="green")
axs[1, 0].set_title('Original P')
axs[1, 1].hist(S_valid, bins=50, alpha=0.7, color="blue")
axs[1, 1].set_title('Original S')

plt.tight_layout()
plt.suptitle('Original Data', y=1.02, fontsize=16)
plt.show()

In [None]:
plot_save_path = "/content/drive/My Drive/Colab Notebooks/TFG_NN_FINAL/PLOTS"
os.makedirs(plot_save_path, exist_ok=True)

plt.figure(figsize=(12, 5))

# Plot logQ1 vs nb
plt.subplot(1, 2, 1)
plt.scatter(log_nb, logS, alpha=0.6, s=10, color='blue')
plt.xlabel("log10(nb)")
plt.ylabel("log10(S)")
plt.title("log(S) vs log(nb)")
plt.grid(True)

# Plot logP vs nb
plt.subplot(1, 2, 2)
plt.scatter(log_nb, logP, alpha=0.6, s=10, color='green')
plt.xlabel("log10(nb)")
plt.ylabel("log10(P)")
plt.title("logP vs log(nb)")
plt.grid(True)
plt.savefig(os.path.join(plot_save_path, 'logP_logS_vs_lognb.png'), dpi=300, bbox_inches='tight')
plt.tight_layout()
plt.show()

In [None]:
# Filter central 90%
T_q05, T_q95 = np.quantile(logT, [0.05, 0.95])
nb_q05, nb_q95 = np.quantile(log_nb, [0.05, 0.95])
mask = (logT >= T_q05) & (logT <= T_q95) & \
       (log_nb >= nb_q05) & (log_nb <= nb_q95)

T_values = logT[mask]
nb_values = log_nb[mask]
P_values = logP[mask]
S_values = logS[mask]

inputs = np.vstack([T_values, nb_values]).T
outputs = np.vstack([P_values, S_values]).T

print("Initial inputs shape (before splitting/scaling):", inputs.shape)
print("Initial outputs shape (before splitting/scaling):", outputs.shape)

In [None]:
# 1. Histograms for individual features
plt.figure(figsize=(15, 10))

plt.subplot(2, 2, 1)
sns.histplot(inputs[:, 0], kde=True, bins=50, color='skyblue')
plt.title('Distribution of log10(Temperature)')
plt.xlabel('log10(T)')
plt.ylabel('Count')

plt.subplot(2, 2, 2)
sns.histplot(inputs[:, 1], kde=True, bins=50, color='lightcoral')
plt.title('Distribution of log10(Baryon Number)')
plt.xlabel('log10(nb)')
plt.ylabel('Count')

plt.subplot(2, 2, 3)
sns.histplot(outputs[:, 0], kde=True, bins=50, color='lightgreen')
plt.title('Distribution of log10(Pressure)')
plt.xlabel('log10(P)')
plt.ylabel('Count')

plt.subplot(2, 2, 4)
sns.histplot(outputs[:, 1], kde=True, bins=50, color='orchid')
plt.title('Distribution of log10(Entropy)')
plt.xlabel('log10(S)')
plt.ylabel('Count')

plt.tight_layout()
plt.suptitle('Distributions of Log-Transformed EoS Features (After 90% Filter)', y=1.02, fontsize=16)
plt.savefig(os.path.join(plot_save_path, 'distributions_log_features.png'), dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# 2D Histogram / Density Plot for Input Parameters (T vs nb)
plt.figure(figsize=(10, 8))
plt.hist2d(inputs[:, 0], inputs[:, 1], bins=50, cmap='viridis')
plt.colorbar(label='Count')
plt.xlabel('log10(T)')
plt.ylabel('log10(nb)')
plt.title('2D Distribution of Input Parameters (T vs nb)')
plt.grid(True, linestyle='--', alpha=0.6)
plt.savefig(os.path.join(plot_save_path, '2d_input_distribution.png'), dpi=300, bbox_inches='tight')
plt.show()

plt.figure(figsize=(10, 8))
sns.kdeplot(x=inputs[:, 0], y=inputs[:, 1], fill=True, cmap='viridis', levels=20)
plt.xlabel('log10(T)')
plt.ylabel('log10(nb)')
plt.title('2D Density Plot of Input Parameters (T vs nb)')
plt.savefig(os.path.join(plot_save_path, '2d_density_plot.png'), dpi=300, bbox_inches='tight')
plt.grid(True, linestyle='--', alpha=0.6)
plt.show()

From the histograms we check:
1. Uniform distributions with approximatly the same number of counts across their range
2. Gaussian or close behaviour
3. Skedness

Stratification of inputs will not be required if they show 1 or 2 and after splitting and normalization their distributions are still representative of this behaviour on the EoS.


### PREPARATION OF DATA

Normalization of the files needs to be performed AFTER it's been divided in the train-validation-test sets!
Because if we perform it before it causes:
- Global Statistics: normalization process calculates these statistics (mean, std, min, max) using all the data points, including those in what will eventually become your test set.
- Unfair Advantage: training set's scaled values are influenced by the statistics of the test set. This means your model indirectly "sees" information about the test data during training.

In [None]:
# 1. Split the data
# First split: 80% train, 20% temp (for validation/test)
in_train, in_temp, out_train, out_temp = train_test_split(
    inputs, outputs, test_size=0.20, random_state=42, shuffle=True)

# Second split: From the 20% temp, split it into 15% validation and 5% test
in_val, in_test, out_val, out_test = train_test_split(
    in_temp, out_temp, test_size=0.25, random_state=42, shuffle=True)

print(f"Train samples: {len(in_train)}")
print(f"Validation samples: {len(in_val)}")
print(f"Test samples: {len(in_test)}")

Fit the Scaler ONLY on the Training Data, this calculates the necessary scaling parameters (mean, std, min, max) exclusively from the training distribution.

Then, we apply transform() to the test and validation data, using the same fitted scaler.

In [None]:
# 2. Normalization
scaler_T = StandardScaler()
scaler_nb = StandardScaler()
scaler_P =  QuantileTransformer(output_distribution='normal', n_quantiles=1000, subsample=1_000_000, random_state=42)
scaler_S = QuantileTransformer(output_distribution='normal', n_quantiles=1000, subsample=1_000_000, random_state=42)

# StandardScaler:	Gaussian-like distribution, improves convergence with activation functions
# MinMaxScaler: Best when we want normalized plots or comparisons, may want to clip/clamp outputs easily and care about scaled MAE/MSE
# QuantileTransformer: Optimal for high skedness, transforming into Gaussian

# We fit the scaler ONLY ON TRAINING SET
def fit_train_scaler(tensor, scaler):
  return scaler.fit_transform(tensor.reshape(-1, 1))

# Then we simply apply the transformation to validation and test set
def apply_scaler(tensor, scaler):
  return scaler.transform(tensor.reshape(-1, 1))

### INPUTS ###
in_T_train_scaled = fit_train_scaler(in_train[:, 0], scaler_T)
in_nb_train_scaled = fit_train_scaler(in_train[:, 1], scaler_nb)

in_T_val_scaled = apply_scaler(in_val[:, 0], scaler_T)
in_nb_val_scaled = apply_scaler(in_val[:, 1], scaler_nb)
in_T_test_scaled = apply_scaler(in_test[:, 0], scaler_T)
in_nb_test_scaled = apply_scaler(in_test[:, 1], scaler_nb)

# Recombine inputs for each set
in_train_processed = np.hstack((in_T_train_scaled, in_nb_train_scaled))
in_val_processed = np.hstack((in_T_val_scaled, in_nb_val_scaled))
in_test_processed = np.hstack((in_T_test_scaled, in_nb_test_scaled))


### OUTPUTS ###
out_P_train_scaled = fit_train_scaler(out_train[:, 0], scaler_P)
out_S_train_scaled = fit_train_scaler(out_train[:, 1], scaler_S)

out_P_val_scaled = apply_scaler(out_val[:, 0], scaler_P)
out_S_val_scaled = apply_scaler(out_val[:, 1], scaler_S)
out_P_test_scaled = apply_scaler(out_test[:, 0], scaler_P)
out_S_test_scaled = apply_scaler(out_test[:, 1], scaler_S)

out_train_processed = np.hstack((out_P_train_scaled, out_S_train_scaled))
out_val_processed = np.hstack((out_P_val_scaled, out_S_val_scaled))
out_test_processed = np.hstack((out_P_test_scaled, out_S_test_scaled))


# Check the shapes of the input and output arrays
print("Training: inputs shape:", in_train_processed.shape)  # (number_of_samples, 2)
print("Training: Outputs shape:", out_train_processed.shape)

In [None]:
# Visualizing the normalization for TRAINING
# Visualize normalization for Input Features (T and nb)
fig, axs = plt.subplots(2, 2, figsize=(12, 8))

axs[0, 0].hist(in_train[:, 0], bins=50, alpha=0.7, color="orange")
axs[0, 0].set_title('Original log10(T)')
axs[0, 1].hist(in_train_processed[:, 0], bins=50, alpha=0.7, color="darkorange")
axs[0, 1].set_title('Normalized log10(T)')

axs[1, 0].hist(in_train[:,1], bins=50, alpha=0.7)
axs[1, 0].set_title('Original log10(nb)')
axs[1, 1].hist(in_train_processed[:, 1], bins=50, alpha=0.7, color='darkblue')
axs[1, 1].set_title('Normalized log10(nb)')

plt.tight_layout()
plt.suptitle('Input Data: Original vs. Normalized (Training Set)', y=1.02, fontsize=16)
plt.savefig(os.path.join(plot_save_path, 'normalization_training_input.png'), dpi=300, bbox_inches='tight')
plt.show()


# Visualize normalization for Output Features (Q1/Pressure and Q2/Entropy)
fig, axs = plt.subplots(2, 2, figsize=(12, 8))

axs[0, 0].hist(out_train[:,0], bins=50, alpha=0.7, color="green")
axs[0, 0].set_title('Original log10(P)')
axs[0, 1].hist(out_train_processed[:, 0], bins=50, alpha=0.7, color="darkgreen")
axs[0, 1].set_title('Normalized log10(P)')

axs[1, 0].hist(out_train[:,1], bins=50, alpha=0.7, color="red")
axs[1, 0].set_title('Original log10(S)')
axs[1, 1].hist(out_train_processed[:, 1], bins=50, alpha=0.7, color="darkred")
axs[1, 1].set_title('Normalized log10(S)')

plt.tight_layout()
plt.suptitle('Output Data: Original vs. Normalized (Training Set)', y=1.02, fontsize=16)
plt.savefig(os.path.join(plot_save_path, 'normalization_training_output.png'), dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Visualizing the normalization for VALIDATION
# Visualize normalization for Input Features (T and nb)
fig, axs = plt.subplots(2, 2, figsize=(12, 8))

axs[0, 0].hist(in_val[:, 0], bins=50, alpha=0.7, color="orange")
axs[0, 0].set_title('Original log10(T)')
axs[0, 1].hist(in_val_processed[:, 0], bins=50, alpha=0.7, color="darkorange")
axs[0, 1].set_title('Normalized log10(T)')

axs[1, 0].hist(in_val[:,1], bins=50, alpha=0.7)
axs[1, 0].set_title('Original log10(nb)')
axs[1, 1].hist(in_val_processed[:, 1], bins=50, alpha=0.7, color='darkblue')
axs[1, 1].set_title('Normalized log10(nb)')

plt.tight_layout()
plt.suptitle('Input Data: Original vs. Normalized (Validation Set)', y=1.02, fontsize=16)
plt.savefig(os.path.join(plot_save_path, 'normalization_val_input.png'), dpi=300, bbox_inches='tight')
plt.show()


# Visualize normalization for Output Features (Q1/Pressure and Q2/Entropy)
fig, axs = plt.subplots(2, 2, figsize=(12, 8))

axs[0, 0].hist(out_val[:,0], bins=50, alpha=0.7, color="green")
axs[0, 0].set_title('Original log10(P)')
axs[0, 1].hist(out_val_processed[:, 0], bins=50, alpha=0.7, color="darkgreen")
axs[0, 1].set_title('Normalized log10(P)')

axs[1, 0].hist(out_val[:,1], bins=50, alpha=0.7, color="red")
axs[1, 0].set_title('Original log10(S)')
axs[1, 1].hist(out_val_processed[:, 1], bins=50, alpha=0.7, color="darkred")
axs[1, 1].set_title('Normalized log10(S)')

plt.tight_layout()
plt.suptitle('Output Data: Original vs. Normalized (Validation Set)', y=1.02, fontsize=16)
plt.savefig(os.path.join(plot_save_path, 'normalization_val_output.png'), dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Visualizing the normalization for TESTING
# Visualize normalization for Input Features (T and nb)
fig, axs = plt.subplots(2, 2, figsize=(12, 8))

axs[0, 0].hist(in_test[:, 0], bins=50, alpha=0.7, color="orange")
axs[0, 0].set_title('Original log10(T)')
axs[0, 1].hist(in_test_processed[:, 0], bins=50, alpha=0.7, color="darkorange")
axs[0, 1].set_title('Normalized log10(T)')

axs[1, 0].hist(in_test[:,1], bins=50, alpha=0.7)
axs[1, 0].set_title('Original log10(nb)')
axs[1, 1].hist(in_test_processed[:, 1], bins=50, alpha=0.7, color='darkblue')
axs[1, 1].set_title('Normalized log10(nb)')

plt.tight_layout()
plt.suptitle('Input Data: Original vs. Normalized (Testing Set)', y=1.02, fontsize=16)
plt.savefig(os.path.join(plot_save_path, 'normalization_testing_input.png'), dpi=300, bbox_inches='tight')
plt.show()


# Visualize normalization for Output Features (Q1/Pressure and Q2/Entropy)
fig, axs = plt.subplots(2, 2, figsize=(12, 8))

axs[0, 0].hist(out_test[:,0], bins=50, alpha=0.7, color="green")
axs[0, 0].set_title('Original log10(P)')
axs[0, 1].hist(out_test_processed[:, 0], bins=50, alpha=0.7, color="darkgreen")
axs[0, 1].set_title('Normalized log10(P)')

axs[1, 0].hist(out_test[:,1], bins=50, alpha=0.7, color="red")
axs[1, 0].set_title('Original log10(S)')
axs[1, 1].hist(out_test_processed[:, 1], bins=50, alpha=0.7, color="darkred")
axs[1, 1].set_title('Normalized log10(S)')

plt.tight_layout()
plt.suptitle('Output Data: Original vs. Normalized (Testing Set)', y=1.02, fontsize=16)
plt.savefig(os.path.join(plot_save_path, 'normalization_testing_output.png'), dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# 3. Convert data to PyTorch tensors
in_train_tensor = torch.FloatTensor(in_train_processed)
out_train_tensor = torch.FloatTensor(out_train_processed)
in_test_tensor = torch.FloatTensor(in_test_processed)
out_test_tensor = torch.FloatTensor(out_test_processed)
in_val_tensor = torch.FloatTensor(in_val_processed)
out_val_tensor = torch.FloatTensor(out_val_processed)

## DEFINE NN

In [None]:
class Interpolation(nn.Module):
    def __init__(self, in_size, out_size, hidden_layer_sizes, dropout, activation):
        super(Interpolation, self).__init__()
        self.dropout_rate = dropout
        self.activation = activation() # Skorch passes the class, so we instantiate it

        layers = []
        current_in_size = in_size

        # Dynamically build hidden layers
        for hidden_size in hidden_layer_sizes:
            layers.append(nn.Linear(current_in_size, hidden_size))
            layers.append(self.activation)
            if self.dropout_rate > 0:
                layers.append(nn.Dropout(self.dropout_rate))
            current_in_size = hidden_size

        # Output layer (linear for regression)
        layers.append(nn.Linear(current_in_size, out_size))

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)

## RANDOM SEARCH

#### Random Search - Scikit Library

The main challenge lies in bridging scikit-learn's RandomizedSearchCV with PyTorch nn.Module.
- RandomizedSearchCV expects a scikit-learn estimator, which is a class that implements fit(), predict(), and potentially score().
- Interpolation(nn.Module) is a PyTorch model, not directly a scikit-learn estimator.

We'll do this with ` skorch ` where `NeuralNetRegressor`provides its own trainig loop. It also implements early stopping.



We'll also be logging the parameters and models in order to:
- Compare all runs
- Track randomness/reproducibility

In [None]:
# Define the Search Space for the Randomly-searched parameters
param_distributions = {
    'module__hidden_layer_sizes': [(256, 128), (128, 64), (128, 64, 32), (64, 128, 64), (64, 64, 64)],
    'module__activation': [nn.ReLU, nn.LeakyReLU, nn.ELU, nn.SiLU],
    'module__dropout': [0.1, 0.2, 0.4, 0.5],
    'lr': np.logspace(-4, -2, 20),
    'batch_size': [32, 64, 128, 256],
}

# Defined hyperparameters
input_size = 2  # baryon number and temperature
output_size = 2 # Q1 and Q2 in eos.thermo

n_iterations = 40
epochs = 200
patience = 25

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Aditional metrics
def prediction_variance(y_true, y_pred):
    return np.var(y_pred)

r2_scorer = make_scorer(r2_score, greater_is_better=True)
var_scorer = make_scorer(prediction_variance, greater_is_better=True)

# Define the model
# Skorch wrapper
model_estimator = NeuralNetRegressor(
    module=Interpolation,
    module__in_size=input_size,
    module__out_size=output_size,
    max_epochs=epochs,
    lr=0.01,  # Will be overridden during random search
    batch_size=64, #Likewise
    optimizer=torch.optim.Adam,
    criterion=nn.MSELoss,
    callbacks=[EarlyStopping(monitor='valid_loss', patience=patience)],
)

# Define the CrossValidation
cv = RepeatedKFold(n_splits=3, n_repeats=2, random_state=42)

RandomSearchCV implements Cross Validations KFold, RepeatedKFold is recommended for regression tasks

It also includes a scoring, the metric must be maximizing: better models result in larger scores
For regression, a negative error measure (‘neg_mean_absolute_error‘) makes values closer to zero to represent less prediction error by the model.

Once defined, the search is performed by calling the fit() function and providing a dataset used to train and evaluate model hyperparameter combinations using cross-validation.


In [None]:
def save_random_search_results(search_obj, name_prefix="logP_logS_search"):
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    base_dir = "/content/drive/My Drive/Colab Notebooks/Random"
    exp_dir = os.path.join(base_dir, f"{name_prefix}_{timestamp}")
    os.makedirs(exp_dir, exist_ok=True)

    # CSV + JSON
    results_df = pd.DataFrame(search_obj.cv_results_)
    results_df.to_csv(os.path.join(exp_dir, "results_full.csv"), index=False)

    def safe_serialize(obj):
        if isinstance(obj, dict):
            return {k: safe_serialize(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [safe_serialize(v) for v in obj]
        elif isinstance(obj, tuple):
            return tuple(safe_serialize(v) for v in obj)
        elif isinstance(obj, (int, float, str, bool)) or obj is None:
            return obj
        else:
            return str(obj)  # Convertir cualquier otro tipo a string

    with open(os.path.join(exp_dir, "results_full.json"), "w") as f:
        json.dump(safe_serialize(search_obj.cv_results_), f, indent=2)

    joblib.dump(search_obj.best_estimator_, os.path.join(exp_dir, "best_model.pkl"))
    with open(os.path.join(exp_dir, "best_params.json"), "w") as f:
        json.dump({k: str(v) for k, v in search_obj.best_params_.items()}, f, indent=2)

    return exp_dir

In [None]:
random_search = RandomizedSearchCV(
    estimator=model_estimator,
    param_distributions=param_distributions,
    n_iter=n_iterations,
    cv=cv,
    scoring={
        'mean_mse': 'neg_mean_squared_error',
        'r2': r2_scorer,
        'var_pred': var_scorer
    },
    refit='r2',  # interpolar y capturar relaciones reales con R² es más exigente.
    random_state=42,
    n_jobs=-1,
    verbose=2
)

# Convert combined train+val data to PyTorch tensors for the search (NumPy for RandomizedSearchCV)
x_train_val = np.concatenate((in_train_processed, in_val_processed), axis=0).astype(np.float32)
y_train_val = np.concatenate((out_train_processed, out_val_processed), axis=0).astype(np.float32)

print("Starting Randomized Search with Skorch...")
start_time = time.time()
random_search.fit(x_train_val, y_train_val)
end_time = time.time()
elapsed = end_time - start_time
minutes, seconds = divmod(elapsed, 60)
print(f"\n Randomized Search complete.")
print(f"Total time: {int(minutes)} min {int(seconds)} sec")

save_random_search_results(random_search)

In [None]:
# Log results
logfile = '/content/drive/My Drive/Colab Notebooks/Random/results_full.csv'
log_dir = os.path.dirname(logfile)
os.makedirs(log_dir, exist_ok=True) # Ensure directory exists

results_df = pd.read_csv(logfile)

# Clean up and rename columns for clarity
columns_to_log = [
    'param_module__hidden_layer_sizes',
    'param_lr',
    'param_module__dropout',
    'param_module__activation',
    'param_batch_size',
    'rank_test_mean_mse',
    'rank_test_r2',
    'rank_test_var_pred',
    'mean_fit_time',
    'std_fit_time',
    'mean_test_mean_mse',
    'mean_test_r2',
    'mean_test_var_pred'
]

df_filtered = results_df[columns_to_log].copy()

df_filtered.rename(columns={
    'param_module__hidden_layer_sizes': 'hidden_layer_sizes',
    'param_lr': 'learning_rate',
    'param_module__dropout': 'dropout',
    'param_module__activation': 'activation_fn',
    'param_batch_size': 'batch_size',
    'mean_test_mean_mse': 'mean_neg_mse',
    'mean_test_r2': 'mean_r2',
    'mean_test_var_pred': 'mean_var'
}, inplace=True)

df_filtered['mean_mse'] = -df_filtered['mean_neg_mse'] # Convert to positive MSE

df_filtered.to_csv(logfile, index=False)
print(f"\nAll search results saved to: {logfile}")

# Summarize best result
print('\n--- Best Result ---')
print('Best cross-validation score (R2): %s' % random_search.best_score_) # Use random_search here, not result_df
print('Best Hyperparameters: %s' % random_search.best_params_)

#### EVALUATION OF RANDOM SEARCH

PAIRPLOT is a multi-plot grid that shows the relationship between each pair of variables in a dataset. It includes:
- Scatter plots for every variable vs every other variable.
- Distributions for each variable on the diagonal.

And it shows:
1. Linear or nonlinear relationships (e.g., higher learning rate → higher MSE?)
2. Clusters (e.g., some batch sizes perform consistently better?)
3. Outliers (points that break patterns)
4. Correlations (strong slopes in the scatter plots)



In [None]:
plot_save_path = "/content/drive/My Drive/Colab Notebooks/Random/Hyperparameters Analysis"
os.makedirs(plot_save_path, exist_ok=True)

# Pairplot for comparing the different combinations of hyperparameters
le_act = LabelEncoder()
le_layer = LabelEncoder()

df_filtered['activation_fn_str'] = df_filtered['activation_fn'].apply(lambda fn: fn if isinstance(fn, str) else fn.__name__)
df_filtered['hidden_layer_str'] = df_filtered['hidden_layer_sizes'].astype(str)
df_filtered['activation_fn_encoded'] = le_act.fit_transform(df_filtered['activation_fn_str'])
df_filtered['hidden_layer_encoded'] = le_layer.fit_transform(df_filtered['hidden_layer_str'])

# Set columns for pairplot
pairplot_vars = ['learning_rate', 'dropout', 'batch_size', 'mean_mse']

# Plot with activation_fn colored
g = sns.pairplot(df_filtered, vars=pairplot_vars, hue='activation_fn_str', diag_kind='kde', corner='True')

# Adjust legend title and position
new_labels = ['ReLU', 'LeakyReLU']
for t, l in zip(g._legend.texts, new_labels):
    t.set_text(l)
g._legend.set_title("Activation Function")
g._legend.set_bbox_to_anchor((0.6, 0.8))  # move it outside the plot if needed

plt.suptitle("Pairplot of Hyperparameters vs MSE", fontsize=16, y=1.02, x=0.4)
plt.savefig(os.path.join(plot_save_path, 'MSE_pairplot_act_fn.png'), dpi=300, bbox_inches='tight')
plt.show()

# Plot with hidden_layers colored
h = sns.pairplot(df_filtered, vars=pairplot_vars, hue='hidden_layer_str', diag_kind='kde', corner='True')
h._legend.set_title("Hidden Layers Architecture")
h._legend.set_bbox_to_anchor((0.7, 0.8))
plt.suptitle("Pairplot of Hyperparameters vs MSE", fontsize=16, y=1.02)
plt.savefig(os.path.join(plot_save_path, 'MSE_pairplot_hidden.png'), dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Grouped boxplot
sns.set_theme(style="whitegrid", context="notebook")
sns.set_style("darkgrid")
palette = sns.color_palette("pastel")

plt.figure(figsize=(12, 6))
sns.boxplot(
    data=df_filtered,
    x='activation_fn_str',
    y='mean_mse',
    hue='hidden_layer_str',
    palette=palette,
    width=0.7,
    linewidth=1.5,
    fliersize=5,
    dodge=True
)

plt.xlabel("Activation Function", fontsize=13)
plt.ylabel("Mean MSE (CV)", fontsize=13)
plt.xticks(fontsize=11)
plt.yticks(fontsize=11)
plt.legend(title="Hidden Layers", bbox_to_anchor=(1.02, 1), loc='upper left',
           borderaxespad=0, prop={'size': 11}, title_fontsize=10)

plt.title("Validation MSE by Activation and Hidden Layer Configuration", fontsize=14)
plt.savefig(os.path.join(plot_save_path, 'MSE_boxlot_act_hidden.png'), dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Heatmat of correlations
corr_df = df_filtered[['learning_rate', 'dropout', 'batch_size',
                         'activation_fn_encoded', 'hidden_layer_encoded', 'mean_mse']]

corr_matrix = corr_df.corr()

plt.figure(figsize=(8, 6))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f", square=True, cbar=True)
plt.title("Heatmap of correlations between hyperparameters and MSE")
plt.tight_layout()
#plt.savefig(os.path.join(plot_save_path, 'MSE_heatmap.png'), dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Plot top5 optimal combinations ranked by metrics

logfile = '/content/drive/My Drive/Colab Notebooks/Random/results_full.csv'
log_dir = os.path.dirname(logfile)
os.makedirs(log_dir, exist_ok=True) # Ensure directory exists

df = pd.read_csv(logfile)

top5_r2 = df.sort_values(by='rank_test_r2').head(5).copy()
top5_mse = df.sort_values(by='rank_test_mean_mse').head(5).copy()
top5_var = df.sort_values(by='rank_test_var_pred').head(5).copy()

for df_sub in [top5_r2, top5_mse, top5_var]:
    df_sub['config_label'] = df_sub.apply(
        lambda row: f"{row['hidden_layer_sizes']} | dr={row['dropout']} | batch={row['batch_size']} | lr={row['learning_rate']:.1e}",
        axis=1
    )

# Plot Side-by-side
fig, axs = plt.subplots(1, 3, figsize=(22, 6), sharey=False)

# R²
sns.barplot(x='mean_r2', y='config_label', data=top5_r2, palette='crest', ax=axs[0])
axs[0].set_title("Top 5 - R²")
axs[0].set_xlabel("Mean R² (Validation)")
axs[0].grid(axis='x', linestyle='--', alpha=0.6)

# MSE
sns.barplot(x='mean_mse', y='config_label', data=top5_mse, palette='crest', ax=axs[1])
axs[1].set_title("Top 5 - MSE")
axs[1].set_xlabel("Mean MSE (Validation)")
axs[1].grid(axis='x', linestyle='--', alpha=0.6)

# Variace
sns.barplot(x='mean_var', y='config_label', data=top5_var, palette='crest', ax=axs[2])
axs[2].set_title("Top 5 - Predicted Variance")
axs[2].set_xlabel("Mean Predicted Variance")
axs[2].grid(axis='x', linestyle='--', alpha=0.6)

plt.tight_layout()
output_file = os.path.join(plot_save_path, "top5_hyperparameter_comparison.png")
plt.savefig(output_file, dpi=300)
plt.show()

RANDOM FOREST measures how much each input variable (hyperparameter) contributes to reducing the error in the forest:
- For each tree in the forest, it checks how much each feature reduces the MSE when it’s used to split a node.
- Then it averages these reductions over all trees, giving you one score per hyperparameter.

The more it reduces the error → the more “important” it is deemed to be.

> We used a Random Forest to estimate the relative importance of each hyperparameter in determining the model's validation MSE. Although not a formal SHAP analysis, this approach provides a fast and intuitive proxy to guide hyperparameter tuning.



In [None]:
# SHAP graph
# RandomizedSearchCV no entrena un modelo con interpretabilidad directa como un árbol,
# usaremos RandomForestRegressor para estimar la importancia relativa de cada hiperparámetro sobre el MSE

X_forest = df_filtered[['learning_rate', 'dropout', 'batch_size', 'activation_fn_encoded', 'hidden_layer_encoded']]
Y_forest = df_filtered['mean_r2']

# Train Forest model to stimate relevance of each parameter
rf = RandomForestRegressor(n_estimators=200, random_state=42)
rf.fit(X_forest, Y_forest)

importances = rf.feature_importances_
feature_names = X_forest.columns

# Plot histogram
plt.figure(figsize=(8, 5))
sns.barplot(x=importances, y=feature_names, palette='viridis')
plt.title("Impact of hyperparameters on R2 (RandomForest estimation)")
plt.xlabel("Relative importance")
plt.tight_layout()
plt.savefig(os.path.join(plot_save_path, 'R2_randomforest.png'), dpi=300, bbox_inches='tight')
plt.show()

## FINAL EVALUATION OF MODEL

### RETRAIN TOP MODELS

We're gonna evaluate top 3 best combinations from Random Search for longer number of runs and evaluate its results and metrics.

We'll make use of mixed precision with `autocast()` and `GradScaler`  for speeding up training on GPUs and reducing memory usage, especially for larger models.

In [None]:
# Get 3 top configurations from csv
logfile = '/content/drive/My Drive/Colab Notebooks/Random/results_full.csv'
log_dir = os.path.dirname(logfile)
os.makedirs(log_dir, exist_ok=True) # Ensure directory exists

df = pd.read_csv(logfile)
top3_configs = df.sort_values(by='rank_test_r2').head(3)
print(top3_configs)

We're gonna save the results for each configuration:Colab Notebooks/07.06/final_top3_models/
- best trained model as `model_top{i}.pth`
- final metrics and hyperparamerters as `config.json`
- losses as `train_losses.npy` and `val_losses.np` for later plotting

In [None]:
# Create path to save results from training
results = []
base_path = "/content/drive/My Drive/Colab Notebooks/Random"
os.makedirs(base_path, exist_ok=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

input_size = 2
output_size = 2

# Loop for going onto the 3 configurations
for i, (_, row) in enumerate(top3_configs.iterrows(), start=1):
    print(f"\n Training configuration #{i}...")

    # Hyperparameters from the csv
    dropout = row['dropout']
    lr = row['learning_rate']
    batch = int(row['batch_size'])
    hidden = ast.literal_eval(row['hidden_layer_sizes']) # Safer parsing
    if isinstance(row['activation_fn'], str) and row['activation_fn'].startswith("<class "):
        act_name = row['activation_fn'].split("'")[1].split('.')[-1]  # e.g. 'ReLU'
    else:
        act_name = row['activation_fn']
    act_fn = getattr(nn, act_name)

    print(dropout, lr, batch, hidden, act_fn)

    # Set dataloaders
    train_dataset = TensorDataset(in_train_tensor, out_train_tensor)
    train_loader = DataLoader(train_dataset, batch_size=batch, shuffle=True)

    # Inicialize the model, optimizer and loss
    model = Interpolation(input_size, output_size, hidden, dropout, act_fn).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()
    scaler = GradScaler() # Mixed precision

    # Early stopping
    epochs = 600
    patience = 60
    best_val_loss = float('inf')
    patience_counter = 0

    ##############################################################
    #TRAINING AND VALIDATION LOOP
    ##############################################################
    train_losses, val_losses = [], []
    start_time = time.time()

    for epoch in range(epochs):
        model.train()
        epoch_train_loss = 0.0
        all_train_preds = []

        # Batch training
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()

            # Forward pass
            with autocast(device_type=device.type):
                train_pred = model(xb)
                train_loss = loss_fn(train_pred, yb)
            # Backward pass and optimization
            scaler.scale(train_loss).backward()
            scaler.step(optimizer)
            scaler.update()

            epoch_train_loss += train_loss.item() * xb.size(0)
            all_train_preds.append(train_pred.detach().cpu())

        train_losses.append(epoch_train_loss  / len(train_loader.dataset))
        train_pred = torch.cat(all_train_preds, dim=0).numpy()

        # Validation (no batching)
        model.eval()
        with torch.no_grad():
            with autocast(device_type=device.type):
                val_pred = model(in_val_tensor.to(device))
                val_loss = loss_fn(val_pred, out_val_tensor.to(device))
        val_losses.append(val_loss.item())

        if val_loss < best_val_loss:
            best_val_loss = val_loss.item()
            torch.save(model.state_dict(), os.path.join(base_path, f"model_top{i}.pth"))
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping at epoch {epoch+1}")
                break

        if epoch % 100 == 0:
            print(f"Epoch {epoch:03d} | Train Loss: {train_losses[-1]:.9f} | Val Loss: {val_losses[-1]:.9f}")

    end_time = time.time()
    duration = (end_time - start_time) / 60
    print(f"Finished config #{i} in {duration:.1f} minutes with best val loss: {best_val_loss:.9f}")

    # Save configuration and loss values
    run_path = os.path.join(base_path, f"top{i}")
    os.makedirs(run_path, exist_ok=True)

    with open(os.path.join(run_path, "config.json"), "w") as f:
        json.dump({
            'hidden_layer_sizes': hidden,
            'activation_fn': row['activation_fn'],
            'dropout': dropout,
            'learning_rate': lr,
            'batch_size': batch,
            'best_val_loss': best_val_loss,
            'duration_minutes': duration
        }, f, indent=2)

    np.save(os.path.join(run_path, "train_losses.npy"), np.array(train_losses))
    np.save(os.path.join(run_path, "val_losses.npy"), np.array(val_losses))

    np.save(os.path.join(run_path, "train_pred.npy"), np.array(train_pred))
    np.save(os.path.join(run_path, "val_pred.npy"), np.array(val_pred.cpu().detach().numpy()))

    # PLOTTING LOSS CURVES
    plt.figure(figsize=(8, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel("Epoch")
    plt.ylabel("Loss (MSE)")
    plt.yscale("linear")  # puedes usar "log" si prefieres logarítmico
    plt.gca().yaxis.set_major_formatter(ticker.ScalarFormatter(useMathText=True))
    plt.ticklabel_format(axis='y', style='sci', scilimits=(0, 0))  # fuerza notación científica
    plt.title(f"Loss Curves - Top {i}")
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.tight_layout()
    plt.savefig(os.path.join(run_path, "loss_curve.png"), dpi=300)
    plt.close()

    ##############################################################
    # EVALUATION MODE
    ##############################################################
    # Load the best model state dictionary
    model.load_state_dict(torch.load(os.path.join(base_path, f"model_top{i}.pth")))

    model.eval()
    with torch.no_grad():
        with autocast(device_type=device.type):
            test_pred = model(in_test_tensor.to(device))
            test_loss = loss_fn(test_pred, out_test_tensor.to(device))

    with open(os.path.join(run_path, "test_metrics.json"), "w") as f:
        json.dump({'test_loss': test_loss.item()}, f, indent=2)
    print(f" Final Test Loss for config #{i}: {test_loss.item():.9f}")
    np.save(os.path.join(run_path, "test_pred.npy"), np.array(test_pred.cpu().detach().numpy()))

### PLOT LOSSES

In [None]:
############################################
# FOLDER final_top3_models
#######################3####################
base_path = "/content/drive/My Drive/Colab Notebooks/Random"
top_models = ['top1', 'top2', 'top3']

# Gráfico completo (desde epoch 0)
fig, axes = plt.subplots(1, 3, figsize=(18, 5), sharey=True)
for idx, model in enumerate(top_models):
    run_path = os.path.join(base_path, model)
    train_losses = np.load(os.path.join(run_path, "train_losses.npy"))
    val_losses = np.load(os.path.join(run_path, "val_losses.npy"))

    epochs = np.arange(len(train_losses))

    ax = axes[idx]
    ax.plot(epochs, train_losses, label='Train Loss', color='green')
    ax.plot(epochs, val_losses, label='Validation Loss', color='pink')
    ax.set_title(f"Model {model.upper()}")
    ax.set_xlabel("Epoch")
    ax.set_yscale("log")
    ax.yaxis.set_major_formatter(ticker.ScalarFormatter(useMathText=True))
    ax.ticklabel_format(axis='y', style='sci', scilimits=(0, 0))
    ax.grid(True, linestyle='--', alpha=0.6)
    if idx == 0:
        ax.set_ylabel("Loss (MSE)")
    ax.legend()

plt.suptitle("Loss Curves for Top 3 Configurations (from epoch 0)", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.savefig(os.path.join(base_path, "loss_curves_total_top3.png"), dpi=300)
plt.show()

# Gráfico zoom (desde epoch 50)
fig, axes = plt.subplots(1, 3, figsize=(18, 5), sharey=True)
start_epoch = 50
for idx, model in enumerate(top_models):
    run_path = os.path.join(base_path, model)
    train_losses = np.load(os.path.join(run_path, "train_losses.npy"))
    val_losses = np.load(os.path.join(run_path, "val_losses.npy"))


    epochs = np.arange(start_epoch, len(train_losses))
    train_trimmed = train_losses[start_epoch:]
    val_trimmed = val_losses[start_epoch:]

    ax = axes[idx]
    ax.plot(epochs, train_trimmed, label='Train Loss', color='green')
    ax.plot(epochs, val_trimmed, label='Validation Loss', color='pink')
    ax.set_title(f"Model {model.upper()}")
    ax.set_xlabel("Epoch")
    ax.set_yscale("log")
    ax.yaxis.set_major_formatter(ticker.ScalarFormatter(useMathText=True))
    ax.ticklabel_format(axis='y', style='sci', scilimits=(0, 0))
    ax.grid(True, linestyle='--', alpha=0.6)
    if idx == 0:
        ax.set_ylabel("Loss (MSE)")
    ax.legend()

plt.suptitle(f"Loss Curves for Top 3 Configurations (from epoch {start_epoch})", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.savefig(os.path.join(base_path, "loss_curves_zoom_top3.png"), dpi=300)
plt.show()

In [None]:
# Solo para el modelo top1
model = 'top1'
base_path = "/content/drive/My Drive/Colab Notebooks/Random"
run_path = os.path.join(base_path, model)

train_losses = np.load(os.path.join(run_path, "train_losses.npy"))
val_losses = np.load(os.path.join(run_path, "val_losses.npy"))

epochs = np.arange(len(train_losses))
start_epoch = 50
epochs_zoom = np.arange(start_epoch, len(train_losses))

# Crear figura con dos subgráficos
fig, axes = plt.subplots(1, 2, figsize=(14, 5), sharey=True)

# Gráfico completo
axes[0].plot(epochs, train_losses, label='Train Loss', color='green')
axes[0].plot(epochs, val_losses, label='Validation Loss', color='pink')
axes[0].set_title("TOP1 - From Epoch 0")
axes[0].set_xlabel("Epoch")
axes[0].set_ylabel("Loss (MSE)")
axes[0].set_yscale("log")
axes[0].yaxis.set_major_formatter(ticker.ScalarFormatter(useMathText=True))
axes[0].ticklabel_format(axis='y', style='sci', scilimits=(0, 0))
axes[0].grid(True, linestyle='--', alpha=0.6)
axes[0].legend()

# Gráfico desde época 50
axes[1].plot(epochs_zoom, train_losses[start_epoch:], label='Train Loss', color='green')
axes[1].plot(epochs_zoom, val_losses[start_epoch:], label='Validation Loss', color='pink')
axes[1].set_title("TOP1 - From Epoch 50")
axes[1].set_xlabel("Epoch")
axes[1].set_yscale("log")
axes[1].yaxis.set_major_formatter(ticker.ScalarFormatter(useMathText=True))
axes[1].ticklabel_format(axis='y', style='sci', scilimits=(0, 0))
axes[1].grid(True, linestyle='--', alpha=0.6)
axes[1].legend()

plt.suptitle("Loss Curves for Model TOP1", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.savefig(os.path.join(base_path, "loss_top1_dual.png"), dpi=300)
plt.show()

### MODEL'S STATISTICS

To evaluate the accuracy of the neural network interpolation, we computed mean squared error (MSE), mean absolute error (MAE), and coefficient of determination (R²) for both pressure (P) and entropy (S) across the training, validation, and test datasets.

In [None]:
top_path = "/content/drive/My Drive/Colab Notebooks/Random/top1"
os.makedirs(top_path, exist_ok=True)

train_predictions = np.load(os.path.join(top_path, "train_pred.npy"))
val_predictions   = np.load(os.path.join(top_path, "val_pred.npy"))
test_predictions  = np.load(os.path.join(top_path, "test_pred.npy"))

In [None]:
def evaluate_actual_predictions(predictions, real_output, scaler_q1, scaler_q2):
    if isinstance(predictions, torch.Tensor):
        pred_np = predictions.cpu().detach().numpy()
    else:
        pred_np = predictions

    # We splitted before normalizing, however the predictions need to be inverse transformed
    P_true = 10 ** real_output[:, 0]
    S_true = 10 ** real_output[:, 1]
    P_pred = 10 ** scaler_q1.inverse_transform(pred_np[:, 0].reshape(-1, 1)).flatten()
    S_pred = 10 **  scaler_q2.inverse_transform(pred_np[:, 1].reshape(-1, 1)).flatten()

    results = {
    'P_MSE': mean_squared_error(P_true, P_pred),
    'P_MAE': mean_absolute_error(P_true, P_pred),
    'P_R2': r2_score(P_true, P_pred),
    'S_MSE': mean_squared_error(S_true, S_pred),
    'S_MAE': mean_absolute_error(S_true, S_pred),
    'S_R2': r2_score(S_true, S_pred),
    }

    print(f"Pressure P [MeV/fm³]")
    print(f"  MSE : {results['P_MSE']:.4e}")
    print(f"  MAE : {results['P_MAE']:.4e}")
    print(f"  R²  : {np.abs(results['P_R2']):.4f}")

    print(f"Entropy S [1/fm³]")
    print(f"  MSE : {results['S_MSE']:.4e}")
    print(f"  MAE : {results['S_MAE']:.4e}")
    print(f"  R²  : {np.abs(results['S_R2']):.4f}")

    return results, P_true, P_pred, S_true, S_pred

In [None]:
print(f" --- Metrics for training data set ---")
train_metrics, P_train_true, P_train_pred, S_train_true, S_train_pred = evaluate_actual_predictions(
     predictions = train_predictions,
     real_output = out_train,
     scaler_q1 = scaler_P,
     scaler_q2 = scaler_S)

print(f"\n --- Metrics for validation data set ---")
val_metrics, P_val_true, P_val_pred, S_val_true, S_val_pred = evaluate_actual_predictions(
    predictions = val_predictions,
    real_output = out_val,
     scaler_q1 = scaler_P,
     scaler_q2 = scaler_S)

print(f"\n --- Metrics for evaluation data set ---")
test_metrics, P_test_true, P_test_pred, S_test_true, S_test_pred = evaluate_actual_predictions(
    predictions = test_predictions,
    real_output = out_test,
     scaler_q1 = scaler_P,
     scaler_q2 = scaler_S)

### VISUALIZE RESULTS

We study the models interpolation power by plotting Predicted vs Tabulated logaritmic values for P and S. We hope data to be on top of x == y line, meaning the model has interpolated accordingly.

In [None]:
# ONLY PRESSURE COMPARISION
min_P_train = min(P_train_true.min(), P_train_pred.min())
max_P_train = max(P_train_true.max(), P_train_pred.max())

min_P_val = min(P_val_true.min(), P_val_pred.min())
max_P_val = max(P_val_true.max(), P_val_pred.max())

min_P_test = min(P_test_true.min(), P_test_pred.min())
max_P_test = max(P_test_true.max(), P_test_pred.max())

# Scatter Plot for actual vs predicted
plt.figure(figsize=(18, 7))
plt.subplot(1, 3, 1)
plt.scatter(P_train_true, P_train_pred, alpha=0.6, label='Training Data', color='limegreen')
plt.plot([min_P_train, max_P_train], [min_P_train, max_P_train], 'r--', label='Ideal line')
plt.xlabel('Tabulated Values')
plt.ylabel('Predictions')
plt.title('Training Data for Pressure: True vs Predicted')
plt.legend()

plt.subplot(1, 3, 2)
plt.scatter(P_val_true, P_val_pred, alpha=0.6, label='Validation Data', color='green')
plt.plot([min_P_val, max_P_val], [min_P_val, max_P_val], 'r--', label='Ideal line')
plt.xlabel('Tabulated Values')
plt.ylabel('Predictions')
plt.title('Validation Data for Pressure: True vs Predicted')
plt.legend()

plt.subplot(1, 3, 3)
plt.scatter(P_test_true, P_test_pred, alpha=0.6, label='Test Data', color='darkslategray')
plt.plot([min_P_test, max_P_test], [min_P_test, max_P_test], 'r--', label='Ideal line')
plt.xlabel('Tabulated Values')
plt.ylabel('Predictions')
plt.title('Test Data for Pressure: True vs Predicted')
plt.legend()

plt.tight_layout()
plt.savefig(os.path.join(base_path, "pred_real_P.png"), dpi=300)
plt.show()

In [None]:
# ONLY ENTROPY COMPARISINO
# Scatter Plot for actual vs predicted
plt.figure(figsize=(16, 7))

plt.subplot(1, 3, 1)
plt.scatter(S_train_true, S_train_pred, alpha=0.6, label='Training Data', color='lightblue')
plt.plot([min(S_train_true), max(S_train_true)], [min(S_train_true), max(S_train_true)], 'r--', label='Ideal line')
plt.xlabel('Tabulated Values')
plt.ylabel('Predictions')
plt.title('Training Data for Entropy: True vs Predicted')
plt.legend()

plt.subplot(1, 3, 2)
plt.scatter(S_val_true, S_val_pred, alpha=0.6, label='Validation Data', color='blue')
plt.plot([min(S_val_true), max(S_val_true)], [min(S_val_true), max(S_val_true)], 'r--', label='Ideal line')
plt.xlabel('Tabulated Values')
plt.ylabel('Predictions')
plt.title('Validation Data for Entropy: True vs Predicted')
plt.legend()

plt.subplot(1, 3, 3)
plt.scatter(S_test_true, S_test_pred, alpha=0.5, label='Test Data', color='darkblue')
plt.plot([min(S_test_true), max(S_test_true)], [min(S_test_true), max(S_test_true)], 'r--', label='Ideal line')
plt.xlabel('Tabulated Values')
plt.ylabel('Predictions')
plt.title('Test Data for Entropy: True vs Predicted')
plt.legend()

plt.tight_layout()
plt.savefig(os.path.join(base_path, "pred_real_S.png"), dpi=300)
plt.show()

3D surface plot to visualize how good interpolation is on a global scale of all output data

In [None]:
# 3D Surface Plot for q1 (Pressure)
T_test = scaler_T.inverse_transform(in_test[:, 0].reshape(-1, 1)).flatten()
nb_test = scaler_nb.inverse_transform(in_test[:, 1].reshape(-1, 1)).flatten()

print(nb_test.shape), print(T_test.shape)
print(P_test_true.shape), print(P_test_pred.shape)

print("Plotting predictions and original test data")
fig = plt.figure(figsize=(12, 5))
ax1 = fig.add_subplot(121, projection='3d')
ax1.scatter(T_test, nb_test, P_test_true, label='Tabulated P values', alpha=0.5)
ax1.scatter(T_test, nb_test, P_test_pred, label='Interpolated P values', alpha=0.5)
ax1.set_xlabel('Temperature')
ax1.set_ylabel('Baryon Number')
ax1.set_zlabel('Pressure')
ax1.legend()

# 3D Surface Plot for q2 (Entropy)
ax2 = fig.add_subplot(122, projection='3d')
ax2.scatter(T_test, nb_test, S_test_true, label='Tabulated S values', alpha=0.5)
ax2.scatter(T_test, nb_test, S_test_pred, label='Interpolated S values', alpha=0.5)
ax2.set_xlabel('Temperature')
ax2.set_ylabel('Baryon Number')
ax2.set_zlabel('Entropy')
ax2.legend()

plt.savefig(os.path.join(base_path, "pred_real_3d_plot.png"), dpi=300)
plt.show()

Residual(True−Predicted) plots are incredibly insightful for regression models. Plotting residuals against the true values (or predicted values) can reveal patterns of error:

- Homoscedasticity: residuals should be randomly scattered around zero.
If they form a cone shape, it suggests that the model's error varies with the magnitude of the prediction.
- Bias: If residuals are consistently positive or negative in a certain range, it indicates bias (systematic underprediction or overprediction).


In [None]:
# Residual plot for P
fig, axes = plt.subplots(1, 3, figsize=(20, 7), sharey=True)
fig.suptitle('Residuals Plot for Pressure Prediction', fontsize=16)

# Training set
residuals_train_P = P_train_true - P_train_pred
axes[0].scatter(P_train_pred, residuals_train_P, alpha=0.6, color='limegreen')
axes[0].axhline(y=0, color='r', linestyle='--')
axes[0].set_xlabel('Predicted Training Values')
axes[0].set_ylabel('Residuals (True - Predicted)')
axes[0].set_title('Training Set')

# Validation set
residuals_val_P = P_val_true - P_val_pred
axes[1].scatter(P_val_pred, residuals_val_P, alpha=0.6, color='green')
axes[1].axhline(y=0, color='r', linestyle='--')
axes[1].set_xlabel('Predicted Validation Values')
axes[1].set_title('Validation Set')

# Testing set
residuals_test_P = P_test_true - P_test_pred
axes[2].scatter(P_test_pred, residuals_test_P, alpha=0.6, color='darkgreen')
axes[2].axhline(y=0, color='r', linestyle='--')
axes[2].set_xlabel('Predicted Testing Values')
axes[2].set_title('Testing Set')

plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.savefig(os.path.join(base_path, "residuals_plot_P.png"), dpi=300)
plt.show()

In [None]:
# Residual plot for S
fig, axes = plt.subplots(1, 3, figsize=(20, 7), sharey=True)
fig.suptitle('Residuals Plot for Entropy Prediction', fontsize=16)

# Training set
residuals_train_S = S_train_true - S_train_pred
axes[0].scatter(S_train_pred, residuals_train_S, alpha=0.6, color='royalblue')
axes[0].axhline(y=0, color='r', linestyle='--')
axes[0].set_xlabel('Predicted Training Values')
axes[0].set_ylabel('Residuals (True - Predicted)')
axes[0].set_title('Training Set')

# Validation set
residuals_val_S = S_val_true - S_val_pred
axes[1].scatter(S_val_pred, residuals_val_S, alpha=0.6, color='cornflowerblue')
axes[1].axhline(y=0, color='r', linestyle='--')
axes[1].set_xlabel('Predicted Validation Values')
axes[1].set_title('Validation Set')

# Testing set
residuals_test_S = S_test_pred - S_test_pred
axes[2].scatter(S_test_true, residuals_test_S, alpha=0.6, color='midnightblue')
axes[2].axhline(y=0, color='r', linestyle='--')
axes[2].set_xlabel('Predicted Testing Values')
axes[2].set_title('Testing Set')

plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.savefig(os.path.join(base_path, "residuals_plot_S.png"), dpi=300)
plt.show()


In [None]:
fig, axes = plt.subplots(2, 3, figsize=(24, 10), sharey='row')
fig.suptitle('Residuals for Pressure and Entropy Predictions (Colored by nb)', fontsize=18)

nb_train = 10 ** in_train[:, 1]
nb_val = 10 ** in_val[:, 1]
nb_test = 10 ** in_test[:, 1]

# --- Pressure Residuals ---
sc0 = axes[0, 0].scatter(P_train_true, P_train_true - P_train_pred, c=nb_train, cmap='viridis', alpha=0.6)
axes[0, 0].axhline(0, color='red', linestyle='--')
axes[0, 0].set_title('Training Set (P)')
axes[0, 0].set_ylabel('Residuals P (True - Predicted)')

sc1 = axes[0, 1].scatter(P_val_true, P_val_true - P_val_pred, c=nb_val, cmap='viridis', alpha=0.6)
axes[0, 1].axhline(0, color='red', linestyle='--')
axes[0, 1].set_title('Validation Set (P)')

sc2 = axes[0, 2].scatter(P_test_true, P_test_true - P_test_pred, c=nb_test, cmap='viridis', alpha=0.6)
axes[0, 2].axhline(0, color='red', linestyle='--')
axes[0, 2].set_title('Testing Set (P)')

# --- Entropy Residuals ---
sc3 = axes[1, 0].scatter(S_train_true, S_train_true - S_train_pred, c=nb_train, cmap='viridis', alpha=0.6)
axes[1, 0].axhline(0, color='red', linestyle='--')
axes[1, 0].set_title('Training Set (S)')
axes[1, 0].set_xlabel('Tabulated Training Values')
axes[1, 0].set_ylabel('Residuals S (True - Predicted)')

sc4 = axes[1, 1].scatter(S_val_true, S_val_true - S_val_pred, c=nb_val, cmap='viridis', alpha=0.6)
axes[1, 1].axhline(0, color='red', linestyle='--')
axes[1, 1].set_title('Validation Set (S)')
axes[1, 1].set_xlabel('Tabulated Validation Values')

sc5 = axes[1, 2].scatter(S_test_true, S_test_true - S_test_pred, c=nb_test, cmap='viridis', alpha=0.6)
axes[1, 2].axhline(0, color='red', linestyle='--')
axes[1, 2].set_title('Testing Set (S)')
axes[1, 2].set_xlabel('Tabulated Testing Values')

# Colorbar for nb
cbar = fig.colorbar(sc5, ax=axes.ravel().tolist(), label='Baryon Density (nb)', shrink=0.95, pad=0.01)
cbar.ax.set_ylabel('Baryon Density (nb)', rotation=270, labelpad=15)

plt.tight_layout(rect=[0, 1.1, 1, 0.95])
plt.savefig(os.path.join(base_path, "residuals_combined_colored_by_nb.png"), dpi=300)
plt.show()


## INTERPOLATION

Now that i've found a model that performs as hoped, the whole goal of my interpolation nn is interpolating from a tabulated EoS of Neutron Stars, where the known x variables are temperature (T) and baryon number (n_b) and the y variables are pressure (P) and Entropy (S).

I do know the values of both the x and y variables because i take them from given tabulated EoS in compose archive. However, i need to prove that my interpolating algorithm is good, this means i should stick to a small region of my range and do an interpolation, taking lets say x_1 and x_3 that i do know and interpolating y_2 for x_2 that even if i also know both values i write a code as if i didn't, in order to prove if it interpolates well.

1. Select Known Points:
Let’s assume we have points (x_1) (T1, n1) and (x_3) (T3, n3) that we know, where (x_2) will be the intermediate point.
Define (x_2) (T2, n2) within the range of the known points.

2. Interpolation Procedure
Treat the prediction for (x_2) (T2, n2) as an unknown during your actual interpolation process.
During this phase, we use the model to predict (P_2) and (S_2).

3. Evaluation
Calculate the difference between predicted (P_2) and (S_2) from the model and the exact known values from the tabulated EoS.
Metrics like Mean Absolute Error (MAE) or Mean Squared Error (MSE) to quantify how close the interpolated values are to the actual known values.

#### NN AND BSPLINE COMPARISION

In [None]:
#######################################################
# NN INTERPOLATION
#######################################################
def interpolation_test_nn(model, inputs_set, outputs_set, scaler_P, scaler_S, device, indices, save_dir):
    model.eval()
    os.makedirs(save_dir, exist_ok=True)
    metrics = []
    start_time = time.time()

    def inv_log_scaled(values, scaler):
      values = np.array(values).reshape(-1, 1)
      return scaler.inverse_transform(values).flatten()

    for i, center_index in enumerate(indices):
        if center_index < 1 or center_index >= len(inputs_set) - 1:
            continue

        x1 = in_test[center_index - 1]
        x2 = in_test[center_index]
        x3 = in_test[center_index + 1]

        y1 = out_test[center_index - 1]
        y2_true = out_test[center_index]
        y3 = out_test[center_index + 1]

        x_tensor = torch.FloatTensor(x2).unsqueeze(0).to(device)
        model_eval
        with torch.no_grad():
            y2_pred = model(x_tensor).cpu().numpy()[0]

        # Denormalize
        P_true = 10 ** np.array([y1[0], y2_true[0], y3[0]])
        S_true = 10 ** np.array([y1[1], y2_true[1], y3[1]])
        P_pred = 10 ** inv_log_scaled([y2_pred[0]], scaler_P)[0]
        S_pred = 10 ** inv_log_scaled([y2_pred[1]], scaler_S)[0]

        # Metrics
        mse_P = mean_squared_error([P_true[1]], [P_pred])
        mae_P = mean_absolute_error([P_true[1]], [P_pred])
        mse_S = mean_squared_error([S_true[1]], [S_pred])
        mae_S = mean_absolute_error([S_true[1]], [S_pred])

        metrics.append({
            "method": "NN",
            "index": int(center_index),
            "mse_P": mse_P,
            "mae_P": mae_P,
            "mse_S": mse_S,
            "mae_S": mae_S
        })

        # Plot
        fig, axs = plt.subplots(1, 2, figsize=(12, 5))
        x_labels = ['x1', 'x2', 'x3']
        x_ticks = np.arange(3)

        axs[0].plot(x_ticks, P_true, label='True P', marker='o', color='green')
        axs[0].scatter(x_ticks[1], P_pred, label='Predicted P', marker='x', s=100, color='red')
        axs[0].set_title('Pressure (P)')
        axs[0].set_xticks(x_ticks)
        axs[0].set_xticklabels(x_labels)
        axs[0].grid(True)
        axs[0].legend()

        axs[1].plot(x_ticks, S_true, label='True S', marker='o', color='blue')
        axs[1].scatter(x_ticks[1], S_pred, label='Predicted S', marker='x', s=100, color='orange')
        axs[1].set_title('Entropy (S)')
        axs[1].set_xticks(x_ticks)
        axs[1].set_xticklabels(x_labels)
        axs[1].grid(True)
        axs[1].legend()

        plt.suptitle(f"NN Interpolation Triplet Test #{i+1} (Index {center_index})", fontsize=14)
        plt.tight_layout()
        filename = f"NN_triplet_{i+1}_index_{center_index}.png"
        plt.savefig(os.path.join(save_dir, filename), dpi=300)
        plt.close()

    elapsed = time.time() - start_time
    for m in metrics:
        m["elapsed_sec"] = elapsed
    return metrics

In [None]:
#######################################################
# b-SPLINES INTERPOLATION
#######################################################
def interpolation_test_bspline(in_train, out_train, in_test, out_test, indices, save_dir):
    os.makedirs(save_dir, exist_ok=True)

    x = in_train[:, 0]
    y = in_train[:, 1]
    z_P = out_train[:, 0]
    z_S = out_train[:, 1]

    bspline_P = SmoothBivariateSpline(x, y, z_P, kx=3, ky=3)
    bspline_S = SmoothBivariateSpline(x, y, z_S, kx=3, ky=3)

    metrics = []
    start_time = time.time()

    for i, center_index in enumerate(indices):
        if center_index < 1 or center_index >= len(in_test) - 1:
            continue

        gap=10

        x1 = in_test[center_index - gap]
        x2 = in_test[center_index]
        x3 = in_test[center_index + gap]

        y1 = out_test[center_index - gap]
        y2_true = out_test[center_index]
        y3 = out_test[center_index + gap]

        # Predicions for middle point
        P_pred_log = bspline_P.ev(x2[0], x2[1])
        S_pred_log = bspline_S.ev(x2[0], x2[1])

        # Inverse log transformation to get real phyisical values
        P_true = 10 ** np.array([y1[0], y2_true[0], y3[0]])
        S_true = 10 ** np.array([y1[1], y2_true[1], y3[1]])
        P_pred = 10 ** P_pred_log
        S_pred = 10 ** S_pred_log

        # Metrics
        mse_P = mean_squared_error([P_true[1]], [P_pred])
        mae_P = mean_absolute_error([P_true[1]], [P_pred])
        mse_S = mean_squared_error([S_true[1]], [S_pred])
        mae_S = mean_absolute_error([S_true[1]], [S_pred])

        metrics.append({
            "method": "B-Spline",
            "index": int(center_index),
            "mse_P": mse_P,
            "mae_P": mae_P,
            "mse_S": mse_S,
            "mae_S": mae_S
        })

        # Plot
        fig, axs = plt.subplots(1, 2, figsize=(12, 5))
        x_labels = ['x1', 'x2', 'x3']
        x_ticks = np.arange(3)

        axs[0].plot(x_ticks, P_true, label='True P', marker='o', color='green')
        axs[0].scatter(x_ticks[1], P_pred, label='Predicted P', marker='x', s=100, color='red')
        axs[0].set_title('Pressure (P)')
        axs[0].set_xticks(x_ticks)
        axs[0].set_xticklabels(x_labels)
        axs[0].grid(True)
        axs[0].legend()
        axs[0].ticklabel_format(axis='y', style='sci', scilimits=(-3, 3)) # Scientific notation for y-axis

        axs[1].plot(x_ticks, S_true, label='True S', marker='o', color='blue')
        axs[1].scatter(x_ticks[1], S_pred, label='Predicted S', marker='x', s=100, color='orange')
        axs[1].set_title('Entropy (S)')
        axs[1].set_xticks(x_ticks)
        axs[1].set_xticklabels(x_labels)
        axs[1].grid(True)
        axs[1].legend()
        axs[0].ticklabel_format(axis='y', style='sci', scilimits=(-3, 3)) # Scientific notation for y-axis

        plt.suptitle(f"B-Spline Interpolation Triplet Test #{i+1} (Index {center_index})", fontsize=14)
        plt.tight_layout()
        filename = f"SPLINE_triplet_{i+1}_index_{center_index}.png"
        plt.savefig(os.path.join(save_dir, filename), dpi=300)
        plt.close()

    elapsed = time.time() - start_time
    for m in metrics:
        m["elapsed_sec"] = elapsed

    return metrics

In [None]:
def compare_results(metrics_nn, metrics_spline):
    print("\n--- COMPARE INTERPOLATION METHODS ---")
    time_nn = np.mean([m["elapsed_sec"] for m in metrics_nn])
    time_sp = np.mean([m["elapsed_sec"] for m in metrics_spline])
    print(f"⏱️  Average timer per execution - NN: {time_nn:.2f}s | B-Spline: {time_sp:.2f}s\n")

    print("Triplet | MSE_P (NN / BSpl) | MAE_P (NN / BSpl) | MSE_S (NN / BSpl) | MAE_S (NN / BSpl)")
    print("-" * 75)

    for i, (m_nn, m_sp) in enumerate(zip(metrics_nn, metrics_spline)):
        print(f"   {i+1}    | {m_nn['mse_P']:.2e} / {m_sp['mse_P']:.2e} |"
              f" {m_nn['mae_P']:.2e} / {m_sp['mae_P']:.2e} |"
              f" {m_nn['mse_S']:.2e} / {m_sp['mse_S']:.2e} |"
              f" {m_nn['mae_S']:.2e} / {m_sp['mae_S']:.2e}")

In [None]:
# Upload the model and instantiate
top_path = "/content/drive/My Drive/Colab Notebooks/Random/top1"
config_path = os.path.join(top_path, "config.json")
weights_path = os.path.join(top_path, "model_top1.pth")

def load_activation_fn(activation_str):
    activation_map = {
        "ReLU": nn.ReLU,
        "LeakyReLU": nn.LeakyReLU,
        "SiLU": nn.SiLU
    }

    if "LeakyReLU" in activation_str:
        return activation_map["LeakyReLU"]
    elif "ReLU" in activation_str:
        return activation_map["ReLU"]
    elif "SiLU" in activation_str:
        return activation_map["SiLU"]
    else:
        raise ValueError(f"Activation function '{activation_str}' not recognized.")

def load_model_from_config(config_path, in_size, out_size):
    with open(config_path, "r") as f:
        config = json.load(f)
    hidden_layer_sizes = config.get("hidden_layer_sizes")
    dropout = config.get("dropout")
    activation_str = config.get("activation_fn")
    activation_cls = load_activation_fn(activation_str)

    model = Interpolation(in_size, out_size, hidden_layer_sizes, dropout, activation_cls)
    return model

in_size = 2
out_size = 2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = load_model_from_config(config_path, in_size, out_size)
model.load_state_dict(torch.load(weights_path, map_location=device))
model.to(device)

# Same index for both
N = len(in_test)
np.random.seed(42)
indices = np.sort(np.random.randint(50, N - 50, size=5))
print(indices)

dir_nn = "/content/drive/My Drive/Colab Notebooks/Random/top1"
os.makedirs(dir_nn, exist_ok=True)
dir_spl = "/content/drive/My Drive/Colab Notebooks/Random/top1"
os.makedirs(dir_spl, exist_ok=True)

In [None]:
# 1. Interpolación con red neuronal
metrics_nn = interpolation_test_nn(
    model=model,
    inputs_set=in_test,
    outputs_set=out_test,
    scaler_P=scaler_P,
    scaler_S=scaler_S,
    device=device,
    indices=indices,
    save_dir=dir_nn
)
for metric in metrics_nn:
    print(metric)


# 2. Interpolación con B-Spline
metrics_spline = interpolation_test_bspline(
    in_train=in_train,
    out_train=out_train,
    in_test=in_test,
    out_test=out_test,
    indices=indices,
    save_dir=dir_spl
)
for metric in metrics_spline:
    print(metric)

# Guardar metrics
save_dir = "/content/drive/My Drive/Colab Notebooks/Random/top1"
os.makedirs(save_dir, exist_ok=True)

df_all = pd.DataFrame(metrics_nn + metrics_spline)
csv_path = os.path.join(save_dir, "interpolation_metrics_comparison.csv")
df_all.to_csv(csv_path, index=False)

# 3. Comparar resultados
compare_results(metrics_nn, metrics_spline)

## NN LOCAL INTERPOLATION

### Prepare triplets and train

In [None]:
gap = 1
# Training triplets
in_train_triplets = torch.stack([
    torch.cat((in_train_tensor[i - gap], in_train_tensor[i + gap]))
    for i in range(gap, len(in_train_tensor) - gap)
])
out_train_targets = out_train_tensor[gap : len(out_train_tensor) - gap]

# Validation triplets
in_val_triplets = torch.stack([
    torch.cat((in_val_tensor[i - gap], in_val_tensor[i + gap]))
    for i in range(gap, len(in_val_tensor) - gap)
])
out_val_targets = out_val_tensor[gap : len(out_val_tensor) - gap]

In [None]:
# Define some necessary functions for triplet interpolation
# Function for performing inverse transform to predictions to get physical values
def inv_log_scaled(values, scaler):
    real_val = scaler.inverse_transform(np.array(values).reshape(-1, 1)).flatten()
    physical_val = 10**real_val
    return physical_val

# Get triplet indices
def get_interpolation_indices(center_index, data_length, gap):
    idx1 = center_index - gap
    idx3 = center_index + gap
    if idx1 < 0 or idx3 >= data_length:
        return None
    return idx1, center_index, idx3

# Function to load activation from string
def load_activation_fn(activation_str):
    activation_map = {
        "ReLU": nn.ReLU,
        "LeakyReLU": nn.LeakyReLU,
        "SiLU": nn.SiLU
    }

    if "LeakyReLU" in activation_str:
        return activation_map["LeakyReLU"]
    elif "ReLU" in activation_str:
        return activation_map["ReLU"]
    elif "SiLU" in activation_str:
        return activation_map["SiLU"]
    else:
        raise ValueError(f"Activation function '{activation_str}' not recognized.")

In [None]:
top_path = "/content/drive/My Drive/Colab Notebooks/Random/top1"
output_dir  = os.path.join(top_path, "InterpolationTriplet_Training")
config_path = os.path.join(top_path, "config.json")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

with open(config_path, "r") as f:
    top1_config = json.load(f)
hidden_sizes  = top1_config.get("hidden_layer_sizes")
dropout  = top1_config.get("dropout")
activation_str  = top1_config.get("activation_fn")
activation_fn  = load_activation_fn(activation_str)

lr = 0.00012742749857031334


input_triplet = 4 # concate x1, x3 = [T1, nb1, T3, nb3]
output_triplet = 2 # P, S

# Instantiate the model, without  previous weights just hiperparameters !!!
triplet_model = Interpolation(in_size = input_triplet,
                              out_size = output_triplet,
                              hidden_layer_sizes=hidden_sizes,
                              dropout=dropout,
                              activation=activation_fn).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(triplet_model.parameters(), lr)

In [None]:
def train_triplet_model(model, in_train, out_train, in_val, out_val,
                        criterion, optimizer, device, epochs, save_path):
    best_val_loss = float("inf")
    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        preds_train = model(in_train.to(device))
        loss_train = criterion(preds_train, out_train.to(device))

        loss_train.backward()
        optimizer.step()

        model.eval()
        with torch.no_grad():
            preds_val = model(in_val.to(device))
            loss_val = criterion(preds_val, out_val.to(device))

        train_losses.append(loss_train.item())
        val_losses.append(loss_val.item())

        print(f"Epoch {epoch+1:03d} | Train Loss: {loss_train.item():.5e} | Val Loss: {loss_val.item():.5e}")

        if loss_val.item() < best_val_loss:
            best_val_loss = loss_val.item()
            torch.save(model.state_dict(), save_path)
            print(f"  ✅ Best model saved (val_loss = {best_val_loss:.5e})")

    return train_losses, val_losses

In [None]:
save_path  = os.path.join(output_dir, "best_triplet_model.pth")

train_losses, val_losses = train_triplet_model(
    model=triplet_model,
    in_train=in_train_triplets,
    out_train=out_train_targets,
    in_val=in_val_triplets,
    out_val=out_val_targets,
    criterion=criterion,
    optimizer=optimizer,
    device=device,
    epochs=150,
    save_path=save_path
)

losses_df = pd.DataFrame({
    "epoch": list(range(1, len(train_losses) + 1)),
    "train_loss": train_losses,
    "val_loss": val_losses
})

losses_csv_path = os.path.join(output_dir, "triplet_model_losses.csv")
losses_df.to_csv(losses_csv_path, index=False)


plt.figure(figsize=(10, 6))
plt.plot(losses_df["epoch"], losses_df["train_loss"], label="Train Loss", linewidth=2)
plt.plot(losses_df["epoch"], losses_df["val_loss"], label="Validation Loss", linewidth=2)
plt.xlabel("Epoch")
plt.ylabel("Loss (MSE)")
plt.title("TripletNet Training and Validation Loss")
plt.yscale("log")  # log scale is useful if loss varies a lot
plt.grid(True, which='both', linestyle='--', alpha=0.5)
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(output_dir, "triplet_model_loss_curve.png"), dpi=300)
plt.show()

### NN and B-SPlines Interpolation

In [None]:
# #######################################################
# INTERPOLATION FOR THE TRIPLET
# #######################################################
def Interpolation_Triplet_test(model, inputs_set, outputs_set, scaler_P, scaler_S, device, indices, save_dir, gap):
    model.eval()
    os.makedirs(save_dir, exist_ok=True)
    metrics = []

    model = model.to(device)

    for i, center_index in enumerate(indices):
        idx1, idx2, idx3 = get_interpolation_indices(center_index, len(inputs_set), gap)

        if idx1 is None:
            print(f"Saltando índice {center_index} (gap={gap}) debido a límites.")
            continue

        x1 = inputs_set[idx1]
        x3 = inputs_set[idx3]

        y1 = outputs_set[idx1]
        y2_true = outputs_set[idx2]
        y3 = outputs_set[idx3]

        # Concatenate x1 y x3 for input in the Interpolation NN
        triplet_input_tensor = torch.cat([
            torch.FloatTensor(x1).unsqueeze(0),
            torch.FloatTensor(x3).unsqueeze(0)
        ], dim=1).to(device)

        # Real outputs
        P_true = 10 ** np.array([y1[0], y2_true[0], y3[0]])
        S_true = 10 ** np.array([y1[1], y2_true[1], y3[1]])

        # Predictions
        with torch.no_grad():
            y2_pred_scaled = model(triplet_input_tensor).cpu().numpy()[0]

        P2_pred = inv_log_scaled(y2_pred_scaled[0], scaler_P)
        S2_pred = inv_log_scaled(y2_pred_scaled[1], scaler_S)

        # Metrics
        mse_P = mean_squared_error([P_true[1]], [P2_pred])
        mae_P = mean_absolute_error([P_true[1]], [P2_pred])
        mse_S = mean_squared_error([S_true[1]], [S2_pred])
        mae_S = mean_absolute_error([S_true[1]], [S2_pred])

        metrics.append({
            "method": "TripletNet", "index": int(center_index), "gap": gap,
            "mse_P": mse_P, "mae_P": mae_P,
            "mse_S": mse_S, "mae_S": mae_S
        })

        # Interpolation graph
        fig, axs = plt.subplots(1, 2, figsize=(12, 5))
        x_labels_plot = [f'Idx {idx1}', f'Idx {idx2}', f'Idx {idx3}']
        x_ticks = np.arange(3)

        axs[0].plot(x_ticks, P_true, label='True P', marker='o', color='green')
        axs[0].scatter(x_ticks[1], P2_pred, label='Predicted P', marker='x', s=100, color='red')
        axs[0].set_title('Pressure (P) [MeV/fm³]')
        axs[0].set_xticks(x_ticks)
        axs[0].set_xticklabels(x_labels_plot)
        axs[0].grid(True)
        axs[0].legend()
        axs[0].ticklabel_format(axis='y', style='sci', scilimits=(-3, 3))

        axs[1].plot(x_ticks, S_true, label='True S', marker='o', color='blue')
        axs[1].scatter(x_ticks[1], S2_pred, label='Predicted S', marker='x', s=100, color='orange')
        axs[1].set_title('Entropy (S) [1/fm³]')
        axs[1].set_xticks(x_ticks)
        axs[1].set_xticklabels(x_labels_plot)
        axs[1].grid(True)
        axs[1].legend()
        axs[1].ticklabel_format(axis='y', style='sci', scilimits=(-3, 3))

        plt.suptitle(f"NN Interpolation Triplet Test #{i+1} (Index {center_index})", fontsize=14)
        plt.tight_layout()
        filename = f"TripletNet_triplet_{i+1}_index_{center_index}_gap_{gap}.png"
        plt.savefig(os.path.join(save_dir, filename), dpi=300)
        plt.close()

    return metrics

In [None]:
#######################################################
# b-SPLINES INTERPOLATION
#######################################################
def interpolation_test_bspline(in_train, out_train, in_test, out_test, indices, save_dir, gap):
    os.makedirs(save_dir, exist_ok=True)

    x = in_train[:, 0]
    y = in_train[:, 1]
    z_P = out_train[:, 0]
    z_S = out_train[:, 1]

    bspline_P = SmoothBivariateSpline(x, y, z_P, kx=3, ky=3)
    bspline_S = SmoothBivariateSpline(x, y, z_S, kx=3, ky=3)

    metrics = []
    start_time = time.time()

    for i, center_index in enumerate(indices):
        idx1, idx2, idx3 = get_interpolation_indices(center_index, len(in_test), gap)

        if idx1 is None:
            print(f"Saltando índice {center_index} (gap={gap}) debido a límites.")
            continue

        x1 = in_test[idx1]
        x2 = in_test[idx2]
        x3 = in_test[idx3]

        y1 = in_test[idx1]
        y2_true = in_test[idx2]
        y3 = in_test[idx3]

        # Predicción en el centro
        P_pred_log = bspline_P.ev(x2[0], x2[1])
        S_pred_log = bspline_S.ev(x2[0], x2[1])

        # Valores verdaderos en el triplete (ya están en log10, los destransformamos)
        P_true = 10 ** np.array([y1[0], y2_true[0], y3[0]])
        S_true = 10 ** np.array([y1[1], y2_true[1], y3[1]])
        P_pred = 10 ** P_pred_log
        S_pred = 10 ** S_pred_log

        # Métricas
        mse_P = mean_squared_error([P_true[1]], [P_pred])
        mae_P = mean_absolute_error([P_true[1]], [P_pred])
        mse_S = mean_squared_error([S_true[1]], [S_pred])
        mae_S = mean_absolute_error([S_true[1]], [S_pred])

        metrics.append({
            "method": "B-Spline",
            "index": int(center_index),
            "mse_P": mse_P,
            "mae_P": mae_P,
            "mse_S": mse_S,
            "mae_S": mae_S
        })

        # === Plot (idéntico al de NN) ===
        fig, axs = plt.subplots(1, 2, figsize=(12, 5))
        x_labels_plot = [f'Idx {idx1}', f'Idx {idx2}', f'Idx {idx3}']
        x_ticks = np.arange(3)

        axs[0].plot(x_ticks, P_true, label='True P', marker='o', color='green')
        axs[0].scatter(x_ticks[1], P_pred, label='Predicted P', marker='x', s=100, color='red')
        axs[0].set_title('Pressure (P) [MeV/fm³]')
        axs[0].set_xticks(x_ticks)
        axs[0].set_xticklabels(x_labels_plot)
        axs[0].grid(True)
        axs[0].legend()
        axs[0].ticklabel_format(axis='y', style='sci', scilimits=(-3, 3)) # Scientific notation for y-axis

        axs[1].plot(x_ticks, S_true, label='True S', marker='o', color='blue')
        axs[1].scatter(x_ticks[1], S_pred, label='Predicted S', marker='x', s=100, color='orange')
        axs[1].set_title('Entropy (S) [1/fm³]')
        axs[1].set_xticks(x_ticks)
        axs[1].set_xticklabels(x_labels_plot)
        axs[1].grid(True)
        axs[1].legend()
        axs[0].ticklabel_format(axis='y', style='sci', scilimits=(-3, 3)) # Scientific notation for y-axis

        plt.suptitle(f"B-Spline Interpolation Triplet Test #{i+1} (Index {center_index})", fontsize=14)
        plt.tight_layout()
        filename = f"SPLINE_triplet_{i+1}_index_{center_index}.png"
        plt.savefig(os.path.join(save_dir, filename), dpi=300)
        plt.close()

    elapsed = time.time() - start_time
    for m in metrics:
        m["elapsed_sec"] = elapsed

    return metrics

In [None]:
# Upload the top1 model hyperparameteres
output_dir  = "/content/drive/My Drive/Colab Notebooks/Random/top1/InterpolationTriplet_Training"
best_model_path = os.path.join(output_dir , "best_triplet_model.pth")

triplet_model.load_state_dict(torch.load(best_model_path))
triplet_model.eval()

# Define indices
N_test_samples = len(in_test_processed)
np.random.seed(42)
max_gap_for_test_selection = 10
indices = np.sort(np.random.randint(max_gap_for_test_selection, N_test_samples - max_gap_for_test_selection, size=5))
print(f"Indices for InterpolationTripletNe from test data set: {indices}")

# Define saving directory
triplet_nn_save_base_dir = os.path.join(output_dir, "TripletNet_Interpolation_Tests")
os.makedirs(triplet_nn_save_base_dir, exist_ok=True)

for current_gap in [1, 5, 10]:
    print(f"\n▶️ TripletNet and B-Spline with gap={current_gap}...")

    save_dir_gap = os.path.join(triplet_nn_save_base_dir, f"gap_{current_gap}")
    os.makedirs(save_dir_gap, exist_ok=True)

    # === Interpolación con TripletNet ===
    t0 = time.time()
    metrics_triplet_nn_gap = Interpolation_Triplet_test(
        model=triplet_model,
        inputs_set=in_test,
        outputs_set=out_test,
        scaler_P=scaler_P,
        scaler_S=scaler_S,
        device=device,
        indices=indices,
        save_dir=save_dir_gap,
        gap=current_gap
    )
    elapsed_nn = time.time() - t0

    for metric in metrics_triplet_nn_gap:
        metric["time_sec"] = elapsed_nn
        print(f"NN: {metric}")
    df_nn = pd.DataFrame(metrics_triplet_nn_gap)

    # === Interpolación con B-Spline ===
    t1 = time.time()
    metrics_spline_gap = interpolation_test_bspline(
        in_train=in_train,
        out_train=out_train,
        in_test=in_test,
        out_test=out_test,
        indices=indices,
        save_dir=save_dir_gap,
        gap=current_gap
    )
    elapsed_spline = time.time() - t1

    for metric in metrics_spline_gap:
        metric["time_sec"] = elapsed_spline
        print(f"SPLINE: {metric}")
    df_spline = pd.DataFrame(metrics_spline_gap)

    df_all = pd.concat([df_nn, df_spline], ignore_index=True)
    csv_path = os.path.join(save_dir_gap, f"metrics_triplet_gap_{current_gap}.csv")
    df_all.to_csv(csv_path, index=False)