In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.init as init

import torchvision
from torchvision import transforms
from torchsummary import summary

from torch.nn.utils import (
  parameters_to_vector as Params2Vec,
  vector_to_parameters as Vec2Params
)

from tqdm import tqdm
import torch.nn.functional as F
import pickle
import torchvision.transforms.v2 as v2
import random
import torch.optim.lr_scheduler as lr_scheduler

In [3]:
# Dataset path
data_dir = "/kaggle/input/deep-learning-spring-2025-project-1/cifar-10-python/cifar-10-batches-py/"

In [4]:
# Function to unpickle a batch
def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

In [5]:
# Load Training Data (All 5 Batches)
X_train, y_train = [], []
for i in range(1, 6):  # Loop through data_batch_1 to data_batch_5
    batch_path = os.path.join(data_dir, f"data_batch_{i}")
    batch_data = unpickle(batch_path)
    
    images = batch_data[b'data']
    labels = batch_data[b'labels']

    # Reshape images to (num_samples, 32, 32, 3)
    images = images.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)

    X_train.append(images)
    y_train.extend(labels)

# Convert lists to NumPy arrays
X_train = np.concatenate(X_train, axis=0)
y_train = np.array(y_train)

In [6]:
# Load Test Data
test_path = os.path.join(data_dir, "test_batch")
test_data = unpickle(test_path)

X_test = test_data[b'data']
y_test = np.array(test_data[b'labels'])

# Reshape images to (num_samples, 32, 32, 3)
X_test = X_test.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)

In [7]:
# PyTorch Dataset Class with Transformations for train dataset
class CIFAR10Dataset(Dataset):
    def __init__(self, images, labels, transform_structured):
        self.images = images
        self.labels = labels
        self.transform_structured = transform_structured

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        image = self.transform_structured(image)

        return image, label

In [8]:
# PyTorch Dataset Class with Transformations for validation dataset
class CIFAR10ValDataset(Dataset):
    def __init__(self, images, labels, transform):
        self.images = images
        self.labels = labels
        self.transform = transform  

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        # Apply the same transformation to all validation images (no augmentation)
        image = self.transform(image)

        return image, label

