In [1]:
import pandas as pd
import numpy as np
from tqdm import tqdm

import torch
import optuna
from sklearn.metrics import mean_absolute_error, r2_score

from regression_net import RegressionNet
from data import denormalize_columns, normalize_columns, FEATURES
from optimization import *

pd.set_option('display.max_rows', 100)

%load_ext autoreload
%autoreload 2

# black magic for reproducibility 
random_state = 42
np.random.seed(random_state)

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

# Setup

In [4]:
# Load the model and parameters from the file
saved_model_params = torch.load('trained_models/model_large_rom.pth')

# Reconstruct the model
loaded_model = RegressionNet(saved_model_params['hparams'])
loaded_model.load_state_dict(saved_model_params['state_dict'])
loaded_model.to(device)
loaded_model.eval()

feature_names = saved_model_params['feature_names']
feature_bounds = saved_model_params['feature_bounds']

target_features = ['y_ROM']
load_cases = ['Flexion', 'AxialRotation', 'Extension', 'LateralBending']
params_columns = ['C10Nucleus', 'C01Nucleus', 'C10Annulus', 'K1Annulus', 'K2Annulus', 'Kappa', 'K1Circ', 'K2Circ', 'K1Rad', 'K2Rad', 'FiberAngle', 'FiberAngleCirc', 'FiberAngleRad']

print("Model loaded with saved parameters.")

Model loaded with saved parameters.


In [5]:
df_train, df_test = pd.read_csv("datasets/Large/trainval_df_large_rom_norm.csv"), pd.read_csv("datasets/Test/test_df_large_rom_norm.csv")

Or try with a real sample from Heuer

In [None]:
# Reading the CSV data
df_real_instance = pd.read_csv("samples/ROMData_ExperimentsHeuer.csv", sep=";")

# Parse the CSV
df_real_instance = df_real_instance.melt(id_vars='Moment', var_name='LoadCase', value_name='y_ROM')
df_real_instance.rename({'Moment': 'LoadCase', 'LoadCase': 'Moment'}, axis=1, inplace=True)
df_real_instance['LoadCase'] = df_real_instance['LoadCase'].apply(lambda x: load_cases.index(x))
df_real_instance['Moment'] = df_real_instance['Moment'].astype('float')
df_real_instance = df_real_instance[df_real_instance['Moment'] != 0]
df_real_instance['y_IDP'] = 0

df_real_instance = normalize_columns(df_real_instance, feature_bounds, train=False, keep_target=True)
df_real_instance

Or real sample from experiments

In [None]:
# Reading the CSV data
df_exp = pd.read_csv("datasets/ValidationCalibration/Nicolini_L1L2/ExperimentalResultsIVDNic3.txt", sep="\t")
df_exp.drop([0,1], axis=0, inplace=True)
df_exp.rename({'Unnamed: 0': 'Moment'}, axis=1, inplace=True)
df_exp.columns = [col.replace("(degrees)", "").replace(" ", "") if col.endswith(" (degrees)") else col for col in df_exp.columns]
df_exp = df_exp.melt(id_vars='Moment', var_name='LoadCase', value_name='y_ROM')
df_exp['LoadCase'] = df_exp['LoadCase'].apply(lambda x: load_cases.index(x))
df_exp = df_exp.astype(float)

df_exp = normalize_columns(df_exp, feature_bounds, train=False, keep_target=True)
df_exp

## Guided Search

In [17]:
# Define target and init sample
closest_config = get_rand_train_sample(df_exp, df_train, verbose=True)

df_exp = df_exp.sort_values(['LoadCase', 'Moment'], ignore_index=True)

target = df_exp

# Use all rows from the target, set with start config
init_sample = target.copy()
for col in params_columns:
    init_sample[col] = closest_config.loc[col]
init_sample = init_sample[FEATURES + ['y_ROM']]

# sort closest_config by the values of the columns LoadCase and Moment
init_sample = init_sample.sort_values(['LoadCase', 'Moment'], ignore_index=True)
diff_mask = [df_exp['Moment'] == moment for moment in df_exp['Moment'].unique()]

init_sample.drop(['y_ROM'], axis=1, inplace=True)
target = target[['y_ROM']]

Random configuration and distance: 700 0.12709513105247444


ADAM

In [20]:
# Convert relevant columns to tensor
search_tensor = torch.tensor(init_sample.values, dtype=torch.float, requires_grad=True, device=device)
target_tensor = torch.tensor(target.values, dtype=torch.float, device=device)

