## COMP5328 - Advanced Machine Learning
## Assignment 2: Title
----------------------------------------------------------------------------------------

In [21]:
# Common imports
import os
import sys
import pandas as pd
import numpy as np
import json
import time
import datetime as dt
from PIL import Image
from collections import Counter, defaultdict
from datetime import datetime

# Ploting
import matplotlib.pyplot as plt

# Evaluation 
from sklearn.cluster import KMeans
from sklearn.metrics import accuracy_score
from sklearn.metrics import normalized_mutual_info_score

# Experiment
from joblib import Parallel, delayed
import traceback


from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import torch.optim as optim

# Notebook variables
seed = 0
# np.random.seed(seed)
# rng = np.random.default_rng(seed)

# Log file
log_file = 'experiment_results.txt'

if os.path.exists(log_file):
    os.remove(log_file)
    print("Previous Log File deleted.")
else:
    print("Previous Log File does not exist.")

Previous Log File does not exist.


In [22]:

# Experiment variables
# Common
datasets = ["cifar", "fashion03", "fashion06"]



dataset_folder = 'data/'
cifar_dataset = dataset_folder+'CIFAR.npz'
MNISTO3_dataset = dataset_folder+'FashionMNIST0.3.npz'
MNISTO6_dataset = dataset_folder+'FashionMNIST0.6.npz'

known_T_fashion_03 = np.array(  [[0.7,0.3,0.0],
                                [0.0,0.7,0.3],
                                [0.0,0.0,0.7]], dtype=np.float32)

known_T_fashion_06 = np.array(  [[0.3,0.4,0.3],
                                [0.4,0.3,0.3],
                                [0.3,0.3,0.4]], dtype=np.float32)

def set_seed(seed):
    import random
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [23]:
def log_experiment(df, experiment_name, filename=log_file):
  """
  Record the experimental result
  """
  mode = "a" if os.path.exists(filename) else "w"
  with open(filename, mode) as f:
      if mode == "w":  # first record
          f.write("===== Experiment Log Start ======================================\n\n")
      f.write(f"===== {dt.datetime.now()} == New Experiment: {experiment_name} =====\n")
      f.write(df.to_string(index=False))
      f.write("\n\n")

## 1. Load Dataset

### 1.0 Data Folder

In [24]:
# Path to your dataset zip stored in Drive
zip_path = "datasets.zip"

# Unzip file
!unzip -o -q "$zip_path" 

unzip:  cannot find or open datasets.zip, datasets.zip.zip or datasets.zip.ZIP.


In [25]:
# The structure of data folder.
!ls -l data



total 172688
-rw-r--r--  1 jamie.saunders  staff  55440974 Oct 28 22:10 CIFAR.npz
-rw-r--r--  1 jamie.saunders  staff  16485974 Oct 28 22:10 FashionMNIST0.3.npz
-rw-r--r--  1 jamie.saunders  staff  16485974 Oct 28 22:10 FashionMNIST0.6.npz


In [26]:
def load_npz(path):
    d = np.load(path)
    Xtr, Str = d['Xtr'], d['Str']
    Xts, Yts = d['Xts'], d['Yts']
    return Xtr, Str, Xts, Yts

# A helper class, it is used as an input of the DataLoader object.
class DatasetArray(Dataset):
    r"""This is a child class of the pytorch Dataset object."""
    def __init__(self, data, labels=None, transform=None):
        if labels != None:
            self.data_arr = np.asarray(data).astype(np.float32)
            self.label_arr = np.asarray(labels).astype(np.long)
        else:
            tmp_arr = np.asarray(data)
            self.data_arr = tmp_arr[:,:-1].astype(np.float32)
            self.label_arr = tmp_arr[:,-1].astype(np.long)
        self.transform = transform
        
    def __len__(self):
        return len(self.data_arr)
    
    def __getitem__(self, index):
     
        data = self.data_arr[index]
        label = self.label_arr[index]
        
        if self.transform is not None:
            data = self.transform(data)
            
        return (data, label)
    
    
# Splitting the data into three parts.
def train_val_test_random_split(data, fracs=[0.7,0.1,0.2]):
    r"""Split the data into training, validation and test set.
    Args:
        fracs: a list of length three
    """
    assert len(fracs) == 3
    assert sum(fracs) == 1
    assert all(frac > 0 for frac in fracs)
    n = len(data)
    subset_lens = [int(n*frac) for frac in fracs]
    idxs = list(range(n))
    random.shuffle(idxs)
    data = np.array(data)
    new_data = []
    start_idx = 0
    for subset_len in subset_lens:
        end_idx = start_idx + subset_len
        cur_idxs = idxs[start_idx:end_idx]
        new_data.append(data[cur_idxs,:].tolist())
        start_idx = end_idx
    return new_data

