In [None]:
%load_ext autoreload
%autoreload 2

import pickle
from pathlib import Path

import keras_tuner
import numpy as np
import pandas as pd
import tensorflow.keras as keras
from keras.src.layers import Dropout
from keras.src.layers import Dense
from matplotlib import pyplot as plt
from scipy.interpolate import interp1d
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from xgboost import XGBRegressor

from src.metrics.ece import ece

In [None]:
def interpolate_indexed_value_array(indexing_array: np.ndarray, value_array: np.ndarray, searched_for, strategy="linear"):
    f = interp1d(indexing_array, value_array, kind=strategy,
                 bounds_error=False, fill_value=(value_array[0], value_array[-1]))
    return f(searched_for)


def safe_unpickle_all(file):
    objects = []
    unpickler = pickle.Unpickler(file)
    while True:
        try:
            obj = unpickler.load()
            objects.append(obj)
        except EOFError:
            break  # End of file (incomplete last object)
        except Exception as e:
            print("Partial data recovered. Stopped at error:", e)
            break
    return objects

def build_nn(hp) -> keras.Model:
    model = keras.Sequential()
    model.add(keras.layers.Flatten())
    for i in range(hp.Int('num_layers', 5, 14)):
        model.add(
            Dense(
                units=hp.Int(f"units_{i}", min_value=256, max_value=1536, step=32),
                activation=hp.Choice(f"activation_{i}", ["relu", "linear"]),
            )
        )
    model.add(Dense(1, activation="relu"))
    model.add(Dropout(hp.Float("Dropout", 0.05, 0.25, sampling='log')))

    optimizer_choice = hp.Choice('optimizer', ['adam', 'rmsprop'])
    learning_rate = hp.Float('learning_rate', 1e-5, 5e-4, sampling='log')

    optimizer = None
    if optimizer_choice == 'adam':
        optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
    elif optimizer_choice == 'sgd':
        optimizer = keras.optimizers.SGD(learning_rate=learning_rate)
    elif optimizer_choice == 'rmsprop':
        optimizer = keras.optimizers.RMSprop(learning_rate=learning_rate)

    model.compile(
        optimizer=optimizer,
        loss=keras.losses.LogCosh,
        metrics=["mae", "mse"],
    )
    return model

def get_rounded_clipped_sample_size(sample_size):
    return max(15, min(10000, round(sample_size)))

def calculate_ece_on_subsets(p_pred, y_true, sample_size, num=10):
    eces = []
    for i in range(num):
        indices = np.random.choice(p_pred.shape[0], size=get_rounded_clipped_sample_size(sample_size), replace=True)
        p_pred_subset = p_pred[indices]
        y_true_subset = y_true[indices]
        eces.append(ece(p_pred_subset, y_true_subset, 15))
    return np.array(eces)
        

In [None]:
# Gather Data from different files
print("Gathering data...")
data = []
dirs = ["./data/20250524_024937", "./data/20250524_140927"]

for dir in dirs:
    files = Path(dir).glob("*.pkl")
    for file_name in files:
        print(file_name, sep=", ")
        with (open(f'{file_name}', 'rb') as file):
            try:
                content = pickle.load(file)
            except EOFError:
                print("Data is corrupted. Trying to partially recover data...")
                content = safe_unpickle_all(file)
                print(content)

            data = data + content

In [None]:
model_results = []
y_tests = []
p_tests = []
eces = []
sample_sizes = []
accuracies = []
y = []
optimal_eces = []
for result in data:
    model_result = result["model_results"]
    model_results = model_results + model_result
    
    for r in model_result:
        p_tests.append(r["p_test"])
        eces.append(r["ECEs"])
        accuracies.append(r["Accuracy"])
        y.append(r["Optimal Sample Size"])
        optimal_eces.append(r["Optimal ECE"])

    for i in range(4):
        y_tests.append(result["y_test"])
        sample_sizes.append(result["Sample Sizes"])

# Ensure numpy
y_tests = np.array(y_tests)
eces = np.array(eces)
p_tests = np.array(p_tests, dtype=np.float32)
sample_sizes = np.array(sample_sizes)
accuracies = np.array(accuracies)
y = np.array(y)
optimal_eces = np.array(optimal_eces)