# Hyperparams
lr = 0.09715010540129915
num_steps = 2250
lambda_penalty = 1.0142772931593078

pos_bound = [FEATURES.index(col) for col in ['K2Annulus', 'K1Annulus', 'Kappa', 'K1Circ', 'K2Circ', 'K1Rad', 'K2Rad']]
neg_bound = [FEATURES.index(col) for col in ['C10Nucleus', 'C01Nucleus', 'C10Annulus', 'K2Annulus', 'K1Annulus', 'Kappa', 'K1Circ', 'K2Circ', 'K1Rad', 'K2Rad', 'FiberAngle', 'FiberAngleCirc', 'FiberAngleRad']]

original_features = search_tensor.detach().clone()

# Define an optimizer for the input sample
optimizer = torch.optim.Adam([search_tensor], lr=lr)

for step in range(num_steps):
    optimizer.zero_grad()
    
    # Forward pass: compute the predicted outputs
    predictions = loaded_model(search_tensor)
    
    # Compute the loss between the predicted and target values, consider only ROM
    main_loss = torch.nn.functional.l1_loss(predictions, target_tensor)

    # Compute the penalty for out-of-range values (only for updated features)
    penalty = out_of_range_loss(search_tensor[:, neg_bound], bound='negative',) + \
        out_of_range_loss(search_tensor[:, pos_bound], bound='positive',)


    # Combine the losses
    total_loss = main_loss #+ lambda_penalty * penalty

    # Backward pass: compute gradient of the loss with respect to the inputs
    total_loss.backward()
    
    # Zero out the gradients for LoadCase and Moment
    search_tensor.grad[:, 0:2] = 0

    # Update the inputs based on the gradients. Use mean acorss all Cases and Moments
    search_tensor.grad = search_tensor.grad.mean(dim=0, keepdim=True).expand_as(search_tensor)
    
    optimizer.step()

    with torch.no_grad():
        # projection step
        search_tensor[:, neg_bound] = torch.clamp(search_tensor[:, neg_bound], min=0)
        search_tensor[:, pos_bound] = torch.clamp(search_tensor[:, pos_bound], max=1)
        
        predictions = loaded_model(search_tensor)

    # Check for improvement
    mae = mean_absolute_error(target_tensor.detach().cpu().numpy(), predictions.detach().cpu().numpy())


Evaluate Reults

In [None]:
best_tensor = search_tensor.detach().clone()

lc_masks = [lc == init_sample.values[:,0] for lc in np.unique(init_sample.values[:,0])]

# get the final predictions
denom_found = denormalize_columns(pd.DataFrame(best_tensor.cpu(), columns=FEATURES), feature_bounds, also_target=True)

found_config = denom_found[params_columns].iloc[0]
print("Found configuration:")
print(found_config)

# Evaluate the results per LoadCase
with torch.no_grad():
    y_true = target.values
    y_pred = loaded_model(best_tensor).detach().cpu().numpy()

y_true = y_true * (feature_bounds['y_ROM']['max'] - feature_bounds['y_ROM']['min']) + feature_bounds['y_ROM']['min']
y_pred = y_pred * (feature_bounds['y_ROM']['max'] - feature_bounds['y_ROM']['min']) + feature_bounds['y_ROM']['min']

print("Scores: [R2, MAE]")

r2_scores = [r2_score(y_true[mask], y_pred[mask]) for mask in lc_masks]
mae_scores = [mean_absolute_error(y_true[mask], y_pred[mask]) for mask in lc_masks]
for i in range(len(lc_masks)):
    print(f"    {load_cases[i]}: {r2_scores[i]:.4f}, {mae_scores[i]:.4f}")
print(f"Mean: {np.mean(r2_scores):.4f}, {np.mean(mae_scores):.4f}")

neg_bounds = out_of_range_loss(search_tensor[:, neg_bound], bound='negative')
pos_bounds = out_of_range_loss(search_tensor[:, pos_bound], bound='positive')
print(f"Negative bounds: {neg_bounds.item():.4f}, Positive bounds: {pos_bounds.item():.4f}")

# learned weights

