In [None]:
 # Imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, Dataset
from torchvision import transforms, models
import numpy as np
import random
import os
from pathlib import Path

# Device
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

# Reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
if device == "cuda":
    torch.cuda.manual_seed_all(42)

Using device: cuda


In [None]:
from datasets import load_dataset
from PIL import Image

# Load UTKFace cropped dataset
dataset = load_dataset("vtsouval/utkface-cropped", split="train")
print("Dataset loaded, number of samples:", len(dataset))


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/799 [00:00<?, ?B/s]

data/train-00000-of-00001.parquet:   0%|          | 0.00/116M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/23704 [00:00<?, ? examples/s]

Dataset loaded, number of samples: 23704


In [None]:
# 2) Split dataset
# -----------------------------
total = len(dataset)
train_size = int(0.6 * total)
val_size   = int(0.2 * total)
test_size  = total - train_size - val_size

train_raw, val_raw, test_raw = random_split(dataset, [train_size, val_size, test_size])

print("Train:", len(train_raw))
print("Val:",   len(val_raw))
print("Test:",  len(test_raw))

Train: 14222
Val: 4740
Test: 4742


In [None]:
from torchvision import transforms

train_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406],
                         std=[0.229,0.224,0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406],
                         std=[0.229,0.224,0.225])
])


In [None]:
class UTKFaceDataset(Dataset):
    def __init__(self, dataset, transform=None):
        self.dataset = dataset
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]

        # Hugging Face dict or tuple
        if isinstance(item, dict):
            image = item['image']
            age = item['age']
        else:  # tuple
            image, age = item

        # Convert PIL or ndarray to tensor if needed
        if not isinstance(image, torch.Tensor):
            if self.transform:
                image = self.transform(image)
        else:
            # Already tensor → only apply normalization if transform exists
            if self.transform:
                # Apply only normalization
                # Create a transform without ToTensor
                norm_transform = transforms.Compose([
                    t for t in self.transform.transforms if not isinstance(t, transforms.ToTensor)
                ])
                image = norm_transform(image)

        return image, torch.tensor(age, dtype=torch.float)



In [None]:
# Wrap raw splits into UTKFaceDataset
train_dataset = UTKFaceDataset(train_raw, transform=train_transforms)
val_dataset   = UTKFaceDataset(val_raw,   transform=val_transforms)
test_dataset  = UTKFaceDataset(test_raw,  transform=val_transforms)


In [None]:
BATCH_SIZE = 64
NUM_WORKERS = 2

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
val_loader   = DataLoader(val_dataset,   batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
test_loader  = DataLoader(test_dataset,  batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)


In [None]:
imgs, ages = next(iter(train_loader))
print("Batch images shape:", imgs.shape)
print("Batch ages shape:", ages.shape)


Batch images shape: torch.Size([64, 3, 224, 224])
Batch ages shape: torch.Size([64])


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class ResNetQuantile(nn.Module):
    def __init__(self, pretrained=True):
        super().__init__()
        # Load ResNet-50
        self.backbone = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1 if pretrained else None)
        in_features = self.backbone.fc.in_features
        self.backbone.fc = nn.Identity()  # Remove original head
        # Quantile regression head
        self.fc = nn.Linear(in_features, 3)  # qL, qM, qU

    def forward(self, x):
        features = self.backbone(x)
        out = self.fc(features)
        qL = out[:,0]
        qM = out[:,1]
        qU = out[:,2]
        return qL, qM, qU

# Initialize
model = ResNetQuantile(pretrained=True).to(device)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


100%|██████████| 97.8M/97.8M [00:00<00:00, 185MB/s]


In [None]:
imgs, ages = next(iter(train_loader))
imgs = imgs.to(device)
qL, qM, qU = model(imgs)

print("qL shape:", qL.shape)  # [B]
print("qM shape:", qM.shape)  # [B]
print("qU shape:", qU.shape)  # [B]

qL shape: torch.Size([64])
qM shape: torch.Size([64])
qU shape: torch.Size([64])


In [None]:
def quantile_loss(pred, target, q):
    """
    pred: [B]
    target: [B]
    q: quantile (0 < q < 1)
    """
    err = target - pred
    return torch.max(q*err, (q-1)*err).mean()

def combined_quantile_loss(qL, qM, qU, y):
    # qL ~ 0.025, qM ~ 0.5, qU ~ 0.975
    loss_L = quantile_loss(qL, y, 0.025)
    loss_M = quantile_loss(qM, y, 0.5)
    loss_U = quantile_loss(qU, y, 0.975)
    return loss_L + loss_M + loss_U


In [None]:
import torch.optim as optim

LR = 1e-4
optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)

# Reduce LR on plateau (validation loss)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)


In [None]:
EPOCHS = 20
PATIENCE = 4 # Early stopping patience
MODEL_DIR = Path("./checkpoints")
MODEL_DIR.mkdir(exist_ok=True)

best_val_loss = float("inf")
epochs_no_improve = 0
history = {"train_loss": [], "val_loss": []}


In [None]:
from tqdm import tqdm

