## Implement CNN with Early Stopping on CIFAR-10

**Import Pytorch Modules and Functions**

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, Subset
from torchvision import datasets, transforms
import numpy as np
import copy
import matplotlib.pyplot as plt

**Check GPU Availability**

In [None]:
# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

**Training Configuration and Hyperparameters**


In [None]:
batch_size = 128
initial_lr = 0.0005
lr_decay_factor = 0.1
lr_patience = 7
max_epochs = 75
patience = 15
weight_decay = 1e-4

**Dataset Loading and Preprocessing**


In [None]:
# Load the dataset without transforms initially
full_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=None)

means = full_dataset.data.mean(axis=(0,1,2)) / 255
stds = full_dataset.data.std(axis=(0,1,2)) / 255

# Define transformations
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize(mean=means, std=stds)
])

val_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=means, std=stds)
])


# Calculate split sizes
val_size = int(0.1 * len(full_dataset))
train_size = len(full_dataset) - val_size

# Perform random split
train_indices, val_indices = random_split(range(len(full_dataset)), [train_size, val_size])

# Create Subsets with proper transforms
train_dataset = Subset(full_dataset, train_indices)
train_dataset.dataset.transform = train_transform

val_dataset = Subset(full_dataset, val_indices)
val_dataset.dataset.transform = val_transform

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)


**VGG-16 Design**


In [None]:
class VGG16(nn.Module):
    def __init__(self):
        super().__init__()
        # Block 1: input channels=3, output channels=64
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Block 2: input channels=64, output channels=128
        self.conv2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Block 3: input channels=128, output channels=256
        self.conv3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Block 4: input channels=256, output channels=512
        self.conv4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Block 5: input channels=512, output channels=512
        self.conv5 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Fully connected layers
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 1 * 1, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 10)  # 10 classes for CIFAR-10
        )

        # Initialize weights according to VGG paper
        self._initialize_weights()

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.fc(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.normal_(m.weight, mean=0, std=0.01)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, mean=0, std=0.01)
                nn.init.constant_(m.bias, 0)

model = VGG16().to(device)


**Training Functions**


In [None]:
def correct(output, target):
    pred = output.argmax(1)
    prediction = (pred == target).type(torch.float)
    return prediction.sum().item()

In [None]:
def train(data_loader, model, lossfun, optimizer):
    model.train()

    num_batches = len(data_loader)
    total_loss = 0
    total_correct = 0

    for data, target in data_loader:
        data, target = data.to(device), target.to(device)

        # Forward pass
        optimizer.zero_grad()

        output = model(data)

        loss = lossfun(output, target)
        total_loss += loss.item()

        total_correct += correct(output, target)

        # Backward pass
        loss.backward()
        optimizer.step()


    avg_loss = total_loss / num_batches
    accuracy = 100. * total_correct / len(data_loader.dataset)
    return avg_loss, accuracy



In [None]:
def validate(data_loader, model, lossfun):
    model.eval()
    total_loss = 0
    total_correct = 0

    with torch.no_grad():
        for data, target in data_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            total_loss += lossfun(output, target).item()
            total_correct += correct(output, target)

    avg_loss = total_loss / len(data_loader)
    accuracy = 100. * total_correct / len(data_loader.dataset)
    return avg_loss, accuracy

In [None]:
def run_training(model, train_loader, val_loader, lossfun, optimizer, scheduler, best_model_filename):
    history = []
    best_val_loss = float('inf')
    no_improve = 0

    for epoch in range(max_epochs):
        # Training phase
        train_loss, train_acc = train(train_loader, model, lossfun, optimizer)

        # Validation phase
        val_loss, val_acc = validate(val_loader, model, lossfun)

        # Store history
        history.append([train_loss, val_loss, train_acc, val_acc])

        print(f'Epoch: {epoch+1}')
        print(f'Training Loss: {train_loss:.6f}, Training Accuracy: {train_acc:.2f}%')
        print(f'Val Loss: {val_loss:.6f}, Val Accuracy: {val_acc:.2f}%\n')

        # Step the scheduler with the validation loss
        scheduler.step(val_loss)

        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_model_filename)
            no_improve = 0
        else:
            no_improve += 1
            if no_improve >= patience:
                print('Early stopping triggered!')
                break

    return history