mean_accuracy = np.mean(accuracies)
std_dev_accuracy = np.std(accuracies)

dist_from_eces0 = np.array([np.linalg.norm(eces[0] - eces[i]) for i in range(len(eces)) if i != 0])

In [None]:
print("y_tests shape", y_tests.shape)
print("sample sizes shape", sample_sizes.shape)
print("p_tests shape", p_tests.shape)
print("eces shape", eces.shape)
print("Mean Accuracy", mean_accuracy)
print("Std. Dev. Accuracy", std_dev_accuracy)
print("Distances from eces[0]", dist_from_eces0)
print("Mean distance from eces[0]", np.mean(dist_from_eces0))
print("Std. Dev distance from eces[0]", np.std(dist_from_eces0))

print("Mean True ECE", np.mean(np.array([result["True ECE Dists (Binned - 15 Bins)"] for result in model_results])))
print("Std. Dev True ECE", np.std(np.array([result["True ECE Dists (Binned - 15 Bins)"] for result in model_results])))

print(sample_sizes)
print(eces)

In [None]:
# Prepare Data
X = np.hstack((sample_sizes, eces, accuracies.reshape(-1, 1)))
y = np.column_stack((y, optimal_eces))

print(X)

In [None]:
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

optimal_eces_train = y_train[:, 1]
y_train = y_train[:, 0]
optimal_eces_test = y_test[:, 1]
y_test = y_test[:, 0]

X_train = X_train[:, 100:]
X_test = X_test[:, 100:]

print("X_train Shape", X_train.shape)
print("X_test Shape", X_test.shape)

print("y_train Shape", y_train.shape)
print("y_test Shape", y_test.shape)


In [None]:
# Perform Bayes Optimization
tuner = keras_tuner.BayesianOptimization(
    hypermodel=build_nn,
    objective="val_loss",
    max_trials=24,
    executions_per_trial=1,
    directory="keras_tuner_logs_bayes_logcosh_dropout_only_ece_values_ext",
    project_name="ece_neural_network_bayes2_logscosh_dropout_only_ece_values_ext",
)
tuner.search(
    X_train, y_train,
    validation_split=0.2,
    epochs=150,
    batch_size=128,
    callbacks=[keras.callbacks.EarlyStopping(patience=10)],
    verbose=1
)

best_model = tuner.get_best_models(num_models=1)[0]
y_pred_scaled = np.array(best_model.predict(X_test)).flatten()
y_pred = y_pred_scaled
print("y_pred_scaled NN", y_pred_scaled)

# Print best Hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print("Best Hyperparameters:")
for key in best_hps.values.keys():
    print(f"{key}: {best_hps.get(key)}")

"""
model = keras.Sequential()
model.add(keras.layers.Flatten())
for i in range(4):
    model.add(
        Dense(
            units=480,
            activation="relu",
        )
    )
model.add(Dense(1, activation="linear"))

learning_rate = 0.0022035143136497045

model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
    loss=keras.losses.LogCosh(),
    metrics=["mae"],
)

model.fit(X_train_scaled, y_train_scaled)
y_pred = y_scaler.inverse_transform(np.array(model.predict(X_test_scaled)).flatten().reshape(-1, 1)).ravel()
"""

print("y_pred NN", y_pred)

In [None]:
# Train Regressor
model = XGBRegressor(
    n_estimators=1000,
    max_depth=16,
    learning_rate=0.002,
    subsample=0.78,
    colsample_bytree=0.75,
    gamma=1,
    reg_alpha=0.15,
    reg_lambda=1,
    random_state=42,
    n_jobs=-1,
    verbosity=1
)
model.fit(X_train, y_train)
y_pred_scaled_regressor = np.array(model.predict(X_test)).flatten()
y_pred_regressor = y_pred_scaled_regressor
print("y_pred_scaled regressor", y_pred_scaled_regressor)
print("y_pred_regressor", y_pred_regressor)

In [None]:
# Calculate Metrics and display Dataframe
print("X_test", X_test)
X_test_eces = X_test[:, :100]
print("X_test_eces", X_test_eces)
print("Sample Sizes", sample_sizes)