def evaluate_loss(loader):
    model.eval()
    total_loss = 0.0
    n = 0
    with torch.no_grad():
        for imgs, ages in loader:
            imgs = imgs.to(device)
            ages = ages.to(device)
            qL, qM, qU = model(imgs)
            loss = combined_quantile_loss(qL, qM, qU, ages)
            batch_n = imgs.size(0)
            total_loss += float(loss.item()) * batch_n
            n += batch_n
    return total_loss / max(n,1)

for epoch in range(1, EPOCHS+1):
    # ---- train one epoch
    model.train()
    running = 0.0
    n = 0
    pbar = tqdm(train_loader, desc=f"Epoch {epoch}/{EPOCHS}")
    for imgs, ages in pbar:
        imgs = imgs.to(device)
        ages = ages.to(device)
        optimizer.zero_grad()
        qL, qM, qU = model(imgs)
        loss = combined_quantile_loss(qL, qM, qU, ages)
        loss.backward()
        optimizer.step()
        running += float(loss.item()) * imgs.size(0)
        n += imgs.size(0)
        pbar.set_postfix({"train_loss": f"{running/n:0.4f}"})

    train_loss = running / max(n,1)
    val_loss = evaluate_loss(val_loader)
    scheduler.step(val_loss)
    history["train_loss"].append(train_loss)
    history["val_loss"].append(val_loss)

    print(f"Epoch {epoch} train_loss={train_loss:.4f} val_loss={val_loss:.4f}")

    # --- checkpoint
    ckpt_path = MODEL_DIR / f"best_val.pt"
    if val_loss < best_val_loss - 1e-6:
        print(f"Validation loss improved: {best_val_loss:.4f} -> {val_loss:.4f}. Saving checkpoint.")
        best_val_loss = val_loss
        epochs_no_improve = 0
        torch.save({
            "epoch": epoch,
            "model_state_dict": model.state_dict(),
            "optim_state_dict": optimizer.state_dict(),
            "val_loss": val_loss,
        }, ckpt_path)
    else:
        epochs_no_improve += 1
        print(f"No improvement for {epochs_no_improve} epochs.")

    # Early stopping
    if epochs_no_improve >= PATIENCE:
        print(f"Early stopping triggered (no improvement for {PATIENCE} epochs).")
        break

# Load best checkpoint
if ckpt_path.exists():
    ckpt = torch.load(ckpt_path, map_location=device)
    model.load_state_dict(ckpt["model_state_dict"])
    print("Loaded best checkpoint from epoch", ckpt["epoch"], "val_loss", ckpt["val_loss"])


Epoch 1/20: 100%|██████████| 223/223 [02:13<00:00,  1.67it/s, train_loss=25.0141]


Epoch 1 train_loss=25.0141 val_loss=9.0484
Validation loss improved: inf -> 9.0484. Saving checkpoint.


Epoch 2/20: 100%|██████████| 223/223 [02:16<00:00,  1.63it/s, train_loss=4.9115]


Epoch 2 train_loss=4.9115 val_loss=4.4440
Validation loss improved: 9.0484 -> 4.4440. Saving checkpoint.


Epoch 3/20: 100%|██████████| 223/223 [02:18<00:00,  1.61it/s, train_loss=3.6126]


KeyboardInterrupt: 

In [None]:
model.eval()
residuals = []

with torch.no_grad():
    for imgs, ages in val_loader:  # <-- use validation set
        imgs = imgs.to(device)
        ages = ages.to(device)
        qL, qM, qU = model(imgs)

        # Compute residuals for Conformalized Quantile Regression (CQR)
        r = torch.max(qL - ages, ages - qU)
        residuals.extend(r.cpu().numpy())

print("Number of residuals:", len(residuals))


In [None]:
alpha = 0.05
q_hat = np.quantile(residuals, 1 - alpha)
print("q_hat (CQR adjustment):", q_hat)

In [None]:
all_true = []
all_preds = []
all_intervals = []

with torch.no_grad():
    for imgs, ages in val_loader:
        imgs = imgs.to(device)
        ages = ages.to(device)
        qL, qM, qU = model(imgs)

        # Apply CQR
        qL_adj = qL - q_hat
        qU_adj = qU + q_hat

        qL_adj = qL_adj.cpu().numpy()
        qM = qM.cpu().numpy()
        qU_adj = qU_adj.cpu().numpy()
        ages = ages.cpu().numpy()

        for i in range(len(ages)):
            all_true.append(ages[i])
            all_preds.append(qM[i])
            all_intervals.append((qL_adj[i], qU_adj[i]))

In [None]:
print("\nExamples of predicted age + 95% interval:")
for i in range(5):
    print(f"True age: {all_true[i]:.1f} | Pred: {all_preds[i]:.1f} | 95% interval: ({all_intervals[i][0]:.1f}, {all_intervals[i][1]:.1f})")


In [None]:
coverage = np.mean([l <= y <= u for (l,u), y in zip(all_intervals, all_true)])
avg_width = np.mean([u - l for (l,u) in all_intervals])

print(f"Empirical Coverage: {coverage*100:.2f}%")
print(f"Average Interval Width: {avg_width:.2f}")


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(6,6))
plt.scatter(all_true, all_preds, alpha=0.5)
plt.plot([0,100],[0,100], 'r--')  # perfect line
plt.xlabel("True Age")
plt.ylabel("Predicted Median Age")
plt.title("True vs Predicted Median Age")
plt.show()