In [None]:
import numpy as np
import pandas as pd
import os, pickle, torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import MultiStepLR
import matplotlib.pyplot as plt
from PIL import Image

# auto. choose CPU or GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Running on:", device)


def unpickle_cifar(file_path):
    with open(file_path, 'rb') as f:
        data_dict = pickle.load(f, encoding='bytes')
    return data_dict

# Function to load CIFAR-10 dataset
DATA_PATH = '/home/yw9023/deeplearning/project1/cifar-10-python/cifar-10-batches-py'

# Load metadata (labels)
meta = unpickle_cifar(os.path.join(DATA_PATH, 'batches.meta'))
classes = [c.decode('utf-8') for c in meta[b'label_names']]

# Load training data
train_imgs = []
train_targets = []
for i in range(1, 6):
    batch = unpickle_cifar(os.path.join(DATA_PATH, f'data_batch_{i}'))
    train_imgs.append(batch[b'data'])
    train_targets += batch[b'labels']

train_imgs = np.vstack(train_imgs).reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)
train_targets = np.array(train_targets)

# Data augmentation and normalization
train_transform = T.Compose([
    T.ToPILImage(),
    T.RandomRotation(10),
    T.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
    T.RandomHorizontalFlip(),
    T.RandomAdjustSharpness(sharpness_factor=2, p=0.2),
    T.RandomCrop(32, padding=4),
    T.ToTensor(),
    T.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    T.RandomErasing(p=0.2, scale=(0.02, 0.1), value=1.0)
])

# Data augmentation and normalization
class CIFAR10Dataset(torch.utils.data.Dataset):
    def __init__(self, images, labels, transform=None):
        self.data = images
        self.labels = labels
        self.transform = transform
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.labels[idx]
        if self.transform is not None:
            image = self.transform(image)
        return image, label

# Convert to TensorDataset and apply transformations
train_dataset = CIFAR10Dataset(train_imgs, train_targets, transform=train_transform)

# Load test batch
test_batch = unpickle_cifar(os.path.join(DATA_PATH, 'test_batch'))
val_imgs = test_batch[b'data'].reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)
val_targets = np.array(test_batch[b'labels'])
val_transform = T.Compose([
    T.ToPILImage(),
    T.ToTensor(),
    T.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))
])
val_dataset = CIFAR10Dataset(val_imgs, val_targets, transform=val_transform)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=256, shuffle=False, num_workers=4)

# Load test data
TEST_PATH = '/home/yw9023/deeplearning/project1/cifar_test_nolabel.pkl'
test_batch_custom = unpickle_cifar(TEST_PATH)
test_imgs = test_batch_custom[b'data'].astype(np.float32) / 255.0
test_dataset = [(val_transform(img),) for img in test_imgs]
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=4)

# Define training function
def fit_model(model, train_dl, valid_dl, n_epochs=50):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)
    scheduler = MultiStepLR(optimizer, milestones=[30, 60, 80, 90], gamma=0.1)
    stats = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    
    for epoch in range(n_epochs):
        model.train()
        epoch_loss, correct_preds, total_samples = 0.0, 0, 0
        for imgs, labels in train_dl:
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            _, preds = outputs.max(1)
            total_samples += labels.size(0)
            correct_preds += (preds == labels).sum().item()
        
        avg_train_loss = epoch_loss / len(train_dl)
        train_accuracy = 100.0 * correct_preds / total_samples
        stats['train_loss'].append(avg_train_loss)
        stats['train_acc'].append(train_accuracy)
        
        model.eval()
        val_loss, val_correct, val_total = 0.0, 0, 0
        with torch.no_grad():
            for imgs, labels in valid_dl:
                imgs, labels = imgs.to(device), labels.to(device)
                outputs = model(imgs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, preds = outputs.max(1)
                val_total += labels.size(0)
                val_correct += (preds == labels).sum().item()
        avg_val_loss = val_loss / len(valid_dl)
        val_accuracy = 100.0 * val_correct / val_total
        stats['val_loss'].append(avg_val_loss)
        stats['val_acc'].append(val_accuracy)
        
        scheduler.step()
        print(f"Epoch {epoch+1}: Train Loss {avg_train_loss:.4f}, Train Acc {train_accuracy:.2f}%, "
              f"Val Loss {avg_val_loss:.4f}, Val Acc {val_accuracy:.2f}%")
    
    # Plot Losses and accuracies
    plt.figure(figsize=(10,4))
    plt.subplot(1,2,1)
    plt.plot(range(1, n_epochs+1), stats['train_loss'], 'r-', label='Train Loss')
    plt.plot(range(1, n_epochs+1), stats['val_loss'], 'b-', label='Val Loss')
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True)
    
    plt.subplot(1,2,2)
    plt.plot(range(1, n_epochs+1), stats['train_acc'], 'g-', label='Train Accuracy')
    plt.plot(range(1, n_epochs+1), stats['val_acc'], 'm-', label='Val Accuracy')
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy (%)")
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()

# Define a custom ResNet model from scratch
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.act_fn = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    def forward(self, x):
        shortcut = self.shortcut(x)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.act_fn(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += shortcut
        return self.act_fn(out)

class MyResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(MyResNet, self).__init__()
        self.stem_conv = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.stem_bn = nn.BatchNorm2d(64)
        self.stem_act = nn.ReLU(inplace=True)
        self.layer1 = self._build_layer(64, 64, num_blocks=4, stride=1)
        self.layer2 = self._build_layer(64, 128, num_blocks=4, stride=2)
        self.layer3 = self._build_layer(128, 256, num_blocks=3, stride=2)
        self.pool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(256, num_classes)
    def _build_layer(self, in_ch, out_ch, num_blocks, stride):
        layers = [ResBlock(in_ch, out_ch, stride)]
        for _ in range(1, num_blocks):
            layers.append(ResBlock(out_ch, out_ch))
        return nn.Sequential(*layers)
    def forward(self, x):
        x = self.stem_conv(x)
        x = self.stem_bn(x)
        x = self.stem_act(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.pool(x)
        x = torch.flatten(x, 1)
        return self.fc(x)

# Print the number of parameters
model_instance = MyResNet(num_classes=10).to(device)
from torchsummary import summary
summary(model_instance, (3, 32, 32))

# Train the model
fit_model(model_instance, train_loader, val_loader, n_epochs=100)

# Generate submission file
model_instance.eval()
preds_list = []
with torch.no_grad():
    for batch in test_loader:
        imgs = batch[0].to(device)
        outputs = model_instance(imgs)
        _, pred = outputs.max(1)
        preds_list.extend(pred.cpu().numpy())

submission = pd.DataFrame({'ID': np.arange(len(preds_list)), 'Labels': preds_list})
submission.to_csv('/kaggle/working/submission1.csv', index=False)
print("Submission file saved.")