simple_strategy_preds = [
    (f"{(i + 1) * 100} Samples", (sample_sizes[len(X_train):, i], X_test_eces[:, i])) for i in [0, 4, 9, 19, 49, 79, 99]]

preds = {}
for pred in simple_strategy_preds:
    preds[pred[0]] = pred[1]
    

num_ece_subsets = 100

nn_ece_on_subsets = []
regressor_ece_on_subsets = []
for i, sample_size_nn in enumerate(y_pred):
    print(f"Prediction number {i} of {len(y_pred)}")
    nn_ece_on_subsets.append(np.mean(calculate_ece_on_subsets(p_tests[i], y_tests[i], sample_size_nn, num_ece_subsets)))
    regressor_ece_on_subsets.append(np.mean(calculate_ece_on_subsets(p_tests[i], y_tests[i], y_pred_regressor[i], num_ece_subsets)))

nn_ece_on_subsets = np.array(nn_ece_on_subsets)
regressor_ece_on_subsets = np.array(regressor_ece_on_subsets)

ece_nn_interpolated = np.array([interpolate_indexed_value_array(sample_sizes[i], X_test_eces[i], sample_size) for
                                (i, sample_size) in enumerate(y_pred)])


ece_regressor_interpolated = np.array([interpolate_indexed_value_array(sample_sizes[i], X_test_eces[i], sample_size) for
                      (i, sample_size) in enumerate(y_pred_regressor)])

ece_nn_interpolated_quad = np.array([interpolate_indexed_value_array(sample_sizes[i], X_test_eces[i], sample_size, strategy="quadratic") for
                      (i, sample_size) in enumerate(y_pred)])

ece_regressor_interpolated_quad = np.array([interpolate_indexed_value_array(sample_sizes[i], X_test_eces[i], sample_size, strategy="quadratic") for
                                      (i, sample_size) in enumerate(y_pred_regressor)])
preds.update({
    "Neural Network": (
        y_pred, np.array([ece(p_tests[i][:get_rounded_clipped_sample_size(sample_size)], y_tests[i][:get_rounded_clipped_sample_size(sample_size)], n_bins=15) for
                          i, sample_size in enumerate(y_pred)])),
    "XGBRegressor": (y_pred_regressor,
                     np.array([ece(p_tests[i][:get_rounded_clipped_sample_size(sample_size)], y_tests[i][:get_rounded_clipped_sample_size(sample_size)], n_bins=15) for
                               i, sample_size in enumerate(y_pred_regressor)])),
    f"Neural Network (ECE on subsets {num_ece_subsets})": (
        y_pred, nn_ece_on_subsets),
    f"XGBRegressor (ECE on subsets {num_ece_subsets})": (
        y_pred_regressor, regressor_ece_on_subsets),
    "Neural Network (Interpolated)": (y_pred, ece_nn_interpolated),
    "XGBRegressor (Interpolated)": (y_pred_regressor, ece_regressor_interpolated),
    "Interpolated Average": ((y_pred + y_pred_regressor)/ 2, (ece_nn_interpolated + ece_regressor_interpolated) / 2),
    "Neural Network (Interpolated - Quadratic)": (y_pred, ece_nn_interpolated_quad),
    "XGBRegressor (Interpolated - Quadratic)": (y_pred_regressor, ece_regressor_interpolated_quad),
    "Interpolated (Quadratic) Average": ((y_pred + y_pred_regressor) / 2, (ece_nn_interpolated_quad + ece_regressor_interpolated_quad) / 2),
    "Y_Test": (y_test, optimal_eces_test)
})

print("Optimal ECEs", optimal_eces_test)
print("y_test", y_test)
# Compute stats
results = []
for name, (sample_sizes_, eces) in preds.items():
    print(name, sample_sizes_, eces)
    results.append({
        "Name": name,
        "Mean (Samples)": np.mean(sample_sizes_),
        "Std Dev (Samples)": np.std(sample_sizes_),
        "MSE (Samples)": mean_squared_error(y_test, sample_sizes_),
        "MAE (Samples)": mean_absolute_error(y_test, sample_sizes_),
        "R2-Score (Samples)": r2_score(y_test, sample_sizes_),
        "Mean (ECE)": np.mean(eces),
        "Std Dev (ECE)": np.std(eces),
        "MSE (ECE)": mean_squared_error(optimal_eces_test, eces),
        "MAE (ECE)": mean_absolute_error(optimal_eces_test, eces),
        "R2-Score (ECE)": r2_score(optimal_eces_test, eces),
    })