# Preparation of the data for training, validation and testing a pytorch network. 
# Note that the test data is not in use for this lab.
def get_loader(batch_size =128, num_workers = 0, train_val_test_split = [0.7,0.1,0.2], data=None):
    r"""This function is used to read the data file and split the data into three subsets, i.e, 
    train data, validation data and test data. Their corresponding DataLoader objects are returned."""
    
    [train_data, val_data, test_data] = train_val_test_random_split(data, fracs = train_val_test_split)

    train_data = DatasetArray(data = train_data)
    val_data = DatasetArray(data = val_data)
    test_data = DatasetArray(data = test_data)

    #The pytorch built-in class DataLoader can help us to shuffle the data, draw mini-batch,
    #do transformations, etc. 
    train_loader = DataLoader(
        train_data,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
    )

    val_loader = DataLoader(
        val_data,
        batch_size=100,
        shuffle=False,
        num_workers=num_workers,
    )

    test_loader = DataLoader(
        test_data,
        batch_size=100,
        num_workers=num_workers,
        shuffle=False,
    )
    return train_loader, val_loader, test_loader

In [27]:
import numpy as np
# Remember t o r e p l a c e t h e $FILE PATH
Xtr, Str, Xts, Yts = load_npz(MNISTO6_dataset)

print (Xtr.shape)
print (Str.shape)
print (Xts.shape)
print (Yts.shape)

(18000, 784)
(18000,)
(3000, 784)
(3000,)


In [28]:
class NpzImageDataset(Dataset):
    def __init__(self, X, y, is_cifar=False):
        self.X = X.astype(np.float32)
        self.y = y.astype(np.int64)
        self.is_cifar = is_cifar

        # Normalize to [0,1]
        self.X = self.X / 255.0 if self.X.max() > 1.0 else self.X

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        x = self.X[idx]
        if x.ndim == 1:
            # flat; try to infer shape 28x28 or 32x32x3
            if x.size == 28*28:
                x = x.reshape(1, 28, 28)
            elif x.size == 32*32*3:
                x = x.reshape(3, 32, 32)
            else:
                raise ValueError("Unknown flat image shape: {}".format(x.shape))
        else:
            # (H,W) or (H,W,C)
            if x.ndim == 2:
                x = x[None, ...]  # to (1,H,W)
            elif x.ndim == 3:
                # assume HWC -> CHW
                x = np.transpose(x, (2, 0, 1))
            else:
                raise ValueError(f"Unexpected image dims: {x.shape}")
        return torch.from_numpy(x), torch.tensor(self.y[idx])


def load_npz(path):
    d = np.load(path)
    Xtr, Str = d['Xtr'], d['Str']
    Xts, Yts = d['Xts'], d['Yts']
    return Xtr, Str, Xts, Yts


def make_loaders(Xtr, Str, batch_size=128, seed=0, test_size=0.2):
    # 80/20 split each repetition
    X_tr, X_val, y_tr, y_val = train_test_split(
        Xtr, Str, test_size=test_size, random_state=seed, stratify=Str
    )

    is_cifar = (X_tr.shape[-1] == 3) if X_tr.ndim == 4 else (X_tr.shape[-1] == 32*32*3)

    train_ds = NpzImageDataset(X_tr, y_tr, is_cifar)
    val_ds   = NpzImageDataset(X_val, y_val, is_cifar)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=0)

    return train_loader, val_loader, is_cifar


def make_test_loader(Xts, Yts, batch_size=256):
    is_cifar = (Xts.shape[-1] == 3) if Xts.ndim == 4 else (Xts.shape[-1] == 32*32*3)
    test_ds = NpzImageDataset(Xts, Yts, is_cifar)
    return DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=0)

In [29]:
# loaders for this split
train_loader, val_loader, is_cifar = make_loaders(Xtr, Str, batch_size=512, seed=seed)
test_loader = make_test_loader(Xts, Yts, batch_size=512)

In [30]:
for batch_idx, (x, y) in enumerate(train_loader):
    print(f"Batch {batch_idx}:")
    print("x shape:", x.shape)
    print("y shape:", y.shape)
    print("x[0]:", x[0])
    print("y[0]:", y[0])
    if batch_idx == 1:  # stop after 2 batches
        break


