In [1]:
import numpy as np
import pandas as pd
import helper

from cnn_model import cnn_model
from loss_functions import symmetric_cross_entropy
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split


In [None]:
# anchor_estimator and flc (still wait for check)

In [4]:
# anchor_estimator
import numpy as np
import tensorflow as tf
from tensorflow import keras

from helper import load_dataset, split_data
from cnn_model import cnn_model
from anchor_estimator import temperature_scale_probs, estimate_T_anchor_from_probs
from loss_functions import symmetric_cross_entropy
from flc_loss import forward_correction_loss


def ensure_column_stochastic(T: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    T = np.clip(T, 0, None)
    colsum = T.sum(axis=0, keepdims=True) + eps
    return T / colsum


Xtr, Str, Xts, Yts, T_true = load_dataset("./datasets/CIFAR.npz", "CIFAR.npz")

# just avoid the loss from mismatch of onehot/float 
Str = Str.astype("int64")
Yts = Yts.astype("int64")

Xtr = Xtr.astype("float32") / 255.0
Xts = Xts.astype("float32") / 255.0

X_tr, y_tr, X_val, y_val = split_data(Xtr, Str, train_ratio=0.8, random_seed=7)

num_classes = int(np.max(Str)) + 1
input_shape = Xtr.shape[1:]

# 2. Warm-up process, after estimation, the CIFAR.npz is almost same noise 0.6
alpha, beta, A = 0.05, 4.0, -4.0   
# sce_loss = symmetric_cross_entropy(alpha=alpha, beta=beta, A=A, num_classes=num_classes)

m = cnn_model(input_shape=input_shape, num_classes=num_classes)
m.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-3),
          loss="sparse_categorical_crossentropy", metrics=["accuracy"])

m.fit(X_tr, y_tr,
      validation_data=(X_val, y_val),
      epochs=5, batch_size=128, verbose=1)

# 3. use the val datasets to get the matrix
p_val = m.predict(X_val, batch_size=128, verbose=0)              
p_val_cal, bestT = temperature_scale_probs(p_val, y_val)          
T_hat = estimate_T_anchor_from_probs(p_val_cal, top_quantile=0.99)  
T_hat = ensure_column_stochastic(T_hat).astype(np.float32)

# Transition Matrix (3×3) 
print("Estimated Transition Matrix (T_hat):")

print(T_hat)                        
print("------------------------------")
print("Matrix shape:", T_hat.shape) 

# Check the sum of each col
print("Column sums:", np.sum(T_hat, axis=0)) 

# diagonal value and off-diagonal value
diag_mean = np.mean(np.diag(T_hat))
off_mean = np.mean(T_hat - np.diag(np.diag(T_hat)))
print(f"Mean diagonal value: {diag_mean:.4f}")
print(f"Mean off-diagonal value: {off_mean:.4f}")


Epoch 1/5
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - accuracy: 0.3422 - loss: 1.1023 - val_accuracy: 0.3483 - val_loss: 1.0981
Epoch 2/5
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 25ms/step - accuracy: 0.3410 - loss: 1.0982 - val_accuracy: 0.3487 - val_loss: 1.0970
Epoch 3/5
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - accuracy: 0.3491 - loss: 1.0975 - val_accuracy: 0.3373 - val_loss: 1.0976
Epoch 4/5
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - accuracy: 0.3502 - loss: 1.0967 - val_accuracy: 0.3630 - val_loss: 1.0971
Epoch 5/5
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - accuracy: 0.3599 - loss: 1.0945 - val_accuracy: 0.3657 - val_loss: 1.0951
Estimated Transition Matrix (T_hat):
[[0.44752827 0.37157804 0.28800556]
 [0.34708044 0.36276808 0.29623663]
 [0.20539126 0.26565382 0.4157578 ]]
------------------------------
Matrix shape: (3, 3)
Co

In [5]:
# Forward Correction fine-tuning
flc_loss = forward_correction_loss(T_hat, num_classes=num_classes)
m_flc = cnn_model(input_shape=input_shape, num_classes=num_classes)
m_flc.set_weights(m.get_weights())
m_flc.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-3),
              loss=flc_loss, metrics=["accuracy"])

history_flc = m_flc.fit(
    X_tr, y_tr,
    validation_data=(X_val, y_val),
    epochs=10,
    batch_size=128,
    verbose=1
)

test_loss, test_acc = m_flc.evaluate(Xts, Yts, verbose=0)
print(f"[FLC] Test Accuracy: {test_acc:.4f}")