In [9]:
class Cutout(torch.nn.Module):
    def __init__(self, mask_size, p=0.5):
        """
        Args:
            mask_size (int): The size of the square cutout mask.
            p (float): Probability of applying cutout.
        """
        super().__init__()
        self.mask_size = mask_size
        self.p = p

    def forward(self, img):
        # img is a torch.Tensor with shape (C, H, W)
        if random.random() > self.p:
            return img  # No cutout applied
        
        c, h, w = img.shape
        # Choose random center coordinates
        y = random.randint(0, h - 1)
        x = random.randint(0, w - 1)
        
        y1 = max(0, y - self.mask_size // 2)
        y2 = min(h, y + self.mask_size // 2)
        x1 = max(0, x - self.mask_size // 2)
        x2 = min(w, x + self.mask_size // 2)
        
        # Zero out the selected region
        img[:, y1:y2, x1:x2] = 0.0
        return img

In [10]:
def detect_weak_edges(image, threshold=50):
    """
    Detect weak edges using Sobel filters in PyTorch.
    
    Args:
        image (torch.Tensor): Input image tensor (C, H, W).
        threshold (int): Edge strength threshold to determine weak edges.

    Returns:
        bool: True if edges are weak, False otherwise.
    """
    # Convert RGB to grayscale using ITU-R BT.601 standard
    grayscale = 0.299 * image[0] + 0.587 * image[1] + 0.114 * image[2]  # Shape: (H, W)

    # Apply Sobel filters for edge detection
    sobel_x = torch.tensor([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(image.device)
    sobel_y = torch.tensor([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(image.device)

    # Reshape grayscale to (1, 1, H, W) for convolution
    grayscale = grayscale.unsqueeze(0).unsqueeze(0)  # Shape: (1, 1, H, W)

    # Apply Sobel convolution
    edge_x = F.conv2d(grayscale, sobel_x, padding=1)
    edge_y = F.conv2d(grayscale, sobel_y, padding=1)
    
    # Compute edge strength
    edges = torch.sqrt(edge_x ** 2 + edge_y ** 2)  # Edge magnitude
    edge_sum = edges.sum().item()  # Sum of edge intensities

    return edge_sum < threshold  # Return True if edges are weak

def enhance_contrast(image):
    """
    Enhance image contrast using PyTorch's histogram equalization.

    Args:
        image (torch.Tensor): Input image tensor (C, H, W).

    Returns:
        torch.Tensor: Contrast-enhanced image tensor (C, H, W).
    """
    # Convert to grayscale for histogram equalization
    grayscale = 0.299 * image[0] + 0.587 * image[1] + 0.114 * image[2]  # Shape: (H, W)

    # Compute histogram and cumulative distribution function (CDF)
    hist = torch.histc(grayscale, bins=256, min=0.0, max=1.0)
    cdf = hist.cumsum(0) / hist.sum()  # Normalize CDF

    # Apply histogram equalization
    equalized = torch.index_select(cdf, 0, (grayscale * 255).long().clamp(0, 255))  # Apply equalization
    equalized = equalized / equalized.max()  # Normalize

    # Scale RGB channels based on equalized grayscale
    enhanced_image = image * (equalized / (grayscale + 1e-6))  # Avoid division by zero
    enhanced_image = torch.clamp(enhanced_image, 0, 1)  # Ensure valid range

    return enhanced_image

class ContrastEnhancementTransform(torch.nn.Module):
    """
    Custom PyTorch transform to selectively enhance contrast for weak edge images.
    """
    def forward(self, img):
        """
        Apply contrast enhancement selectively based on weak edge detection.

        Args:
            img (torch.Tensor): Input image tensor (C, H, W).

        Returns:
            torch.Tensor: Transformed image tensor (C, H, W).
        """
        if detect_weak_edges(img):  # Apply only to weak edge images
            img = enhance_contrast(img)

        return img  # No conversion needed (still a PyTorch tensor)

In [11]:
# Transformation for CIFAR Training
transform_structured = transforms.Compose([
    transforms.ToTensor(),
    ContrastEnhancementTransform(),  # Selectively enhance weak-edge images

    # Position-based augmentations (important for generalization)
    transforms.RandomCrop(32, padding=4),  # Simulates different framing
    transforms.RandomHorizontalFlip(p=0.5),  # Prevents left-right bias
    transforms.RandomVerticalFlip(p=0.2),  # Rare but useful for certain classes
    transforms.RandomRotation(5),  # Small rotation to keep features intact

    # Mild Color Augmentation (Avoid Over-Augmenting)
    transforms.ColorJitter(brightness=0.1, contrast=0.1),  # Less aggressive

    # Avoid over-relying on background contrast
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)), 
    transforms.RandomErasing(p=0.2),  # Mild occlusion
    Cutout(mask_size=12, p=0.5),  # Forces the model to learn object features

    # Normalization based on CIFAR statistics
    transforms.Normalize(mean=[0.49139968, 0.48215827, 0.44653124], 
                         std=[0.24703233, 0.24348505, 0.26158768])
])

In [13]:
# Transformation for CIFAR Validation set
val_transform = transforms.Compose([
    transforms.ToTensor(),  # No randomness, just conversion
    transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], 
                         std=[0.247, 0.243, 0.261])  # Normalization
])

In [14]:
batch_size = 64  
num_workers = 4  # Kaggle: use 2 if needed

# Create dataset with different transformations for different classes
train_dataset = CIFAR10Dataset(X_train, y_train, transform_structured)
test_dataset = CIFAR10ValDataset(X_test, y_test, val_transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)

In [15]:
# Define Squeeze-and-Excitation Block
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        
        self.fc1 = nn.Linear(channels, channels // reduction, bias=False)
        self.fc2 = nn.Linear(channels // reduction, channels, bias=False)
        self.relu = nn.ReLU(inplace=True)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = F.adaptive_avg_pool2d(x, 1).view(x.size(0), -1)
        out = self.fc1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.sigmoid(out).view(x.size(0), x.size(1), 1, 1)
        return x * out

In [16]:
# Custom ResNet Model

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64 

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)

        self.seblock = SEBlock(channels=self.in_planes)
        
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)  
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.avg_pool = nn.AvgPool2d(8)  
        self.fc = nn.Linear(256 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.seblock(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [None]:
epochs = 125

def LiteResNet():
    return ResNet(BasicBlock, [7, 4, 3])  

# Define Training and Evaluation
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LiteResNet().to(device)

loss = nn.CrossEntropyLoss()

# Training Phase
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=0.01)
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs, eta_min=1e-6)

def init_weights(m):
    if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
        init.kaiming_uniform_(m.weight, nonlinearity='relu')
        if m.bias is not None:
            init.zeros_(m.bias)

model.apply(init_weights)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (seblock): SEBlock(
    (fc1): Linear(in_features=64, out_features=4, bias=False)
    (fc2): Linear(in_features=4, out_features=64, bias=False)
    (relu): ReLU(inplace=True)
    (sigmoid): Sigmoid()
  )
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, m

In [18]:
summary(model,(3,32,32))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 32, 32]           1,728
       BatchNorm2d-2           [-1, 64, 32, 32]             128
            Linear-3                    [-1, 4]             256
              ReLU-4                    [-1, 4]               0
            Linear-5                   [-1, 64]             256
           Sigmoid-6                   [-1, 64]               0
           SEBlock-7           [-1, 64, 32, 32]               0
            Conv2d-8           [-1, 64, 32, 32]          36,864
       BatchNorm2d-9           [-1, 64, 32, 32]             128
           Conv2d-10           [-1, 64, 32, 32]          36,864
      BatchNorm2d-11           [-1, 64, 32, 32]             128
       BasicBlock-12           [-1, 64, 32, 32]               0
           Conv2d-13           [-1, 64, 32, 32]          36,864
      BatchNorm2d-14           [-1, 64,

In [21]:
# Store training history
train_loss_history = []
test_loss_history = []
train_accuracy_history = []
test_accuracy_history = []
epoch_list = []

# Initialize best loss and accuracy tracking
best_loss = float("inf")
best_accuracy = 0.0
scaler = torch.cuda.amp.GradScaler()

  scaler = torch.cuda.amp.GradScaler()


In [22]:
# Training Loop
for epoch in range(epochs):
    train_loss = 0.0
    test_loss = 0.0
    total = 0
    correct = 0
    train_correct = 0
    train_total = 0
    
    model.train()
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            predicted_output = model(images)
            fit = loss(predicted_output, labels)  # Compute loss
        
        scaler.scale(fit).backward()
        scaler.step(optimizer)
        scaler.update()
        train_loss += fit.item()
        
        # Compute training accuracy
        if labels.dim() == 2:  # If labels are one-hot encoded (MixUp or CutMix)
            labels = labels.argmax(dim=1)  # Convert to class indices

        # Compute training accuracy
        _, predicted = predicted_output.max(1)
        train_total += labels.size(0)
        train_correct += predicted.eq(labels).sum().item()
    
    model.eval()
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            predicted_output = model(images)

            total += labels.size(0)
            predicted = F.softmax(predicted_output, dim=1)
            _, predicted = predicted.max(1)
            correct += predicted.eq(labels).sum().item()
            
            fit = loss(predicted_output, labels)
            test_loss += fit.item()

    # Compute accuracy
    train_accuracy = 100. * train_correct / train_total
    test_accuracy = 100. * correct / total
    train_loss /= len(train_loader)
    test_loss /= len(test_loader)

    # Step the scheduler after validation
    scheduler.step()

    # Save best model based on loss & accuracy
    if best_loss > test_loss or best_accuracy < test_accuracy:
        best_loss = test_loss
        best_accuracy = test_accuracy
        torch.save(model.state_dict(), "best_model.pth")
        print(f"Saved best model at epoch {epoch} with val loss {best_loss:.4f} and accuracy {best_accuracy:.2f}%")
        
    # Store loss and accuracy history
    train_loss_history.append(train_loss)
    test_loss_history.append(test_loss)
    train_accuracy_history.append(train_accuracy)
    test_accuracy_history.append(test_accuracy)
    epoch_list.append(epoch)
    
    print(f'Epoch {epoch}, Train loss {train_loss:.4f}, Train Accuracy {train_accuracy:.2f}%, Test loss {test_loss:.4f}, Test Accuracy {test_accuracy:.2f}%')

  with torch.cuda.amp.autocast():


Saved best model at epoch 0 with val loss 1.3467 and accuracy 51.34%
Epoch 0, Train loss 1.8080, Train Accuracy 35.25%, Test loss 1.3467, Test Accuracy 51.34%
Saved best model at epoch 1 with val loss 1.1176 and accuracy 60.55%
Epoch 1, Train loss 1.5074, Train Accuracy 48.40%, Test loss 1.1176, Test Accuracy 60.55%
Saved best model at epoch 2 with val loss 0.9534 and accuracy 65.79%
Epoch 2, Train loss 1.3512, Train Accuracy 55.30%, Test loss 0.9534, Test Accuracy 65.79%
Saved best model at epoch 3 with val loss 0.8120 and accuracy 72.28%
Epoch 3, Train loss 1.2930, Train Accuracy 57.97%, Test loss 0.8120, Test Accuracy 72.28%
Epoch 4, Train loss 1.1931, Train Accuracy 61.76%, Test loss 0.9321, Test Accuracy 67.02%
Saved best model at epoch 5 with val loss 0.7437 and accuracy 74.38%
Epoch 5, Train loss 1.1789, Train Accuracy 62.82%, Test loss 0.7437, Test Accuracy 74.38%
Epoch 6, Train loss 1.0710, Train Accuracy 66.60%, Test loss 0.8411, Test Accuracy 71.90%
Saved best model at epoch

In [23]:
# Save training history to Excel
history_df = pd.DataFrame({
    'Epoch': epoch,
    'Train Loss': train_loss_history,
    'Test Loss': test_loss_history,
    'Train Accuracy': train_accuracy_history,
    'Test Accuracy': test_accuracy_history
})

In [24]:
history_df.to_excel("training_history.xlsx", index=False)
print("Training history saved to training_history.xlsx")

Training history saved to training_history.xlsx
