## Imports


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from tqdm import tqdm
import matplotlib.pyplot as plt

import torchvision
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import confusion_matrix
import seaborn as sns

In [None]:
print(f"CUDA available: {torch.cuda.is_available()}")

CUDA available: True


# Hyperparametes


In [None]:
# Hyperparameters and Configuration
config = {
    # Dataset parameters
    "batch_size": 64,
    "num_workers": 2,
    "num_classes": 100,
    # Model parameters
    "model_type": "resnet50",
    "pretrained": True,
    # Training parameters
    "epochs": 30,
    "learning_rate": 0.001,
    "optimizer": "adam",  # options: 'adam', 'sgd'
    # Learning rate scheduler
    "scheduler": "plateau",  # options: 'plateau', 'step', 'cosine'
    "scheduler_patience": 3,
    "scheduler_factor": 0.1,
    # Regularization
    "weight_decay": 1e-4,
    # Data augmentation parameters
    "crop_padding": 4,
    "normalize_mean": (0.5071, 0.4867, 0.4408),
    "normalize_std": (0.2675, 0.2565, 0.2761),
    # Device
    "device": torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    # Random seed for reproducibility
    "seed": 42,
    # Save directory
    "save_dir": "./checkpoints",
}

## Downloading Data


In [None]:
# Transformation functions
transform_train = transforms.Compose(
    [
        transforms.RandomCrop(32, padding=config["crop_padding"]),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(config["normalize_mean"], config["normalize_std"]),
    ]
)

transform_test = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize(config["normalize_mean"], config["normalize_std"]),
    ]
)

In [None]:
# batch size
batch_size = 64

# Download data
train_dataset = datasets.CIFAR100(
    root="./data", train=True, download=True, transform=transform_train
)
test_dataset = datasets.CIFAR100(
    root="./data", train=False, download=True, transform=transform_test
)

# Load data
train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=2
)
test_loader = DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False, num_workers=2
)

# get class names for training
class_names: list[str] = train_dataset.classes

Files already downloaded and verified
Files already downloaded and verified


# Resnet Model


In [None]:
def get_resnet_model(config):
    """Load pretrained ResNet and modify for CIFAR100"""
    if config["model_type"] == "resnet50":
        model = models.resnet50(pretrained=config["pretrained"])
    elif config["model_type"] == "resnet18":
        model = models.resnet18(pretrained=config["pretrained"])
    else:
        raise ValueError(f"Unsupported model type: {config['model_type']}")

    # Modify the first conv layer to handle CIFAR100's 32x32 images
    model.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
    model.maxpool = nn.Identity()  # Remove maxpool as we have smaller images

    # Modify final fully connected layer for specified number of classes
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, config["num_classes"])

    return model


def get_optimizer(model, config):
    """Get optimizer based on configuration"""
    if config["optimizer"] == "adam":
        return optim.Adam(
            model.parameters(),
            lr=config["learning_rate"],
            weight_decay=config["weight_decay"],
        )
    elif config["optimizer"] == "sgd":
        return optim.SGD(
            model.parameters(),
            lr=config["learning_rate"],
            momentum=0.9,
            weight_decay=config["weight_decay"],
        )
    else:
        raise ValueError(f"Unsupported optimizer: {config['optimizer']}")


def get_scheduler(optimizer, config):
    """Get learning rate scheduler based on configuration"""
    if config["scheduler"] == "plateau":
        return optim.lr_scheduler.ReduceLROnPlateau(
            optimizer,
            mode="min",
            patience=config["scheduler_patience"],
            factor=config["scheduler_factor"],
            verbose=True,
        )
    elif config["scheduler"] == "step":
        return optim.lr_scheduler.StepLR(
            optimizer, step_size=10, gamma=config["scheduler_factor"]
        )
    elif config["scheduler"] == "cosine":
        return optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=config["epochs"])
    else:
        raise ValueError(f"Unsupported scheduler: {config['scheduler']}")

# Training


