In [None]:
import os
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, random_split, TensorDataset
from torch.optim.lr_scheduler import StepLR, MultiStepLR
from PIL import Image
import numpy as np

In [None]:
# Automatically select device: CPU or GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Function to load CIFAR-10 batch files
def load_cifar_batch(file_path):
    with open(file_path, 'rb') as file:
        batch_data = pickle.load(file, encoding='bytes')
    return batch_data

# Specify the directory containing CIFAR-10 batches
cifar10_directory = '/kaggle/input/dl-project-1/deep-learning-spring-2025-project-1/cifar-10-python/cifar-10-batches-py'

In [None]:

# Load meta data
meta_data = load_cifar_batch('/kaggle/input/deep-learning-spring-2025-project-1/cifar-10-python/cifar-10-batches-py/batches.meta')
label_names = [label.decode('utf-8') for label in meta_data[b'label_names']]

# Load training data
train_images = []
train_labels = []

for batch_num in range(1, 6):
    batch_file = os.path.join('/kaggle/input/deep-learning-spring-2025-project-1/cifar-10-python/cifar-10-batches-py', f'data_batch_{batch_num}')
    batch_data = load_cifar_batch(batch_file)
    
    train_images.append(batch_data[b'data'])
    train_labels += batch_data[b'labels']

# Stack and reshape the training data to HWC format
train_images = np.vstack(train_images).reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)
train_labels = np.array(train_labels)

In [None]:
# Data augmentation and normalization pipeline
augmentation_pipeline = transforms.Compose([
    transforms.ToPILImage(),  # Convert numpy array to PIL Image
    transforms.RandomRotation(degrees=10),  # Random rotation within a range of -10 to 10 degrees
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),  # Adjust color properties
    transforms.RandomHorizontalFlip(p=0.5),  # Random horizontal flip with 50% probability
    transforms.RandomAdjustSharpness(sharpness_factor=2, p=0.2),  # Random sharpness adjustment
    transforms.RandomCrop(32, padding=4),  # Random crop with padding
    transforms.ToTensor(),  # Convert the image to a tensor
    transforms.Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.247, 0.243, 0.261)),  # Normalize based on CIFAR-10 stats
    transforms.RandomErasing(p=0.2, scale=(0.02, 0.1), value=1.0, inplace=False)  # Random erasing for data augmentation
])

In [None]:
# Custom Dataset for CIFAR-10 with transformations
class CustomCIFAR10Dataset(torch.utils.data.Dataset):
    def __init__(self, images, labels, transform=None):
        """
        Initializes the custom dataset.

        Args:
            images (numpy.ndarray): The image data.
            labels (numpy.ndarray): The corresponding labels.
            transform (callable, optional): A function/transform to apply to the images.
        """
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        """Returns the total number of samples in the dataset."""
        return len(self.images)

    def __getitem__(self, index):
        """
        Retrieves an image and its corresponding label.

        Args:
            index (int): Index of the sample.

        Returns:
            tuple: (transformed image, label)
        """
        image = self.images[index]
        label = self.labels[index]

        if self.transform:
            image = self.transform(image)

        return image, label

# Create the CIFAR-10 dataset with the specified transformations
train_dataset = CustomCIFAR10Dataset(train_images, train_labels, transform=augmentation_pipeline)


In [None]:
# Split dataset into training and validation sets (commented out)
# train_size = int(0.9 * len(train_dataset))
# val_size = len(train_dataset) - train_size
# train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Define the transformation for the test dataset
test_transform = transforms.Compose([
    transforms.ToPILImage(),  # Convert numpy array to PIL Image
    transforms.ToTensor(),  # Convert image to Tensor
    transforms.Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.247, 0.243, 0.261))  # Normalize using CIFAR-10 statistics
])

# Load the CIFAR-10 test batch
test_batch_data = load_cifar_batch('/kaggle/input/deep-learning-spring-2025-project-1/cifar-10-python/cifar-10-batches-py/test_batch')
val_images = test_batch_data[b'data'].reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)
val_labels = np.array(test_batch_data[b'labels'])

# Create validation dataset
val_dataset = CustomCIFAR10Dataset(val_images, val_labels, transform=test_transform)

# Create DataLoaders for training, validation, and test datasets
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=256, shuffle=False, num_workers=4)

# Load the test dataset (without labels)
test_dataset_path = '/kaggle/input/deep-learning-spring-2025-project-1/cifar_test_nolabel.pkl'
test_batch_data = load_cifar_batch(test_dataset_path)
test_images = test_batch_data[b'data'].astype(np.float32) / 255.0  # Normalize pixel values to [0, 1]

# Convert the test dataset into a list of transformed images
test_dataset = [(test_transform(img),) for img in test_images]

# Create DataLoader for test dataset
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=4)
validation_accuracies=[]


