In [1]:
import tensorflow as tf
from tensorflow.keras.applications import InceptionV3
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import AdamW
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.saving import register_keras_serializable
import tensorflow_datasets as tfds

import os
import pickle
import numpy as np
from tqdm import tqdm

from src.pmi_estimators import train_critic_model, neural_pmi
from src.psi_estimators import psi_gaussian_train, psi_gaussian_val_class
from src.pvi_estimators import train_pvi_null_model, neural_pvi_class, neural_pvi_ensemble_class
import src.utils as utils
import src.metrics as metrics
import src.methods as methods
import src.temp_scaling as temp_scaling

2025-04-09 08:39:36.896379: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9373] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-04-09 08:39:36.896463: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-04-09 08:39:36.897796: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1534] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-04-09 08:39:36.904902: I tensorflow/core/platform/cpu_feature_guard.cc:183] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE3 SSE4.1 SSE4.2 AVX, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
model_name = 'inceptionv3'
dataset_name = 'stanford-dogs'

(ds_train, ds_val, ds_test), ds_info = tfds.load(
    'stanford_dogs',
    split=['train[:85%]', 'train[85%:]', 'test'],
    data_dir = '../tensorflow_datasets/',
    shuffle_files=False,
    as_supervised=True,
    with_info=True
)

IMG_SIZE = 224
num_classes = 120
def preprocess(image, label):
    image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
    image = tf.cast(image, tf.float32) / 255.0
    label = tf.one_hot(label, depth=num_classes)
    return image, label

ds_train = ds_train.map(preprocess)
ds_val = ds_val.map(preprocess)
ds_test = ds_test.map(preprocess)

# batch_size = 128
# ds_train = ds_train.shuffle(1000).batch(batch_size).prefetch(tf.data.AUTOTUNE)
# ds_val = ds_val.batch(batch_size).prefetch(tf.data.AUTOTUNE)
# ds_test = ds_test.batch(batch_size).prefetch(tf.data.AUTOTUNE)

true_y_train = np.argmax([y for x,y in ds_train], axis=1)
true_y_val = np.argmax([y for x,y in ds_val], axis=1)
true_y_test = np.argmax([y for x,y in ds_test], axis=1)

2025-04-09 08:39:40.038238: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1926] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 78835 MB memory:  -> device: 0, name: NVIDIA A100-SXM4-80GB, pci bus id: 0000:0f:00.0, compute capability: 8.0


In [3]:
# class_counts = np.zeros(120)
# for _, label in tfds.as_numpy(ds_train.unbatch()):
#     class_index = np.argmax(label)
#     class_counts[class_index] += 1

# class_weights = 1.0 / class_counts
# class_weights = class_weights / np.sum(class_weights) * 10

# class_weights_tensor = tf.constant(class_weights, dtype=tf.float32)

# @register_keras_serializable()
# def weighted_categorical_crossentropy(y_true, y_pred):
#     weights = tf.reduce_sum(class_weights_tensor * y_true, axis=1)
#     unweighted_loss = tf.keras.losses.categorical_crossentropy(y_true, y_pred, from_logits=True)
#     return weights * unweighted_loss

In [4]:
def create_model():
    base_model = InceptionV3(include_top=False, weights='imagenet', input_tensor=Input(shape=(IMG_SIZE, IMG_SIZE, 3)))
    x = GlobalAveragePooling2D()(base_model.output)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(128, activation='relu')(x)
    outputs = Dense(120, activation='linear')(x)
    model = Model(inputs=base_model.input, outputs=outputs)
    for layer in base_model.layers:
        layer.trainable = True
    return model

### Train Model

