In [1]:
# ===============================================
# Block [1]: Install necessary libraries (Colab)
# ===============================================
# If already installed, comment these out
!pip install cvxpy tqdm



In [2]:
# ===============================================
# Block [2]: Import libraries
# ===============================================
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader, Subset

import cvxpy as cp
import numpy as np
import random
import os
import requests
import zipfile
from tqdm import tqdm
from collections import defaultdict, Counter

# Check device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [3]:
# ===============================================
# Block [3]: Build the ResNet18 model for 200 classes
# ===============================================
def create_resnet18_for_200_classes():
    """
    Create a ResNet18 model with pretrained weights on ImageNet.
    Then replace the final fully connected layer to output 200 classes.
    Freeze all layers except layer4 and fc, so only these two will be trained.
    """
    # Load pretrained ResNet18
    model = models.resnet18(pretrained=True)

    # Adjust the final layer to output 200 classes
    num_features = model.fc.in_features
    model.fc = nn.Linear(num_features, 200)

    # Freeze everything except layer4 and fc
    for name, param in model.named_parameters():
        if not (name.startswith("layer4") or name.startswith("fc")):
            param.requires_grad = False

    return model

In [4]:
# ===============================================
# Block [4]: Training and validation functions
# ===============================================
def train_model(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    num_epochs=5,
    use_entropy_regularizer=False,
    entropy_weight=0.1
):
    """
    Train the model on train_loader and evaluate on val_loader.
    Optionally, apply Shannon entropy regularization to maximize
    the entropy of softmax outputs.
    """
    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        print(f"\nEpoch {epoch+1}/{num_epochs} ----------------------------")
        for inputs, labels in tqdm(train_loader, desc="Training", leave=False):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)

            # Base cross entropy
            ce_loss = criterion(outputs, labels)

            # Optional: add negative Shannon entropy for regularization
            if use_entropy_regularizer:
                softmax_outputs = F.softmax(outputs, dim=1)
                # Shannon entropy = -sum(p * log(p))
                # We want to maximize it => add negative to the loss
                shannon_entropy = -torch.sum(
                    softmax_outputs * torch.log(softmax_outputs + 1e-8),
                    dim=1
                )
                entropy_loss = -entropy_weight * shannon_entropy.mean()
                loss = ce_loss + entropy_loss
            else:
                loss = ce_loss

            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

        train_loss = running_loss / total
        train_acc = correct / total

        # Validation phase
        val_loss, val_acc = validate_model(model, val_loader, criterion)

        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

def validate_model(model, val_loader, criterion):
    """
    Evaluate the model on the validation set.
    Returns (val_loss, val_accuracy).
    """
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for inputs, labels in tqdm(val_loader, desc="Validating", leave=False):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            val_total += labels.size(0)
            val_correct += predicted.eq(labels).sum().item()

    val_loss /= val_total
    val_acc = val_correct / val_total
    return val_loss, val_acc

In [5]:
# ===============================================
# Block [5]: ModelWithTemperature for temperature scaling
# ===============================================
class ModelWithTemperature(nn.Module):
    """
    A wrapper that applies temperature scaling to a given model's logits.
    Temperature is optimized using the validation set to minimize NLL.
    """
    def __init__(self, model):
        super(ModelWithTemperature, self).__init__()
        self.model = model
        self.temperature = nn.Parameter(torch.ones(1) * 1.0)

    def forward(self, x):
        logits = self.model(x)
        return logits / self.temperature

    def set_temperature(self, valid_loader):
        """
        Optimize temperature on the validation set using LBFGS to minimize NLL.
        """
        self.model.eval()
        nll_criterion = nn.CrossEntropyLoss()

        # Collect logits and labels from validation set
        logits_list, labels_list = [], []
        with torch.no_grad():
            for inputs, labels in valid_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                logits = self.model(inputs)
                logits_list.append(logits)
                labels_list.append(labels)

        logits = torch.cat(logits_list).to(device)
        labels = torch.cat(labels_list).to(device)

        # Optimize temperature with LBFGS
        optimizer = optim.LBFGS([self.temperature], lr=0.01, max_iter=50)

        def eval_step():
            optimizer.zero_grad()
            loss = nll_criterion(logits / self.temperature, labels)
            loss.backward()
            return loss

        optimizer.step(eval_step)
        print(f"Optimal Temperature: {self.temperature.item():.3f}")

        return self

In [6]:
# ===============================================
# Block [6]: Convex optimization & MSE calculation
# ===============================================
def get_test_probabilities(model_with_temp, test_loader):
    """
    Obtain softmax probabilities from the (temperature-scaled) model on the test set.
    """
    model_with_temp.eval()
    all_probs = []

    with torch.no_grad():
        for inputs, _ in test_loader:
            inputs = inputs.to(device)
            logits = model_with_temp(inputs)
            probs = F.softmax(logits, dim=1)
            all_probs.append(probs.cpu())

    return torch.cat(all_probs, dim=0)