In [None]:
def train_model(model, train_loader, val_loader, epochs=75, early_stopping_patience=10):
    """
    Function to train and validate the model for a given number of epochs.
    
    Args:
        model (torch.nn.Module): The neural network model.
        train_loader (torch.utils.data.DataLoader): DataLoader for the training dataset.
        val_loader (torch.utils.data.DataLoader): DataLoader for the validation dataset.
        epochs (int, optional): Number of epochs to train the model. Default is 50.
        
    Returns:
        None
    """
    criterion = nn.CrossEntropyLoss()  # Loss function
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)  # Optimizer
    scheduler = MultiStepLR(optimizer, milestones=[30, 40, 60, 80], gamma=0.1)  # Learning rate scheduler

    # Lists to store the loss and accuracy values
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    for epoch in range(epochs):
        # Training Phase
        model.train()
        running_train_loss = 0.0
        correct_train = 0
        total_train = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()  # Zero out gradients
            outputs = model(images)  # Forward pass
            loss = criterion(outputs, labels)  # Calculate loss
            loss.backward()  # Backpropagate
            optimizer.step()  # Update weights

            running_train_loss += loss.item()  # Accumulate training loss

            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

        # Calculate average training loss and accuracy
        avg_train_loss = running_train_loss / len(train_loader)
        train_acc = 100 * correct_train / total_train
        train_losses.append(avg_train_loss)
        train_accuracies.append(train_acc)

        # Validation Phase
        model.eval()  # Set model to evaluation mode
        running_val_loss = 0.0
        correct_val = 0
        total_val = 0
        with torch.no_grad():  # Disable gradient calculation for validation
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                running_val_loss += loss.item()

                # Calculate accuracy
                _, predicted = torch.max(outputs, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()

        # Calculate average validation loss and accuracy
        avg_val_loss = running_val_loss / len(val_loader)
        val_acc = 100 * correct_val / total_val
        val_losses.append(avg_val_loss)
        val_accuracies.append(val_acc)
        validation_accuracies.append(val_acc)

        scheduler.step()  # Update learning rate based on scheduler

        # Print results after each epoch
        print(f'Epoch {epoch+1}/{epochs}, '
              f'Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.2f}%, '
              f'Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.2f}%')


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        
        # First convolution layer
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        # ReLU activation function
        self.relu = nn.ReLU(inplace=True)
        
        # Second convolution layer
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # Skip connection: 1x1 convolution (if needed) to match dimensions
        if stride != 1 or in_channels != out_channels:
            self.skip = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        else:
            self.skip = nn.Identity()  # Identity mapping if dimensions match
        
    def forward(self, x):
        identity = self.skip(x)  # Apply skip connection
        
        # Main convolution path
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        
        # Add skip connection to output
        out += identity
        out = self.relu(out)  # Apply ReLU activation
        
        return out


In [None]:
class CustomResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomResNet, self).__init__()
        self.init_conv = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.init_bn = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(64, 64, 4, stride=1)
        self.layer2 = self._make_layer(64, 128, 4, stride=2)
        self.layer3 = self._make_layer(128, 256, 3, stride=2)
        # Optionally add more layers as per ResNet architecture
        #self.layer4 = self._make_layer(256, 512, 2, stride=2)
        self.avg_pool = nn.AdaptiveAvgPool2d(1)  # Output size of 1x1
        self.fc = nn.Linear(256, num_classes)  # Input size for fc should match the output of avg_pool

    def _make_layer(self, in_channels, out_channels, blocks, stride):
        layers = [ResidualBlock(in_channels, out_channels, stride)]
        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.init_conv(x)
        out = self.init_bn(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        # out = self.layer4(out)  # Uncomment if you add layer4
        out = self.avg_pool(out)
        out = torch.flatten(out, 1)
        out = self.fc(out)
        return out


# Initialize the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CustomResNet().to(device)

# Print the number of parameters
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params}")

# Train the model
train_model(model, train_loader, val_loader, epochs=75)  # Adjust epochs as needed


In [None]:
# Generate submission file
model.eval()
predictions = []
with torch.no_grad():
    for batch in test_loader:
        images = batch[0].to(device)  # Get images tensor from tuple and move to device
        outputs = model(images) 
        _, predicted = torch.max(outputs, 1)
        predictions.extend(predicted.cpu().numpy())



In [None]:
import pandas as pd
# Generate submission file
submission = pd.DataFrame({'ID': np.arange(len(predictions)), 'Labels': predictions})
submission.to_csv('/kaggle/working/submission.csv', index=False)
print("Submission1 file saved.")

In [None]:
import matplotlib.pyplot as plt

def plot_test_accuracy(validation_accuracies):
    """
    Plots the validation/test accuracy over epochs.
    
    Args:
        val_accuracies (list): List of validation/test accuracy values recorded at each epoch.
    """
    plt.figure(figsize=(8, 5))
    plt.plot(range(1, len(validation_accuracies) + 1), validation_accuracies, marker='o', linestyle='-', color='b', label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.title('Validation Accuracy Over Epochs')
    plt.legend()
    plt.grid(True)
    plt.show()

# After training, call this function with val_accuracies
test_accuracies = validation_accuracies  # Assuming val_accuracies stores validation accuracy per epoch
plot_test_accuracy(test_accuracies)