In [None]:
for run in range(10):
    exp_name = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}'
    if not os.path.exists(exp_name):
        print("Making directory", exp_name)
        os.makedirs(exp_name)

    model = create_model()
        
    def weighted_categorical_crossentropy(y_true, y_pred):
        weights = tf.reduce_sum(class_weights_tensor * y_true, axis=1)
        unweighted_loss = tf.keras.losses.categorical_crossentropy(y_true, y_pred, from_logits=True)
        return weights * unweighted_loss
    
    model.compile(optimizer=AdamW(learning_rate=1e-4, weight_decay=1e-4), loss=weighted_categorical_crossentropy, metrics=['accuracy'])

    lr_scheduler = ReduceLROnPlateau(monitor='val_accuracy', factor=0.5, patience=5, verbose=1)
    early_stop = EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True, verbose=1)
    history = model.fit(ds_train, validation_data=ds_val, epochs=100, callbacks=[lr_scheduler, early_stop])
    
    if not os.path.exists(exp_name+'/saved_models'):
        print("Making directory", exp_name+'/saved_models')
        os.makedirs(exp_name+'/saved_models')

    model.save_weights(f'{exp_name}/saved_models/trained_weights.h5')
    with open(f'{exp_name}/history.pickle', 'wb') as f:
        pickle.dump(history, f, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
train_acc = []
val_acc = []
test_acc = []
for run in range(10):
    print(f'Run: {run+1}')
    tf.keras.utils.set_random_seed(run+10) # set random seed for Python, NumPy, and TensorFlow
    model = create_model()
    model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/saved_models/trained_weights.h5')
    model.compile(optimizer=AdamW(learning_rate=1e-4, weight_decay=1e-4), loss=weighted_categorical_crossentropy, metrics=['accuracy'])
    train_acc.append(model.evaluate(ds_train, verbose=1)[1])
    val_acc.append(model.evaluate(ds_val, verbose=1)[1])
    test_acc.append(model.evaluate(ds_test, verbose=1)[1])
print(f'Average train error: {(100-np.mean(train_acc)*100):.2f} ({(np.std(train_acc)*100):.2f})')
print(f'Average validation error: {(100-np.mean(val_acc)*100):.2f} ({(np.std(val_acc)*100):.2f})')
print(f'Average test error: {(100-np.mean(test_acc)*100):.2f} ({(np.std(test_acc)*100):.2f})')

### PSI

In [None]:
for run in range(10):
    print(f'Run: {run+1}')
    tf.keras.utils.set_random_seed(run+10) # set random seed for Python, NumPy, and TensorFlow
    exp_name = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/psi/gaussian'
    if not os.path.exists(exp_name):
        print("Making directory", exp_name)
        os.makedirs(exp_name)

    model = create_model()
    model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/saved_models/trained_weights.h5')
    int_model = tf.keras.Model(inputs=model.inputs, outputs=model.layers[-1].output)
    
    ##############################################################
    #
    # Train PSI Model
    #
    # #############################################################
    
    x_logits_list = []
    y_labels_list = []

    for x_batch, y_batch in ds_train.batch(256):
        logits = int_model(x_batch)
        labels = tf.argmax(y_batch, axis=1)
        x_logits_list.append(logits)
        y_labels_list.append(labels)

    x = tf.concat(x_logits_list, axis=0).numpy()
    y = tf.concat(y_labels_list, axis=0).numpy()
    
    print(f'Training PSI model (gaussian)...')
    psi_data = psi_gaussian_train(x, y, n_projs=500)
    np.save(f'{exp_name}/gaussian_output_model_500_projs.npy', psi_data)

    ##############################################################
    #
    # Compute PSI for all validation and test samples
    #
    # #############################################################

    psi_data = np.load(f'{exp_name}/gaussian_output_model_500_projs.npy', allow_pickle=True).item()

    print(f'Computing PSI for all validation samples...')
    x_logits_list = []

    for x_batch, y_batch in ds_val.batch(256):
        logits = int_model(x_batch)
        x_logits_list.append(logits)
    
    x = tf.concat(x_logits_list, axis=0).numpy()
    psi_class, pmi_arr = psi_gaussian_val_class(x, psi_data)
    np.save(f'{exp_name}/psi_output_class_500_projs_val.npy', np.array(psi_class))

    print(f'Computing PSI for all test samples...')
    x_logits_list = []

    for x_batch, y_batch in ds_test.batch(256):
        logits = int_model(x_batch)
        x_logits_list.append(logits)
    
    x = tf.concat(x_logits_list, axis=0).numpy()
    psi_class, pmi_arr = psi_gaussian_val_class(x, psi_data)
    np.save(f'{exp_name}/psi_output_class_500_projs_test.npy', np.array(psi_class))

### PVI

In [None]:
random_runs = list(range(10))
while any(random_runs[i] == i for i in range(10)):
    np.random.shuffle(random_runs)
    
for run in range(10):
    print(f'Run: {run+1}')
    tf.keras.utils.set_random_seed(run+10) # set random seed for Python, NumPy, and TensorFlow
    exp_name = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/pvi/training_from_scratch'
    if not os.path.exists(exp_name):
        print("Making directory", exp_name)
        os.makedirs(exp_name)
        
    ##############################################################
    #
    # Train PVI Model
    #
    # #############################################################
    
    pvi_model = create_model()
    pvi_model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{random_runs[run]+1}/saved_models/trained_weights.h5')
    pvi_model.save_weights(f'{exp_name}/pvi_model_weights.h5')
    
    untrained_model = create_model()
    train_pvi_null_model(ds_train, untrained_model, epochs=10, save_path=f'{exp_name}/pvi_null_model_weights.h5')
    
    ##############################################################
    #
    # Compute PVI for all training and test samples
    #
    # #############################################################
    
    pvi_model = create_model()
    pvi_model.load_weights(f'{exp_name}/pvi_model_weights.h5')
    null_model = create_model()
    null_model.load_weights(f'{exp_name}/pvi_null_model_weights.h5')
    
    true_y_val = np.argmax([y for x,y in ds_val], axis=1)
    opt_temp_pvi = utils.temp_scaling_nll(pvi_model.predict(ds_val.batch(128), verbose=0), true_y_val)
    ds_null = ds_val.map(lambda x, y: (tf.zeros_like(x), y))
    opt_temp_null = utils.temp_scaling_nll(null_model.predict(ds_null.batch(128), verbose=0), true_y_val)

    print(f'Computing PVI for all validation samples and for all classes...')
    pvi_class = neural_pvi_class(ds_val.batch(128), pvi_model, null_model, opt_temp_pvi, opt_temp_null)
    np.save(f'{exp_name}/pvi_class_val.npy', np.array(pvi_class))

    print(f'Computing PVI for all test samples and for all classes...')
    pvi_class = neural_pvi_class(ds_test.batch(128), pvi_model, null_model, opt_temp_pvi, opt_temp_null)
    np.save(f'{exp_name}/pvi_class_test.npy', np.array(pvi_class))

In [None]:
pvi_runs = [3 if i == 0 else 0 for i in range(10)]
    
for run in range(10):
    print(f'Run: {run+1}')
    tf.keras.utils.set_random_seed(run+10) # set random seed for Python, NumPy, and TensorFlow
    exp_name = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/pvi/training_from_scratch'
    if not os.path.exists(exp_name):
        print("Making directory", exp_name)
        os.makedirs(exp_name)
        
    ##############################################################
    #
    # Train PVI Model
    #
    # #############################################################
    
    pvi_model = create_model()
    pvi_model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{pvi_runs[run]+1}/saved_models/trained_weights.h5')
    pvi_model.save_weights(f'{exp_name}/pvi_model_best_weights.h5')
    
#     untrained_model = create_model()
#     train_pvi_null_model(ds_train, untrained_model, epochs=10, save_path=f'{exp_name}/pvi_null_model_weights.h5')
    
    ##############################################################
    #
    # Compute PVI for all training and test samples
    #
    # #############################################################
    
    pvi_model = create_model()
    pvi_model.load_weights(f'{exp_name}/pvi_model_best_weights.h5')
    null_model = create_model()
    null_model.load_weights(f'{exp_name}/pvi_null_model_weights.h5')
    
    true_y_val = np.argmax([y for x,y in ds_val], axis=1)
    opt_temp_pvi = temp_scaling.temp_scaling_nll(pvi_model.predict(ds_val.batch(128), verbose=0), true_y_val)
    ds_null = ds_val.map(lambda x, y: (tf.zeros_like(x), y))
    opt_temp_null = temp_scaling.temp_scaling_nll(null_model.predict(ds_null.batch(128), verbose=0), true_y_val)

    print(f'Computing PVI for all validation samples and for all classes...')
    pvi_class = neural_pvi_class(ds_val.batch(128), pvi_model, null_model, opt_temp_pvi, opt_temp_null)
    np.save(f'{exp_name}/pvi_class_best_val.npy', np.array(pvi_class))

    print(f'Computing PVI for all test samples and for all classes...')
    pvi_class = neural_pvi_class(ds_test.batch(128), pvi_model, null_model, opt_temp_pvi, opt_temp_null)
    np.save(f'{exp_name}/pvi_class_best_test.npy', np.array(pvi_class))

### Ensemble PVI

In [None]:
for run in range(10):
    print(f'Run: {run+1}')
    tf.keras.utils.set_random_seed(run+10) # set random seed for Python, NumPy, and TensorFlow
    exp_name = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/pvi/ensemble_no_training_training_from_scratch'
    if not os.path.exists(exp_name):
        print("Making directory", exp_name)
        os.makedirs(exp_name)
        
    ##############################################################
    #
    # Train PVI Model
    #
    # #############################################################
    
    pvi_model_1 = create_model()
    pvi_model_1.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/saved_models/trained_weights.h5')
    null_model_1 = create_model()
    null_model_1.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/pvi/training_from_scratch/pvi_null_model_weights.h5')
    pvi_model_2 = create_model()
    pvi_model_2.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/pvi/training_from_scratch/pvi_model_weights.h5')
    null_model_2 = create_model()
    null_model_2.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/pvi/training_from_scratch/pvi_null_model_weights.h5')
    
#     true_y_val = np.argmax([y for x,y in ds_val], axis=1)
#     opt_temp_pvi_1 = utils.temp_scaling_nll(pvi_model_1.predict(ds_val.batch(128), verbose=0), true_y_val)
#     opt_temp_pvi_2 = utils.temp_scaling_nll(pvi_model_2.predict(ds_val.batch(128), verbose=0), true_y_val)
#     ds_null = ds_val.map(lambda x, y: (tf.zeros_like(x), y))
#     opt_temp_null = utils.temp_scaling_nll(null_model_1.predict(ds_null.batch(128), verbose=0), true_y_val)
    
    ##############################################################
    #
    # Compute PVI for all training and test samples
    #
    # #############################################################
    
    print(f'Computing PVI for all validation samples and for all classes...')
    pvi_class = []
    for (x_batch, y_batch) in ds_val.batch(256):
        pvi = neural_pvi_ensemble_class([x_batch, x_batch], [pvi_model_1, pvi_model_2], [null_model_1, null_model_2])
        pvi_class += np.array(pvi).tolist()
    np.save(f'{exp_name}/pvi_class_val.npy', np.array(pvi_class))

    print(f'Computing PVI for all test samples and for all classes...')
    pvi_class = []
    for (x_batch, y_batch) in ds_test.batch(256):
        pvi = neural_pvi_ensemble_class([x_batch, x_batch], [pvi_model_1, pvi_model_2], [null_model_1, null_model_2])
        pvi_class += np.array(pvi).tolist()
    np.save(f'{exp_name}/pvi_class_test.npy', np.array(pvi_class))

### Temp Scaling

In [None]:
for run in range(10):
    print(f'Run: {run+1}')
    tf.keras.utils.set_random_seed(run+10) # set random seed for Python, NumPy, and TensorFlow
    model = create_model()
    model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/saved_models/trained_weights.h5')

#     if not os.path.exists(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration'):
#         print("Making directory", f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration')
#         os.makedirs(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration')                                  
  
    
    pred_y_val = np.argmax(model.predict(ds_val.batch(512), verbose=1), axis=1)
    scores = model.predict(ds_val.batch(512), verbose=0)
    
#     opt_temp = temp_scaling.temp_scaling_aurc(scores, pred_y_val, true_y_val)
#     np.save(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/softmax_opt_temp_aurc.npy', opt_temp)
    
    opt_temp = temp_scaling.temp_scaling_nll(scores, true_y_val)
    np.save(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/softmax_opt_temp_nll.npy', opt_temp)

#     opt_temp = temp_scaling.temp_scaling_ece(scores, pred_y_val, true_y_val, 15)
#     np.save(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/softmax_opt_temp_ece.npy', opt_temp)

In [None]:
for run in range(10):
    print(f'Run: {run+1}')
    tf.keras.utils.set_random_seed(run+10) # set random seed for Python, NumPy, and TensorFlow
    exp_name = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/psi/gaussian'
    model = create_model()
    model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/saved_models/trained_weights.h5')

    pred_y_val = np.argmax(model.predict(ds_val.batch(512), verbose=1), axis=1)
    scores = np.load(f'{exp_name}/psi_output_class_500_projs_val.npy')
    
    opt_temp = temp_scaling.temp_scaling_aurc(scores, pred_y_val, true_y_val)                                 
    np.save(f'{exp_name}/psi_opt_temp_aurc.npy', opt_temp)
    
    opt_temp = temp_scaling.temp_scaling_nll(scores, true_y_val)                            
    np.save(f'{exp_name}/psi_opt_temp_nll.npy', opt_temp)
    
#     opt_temp = temp_scaling.temp_scaling_ece(scores, pred_y_val, true_y_val, 15)                            
#     np.save(f'{exp_name}/psi_opt_temp_ece.npy', opt_temp)

In [None]:
for run in range(10):
    print(f'Run: {run+1}')
    tf.keras.utils.set_random_seed(run+10) # set random seed for Python, NumPy, and TensorFlow
    exp_name = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/pvi/training_from_scratch'
    model = create_model()
    model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/saved_models/trained_weights.h5')

    pred_y_val = np.argmax(model.predict(ds_val.batch(512), verbose=1), axis=1)
    scores = np.load(f'{exp_name}/pvi_class_val.npy')
    
#     opt_temp = temp_scaling.temp_scaling_aurc(scores, pred_y_val, true_y_val)                                 
#     np.save(f'{exp_name}/pvi_opt_temp_aurc.npy', opt_temp)
    
    opt_temp = temp_scaling.temp_scaling_nll(scores, true_y_val)                                          
    np.save(f'{exp_name}/pvi_opt_temp_nll.npy', opt_temp)

#     opt_temp = temp_scaling.temp_scaling_ece(scores, pred_y_val, true_y_val, 15)                                          
#     np.save(f'{exp_name}/pvi_opt_temp_ece.npy', opt_temp)

In [None]:
for run in range(10):
    print(f'Run: {run+1}')
    tf.keras.utils.set_random_seed(run+10) # set random seed for Python, NumPy, and TensorFlow
    exp_name = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration/pvi/training_from_scratch'
    model = create_model()
    model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/saved_models/trained_weights.h5')

    pred_y_val = np.argmax(model.predict(ds_val.batch(512), verbose=1), axis=1)
    scores = np.load(f'{exp_name}/pvi_class_best_val.npy')
    
    opt_temp = temp_scaling.temp_scaling_aurc(scores, pred_y_val, true_y_val)                                 
    np.save(f'{exp_name}/pvi_best_opt_temp_aurc.npy', opt_temp)
    
    opt_temp = temp_scaling.temp_scaling_nll(scores, true_y_val)                                          
    np.save(f'{exp_name}/pvi_best_opt_temp_nll.npy', opt_temp)

#     opt_temp = temp_scaling.temp_scaling_ece(scores, pred_y_val, true_y_val, 15)                                          
#     np.save(f'{exp_name}/pvi_opt_temp_ece.npy', opt_temp)

### Failure Detection

In [None]:
def get_confidence_scores(conf_method, model, ds_test, pred_y_test, run, model_name, dataset_name):
    base_path = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration'
    metric = conf_method.split('_')[-1] if 'temp_scaling' in conf_method else None
    method_key = conf_method.replace(f'_temp_scaling_{metric}', '') if metric else conf_method

    if method_key == 'softmax':
        if metric:
            opt_temp = np.load(f'{base_path}/softmax_opt_temp_{metric}.npy')
            return methods.max_softmax_prob(model, ds_test, opt_temp)
        else:
            return methods.max_softmax_prob(model, ds_test)

    elif method_key in ['pmi', 'psi', 'pvi', 'pvi_best']:
        if method_key == 'pmi':
            exp_path = f'{base_path}/pmi/separable_variational_f_js'
            class_file = 'pmi_output_class_test.npy'
        elif method_key == 'psi':
            exp_path = f'{base_path}/psi/gaussian'
            class_file = 'psi_output_class_500_projs_test.npy'
        elif method_key == 'pvi':
            exp_path = f'{base_path}/pvi/training_from_scratch'
            class_file = 'pvi_class_test.npy'
        elif method_key == 'pvi_best':
            exp_path = f'{base_path}/pvi/training_from_scratch'
            class_file = 'pvi_class_best_test.npy'

        opt_temp = np.load(f'{exp_path}/{method_key}_opt_temp_{metric}.npy')
        scores_class = np.load(f'{exp_path}/{class_file}')
        scores_class = np.array([utils.softmax(x / opt_temp) for x in scores_class])
        return np.array([score[pred] for score, pred in zip(scores_class, pred_y_test)])

    elif method_key == 'softmax_margin':
        opt_temp = np.load(f'{base_path}/softmax_opt_temp_{metric}.npy')
        return methods.softmax_margin(model, ds_test, opt_temp)

    elif method_key == 'max_logits':
        return methods.max_logits(model, ds_test)

    elif method_key == 'logits_margin':
        return methods.logits_margin(model, ds_test)

    elif method_key == 'negative_entropy':
        opt_temp = np.load(f'{base_path}/softmax_opt_temp_{metric}.npy')
        return methods.negative_entropy(model, ds_test, opt_temp)

    elif method_key == 'negative_gini':
        opt_temp = np.load(f'{base_path}/softmax_opt_temp_{metric}.npy')
        return methods.negative_gini(model, ds_test, opt_temp)

    elif method_key == 'isotonic_regression':
        return methods.isotonic_reg(model, ds_val, ds_test, true_y_val)

    else:
        raise ValueError(f"Unknown confidence method: {conf_method}")


def evaluate_failure_pred(ds_test, true_y_test, conf_method, n_runs=10):
    results = {
        "auroc": [],
        "fpr_at_95tpr": [],
        "auprc_success": [],
        "auprc_error": [],
        "aurc": [],
        "eaurc": [],
        "naurc": []
    }

    for run in range(n_runs):
        tf.keras.utils.set_random_seed(run + 10)
        model = create_model()
        model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/saved_models/trained_weights.h5')

        pred_y_test = np.argmax(model.predict(ds_test.batch(256), verbose=0), axis=1)
        scores_test = get_confidence_scores(conf_method, model, ds_test, pred_y_test, run, model_name, dataset_name)

#         results["auroc"].append(metrics.compute_auroc(scores_test, pred_y_test, true_y_test))
#         results["auprc_success"].append(metrics.compute_auprc_success(scores_test, pred_y_test, true_y_test))
#         results["auprc_error"].append(metrics.compute_auprc_error(scores_test, pred_y_test, true_y_test))
#         results["fpr_at_95tpr"].append(metrics.compute_fpr_at_95tpr(scores_test, pred_y_test, true_y_test))
#         results["aurc"].append(metrics.compute_aurc(scores_test, pred_y_test, true_y_test))
#         results["eaurc"].append(metrics.compute_eaurc(scores_test, pred_y_test, true_y_test))
        results["naurc"].append(metrics.compute_eaurc(scores_test, pred_y_test, true_y_test))

    return results

In [None]:
methods_list = ['softmax_temp_scaling_aurc','psi_temp_scaling_aurc','pvi_temp_scaling_aurc',
                'softmax_margin_temp_scaling_aurc', 'max_logits', 'logits_margin', 'negative_entropy_temp_scaling_aurc',
                'negative_gini_temp_scaling_aurc']
for method in methods_list:
    print(f'Method: {method}')
    results = evaluate_failure_pred(ds_test, true_y_test, conf_method=f'{method}', n_runs=10)
#     print(f"AUROC           : {utils.format_ci(results['auroc'], scale=100)}")
#     print(f"AUPRC (success) : {utils.format_ci(results['auprc_success'], scale=100)}")
#     print(f"AUPRC (error)   : {utils.format_ci(results['auprc_error'], scale=100)}")
#     print(f"FPR at 95% TPR  : {utils.format_ci(results['fpr_at_95tpr'], scale=100)}")
#     print(f"AURC            : {utils.format_ci(results['aurc'], scale=1000)}")
#     print(f"EAURC           : {utils.format_ci(results['eaurc'], scale=1000)}")
    print(f"NAURC           : {utils.format_ci(results['naurc'], scale=1000)}")

### Calibration

In [8]:
def get_scores_for_calibration(conf_method, model, ds_test, pred_y_test, run, model_name, dataset_name):
    base_path = f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/calibration'

    def softmax_scaled(scores, temp=1.0):
        return np.array([utils.softmax(x / temp) for x in scores])

    if conf_method == 'softmax':
        scores_class = methods.softmax_prob(model, ds_test)
        scores_test = methods.max_softmax_prob(model, ds_test)
        return scores_class, scores_test

    if conf_method.startswith('softmax_temp_scaling'):
        metric = conf_method.split('_')[-1]
        opt_temp = np.load(f'{base_path}/softmax_opt_temp_{metric}.npy')
        scores_class = methods.softmax_prob(model, ds_test, opt_temp)
        scores_test = methods.max_softmax_prob(model, ds_test, opt_temp)
        return scores_class, scores_test

    if conf_method in ['pmi', 'psi', 'pvi', 'pvi_best']:
        method = conf_method
        metric = None
        temp = 1.0
    elif conf_method.startswith(('pmi_temp_scaling', 'psi_temp_scaling', 'pvi_temp_scaling', 'pvi_best_temp_scaling')):
        parts = conf_method.split('_')
        method = '_'.join(parts[:2]) if 'best' in parts else parts[0]
        metric = parts[-1]
        method_dir = {
            'pmi': 'pmi/separable_variational_f_js',
            'psi': 'psi/gaussian',
            'pvi': 'pvi/training_from_scratch',
            'pvi_best': 'pvi/training_from_scratch'
        }[method]
        temp = float(np.load(f'{base_path}/{method_dir}/{method}_opt_temp_{metric}.npy'))
    else:
        raise ValueError(f"Unknown confidence method: {conf_method}")

    method_paths = {
        'pmi': (f'{base_path}/pmi/separable_variational_f_js', 'pmi_output_class_test.npy'),
        'psi': (f'{base_path}/psi/gaussian', 'psi_output_class_500_projs_test.npy'),
        'pvi': (f'{base_path}/pvi/training_from_scratch', 'pvi_class_test.npy'),
        'pvi_best': (f'{base_path}/pvi/training_from_scratch', 'pvi_class_best_test.npy'),
    }

    method_path, class_file = method_paths[method]
    scores_class = np.load(f'{method_path}/{class_file}')
    scores_class = softmax_scaled(scores_class, temp)
    scores_test = np.array([score[pred] for score, pred in zip(scores_class, pred_y_test)])
    return scores_class, scores_test

def evaluate_calibration(ds_test, true_y_test, conf_method, n_runs=10):
    results = {
        "ece": [],
        "cc_ece": [],
        "mce": [],
        "ace": [],
        "sce": [],
        "ada_ece": [],
        "ada_sce": [],
        "cc_ada_ece": [],
        "cc_ada_sce": [],
        "cc_ada_sce_rms": [],
        "cw_ece": [],
        "cw_sce": [],
        "cw_ada_ece": [],
        "cw_ada_sce": [],
        "cw_ada_ece_rms": [],
        "cw_ada_sce_rms": [],
        "nll": [],
        "bs": [],
        "sharpness": [],
    }

    for run in range(n_runs):
        tf.keras.utils.set_random_seed(run + 10)
        model = create_model()
        model.load_weights(f'../results/PI_Explainability/{model_name}_{dataset_name}/run_{run+1}/saved_models/trained_weights.h5')

        pred_y_test = np.argmax(model.predict(ds_test.batch(256), verbose=0), axis=1)
        scores_class, scores_test = get_scores_for_calibration(
            conf_method, model, ds_test, pred_y_test, run, model_name, dataset_name
        )

#         results["ece"].append(metrics.compute_ece(scores_test, pred_y_test, true_y_test, 15))
#         results["cc_ece"].append(metrics.compute_cc_ece(scores_test, pred_y_test, true_y_test, 15))
#         results["mce"].append(metrics.compute_mce(scores_test, pred_y_test, true_y_test, 15))
#         results["ace"].append(metrics.compute_ace(scores_test, pred_y_test, true_y_test, 15))
#         results["sce"].append(metrics.compute_sce(scores_class, true_y_test, num_classes, 15))
#         results["ada_ece"].append(metrics.compute_adaece(scores_test, pred_y_test, true_y_test, 15))
#         results["ada_sce"].append(metrics.compute_adasce(scores_class, true_y_test, num_classes, 15))
#         results["cc_ada_ece"].append(metrics.compute_cc_adaece(scores_test, pred_y_test, true_y_test, 15))
#         results["cc_ada_sce"].append(metrics.compute_cc_adasce(scores_class, true_y_test, num_classes, 15))
        results["cc_ada_sce_rms"].append(metrics.compute_cc_adasce_rms(scores_class, true_y_test, num_classes, 15))
#         results["cw_ece"].append(metrics.compute_cw_ece(scores_class, true_y_test, num_classes, 15))
#         results["cw_sce"].append(metrics.compute_cw_sce(scores_class, true_y_test, num_classes, 15))
#         results["cw_ada_ece"].append(metrics.compute_cw_adaece(scores_class, true_y_test, num_classes, 15))
#         results["cw_ada_sce"].append(metrics.compute_cw_adasce(scores_class, true_y_test, num_classes, 15))
#         results["cw_ada_ece_rms"].append(metrics.compute_cw_adaece_rms(scores_class, true_y_test, num_classes, 15))
#         results["cw_ada_sce_rms"].append(metrics.compute_cw_adaece_rms(scores_class, true_y_test, num_classes, 15))
#         results["nll"].append(metrics.compute_nll(scores_class, true_y_test, num_classes))
#         results["bs"].append(metrics.compute_brier_score(scores_class, true_y_test, num_classes))
#         results["sharpness"].append(metrics.compute_sharpness(scores_class))

    return results

In [9]:
methods_list = ['softmax','psi','pvi','pvi_best','softmax_temp_scaling_nll','psi_temp_scaling_nll','pvi_temp_scaling_nll','pvi_best_temp_scaling_nll']
for method in methods_list:
    print(f'Method: {method}')
    results = evaluate_calibration(ds_test, true_y_test, conf_method=f'{method}', n_runs=10)
#     print(f"ECE:            {utils.format_ci(results['ece'], scale=100)}")
#     print(f"CC-ECE:         {utils.format_ci(results['cc_ece'], scale=100)}")
#     print(f"MCE:            {utils.format_ci(results['mce'], scale=100)}")
#     print(f"ACE:            {utils.format_ci(results['ace'], scale=100)}")
#     print(f"SCE:            {utils.format_ci(results['sce'], scale=100)}")
#     print(f"Ada-ECE:        {utils.format_ci(results['ada_ece'], scale=100)}")
#     print(f"Ada-SCE:        {utils.format_ci(results['ada_sce'], scale=100)}")
#     print(f"CC-Ada-ECE:     {utils.format_ci(results['cc_ada_ece'], scale=100)}")
#     print(f"CC-Ada-SCE:     {utils.format_ci(results['cc_ada_sce'], scale=100)}")
    print(f"CC-Ada-SCE-RMS: {utils.format_ci(results['cc_ada_sce_rms'], scale=100)}")
#     print(f"CW-ECE:         {utils.format_ci(results['cw_ece'], scale=100)}")
#     print(f"CW-SCE:         {utils.format_ci(results['cw_sce'], scale=100)}")
#     print(f"CW-Ada-ECE:     {utils.format_ci(results['cw_ada_ece'], scale=100)}")
#     print(f"CW-Ada-SCE:     {utils.format_ci(results['cw_ada_sce'], scale=100)}")
#     print(f"CW-Ada-ECE-RMS: {utils.format_ci(results['cw_ada_ece_rms'], scale=100)}")
#     print(f"CW-Ada-SCE-RMS: {utils.format_ci(results['cw_ada_sce_rms'], scale=100)}")
#     print(f"NLL:            {utils.format_ci(results['nll'], scale=100)}")
#     print(f"Brier Score:    {utils.format_ci(results['bs'], scale=100)}")
#     print(f"Sharpness:      {utils.format_ci(results['sharpness'], scale=100)}")

Method: softmax
CC-Ada-SCE-RMS: 2.98 (0.09)
Method: psi
CC-Ada-SCE-RMS: 3.97 (0.14)
Method: pvi
CC-Ada-SCE-RMS: 3.75 (0.05)
Method: pvi_best
CC-Ada-SCE-RMS: 3.66 (0.01)
Method: softmax_temp_scaling_nll
CC-Ada-SCE-RMS: 3.75 (0.05)
Method: psi_temp_scaling_nll
CC-Ada-SCE-RMS: 3.88 (0.04)
Method: pvi_temp_scaling_nll
CC-Ada-SCE-RMS: 3.75 (0.05)
Method: pvi_best_temp_scaling_nll
CC-Ada-SCE-RMS: 3.66 (0.01)