In [None]:
def train_epoch(model, train_loader, criterion, optimizer, config):
    """Train for one epoch"""
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    pbar = tqdm(train_loader, desc="Training")
    for inputs, targets in pbar:
        inputs, targets = inputs.to(config["device"]), targets.to(config["device"])

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

        pbar.set_postfix(
            {"Loss": running_loss / len(train_loader), "Acc": 100.0 * correct / total}
        )

    return running_loss / len(train_loader), 100.0 * correct / total


def evaluate(model, test_loader, criterion, config):
    """Evaluate the model"""
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(config["device"]), targets.to(config["device"])
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            # Store all predictions and labels
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(targets.cpu().numpy())

    return (
        running_loss / len(test_loader),
        100.0 * correct / total,
        all_preds,
        all_labels,
    )


def main():
    writer = SummaryWriter(log_dir="./runs")
    # Load data
    train_loader = DataLoader(
        datasets.CIFAR100(
            root="./data", train=True, download=True, transform=transform_train
        ),
        batch_size=config["batch_size"],
        shuffle=True,
        num_workers=config["num_workers"],
    )

    test_loader = DataLoader(
        datasets.CIFAR100(
            root="./data", train=False, download=True, transform=transform_test
        ),
        batch_size=config["batch_size"],
        shuffle=False,
        num_workers=config["num_workers"],
    )

    # Create a TensorBoard SummaryWriter and Display dataset images data
    # writer = SummaryWriter()
    dataiter = iter(train_loader)
    images, labels = next(dataiter)
    img_grid = torchvision.utils.make_grid(images)
    writer.add_image("Sixty Four CIFAR100 Images", img_grid)

    # Initialize model
    model = get_resnet_model(config)
    model = model.to(config["device"])

    # Add model graph to TensorBoard
    writer.add_graph(model, images.to(config["device"]))

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = get_optimizer(model, config)
    scheduler = get_scheduler(optimizer, config)

    # Training loop
    best_acc = 0
    train_losses, train_accs = [], []
    test_losses, test_accs = [], []

    for epoch in range(config["epochs"]):
        print(f"\nEpoch {epoch+1}/{config['epochs']}")

        # Train
        train_loss, train_acc = train_epoch(
            model, train_loader, criterion, optimizer, config
        )
        train_losses.append(train_loss)
        train_accs.append(train_acc)

        # Evaluate
        test_loss, test_acc, all_preds, all_labels = evaluate(
            model, test_loader, criterion, config
        )
        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # Tensorboard
        writer.add_scalar("Loss/train", train_loss, epoch)
        writer.add_scalar("Accuracy/train", train_acc, epoch)
        writer.add_scalar("Loss/test", test_loss, epoch)
        writer.add_scalar("Accuracy/test", test_acc, epoch)

        print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")

        # Save best model
        if test_acc > best_acc:
            best_acc = test_acc
            best_preds, best_labels = (
                all_preds,
                all_labels,
            )  # Save predictions and labels
            torch.save(
                {
                    "epoch": epoch,
                    "model_state_dict": model.state_dict(),
                    "optimizer_state_dict": optimizer.state_dict(),
                    "best_acc": best_acc,
                    "config": config,
                },
                f"{config['save_dir']}/best_model.pth",
            )

        # Adjust learning rate
        if config["scheduler"] == "plateau":
            scheduler.step(test_loss)
        else:
            scheduler.step()

    # Plot training history
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label="Train")
    plt.plot(test_losses, label="Test")
    plt.title("Loss vs Epoch")
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_accs, label="Train")
    plt.plot(test_accs, label="Test")
    plt.title("Accuracy vs Epoch")
    plt.legend()

    plt.tight_layout()
    plt.show()

    # confusion matrix
    print("Plotting confusion matrix for the best model")
    cm = confusion_matrix(best_labels, best_preds)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.xlabel("Predicted labels")
    plt.ylabel("True labels")
    plt.title("Confusion Matrix")
    plt.show()

    # Close the TensorBoard SummaryWriter
    writer.flush()
    writer.close()


if __name__ == "__main__":
    main()

Files already downloaded and verified
Files already downloaded and verified





Epoch 1/30


Training: 100%|██████████| 782/782 [00:44<00:00, 17.68it/s, Loss=3.24, Acc=20.1] 


Test Loss: 2.5027, Test Acc: 33.26%


RuntimeError: Parent directory ./checkpoints does not exist.