# Create DataFrame
pd.set_option('display.max_columns', None)
df = pd.DataFrame(results)
print(df) 

In [None]:
print(list(preds.keys()))

In [None]:
num_preds = len(preds)
cols = 6 
rows = int(np.ceil(num_preds / cols))

fig, axs = plt.subplots(rows, cols, figsize=(30, 4 * rows), constrained_layout=True)

axs = axs.flatten()

for i, (key, value) in enumerate(preds.items()):
    ax = axs[i]
    pred_sample_sizes = value[0]
    bin_range = (min(y_test.min(), pred_sample_sizes.min()), max(y_test.max(), pred_sample_sizes.max()))
    ax.hist(y_test, bins=50, range=bin_range, alpha=0.5, label="Optimal Sample Sizes")
    ax.hist(pred_sample_sizes, bins=50, range=bin_range, alpha=0.5, label=key)
    ax.set_title(f"{key} vs Optimal")
    ax.set_xlabel("Sample Sizes")
    ax.set_ylabel("Frequency")
    ax.grid(True, linestyle='--', alpha=0.6)
    ax.legend()

# Hide unused subplots if any
for j in range(i + 1, len(axs)):
    fig.delaxes(axs[j])

plt.suptitle("Sample Sizes - Target vs Prediction Distributions", fontsize=16)
plt.savefig("./optimal_ece_estimator_sample_sizes_distribution_comparison")
plt.show()

In [None]:
num_preds = len(preds)
cols = 6
rows = int(np.ceil(num_preds / cols))

fig, axs = plt.subplots(rows, cols, figsize=(30, 4 * rows), constrained_layout=True)

axs = axs.flatten()

for i, (key, value) in enumerate(preds.items()):
    ax = axs[i]
    pred_eces = value[1]
    bin_range = (min(optimal_eces_test.min(), pred_eces.min()), max(optimal_eces_test.max(), pred_eces.max()))
    ax.hist(optimal_eces_test, bins=50, range=bin_range, alpha=0.5, label="Optimal ECEs")
    ax.hist(pred_eces, bins=50, range=bin_range, alpha=0.5, label=key)
    ax.set_title(f"{key} vs Optimal")
    ax.set_xlabel("ECE")
    ax.set_ylabel("Frequency")
    ax.legend()

for j in range(i + 1, len(axs)):
    fig.delaxes(axs[j])

plt.suptitle("ECE - Target vs Prediction Distributions", fontsize=16)
plt.savefig("./optimal_ece_estimator_ece_distribution_comparison")
plt.show()

In [None]:
num_preds = len(preds)
cols = 6
rows = int(np.ceil(num_preds / cols))

fig, axs = plt.subplots(rows, cols, figsize=(30, 4 * rows), constrained_layout=True)
axs = axs.flatten()

for i, (key, value) in enumerate(preds.items()):
    ax = axs[i]
    if len(value) > 1:
        ax.scatter(value[1], optimal_eces_test)
        ax.plot(optimal_eces_test, optimal_eces_test, c='red')
        ax.set_title(f"{key} vs Optimal")
        ax.set_xlabel("Prediction")
        ax.set_ylabel("Optimal ECE")

for j in range(i + 1, len(axs)):
    fig.delaxes(axs[j])

plt.suptitle("Target vs Prediction Plot", fontsize=16)
plt.savefig("./optimal_ece_estimator_reliability_diagrams")
plt.show()

In [None]:
print(y_test/ 15)

In [None]:
plt.title("Calculated vs. Interpolated ECE")
plt.bar(sample_sizes[0], preds[f"Neural Network (ECE on subsets {num_ece_subsets})"], bins=50, label="Neural Network (ECE on subsets {num_ece_subsets})")
plt.bar(sample_sizes[0], preds["Neural Network (Interpolated)"], bins=50,
         label="Neural Network (Interpolated)")
plt.xlabel("sample sizes")
plt.ylabel("ECE")