**Training Setup**

In [None]:
lossfun = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=initial_lr, weight_decay=weight_decay)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=lr_decay_factor, patience=lr_patience)
best_model_filename = 'best_model.pt'
history = run_training(model, train_loader, val_loader, lossfun, optimizer, scheduler, best_model_filename)

**Best Performing Model Statistics**

In [None]:
# Load the best performing model weights
model.load_state_dict(torch.load('best_model.pt', weights_only=True))

# Evaluate the best model on the training set
train_loss, train_acc = validate(train_loader, model, lossfun)
print(f"Best Model Training Loss: {train_loss:.6f}, Training Accuracy: {train_acc:.2f}%")

# Evaluate the best model on the validation set
val_loss, val_acc = validate(val_loader, model, lossfun)
print(f"Best Model Validation Loss: {val_loss:.6f}, Validation Accuracy: {val_acc:.2f}%")

**Plot Training Results**


In [None]:
history = np.array(history)

# Plot Loss
plt.figure(figsize=(6, 4))
plt.plot(history[:, 0], label='Train Loss')
plt.plot(history[:, 1], label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.show()

# Plot Accuracy
plt.figure(figsize=(6, 4))
plt.plot(history[:, 2], label='Train Acc')
plt.plot(history[:, 3], label='Val Acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Training and Validation Accuracy')
plt.legend()
plt.show()

## 2. Compare CNN With and Without Batch Normalization

**VGG-16 With Batch Normalization Design**

In [None]:
class VGG16WithBatchNorm(nn.Module):
    def __init__(self):
        super().__init__()
        # Block 1: input channels=3, output channels=64
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Block 2: input channels=64, output channels=128
        self.conv2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Block 3: input channels=128, output channels=256
        self.conv3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Block 4: input channels=256, output channels=512
        self.conv4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Block 5: input channels=512, output channels=512
        self.conv5 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        # Fully connected layers
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 1 * 1, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 10)  # 10 classes for CIFAR-10
        )

        # Initialize weights according to VGG paper
        self._initialize_weights()

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.fc(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                # Initialize conv layers with small random weights
                nn.init.normal_(m.weight, mean=0, std=0.01)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, mean=0, std=0.01)
                nn.init.constant_(m.bias, 0)

model_bn = VGG16WithBatchNorm().to(device)


**Training Setup**

In [None]:
initial_lr = 0.001

optimizer_bn = optim.Adam(model_bn.parameters(), lr=initial_lr, weight_decay=weight_decay)
scheduler_bn = optim.lr_scheduler.ReduceLROnPlateau(optimizer_bn, mode='min',
                                                  factor=lr_decay_factor,
                                                  patience=lr_patience)
model_filename = 'best_model_bn.pt'

history_bn = run_training(model_bn, train_loader, val_loader, lossfun, optimizer_bn, scheduler_bn, model_filename)

**Best Performing Model Statistics**

In [None]:
# Load the best performing model weights
model_bn.load_state_dict(torch.load('best_model_bn.pt', weights_only=True))

# Validate the best model on the training set
train_loss, train_acc = validate(train_loader, model_bn, lossfun)
print(f"Best Model Training Loss: {train_loss:.6f}, Training Accuracy: {train_acc:.2f}%")

# Validate the best model on the validation set
val_loss, val_acc = validate(val_loader, model_bn, lossfun)
print(f"Best Model Validation Loss: {val_loss:.6f}, Validation Accuracy: {val_acc:.2f}%")

**Plot Training Results With and Without Batch Normalization**

In [None]:
# Plot comparison between models with and without batch normalization
history_bn = np.array(history_bn)

plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history[:, 0], label='Train Loss (without BN)')
plt.plot(history[:, 1], label='Val Loss (without BN)')
plt.plot(history_bn[:, 0], label='Train Loss (with BN)')
plt.plot(history_bn[:, 1], label='Val Loss (with BN)')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Comparison')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

## 3. Visualize Convolutional Features

**Visualization Functions**

In [None]:
def show_filters(model, layer_name):
    # Get the weights from the first Conv2d layer in the Sequential block
    if layer_name == 'conv1':
        weights = model.conv1[0].weight
        n_filters = 16
        grid_size = (2, 8)
    else:
        weights = getattr(model, layer_name)[0].weight
        n_filters = 8  # Show subset of filters for deeper layers
        grid_size = (1, 8)

    weights = weights.cpu().detach()
    weights_min, weights_max = weights.min(), weights.max()
    weights = (weights - weights_min) / (weights_max - weights_min)

    plt.figure(figsize=(20, 10 if layer_name == 'conv1' else 5))
    plt.suptitle(f'{layer_name} Convolutional Filters', fontsize=16)

    for i in range(n_filters):
        plt.subplot(grid_size[0], grid_size[1], i+1)
        if layer_name == 'conv1':
            plt.imshow(weights[i].permute(1, 2, 0))  # RGB channels for conv1
        else:
            plt.imshow(weights[i, 0], cmap='viridis')
        plt.axis('off')
    plt.show()

In [None]:
def show_image_filters(filters, layer_name, image_idx):
    n_filters = 8
    plt.figure(figsize=(20, 5))
    plt.suptitle(f'{layer_name} Convolutional Filters for Test Image {image_idx+1}', fontsize=16)

    for i in range(n_filters):
        plt.subplot(1, 8, i+1)
        image_filter = features[image_idx, i].cpu().numpy()
        plt.imshow(image_filter, cmap='viridis')
        plt.axis('off')
    plt.show()

In [None]:
def show_test_image(image, idx):
    plt.figure(figsize=(5, 5))
    img = image.cpu().numpy().transpose(1, 2, 0)
    # Denormalize the image
    img = img * np.array(stds) + np.array(means)
    img = np.clip(img, 0, 1)
    plt.imshow(img)
    plt.title(f'Test Image {idx+1}', fontsize=14)
    plt.axis('off')
    plt.show()

**Visualization Setup**

In [None]:
convolutional_layers = ['conv1', 'conv2', 'conv3', 'conv4', 'conv5']

# Get test images
test_images, test_labels = next(iter(val_loader))
num_images = 3
test_batch = test_images[:num_images].to(device)

# Dictionary to store activations
activations = {}

# Hook function
def get_activation(name):
    def hook(model, input, output):
        activations[name] = output.detach()
    return hook

# Register hooks for each convolution block
hooks = []
hooks.append(model_bn.conv1[0].register_forward_hook(get_activation('conv1')))
hooks.append(model_bn.conv2[0].register_forward_hook(get_activation('conv2')))
hooks.append(model_bn.conv3[0].register_forward_hook(get_activation('conv3')))
hooks.append(model_bn.conv4[0].register_forward_hook(get_activation('conv4')))
hooks.append(model_bn.conv5[0].register_forward_hook(get_activation('conv5')))

# Forward pass with test images
model_bn.eval()
with torch.no_grad():
    model_bn(test_batch)

**Convolutional Filters Visualization**

In [None]:
# Visualize raw convolutional filters
print("\nConvolutional Filters")
for layer_name in convolutional_layers:
    print(f"\n{layer_name} Filters:")
    show_filters(model_bn, layer_name)

# Show convolutional features for test images
print("\nConvolutional Filters After Using Test Images")
for i in range(num_images):
    print(f"\nAnalyzing Test Image {i+1}")
    show_test_image(test_images[i], i)

    for layer_name in convolutional_layers:
        print(f"\n{layer_name} convolutional filters:")
        show_image_filters(activations[layer_name], layer_name, i)

# Remove hooks
for hook in hooks:
    hook.remove()