def estimate_r_with_cvxpy(f_x, num_classes=200):
    """
    Use cvxpy to estimate r that maximizes the sum of log(f_x @ r).
    Constraints: r >= 0, sum(r) = 1.
    """
    f_x_np = f_x.numpy()
    r = cp.Variable(num_classes, nonneg=True)
    log_likelihood = cp.sum(cp.log(f_x_np @ r + 1e-8))
    constraints = [cp.sum(r) == 1]
    problem = cp.Problem(cp.Maximize(log_likelihood), constraints)
    problem.solve(solver=cp.SCS, max_iters=500)
    return torch.tensor(r.value, device=device)

def calculate_mse(predicted, actual):
    """
    Calculate the RMSE between two vectors.
    """
    return torch.sqrt(torch.mean((predicted - actual) ** 2)).item()

def evaluate_on_test_subsets(model_with_temp, test_loaders, test_distributions, num_classes=200):
    """
    For each test subset, compute predicted distribution r via convex optimization,
    compare with true distribution to get MSE, and finally compute the mean/std of MSE.
    """
    all_mse = []

    for i, (test_loader, true_dist) in enumerate(zip(test_loaders, test_distributions)):
        print(f"\nEvaluating on Test Subset {i+1}...")
        # 1) get probabilities from model
        f_x = get_test_probabilities(model_with_temp, test_loader)
        # 2) estimate r using cvxpy
        estimated_r = estimate_r_with_cvxpy(f_x, num_classes=num_classes)
        # 3) compute MSE against true_dist
        if isinstance(true_dist, np.ndarray):
            true_dist = torch.tensor(true_dist, device=device, dtype=torch.float32)
        mse = calculate_mse(estimated_r.cpu(), true_dist.cpu())
        all_mse.append(mse)
        print(f"Subset {i+1} MSE: {mse:.6e}")

    mean_mse = np.mean(all_mse)
    std_mse = np.std(all_mse)
    print(f"\nMean MSE: {mean_mse:.6e}, Std MSE: {std_mse:.6e}")
    return all_mse, mean_mse, std_mse

In [7]:
# ===============================================
# Block [7]: Example usage with data loading
# Simplified random sampling for test subsets
# ===============================================

# Below is a sample code that downloads and loads Tiny-ImageNet-200 data,
# trains the model, applies temperature scaling, and evaluates on randomly
# sampled test subsets. This block should be placed after Blocks [1]-[6].

############################
# 1) Data loading
############################

# Define image transforms (for training and testing)
train_transforms = transforms.Compose([
    transforms.Resize(224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def download_and_extract_zip(url, dest_folder):
    """
    Download and extract a zip file from the given url to the destination folder.
    """
    zip_path = os.path.join(dest_folder, "tiny-imagenet.zip")
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)

    response = requests.get(url, stream=True)
    with open(zip_path, "wb") as file:
        for chunk in response.iter_content(chunk_size=1024):
            file.write(chunk)

    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        zip_ref.extractall(dest_folder)
    os.remove(zip_path)

# Download Tiny-ImageNet-200 (pre-processed) from GitHub
url = "https://github.com/tjmoon0104/pytorch-tiny-imagenet/releases/download/tiny-imagenet-dataset/processed-tiny-imagenet-200.zip"
data_folder = "./tiny-imagenet-200"

download_and_extract_zip(url, data_folder)

train_dir = os.path.join(data_folder, "tiny-imagenet-200/train")
val_dir   = os.path.join(data_folder, "tiny-imagenet-200/val")
test_dir  = os.path.join(data_folder, "tiny-imagenet-200/test")

train_dataset = datasets.ImageFolder(train_dir, transform=train_transforms)
val_dataset   = datasets.ImageFolder(val_dir,   transform=test_transforms)
test_dataset  = datasets.ImageFolder(test_dir,  transform=test_transforms)

def sample_test_set(test_dataset, num_samples):
    """
    Randomly sample 'num_samples' images from the test_dataset.
    This is a simpler approach that does not enforce any class distribution.
    """
    indices = list(range(len(test_dataset)))
    random.shuffle(indices)
    selected_indices = indices[:num_samples]
    return Subset(test_dataset, selected_indices)

# Example: create multiple test subsets of different sizes
test_subset_sizes = [1000, 1500, 2000, 2500, 3000]
test_subsets = [sample_test_set(test_dataset, size) for size in test_subset_sizes]

# Build DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True,  num_workers=4)
val_loader   = DataLoader(val_dataset,   batch_size=64, shuffle=False, num_workers=4)
test_loaders = [DataLoader(subset, batch_size=64, shuffle=False, num_workers=4)
                for subset in test_subsets]

print(f"Train Loader: {len(train_loader)} batches, Samples: {len(train_dataset)}")
print(f"Validation Loader: {len(val_loader)} batches, Samples: {len(val_dataset)}")
for i, tloader in enumerate(test_loaders):
    print(f"Test Loader {i + 1}: {len(tloader)} batches, Samples: {len(test_subsets[i])}")

############################
# 2) (Optional) Helper to calculate class distribution
############################
def calculate_class_distribution(dataset, num_classes=200):
    """
    Count how many samples belong to each class in the dataset,
    then return the normalized distribution (length=num_classes).
    """
    labels = [label for _, label in dataset]
    class_counts = Counter(labels)
    class_distribution = np.zeros(num_classes)
    for label, count in class_counts.items():
        class_distribution[label] = count
    return class_distribution / class_distribution.sum()

# Compute the (approximate) true distribution of each test subset
test_distributions = []
for i, subset in enumerate(test_subsets):
    dist = calculate_class_distribution(subset)
    test_distributions.append(dist)
    print(f"Test Subset {i + 1} Class Distribution (first 10 classes): {dist[:10]} ...")

############################
# 3) Create the model
############################
model = create_resnet18_for_200_classes()
print("\nTrainable parameters:")
for n, p in model.named_parameters():
    if p.requires_grad:
        print("  ", n)

############################
# 4) Define loss and optimizer
############################
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()),
                       lr=0.001, weight_decay=1e-4)

