In [17]:
# ==============================================
# 1. Import Required Libraries
# ==============================================
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchsummary import summary
import matplotlib.pyplot as plt
import numpy as np
from torch.optim.lr_scheduler import LambdaLR
import math
import csv
import pickle
import matplotlib.pyplot as plt

In [2]:
# ==============================================
# 2. Device Configuration
# ==============================================
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
running_on = 'colab'
# running_on = 'kaggle'
print(f"Using device: {device}")
print(f"Running on: {running_on}")


Using device: cuda
Running on: colab


In [3]:
# ==============================================
# 3. Data Preparation
# ==============================================
# Data transformations with augmentation
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.RandomRotation(10),
		transforms.ToTensor(),
		transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

# Load datasets
batch_size = 32

if running_on == 'colab':
    data_root = './data'
else :
    data_root = '/kaggle/input/deep-learning-spring-2025-project-1/cifar-10-python'

trainset = torchvision.datasets.CIFAR10(
    root=data_root, train=True, download=True, transform=transform_train)
trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=4)

testset = torchvision.datasets.CIFAR10(
    root=data_root, train=False, download=True, transform=transform_test)
testloader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=4)


100%|██████████| 170M/170M [00:12<00:00, 13.1MB/s]


In [4]:

# ==============================================
# 4. Model Definition (Balanced Parameter Version)
# ==============================================
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                              stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                              stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels,
                         kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = torch.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = torch.relu(out)
        return out

class ModifiedResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedResNet, self).__init__()
        self.in_channels = 36

        self.conv1 = nn.Conv2d(3, 36, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(36)
        self.layer1 = self._make_layer(36, 3, stride=1)
        self.layer2 = self._make_layer(72, 3, stride=2)
        self.layer3 = self._make_layer(144, 3, stride=2)
        self.layer4 = self._make_layer(256, 3, stride=2)
        self.layer5 = self._make_layer(64, 1, stride=2)
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(64, num_classes)

    def _make_layer(self, out_channels, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(ResidualBlock(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = torch.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)  # Added forward pass
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

class SmoothCrossEntropyLoss(nn.Module):
    def __init__(self, smoothing=0.1):
        super(SmoothCrossEntropyLoss, self).__init__()
        self.smoothing = smoothing

    def forward(self, input, target):
        log_prob = F.log_softmax(input, dim=-1)
        weight = input.new_ones(input.size()) * self.smoothing / (input.size(-1) - 1.)
        weight.scatter_(-1, target.unsqueeze(-1), (1. - self.smoothing))
        loss = (-weight * log_prob).sum(dim=-1).mean()
        return loss

# Initialize model
model = ModifiedResNet().to(device)

In [6]:
# ==============================================
# 5. Training Configuration
# ==============================================
num_epochs = 80

criterion = SmoothCrossEntropyLoss(smoothing=0.1).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

def warmup_with_linear_decay(epoch, total_epochs=num_epochs, warmup_epochs=5):
    if epoch < warmup_epochs:
        return epoch / warmup_epochs
    else:
        decay_epochs = total_epochs - warmup_epochs
        return 1 - (epoch - warmup_epochs) / decay_epochs
scheduler = LambdaLR(optimizer, lr_lambda=warmup_with_linear_decay)

In [None]:
# ==============================================
# 6. Training Loop
# ==============================================
best_acc = 0
train_losses = []
test_losses = []
train_accuracies = []
test_accuracies = []
lr_changes = []

for epoch in range(num_epochs):
    # Training
    model.train()
    running_loss = 0.0
    train_correct = 0
    train_total = 0

    for inputs, labels in trainloader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs.data, 1)
        train_total += labels.size(0)
        train_correct += (predicted == labels).sum().item()

    # Calculate training loss and accuracy
    epoch_loss = running_loss / len(trainset)
    train_accuracy = 100 * train_correct / train_total
    train_losses.append(epoch_loss)
    train_accuracies.append(train_accuracy)

    # Validation
    model.eval()
    test_loss = 0.0
    test_correct = 0
    test_total = 0
    with torch.no_grad():
        for inputs, labels in testloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            test_total += labels.size(0)
            test_correct += (predicted == labels).sum().item()

    # Calculate test loss and accuracy
    test_loss = test_loss / len(testset)
    test_acc = 100 * test_correct / test_total
    test_losses.append(test_loss)
    test_accuracies.append(test_acc)

    # Save best model
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(model.state_dict(), 'best_model.pth')

    # Print epoch statistics
    print(f'Epoch [{epoch+1}/{num_epochs}], '
          f'Train Loss: {epoch_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
          f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')

    # Update learning rate scheduler
    scheduler.step()
    lr_changes.append(optimizer.param_groups[0]['lr'])

In [8]:
# ==============================================
# 7. Evaluation
# ==============================================
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in testloader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

final_accuracy = 100 * correct / total
print(f'Final Test Accuracy: {final_accuracy:.2f}%')




Final Test Accuracy: 84.61%


In [None]:
# ==============================================
# 8. Visualization
# ==============================================
plt.figure(figsize=(15, 5))

# Plot losses
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Test Loss')
plt.legend()

# Plot accuracies
plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Training Accuracy')
plt.plot(test_accuracies, label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training and Test Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# ==============================================
# 9. Learning Rate Schedule
# ==============================================
plt.figure(figsize=(10, 4))
plt.plot(range(num_epochs), lr_changes, marker='o')
plt.title('Learning Rate Schedule')
plt.xlabel('Epoch')
plt.ylabel('Learning Rate')
plt.grid(True)
plt.show()

In [14]:
# ==============================================
# 10. Summarizing the Model
# ==============================================
summary(model, (3, 32, 32))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 36, 32, 32]             972
       BatchNorm2d-2           [-1, 36, 32, 32]              72
            Conv2d-3           [-1, 36, 32, 32]          11,664
       BatchNorm2d-4           [-1, 36, 32, 32]              72
            Conv2d-5           [-1, 36, 32, 32]          11,664
       BatchNorm2d-6           [-1, 36, 32, 32]              72
     ResidualBlock-7           [-1, 36, 32, 32]               0
            Conv2d-8           [-1, 36, 32, 32]          11,664
       BatchNorm2d-9           [-1, 36, 32, 32]              72
           Conv2d-10           [-1, 36, 32, 32]          11,664
      BatchNorm2d-11           [-1, 36, 32, 32]              72
    ResidualBlock-12           [-1, 36, 32, 32]               0
           Conv2d-13           [-1, 36, 32, 32]          11,664
      BatchNorm2d-14           [-1, 36,

In [19]:
# ==============================================
# 11. Generate Predictions on Pickle Test Data
# ==============================================
def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

if running_on == 'colab':
    test_data = unpickle('cifar_test_nolabel.pkl')
else:
    test_data = unpickle('/kaggle/input/deep-learning-spring-2025-project-1/cifar_test_nolabel.pkl')

test_images = test_data[b'data'].astype(np.float32) / 255.0

test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))
])

custom_testset = [(test_transform(img),) for img in test_images]
custom_testloader = DataLoader(custom_testset, batch_size=128, shuffle=False, num_workers=4)

model.eval()

predictions = []
with torch.no_grad():
    for data in custom_testloader:
        images = data[0].to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        predictions.extend(predicted.cpu().tolist())


for i in range(100):
    # Get original image
    img = test_images[i]

    # Create figure
    plt.figure(figsize=(2, 2))
    plt.imshow(img)
    plt.title(f'Prediction: {predictions[i]}')
    plt.axis('off')
    plt.show()

image_ids = np.arange(len(predictions))

if running_on == 'colab':
    output_csv_path = 'predictions.csv'
else:
    output_csv_path = '/kaggle/working/predictions.csv'

with open(output_csv_path, 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['ID', 'Labels'])
    for img_id, prediction in zip(image_ids, predictions):
        writer.writerow([img_id, prediction])

print(f'Predictions have been saved to {output_csv_path}')

Predictions have been saved to predictions.csv