Epoch 1/10
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 28ms/step - accuracy: 0.3543 - loss: 1.1077 - val_accuracy: 0.3613 - val_loss: 1.0961
Epoch 2/10
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - accuracy: 0.3600 - loss: 1.0954 - val_accuracy: 0.3607 - val_loss: 1.0965
Epoch 3/10
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - accuracy: 0.3649 - loss: 1.0954 - val_accuracy: 0.3567 - val_loss: 1.0973
Epoch 4/10
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 27ms/step - accuracy: 0.3745 - loss: 1.0944 - val_accuracy: 0.3703 - val_loss: 1.0951
Epoch 5/10
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - accuracy: 0.3665 - loss: 1.0952 - val_accuracy: 0.3657 - val_loss: 1.0968
Epoch 6/10
[1m94/94[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 28ms/step - accuracy: 0.3687 - loss: 1.0946 - val_accuracy: 0.3660 - val_loss: 1.0961
Epoch 7/10
[1m94/94[0m [32m━━━━

In [23]:
RANDOM_SEED = 42

def train_model(X_train, y_train, X_val, y_val, dataset, method="fc", transition_matrix=None, epochs=50, input_shape=(28, 28, 1), num_classes=3):
    
    model = cnn_model(input_shape=input_shape, num_classes=num_classes)

    if method == "sce":
        if dataset == "FashionMNIST0.3":
            alpha = 0.01
            beta = 1
        elif dataset == "FashionMNIST0.6":
            alpha = 0.01
            beta = 1
        elif dataset == "CIFAR":
            alpha = 0.1
            beta = 1
        A=-4.0
        loss_function = symmetric_cross_entropy(alpha=alpha, beta=beta, A=A, num_classes=num_classes)
    # elif method == "forward":
    #     #forward function
    # elif method == "coteaching":
    #     #coteaching function

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss = loss_function,
        metrics=['accuracy']
    )

    early_stopping = keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True
    )

    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=128,
        callbacks=[early_stopping],
        verbose=0
    )

    return model

def evaluate_model(model, X_test, y_test):
    predictions = model.predict(X_test, verbose=0)
    predicted_classes = np.argmax(predictions, axis=1)
    accuracy = np.mean(predicted_classes == y_test) * 100
    return accuracy

def run_single_experiment(Xtr, Str, Xts, Yts, T, dataset, method, num_runs=10, epochs=50):
    Xtr = Xtr.astype('float32') / 255.0
    Xts = Xts.astype('float32') / 255.0
    input_shape = Xtr.shape[1:] 
    
    if method == 'fc':
        if T is not None:
            transition_matrix = T
        else:
            #call estimate T function here
            pass
    else:
        transition_matrix=None

    accuracies = []

    for run in range(num_runs):
        seed = RANDOM_SEED + run

        X_train, y_train, X_val, y_val = helper.split_data(
            Xtr, Str, train_ratio=0.8, random_seed=seed
        )

        model = train_model(X_train, y_train, X_val, y_val, dataset=dataset, method=method, transition_matrix=transition_matrix, epochs=epochs, input_shape=input_shape, num_classes=3)

        accuracy = evaluate_model(model, Xts, Yts)
        accuracies.append(accuracy)

        print(f"Run {run+1}/{num_runs}: Test Accuracy = {accuracy:.2f}%")

        del model
        tf.keras.backend.clear_session()
    
    return accuracies
    
def run_all_experiments(datasets, methods, num_runs=10, epochs=50):
    results = []
    
    for dataset in datasets:
        for method in methods:
            print(f"Running {method.upper()} on {dataset}...")

            data_path = f'datasets/{dataset}.npz'

            Xtr, Str, Xts, Yts, T = helper.load_dataset(data_path, dataset) 
            accuracies = run_single_experiment(
                Xtr, Str, Xts, Yts, T, dataset, method, num_runs, epochs
            )
            mean_acc = np.mean(accuracies)
            std_acc = np.std(accuracies)

            results.append({
                'Dataset': dataset,
                'Method': method.upper(),
                'Mean': mean_acc,
                'Std': std_acc,
                'Result': f"{mean_acc:.2f} ± {std_acc:.2f}"
            })

            print(f"Result: {mean_acc:.2f} ± {std_acc:.2f}%")
    
    results_df = pd.DataFrame(results)
    
    return results_df

In [24]:
datasets = ['FashionMNIST0.3', 'FashionMNIST0.6', 'CIFAR']
methods = ['sce'] #add more methods here

result = run_all_experiments(datasets, methods, 10, 50)

Running SCE on FashionMNIST0.3...
Run 1/10: Test Accuracy = 98.53%
Run 2/10: Test Accuracy = 98.67%
Run 3/10: Test Accuracy = 98.37%
Run 4/10: Test Accuracy = 98.77%
Run 5/10: Test Accuracy = 98.80%
Run 6/10: Test Accuracy = 98.53%
Run 7/10: Test Accuracy = 98.37%
Run 8/10: Test Accuracy = 98.63%
Run 9/10: Test Accuracy = 98.73%
Run 10/10: Test Accuracy = 98.40%
Result: 98.58 ± 0.16%
Running SCE on FashionMNIST0.6...
Run 1/10: Test Accuracy = 96.13%
Run 2/10: Test Accuracy = 96.27%
Run 3/10: Test Accuracy = 96.03%
Run 4/10: Test Accuracy = 96.13%
Run 5/10: Test Accuracy = 96.43%
Run 6/10: Test Accuracy = 94.97%
Run 7/10: Test Accuracy = 96.83%
Run 8/10: Test Accuracy = 95.50%
Run 9/10: Test Accuracy = 95.17%
Run 10/10: Test Accuracy = 94.87%
Result: 95.83 ± 0.63%
Running SCE on CIFAR...
Run 1/10: Test Accuracy = 67.87%
Run 2/10: Test Accuracy = 67.60%
Run 3/10: Test Accuracy = 64.07%
Run 4/10: Test Accuracy = 62.67%
Run 5/10: Test Accuracy = 64.67%
Run 6/10: Test Accuracy = 68.50%
Run 

In [26]:
pivot_df = result.pivot(index='Dataset', columns='Method', values='Result')
    
print(pivot_df)

Method                    SCE
Dataset                      
CIFAR            65.57 ± 3.22
FashionMNIST0.3  98.58 ± 0.16
FashionMNIST0.6  95.83 ± 0.63