############################
# 5) Train the model
############################
# Set 'use_entropy_regularizer=True' to add entropy regularization
num_epochs = 0  # Example epochs
train_model(model,
            train_loader,
            val_loader,
            criterion,
            optimizer,
            num_epochs=num_epochs,
            use_entropy_regularizer=True,
            entropy_weight=0.1)

############################
# 6) Temperature scaling
############################
model_with_temp = ModelWithTemperature(model).to(device)
model_with_temp.set_temperature(val_loader)

############################
# 7) Evaluate on test subsets
############################
mse_list, mean_mse, std_mse = evaluate_on_test_subsets(
    model_with_temp,
    test_loaders,
    test_distributions,
    num_classes=200
)

print("\nFinal results:")
print(f"Mean MSE across subsets: {mean_mse:.6e}")
print(f"Std MSE across subsets:  {std_mse:.6e}")



Train Loader: 1563 batches, Samples: 100000
Validation Loader: 79 batches, Samples: 5000
Test Loader 1: 16 batches, Samples: 1000
Test Loader 2: 24 batches, Samples: 1500
Test Loader 3: 32 batches, Samples: 2000
Test Loader 4: 40 batches, Samples: 2500
Test Loader 5: 47 batches, Samples: 3000
Test Subset 1 Class Distribution (first 10 classes): [0.004 0.005 0.006 0.004 0.002 0.002 0.002 0.005 0.007 0.007] ...
Test Subset 2 Class Distribution (first 10 classes): [0.00666667 0.00466667 0.00333333 0.00466667 0.008      0.00333333
 0.00666667 0.00466667 0.00666667 0.00666667] ...
Test Subset 3 Class Distribution (first 10 classes): [0.006  0.0055 0.0045 0.0045 0.0035 0.006  0.0045 0.0055 0.006  0.0065] ...
Test Subset 4 Class Distribution (first 10 classes): [0.0064 0.0032 0.0052 0.006  0.0048 0.0052 0.0056 0.0036 0.0048 0.004 ] ...
Test Subset 5 Class Distribution (first 10 classes): [0.006      0.00533333 0.00566667 0.00366667 0.006      0.00433333
 0.00466667 0.00533333 0.003      0.003




Trainable parameters:
   layer4.0.conv1.weight
   layer4.0.bn1.weight
   layer4.0.bn1.bias
   layer4.0.conv2.weight
   layer4.0.bn2.weight
   layer4.0.bn2.bias
   layer4.0.downsample.0.weight
   layer4.0.downsample.1.weight
   layer4.0.downsample.1.bias
   layer4.1.conv1.weight
   layer4.1.bn1.weight
   layer4.1.bn1.bias
   layer4.1.conv2.weight
   layer4.1.bn2.weight
   layer4.1.bn2.bias
   fc.weight
   fc.bias
Optimal Temperature: 1.187

Evaluating on Test Subset 1...
Subset 1 MSE: 1.504253e-02

Evaluating on Test Subset 2...




Subset 2 MSE: 1.494659e-02

Evaluating on Test Subset 3...
Subset 3 MSE: 1.612223e-02

Evaluating on Test Subset 4...
Subset 4 MSE: 1.609594e-02

Evaluating on Test Subset 5...
Subset 5 MSE: 1.657827e-02

Mean MSE: 1.575711e-02, Std MSE: 6.465273e-04

Final results:
Mean MSE across subsets: 1.575711e-02
Std MSE across subsets:  6.465273e-04