In [None]:
# Reading the CSV data
df_exp = pd.read_csv("datasets/ValidationCalibration/Nicolini_L1L2/ExperimentalResultsIVDNic1.txt", sep="\t")
df_exp.drop([0,1], axis=0, inplace=True)
df_exp.rename({'Unnamed: 0': 'Moment'}, axis=1, inplace=True)
df_exp.columns = [col.replace("(degrees)", "").replace(" ", "") if col.endswith(" (degrees)") else col for col in df_exp.columns]
df_exp = df_exp.melt(id_vars='Moment', var_name='LoadCase', value_name='y_ROM')
df_exp['LoadCase'] = df_exp['LoadCase'].apply(lambda x: load_cases.index(x))
df_exp = df_exp.astype(float)

# decide if to take all moments or only trained ones
#df_exp = df_exp[df_exp['Moment'] <= 5]

df_exp = normalize_columns(df_exp, feature_bounds, train=False, keep_target=True)
df_exp

# Define target and init sample
closest_config = get_closest_train_sample(df_exp, df_train, verbose=True)
#closest_config = get_rand_train_sample(df_exp, df_train, verbose=True)
#closest_config = get_rand_features(params_columns)

df_exp = df_exp.sort_values(['LoadCase', 'Moment'], ignore_index=True)

target = df_exp

# Use all rows from the target, set with start config
init_sample = target.copy()
for col in params_columns:
    init_sample[col] = closest_config.loc[col]
init_sample = init_sample[FEATURES + ['y_ROM']]

# sort closest_config by the values of the columns LoadCase and Moment
init_sample = init_sample.sort_values(['LoadCase', 'Moment'], ignore_index=True)
diff_mask = [df_exp['Moment'] == moment for moment in df_exp['Moment'].unique()]

init_sample.drop(['y_ROM'], axis=1, inplace=True)
target = target[['y_ROM']]

LBGFS

In [55]:
# Convert relevant columns to tensor, LBFGS need it to be contiguous
search_tensor = torch.from_numpy(init_sample.values).contiguous().float()
search_tensor.requires_grad = True

original_features = search_tensor.detach().clone()

target_tensor = torch.tensor(target.values, dtype=torch.float)

# Hyperparams
num_steps =  190
max_iter =  40
history_size =  30
lambda_penalty =  0.001516139863958438

# Define an optimizer for the input sample using LBFGS
optimizer = torch.optim.LBFGS([search_tensor], lr=lr, max_iter=max_iter, history_size=history_size)

def closure():
    optimizer.zero_grad()
    predictions = loaded_model(search_tensor)
    main_loss = torch.nn.functional.l1_loss(predictions[:, 0], target_tensor[:, 0])
    penalty = out_of_range_loss(search_tensor[:, 2:])
    loss = main_loss + lambda_penalty * penalty
    loss.backward()
    return loss

for step in range(num_steps):
    # Store the original state of search_tensor before the update
    prev_search_tensor = search_tensor.clone().detach()

    # Perform the optimization step
    optimizer.step(closure)

    # Calculate the change induced by the optimization step
    change = search_tensor - prev_search_tensor
    
    # Calculate the mean change across all samples (for all features except the first two)
    mean_change = change.mean(axis=0, keepdim=True)
    mean_change[:, 0:2] = 0
    
    # Apply the mean change to the entire tensor data
    with torch.no_grad():
        search_tensor.data = prev_search_tensor + mean_change

# Compute metrics
with torch.no_grad():
    predictions = loaded_model(search_tensor)


In [None]:
# get the final predictions
search_result_df = pd.DataFrame(search_tensor.detach().numpy(), columns=FEATURES)
denom_found = denormalize_columns(search_result_df, feature_bounds, also_target=True)

found_config = denom_found[params_columns].iloc[0]
print("Found configuration:")
print(found_config)

# evaluate the results, also per LoadCase
mae = mean_absolute_error(target_tensor.numpy()[:, 0], predictions.detach().numpy()[:, 0])
r2 = r2_score(target_tensor.numpy()[:, 0], predictions.detach().numpy()[:, 0])
print(f"Mean MAE: {mae:.4f}, R2: {r2:.4f}")


for i in range(len(load_cases)):
    mask = (denom_found.LoadCase.values == i)
    mae = mean_absolute_error(target_tensor.numpy()[mask], predictions.detach().numpy()[mask])
    r2 = r2_score(target_tensor.numpy()[mask], predictions.detach().numpy()[mask])
    print(f"{load_cases[i]}, MAE: {mae:.4f}, R2: {r2:.4f}")