Batch 0:
x shape: torch.Size([512, 1, 28, 28])
y shape: torch.Size([512])
x[0]: tensor([[[0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
          0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0

In [31]:
class ForwardCorrectedCE(nn.Module):
    """
    Forward loss correction: minimizes CE between T^T p and noisy labels.
    T: class-transition matrix where T[i,j] = P(S=j | Y=i). Shape [C,C].
    """
    def __init__(self, T):
        super().__init__()
        self.register_buffer('T', T)  # [C,C]

    def forward(self, logits, y_noisy):
        # logits -> p(y|x)
        p = F.softmax(logits, dim=1)  # [B,C]
        # mix via T^T
        mixed = torch.clamp(p @ self.T.t(), 1e-6, 1.0)
        log_mixed = torch.log(mixed)
        return F.nll_loss(log_mixed, y_noisy)


class GeneralizedCrossEntropy(nn.Module):
    """
    GCE loss: L_q(p, y) = (1 - p_y^q) / q, with q in (0,1].
    q→1 recovers CE; smaller q is more robust to label noise.
    """
    def __init__(self, q=0.7):
        super().__init__()
        assert 0 < q <= 1
        self.q = q

    def forward(self, logits, y):
        p = F.softmax(logits, dim=1)
        p_y = p.gather(1, y.view(-1,1)).clamp(min=1e-6, max=1.0)
        if self.q == 1.0:
            return -torch.log(p_y).mean()
        return ((1 - p_y.pow(self.q)) / self.q).mean()
    
def rre_fse(T, T_prime, eps=1e-12): 
  """
  Return relative reconstruction frobenious error

  Args:
    V_hat: Clean data matrix (no-noise)
    W: Basis matrix
    H: Encoding matrix
    eps: epsilon value to prevent division by zeo / stabilize the computation
  """
  FE = np.linalg.norm( T - T_prime, ord='fro') # Calculating numerator value from fse error
  denom = np.linalg.norm(T, ord='fro')  # Calculating denominator value
  RRE = FE / (denom + eps)  # RRE calculation
  return RRE

In [32]:
def conv_block(cin, cout):
    return nn.Sequential(
        nn.Conv2d(cin, cout, 3, padding=1),
        nn.BatchNorm2d(cout),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(2)
    )


class SmallCNN28(nn.Module):
    """For 1×28×28 images."""
    def __init__(self, num_classes=3):
        super().__init__()
        self.net = nn.Sequential(
            conv_block(1, 32),  # 14x14
            conv_block(32, 64), # 7x7
            nn.Flatten(),
            nn.Linear(64*7*7, 128),
            nn.ReLU(inplace=True),
            nn.Linear(128, num_classes)
        )
    def forward(self, x):
        return self.net(x)


class SmallCNNCifar(nn.Module):
    """For 3×32×32 images."""
    def __init__(self, num_classes=3):
        super().__init__()
        self.net = nn.Sequential(
            conv_block(3, 32),   # 16x16
            conv_block(32, 64),  # 8x8
            conv_block(64, 128), # 4x4
            nn.Flatten(),
            nn.Linear(128*4*4, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, num_classes)
        )
    def forward(self, x):
        return self.net(x)


def make_model(is_cifar, num_classes=3):
    return SmallCNNCifar(num_classes) if is_cifar else SmallCNN28(num_classes)

In [33]:
@torch.no_grad()
def predict_proba(model, loader, device):
    model.eval()
    probs = []
    ys = []
    for xb, yb in loader:
        xb = xb.to(device)
        logits = model(xb)
        p = F.softmax(logits, dim=1).cpu().numpy()
        probs.append(p)
        ys.append(yb.numpy())
    return np.concatenate(probs), np.concatenate(ys)


def estimate_transition_anchor(t, train_loader, is_cifar, num_classes=3, device='cpu', epochs=5):
    """
    Simple anchor/confident-example estimator (Patrini et al., 2017 style):
    1) Train a base classifier on noisy data.
    2) Get p(y|x) on training set.
    3) For each clean class i, find indices whose predicted argmax == noisy label == i and with high confidence.
    4) For those indices, estimate column i of T as average of empirical noisy label distribution given model predicts i.
    Here: since we only have noisy labels S, we approximate T[:, i] ≈ E[ onehot(S) | argmax p = i, p_i >= τ ].
    Normalize columns to sum to 1.
    """
    import torch.optim as optim


    device = torch.device(device)
    model = make_model(is_cifar, num_classes=num_classes).to(device)

    optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

    # quick warmup training on noisy labels
    for _ in range(epochs):
        train_one_epoch(model, train_loader, optimizer, criterion=None, device=device)  # default CE inside

    # collect probs & noisy labels
    probs, y_noisy = predict_proba(model, train_loader, device)
    preds = probs.argmax(axis=1)
    maxp = probs.max(axis=1)

    C = num_classes
    T = t
    # choose class-wise thresholds based on quantiles for stability
    for i in range(C):
        idx = np.where(preds == i)[0]
        if idx.size == 0:
            T[:, i] = np.ones(C) / C
            continue
        # high-confidence subset (top 30% by p_i)
        conf = maxp[idx]
        if conf.size > 50:
            tau = np.quantile(conf, 0.7)
        else:
            tau = np.min(conf)  # keep all if tiny
        keep = idx[conf >= tau]
        if keep.size == 0:
            keep = idx
        # empirical distribution of noisy labels in this confident set
        hist = np.bincount(y_noisy[keep], minlength=C).astype(np.float64)
        if hist.sum() == 0:
            T[:, i] = np.ones(C) / C
        else:
            T[:, i] = hist / hist.sum()

    # column-normalize
    colsum = T.sum(axis=0, keepdims=True)
    T = np.divide(T, np.maximum(colsum, 1e-8))
    return T.astype(np.float32)


def estimate_transition_trevision(T_init,train_loader, is_cifar, num_classes=3, device="cpu", epochs=5, lambda_reg=0.01, lr_t=1e-4):
    """
    T-Revision version of transition-matrix estimation.

    Args:
        T_init (np.ndarray or torch.Tensor): initial transition matrix [C, C]
        train_loader: noisy training dataloader
        is_cifar (bool): model type selector
        num_classes (int): number of classes
        device (str): 'cpu', 'cuda', or 'mps'
        epochs (int): warm-up epochs
        lambda_reg (float): regularization to keep T close to T_init
        lr_t (float): learning rate for T refinement

    Returns:
        torch.Tensor: refined transition matrix T' (shape [C,C])
    """

    device = torch.device(device)
    C = num_classes

    # ---------------------------
    # 1. Base model setup
    # ---------------------------
    model = make_model(is_cifar, num_classes=C).to(device)
    opt_model = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
    criterion = nn.CrossEntropyLoss()
    # quick warmup to get p(y|x)
    for _ in range(epochs):
        train_one_epoch(model, train_loader, opt_model, criterion=criterion, device=device)

    # ---------------------------
    # 2. Get predicted probabilities & noisy labels
    # ---------------------------
    probs, y_noisy = predict_proba(model, train_loader, device)
    preds = probs.argmax(axis=1)
    maxp = probs.max(axis=1)

    # ---------------------------
    # 3. Initialize learnable ΔT (T-Revision)
    # ---------------------------

    T_init_torch = torch.tensor(T_init, dtype=torch.float32, device=device)
    
    delta_T = nn.Parameter(torch.zeros_like(T_init_torch))
    optimizer_T = optim.Adam([delta_T], lr=lr_t)

    # ---------------------------
    # 4. Optimize ΔT using forward-corrected loss
    # ---------------------------
    for _ in range(epochs):
        optimizer_T.zero_grad()
        T_prime = T_init_torch + delta_T

        # enforce nonnegativity & normalization
        #T_prime = torch.clamp(T_prime, min=1e-6)
        #T_prime = T_prime / T_prime.sum(dim=0, keepdim=True)

        # predicted noisy-label probs
        p = torch.tensor(probs, dtype=torch.float32, device=device)
        noisy_pred = torch.clamp(p @ T_prime.t(), 1e-6, 1.0)
        log_noisy = torch.log(noisy_pred)

        y_t = torch.tensor(y_noisy, dtype=torch.long, device=device)
        loss_ce = nn.NLLLoss()(log_noisy, y_t)
        reg = lambda_reg * torch.norm(T_prime - T_init_torch, p="fro")
        loss = loss_ce + reg

        loss.backward()
        optimizer_T.step()
        print(delta_T.grad.abs().mean().item())

    # normalize final T
    T_final = T_init_torch + delta_T.data
    T_final = torch.clamp(T_final, min=1e-6)
    T_final = T_final / T_final.sum(dim=0, keepdim=True)

    return T_final.detach().cpu().numpy().astype(np.float32)

In [34]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

def estimate_transition_trevision(
    T_init,
    train_loader,
    is_cifar,
    num_classes=3,
    device="cpu",
    epochs=5,
    lambda_reg=1e-4,
    lr_t=5e-3,
    lr_model=1e-3,
    warmup_epochs=3,
    log_every=1
):
    """
    Refine transition matrix using T-Revision method (Patrini et al. style).

    Args:
        T_init (np.ndarray or torch.Tensor): initial transition matrix [C, C]
        train_loader: noisy dataloader (x, y_noisy)
        is_cifar (bool): dataset selector for model architecture
        num_classes (int): number of classes
        device (str): 'cpu', 'cuda', or 'mps'
        epochs (int): number of refinement epochs for ΔT
        lambda_reg (float): regularization to keep T close to T_init
        lr_t (float): learning rate for ΔT
        lr_model (float): learning rate for model warm-up
        warmup_epochs (int): number of model warm-up epochs
        log_every (int): print interval

    Returns:
        np.ndarray: refined transition matrix [C, C]
    """

    device = torch.device(device)
    C = num_classes

    # ---------------------------
    # 1. Base model setup
    # ---------------------------
    model = make_model(is_cifar, num_classes=C).to(device)
    opt_model = optim.Adam(model.parameters(), lr=lr_model, weight_decay=1e-4)
    criterion = nn.CrossEntropyLoss()

    # Warm-up (train classifier on noisy labels)
    print(f"[Warm-up] training base classifier for {warmup_epochs} epochs...")
    for e in range(warmup_epochs):
        train_one_epoch(model, train_loader, opt_model, criterion=criterion, device=device)
        print(f"  done epoch {e+1}/{warmup_epochs}")

    # ---------------------------
    # 2. Get predicted probabilities
    # ---------------------------
    probs, y_noisy = predict_proba(model, train_loader, device)
    p = torch.tensor(probs, dtype=torch.float32, device=device)
    y_t = torch.tensor(y_noisy, dtype=torch.long, device=device)

    # ---------------------------
    # 3. Initialize learnable ΔT (T-Revision)
    # ---------------------------
    T_init_torch = torch.tensor(T_init, dtype=torch.float32, device=device)
    delta_T = nn.Parameter(torch.zeros_like(T_init_torch))
    optimizer_T = optim.Adam([delta_T], lr=lr_t)

    print(f"[Optimization] refining transition matrix for {epochs} epochs...")

    # ---------------------------
    # 4. Optimize ΔT
    # ---------------------------
    for ep in range(epochs):
        optimizer_T.zero_grad()

        # Proposed transition
        T_prime = T_init_torch + delta_T
        T_prime = torch.clamp(T_prime, min=1e-6)

        # Forward correction: p(y_noisy | x) = p(y|x) * T'
        noisy_pred = torch.clamp(p @ T_prime.t(), 1e-6, 1.0)
        log_noisy = torch.log(noisy_pred)

        # Loss = NLL + regularization
        loss_ce = nn.NLLLoss()(log_noisy, y_t)
        reg = lambda_reg * torch.norm(T_prime - T_init_torch, p="fro")
        loss = loss_ce + reg

        loss.backward()
        optimizer_T.step()

        if (ep + 1) % log_every == 0:
            grad_norm = delta_T.grad.abs().mean().item() if delta_T.grad is not None else 0
            print(f"Epoch {ep+1}/{epochs} | loss={loss.item():.5f} | grad={grad_norm:.5e}")

    # ---------------------------
    # 5. Normalize once at the end
    # ---------------------------
    T_final = T_init_torch + delta_T.data
    T_final = torch.clamp(T_final, min=1e-6)
    T_final = T_final / T_final.sum(dim=0, keepdim=True)

    print("\n[Done] Refined Transition Matrix:")
    print(T_final.detach().cpu().numpy())

    return T_final.detach().cpu().numpy().astype(np.float32)


In [35]:
def estimate_transition_trevision(
    T_init,
    train_loader,
    is_cifar,
    num_classes=3,
    device="cpu",
    warmup_epochs=15,
    refine_epochs=60,
    lr_model=1e-3,
    lr_t=5e-3,
    lambda_reg=1e-4,
    q=0.7,
):
    """
    Improved T-Revision: row-stochastic, stable, no T_init overwrite.
    """

    device = torch.device(device)
    C = num_classes

    # ----- 1. Warm-up base model -----
    model = make_model(is_cifar, num_classes=C).to(device)
    opt_model = optim.Adam(model.parameters(), lr=lr_model, weight_decay=1e-4)
    criterion = GeneralizedCrossEntropy(q=q)
    print(f"[Warm-up] {warmup_epochs} epochs...")
    for e in range(warmup_epochs):
        train_one_epoch(model, train_loader, opt_model, criterion=criterion, device=device)

    # ----- 2. Predict posteriors -----
    probs, y_noisy = predict_proba(model, train_loader, device)
    p = torch.tensor(probs, dtype=torch.float32, device=device)
    y_t = torch.tensor(y_noisy, dtype=torch.long, device=device)

    # ----- 3. Learnable ΔT in logit-space -----
    logits_T = nn.Parameter(torch.log(torch.tensor(T_init + 1e-6, dtype=torch.float32, device=device)))
    optT = optim.Adam([logits_T], lr=lr_t)

    print(f"[Refinement] {refine_epochs} epochs...")
    for ep in range(refine_epochs):
        optT.zero_grad()
        T_prime = torch.softmax(logits_T, dim=1)          # rows sum to 1
        noisy_pred = torch.clamp(p @ T_prime, 1e-8, 1.0)  # no transpose
        loss_ce = nn.NLLLoss()(torch.log(noisy_pred), y_t)
        reg = lambda_reg * torch.norm(T_prime - torch.tensor(T_init, device=device), p='fro')
        loss = loss_ce + reg
        loss.backward(); optT.step()
        if (ep + 1) % 10 == 0:
            print(f"Epoch {ep+1}: loss={loss.item():.5f}")

    T_final = torch.softmax(logits_T, dim=1).detach().cpu().numpy()
    return T_final.astype(np.float32)


In [36]:
def accuracy(model, loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            pred = logits.argmax(dim=1)
            correct += (pred == yb).sum().item()
            total += yb.numel()
    return correct / max(total, 1)


def train_one_epoch(model, loader, optimizer, criterion=None, device='cpu'):
    model.train()
    if criterion is None:
        criterion = nn.CrossEntropyLoss()
    total_loss = 0.0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad(set_to_none=True)
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * yb.size(0)
    return total_loss / len(loader.dataset)


def fit_model(model, train_loader, val_loader, device, loss_name='ce', T=None, q=0.7, beta=0.2, epochs=10, lr=1e-3):
    import torch.optim as optim

    if loss_name == 'forward':
        assert T is not None, "Forward correction requires known/estimated T"
        criterion = ForwardCorrectedCE(torch.tensor(T, dtype=torch.float32, device=device))
    elif loss_name == 'gce':
        criterion = GeneralizedCrossEntropy(q=q)
    else:
        criterion = nn.CrossEntropyLoss()

    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)

    best_val = -np.inf
    best_state = None

    for _ in range(epochs):
        train_one_epoch(model, train_loader, optimizer, criterion, device)
        # early stopping on val accuracy (cheap)
        val_acc = accuracy(model, val_loader, device)
        if val_acc > best_val:
            best_val = val_acc
            best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}

    if best_state is not None:
        model.load_state_dict(best_state)
    return model

In [37]:
def set_seed(seed):
    import random
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

DATA_PATHS = {
    'fashion03': os.path.join('data', 'FashionMNIST0.3.npz'),
    'fashion06': os.path.join('data', 'FashionMNIST0.6.npz'),
    'cifar':     os.path.join('data', 'CIFAR.npz'),
}
def known_T_fashion_03():
    # T = [[0.7,0.3,0.0],[0.0,0.7,0.3],[0.0,0.0,0.7]]
    return np.array([[0.7,0.3,0.0],
                     [0.0,0.7,0.3],
                     [0.0,0.0,0.7]], dtype=np.float32)

def known_T_fashion_06():
    # [[0.3,0.4,0.3],[0.4,0.3,0.3],[0.3,0.3,0.4]]
    return np.array([[0.3,0.4,0.3],
                     [0.4,0.3,0.3],
                     [0.3,0.3,0.4]], dtype=np.float32)


def pick_known_T(tag):
    if tag == 'fashion03':
        return known_T_fashion_03()
    elif tag == 'fashion06':
        return known_T_fashion_06()
    else:
        return None
C = 3

def run_once(args, seed):
    set_seed(seed)
    device = torch.device('mps')

    # load data
    Xtr, Str, Xts, Yts = load_npz(DATA_PATHS[args['dataset']])

    # loaders for this split
    train_loader, val_loader, is_cifar = make_loaders(Xtr, Str, batch_size=args['batch_size'], seed=seed)
    test_loader = make_test_loader(Xts, Yts, batch_size=512)
  
    # choose model
    model = make_model(is_cifar).to(device)

    # Transition matrix
    T = None
    if args['loss'] == 'forward' or args['loss'] == 'gce':
        if args['estimate_T'] or args['dataset']=='cifar':
            print('estimating matrix')
            T = np.zeros((C, C), dtype=np.float64)
            T = estimate_transition_anchor(T, train_loader, is_cifar, device=device, epochs=args['est_epochs'])
            T = estimate_transition_trevision(T, train_loader, is_cifar, device=device)
            print(T)
        else:
            T = pick_known_T(args['dataset'])
            if T is None:
                raise ValueError("Forward loss selected but no known T for this dataset; use --estimate_T.")
            


    # fit
    model = fit_model(
        model,
        train_loader,
        val_loader,
        device,
        loss_name=args['loss'],
        T=T,
        q=args['q'],
        beta=args['beta'],
        epochs=args['epochs'],
        lr=args['lr'],
    )

    # evaluate on clean test set
    test_acc = accuracy(model, test_loader, device)
    return float(test_acc), (T.tolist() if T is not None else None)

In [38]:
losses = ['forward','gce']
datasets = ['cifar', "fashion03", "fashion06"]
base = {
    "runs":10,
    "epochs": 10,
    "estimate_T":True,
    "loss":'forward',
    "batch_size":4096,
    "q":0.6,
    "est_epochs":5,
    "beta":0.2,
    "lr":1e-3,
    "num_classes":3,
    "device":'mps'
}



In [39]:
now = datetime.now()
now = now.strftime("%Y-%m-%d %H:%M:%S")
# create each cfg
configs = []
for i, ds in enumerate(datasets):
    for loss in losses:
        cfg = {**base, "dataset": ds, "out": 'results_'+loss+'_'+now+'_'+str(i)+'.json', "loss":loss}
        configs.append(cfg)

print(configs)

[{'runs': 10, 'epochs': 10, 'estimate_T': True, 'loss': 'forward', 'batch_size': 4096, 'q': 0.6, 'est_epochs': 5, 'beta': 0.2, 'lr': 0.001, 'num_classes': 3, 'device': 'mps', 'dataset': 'cifar', 'out': 'results_forward_2025-10-28 22:11:44_0.json'}, {'runs': 10, 'epochs': 10, 'estimate_T': True, 'loss': 'gce', 'batch_size': 4096, 'q': 0.6, 'est_epochs': 5, 'beta': 0.2, 'lr': 0.001, 'num_classes': 3, 'device': 'mps', 'dataset': 'cifar', 'out': 'results_gce_2025-10-28 22:11:44_0.json'}, {'runs': 10, 'epochs': 10, 'estimate_T': True, 'loss': 'forward', 'batch_size': 4096, 'q': 0.6, 'est_epochs': 5, 'beta': 0.2, 'lr': 0.001, 'num_classes': 3, 'device': 'mps', 'dataset': 'fashion03', 'out': 'results_forward_2025-10-28 22:11:44_1.json'}, {'runs': 10, 'epochs': 10, 'estimate_T': True, 'loss': 'gce', 'batch_size': 4096, 'q': 0.6, 'est_epochs': 5, 'beta': 0.2, 'lr': 0.001, 'num_classes': 3, 'device': 'mps', 'dataset': 'fashion03', 'out': 'results_gce_2025-10-28 22:11:44_1.json'}, {'runs': 10, 'e

In [40]:
for cfg in configs:
    all_acc = []
    last_T = None
    t_arr = []
    for r in range(cfg['runs']):

        start = time.perf_counter()
        acc, T = run_once(cfg, seed=1000+r)
        all_acc.append(acc)
        if cfg['estimate_T']:
            t_arr.append(T)
        last_T = T if T is not None else last_T
        print(f"Run {r+1:02d}/{cfg['runs']}: test acc = {acc*100:.2f}%")
        end = time.perf_counter()
        print(f"{cfg['device']}: {r+1} steps -> {end - start:.2f} sec | avg {1000*(end - start)/(r+1):.1f} ms/step")
    mean = float(np.mean(all_acc))
    std  = float(np.std(all_acc))

    summary = {
        'dataset': cfg['dataset'],
        'loss': cfg['loss'],
        'estimate_T': bool(cfg['estimate_T']),
        'epochs': cfg['epochs'],
        'runs': cfg['runs'],
        'mean_test_acc': mean,
        'std_test_acc': std,
        'last_estimated_T': last_T,
        't_arr':t_arr,
        'per_run_acc': all_acc,
    }
    print("="*72)
    print(f"{cfg['dataset']} | {cfg['loss']} | mean±std over {cfg['runs']} runs: {mean*100:.2f}±{std*100:.2f}%")

    with open(cfg['out'], 'w') as f:
        json.dump(summary, f, indent=2)
    print(f"Saved summary to {cfg['out']}")

estimating matrix
[Warm-up] 15 epochs...
[Refinement] 60 epochs...
Epoch 10: loss=1.09633
Epoch 20: loss=1.09464
Epoch 30: loss=1.09303
Epoch 40: loss=1.09153
Epoch 50: loss=1.09012
Epoch 60: loss=1.08883
[[0.3889811  0.354772   0.25624692]
 [0.22579259 0.5197434  0.254464  ]
 [0.2901467  0.2333899  0.47646347]]
Run 01/10: test acc = 33.33%
mps: 1 steps -> 39.54 sec | avg 39541.2 ms/step
estimating matrix
[Warm-up] 15 epochs...
[Refinement] 60 epochs...
Epoch 10: loss=1.09429
Epoch 20: loss=1.09272
Epoch 30: loss=1.09132
Epoch 40: loss=1.09011
Epoch 50: loss=1.08910
Epoch 60: loss=1.08826
[[0.45679858 0.2831707  0.26003075]
 [0.24708967 0.52224725 0.23066308]
 [0.2615049  0.21988928 0.51860577]]
Run 02/10: test acc = 33.33%
mps: 2 steps -> 35.94 sec | avg 17970.5 ms/step
estimating matrix
[Warm-up] 15 epochs...
[Refinement] 60 epochs...
Epoch 10: loss=1.09538
Epoch 20: loss=1.09414
Epoch 30: loss=1.09298
Epoch 40: loss=1.09190
Epoch 50: loss=1.09093
Epoch 60: loss=1.09006
[[0.43567258 

In [41]:
criterion = GeneralizedCrossEntropy(q=0.7)

# Dummy data
logits = torch.randn(4, 3, requires_grad=True)  # model outputs
y = torch.tensor([0, 2, 1, 0])                  # true labels

# Compute loss
loss = criterion(logits, y)
print("Loss tensor:", loss)
print("Loss value:", loss.item())

Loss tensor: tensor(0.9543, grad_fn=<MeanBackward0>)
Loss value: 0.9543001651763916


In [44]:
with open("results_forward_2025-10-28 22:11:44_1", "r") as f:
    data = json.load(f)

print()
T_prime = np.array(data['last_estimated_T'])
T_true = np.array([[0.7,0.3,0.0],
                     [0.0,0.7,0.3],
                     [0.3,0.0,0.7]], dtype=np.float32)

#checking recreation performance
print(T_prime)
print(T_true)
print(f"Fro error: {np.linalg.norm(T_prime - T_true, 'fro')}")
print(f"rre error: {np.linalg.norm(T_prime - T_true, 'fro') / np.linalg.norm(T_true, 'fro')}")
print(f"mae error: {np.mean(np.abs(T_prime - T_true))}")
import scipy.stats as st

corrs = [st.pearsonr(T_true[i], T_prime[i])[0] for i in range(C)]
print("Per-row correlations:", corrs)
print("Mean:", np.mean(corrs))


with open("results_forward_2025-10-28 22:11:44_2.json", "r") as f:
    data = json.load(f)

print()
T_prime = np.array(data['last_estimated_T'])
T_true = np.array([[0.3,0.4,0.3],
                     [0.4,0.3,0.3],
                     [0.3,0.3,0.4]], dtype=np.float32)
#checking recreation performance
print(T_prime)
print(T_true)
print(f"Fro error: {np.linalg.norm(T_prime - T_true, 'fro')}")
print(f"rre error: {np.linalg.norm(T_prime - T_true, 'fro') / np.linalg.norm(T_true, 'fro')}")
print(f"mae error: {np.mean(np.abs(T_prime - T_true))}")
import scipy.stats as st

corrs = [st.pearsonr(T_true[i], T_prime[i])[0] for i in range(C)]
print("Per-row correlations:", corrs)
print("Mean:", np.mean(corrs))

FileNotFoundError: [Errno 2] No such file or directory: 'results_forward_2025-10-28 22:11:44_1'

In [None]:
#checking recreation performance
print(est_t)
print(known_t)
print(rre_fse(known_t, est_t))

[[0.74965894 0.         0.19647887]
 [0.24897681 0.80209059 0.        ]
 [0.00136426 0.19790941 0.80352116]]
[[0.7 0.3 0. ]
 [0.  0.7 0.3]
 [0.  0.  0.7]]
0.4561821482443322
