In [None]:
import pickle
import numpy as np 
import csv
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import ConfusionMatrixDisplay

import matplotlib.pyplot as plt


with open('data/train_data.pkl', 'rb') as f:
    data = pickle.load(f)

images = data['images']
labels = data['labels']

images = images.astype(np.float32) / 255.0   
images = images[:, None, :, :]               # add channel dimension: (N,1,H,W)

X = torch.tensor(images, dtype=torch.float32)
y = torch.tensor(labels, dtype=torch.long)

dataset = TensorDataset(X, y)

def confusion_matrix(y_true,y_pred):
    classes = np.unique(np.concatenate((y_true, y_pred)))
    num_classes = len(classes)
    
    matrix = np.zeros((num_classes, num_classes), dtype=int)

    for t, p in zip(y_true, y_pred):
        true_idx = np.where(classes == t)[0][0]
        pred_idx = np.where(classes == p)[0][0]
        matrix[true_idx, pred_idx] += 1
    
    return matrix
    
    


In [2]:
# Creating a CNN class
class ConvNeuralNet(nn.Module):
#  Determine what layers and their order in CNN object 
    def __init__(self, num_classes):
        super(ConvNeuralNet, self).__init__()
        self.conv_layer1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3)
        self.conv_layer2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3)
        self.max_pool1 = nn.MaxPool2d(kernel_size = 2, stride = 2)
        
        self.conv_layer3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)
        self.conv_layer4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3)
        self.max_pool2 = nn.MaxPool2d(kernel_size = 2, stride = 2)
        
        self.fc1 = nn.Linear(1024, 128)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(128, num_classes)
    
    # Progresses data across layers    
    def forward(self, x):
        out = self.conv_layer1(x)
        out = self.conv_layer2(out)
        out = self.max_pool1(out)
        
        out = self.conv_layer3(out)
        out = self.conv_layer4(out)
        out = self.max_pool2(out)
                
        out = out.reshape(out.size(0), -1)
        
        out = self.fc1(out)
        out = self.relu1(out)
        out = self.fc2(out)
        return out
    
    def predict(self, X):
        self.eval()
        with torch.no_grad():
            return torch.argmax(self.forward(X), dim=1)

    def accuracy(self, X, y):
        preds = self.predict(X)
        return (preds == y).float().mean().item()
    

In [3]:
# -----------------------
# Prepare images and labels
# -----------------------
np.random.seed(0)

X = np.array(images, dtype=np.float32) / 255.0  # normalize

# handle extra singleton dimensions
if X.ndim == 5 and X.shape[1] == 1:  
    X = np.squeeze(X, axis=1)  # (N, H, W, C)

# convert to channel-first
if X.ndim == 3:  # grayscale (N, H, W)
    X = X[:, None, :, :]  # (N, 1, H, W)
elif X.ndim == 4:  # RGB (N, H, W, C)
    X = np.transpose(X, (0, 3, 1, 2))  # (N, C, H, W)
else:
    raise ValueError(f"Unexpected image shape: {X.shape}")
y = np.array(labels, dtype=int).flatten()

# -----------------------
# Train/validation split
# -----------------------
idx = np.random.permutation(len(X))
split = int(0.8 * len(X))

X_train, X_val = X[idx[:split]], X[idx[split:]]
y_train, y_val = y[idx[:split]], y[idx[split:]]

# -----------------------
# Oversample & augment
# -----------------------
def oversample_and_augment(X, y, noise_std=0.02, scale_range=0.05, shift_std=0.02):
    classes, counts = np.unique(y, return_counts=True)
    max_count = counts.max()

    new_X, new_y = [], []

    for cls in classes:
        mask = (y == cls)
        X_cls = X[mask]
        count = X_cls.shape[0]

        # keep originals
        new_X.append(X_cls)
        new_y.append(np.full(count, cls))

        # oversample if needed
        needed = max_count - count
        if needed > 0:
            idx = np.random.choice(count, needed, replace=True)
            X_over = X_cls[idx]

            # augmentation
            noise = np.random.randn(*X_over.shape) * noise_std
            scale = 1 + np.random.randn(*X_over.shape) * scale_range
            shift = np.random.randn(*X_over.shape) * shift_std

            X_aug = np.clip(X_over * scale + shift + noise, 0, 1)
            new_X.append(X_aug)
            new_y.append(np.full(needed, cls))

    X_new = np.vstack(new_X)
    y_new = np.concatenate(new_y)
    idx = np.random.permutation(len(y_new))
    return X_new[idx], y_new[idx]

X_train, y_train = oversample_and_augment(X_train, y_train)

# -----------------------
# Device
# -----------------------
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# convert to torch
X_train_t = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_t = torch.tensor(y_train, dtype=torch.long).to(device)
X_val_t = torch.tensor(X_val, dtype=torch.float32).to(device)
y_val_t = torch.tensor(y_val, dtype=torch.long).to(device)

train_dataset = TensorDataset(X_train_t, y_train_t)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

num_classes = len(np.unique(y))

model = ConvNeuralNet(num_classes=num_classes) .to(device)

print("Val label counts:", np.unique(y_val, return_counts=True))
print("X_train shape:", X_train.shape)


Val label counts: (array([0, 1, 2, 3, 4]), array([99, 23, 39, 44, 11]))
X_train shape: (1935, 3, 28, 28)


In [4]:
# -----------------------
# Training setup
# -----------------------
classes, counts = torch.unique(y_train_t, return_counts=True)
print(classes, counts)
class_weights = (len(y_train_t) / (num_classes * counts)).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=0)

# -----------------------
# Training loop
# -----------------------
num_epochs =100
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for xb, yb in train_loader:
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * xb.size(0)

    avg_loss = total_loss / len(train_dataset)

    # print every 10 epochs
    if (epoch+1) % 10 == 0 or epoch == 0:
        val_acc = model.accuracy(X_val_t, y_val_t)
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {avg_loss:.4f} - Val Acc: {val_acc:.4f}")

# -----------------------
# Confusion matrix
# -----------------------
y_val_pred = model.predict(X_val_t).cpu().numpy()

print("Validation accuracy:", model.accuracy(X_val_t, y_val_t))
print("Training accuracy:", model.accuracy(X_train_t, y_train_t))


cm = confusion_matrix(y_val, y_val_pred)
print(cm)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap="Blues", values_format="d")
plt.show()

tensor([0, 1, 2, 3, 4], device='cuda:0') tensor([387, 387, 387, 387, 387], device='cuda:0')
Epoch 1/100 - Loss: 1.4937 - Val Acc: 0.4583


KeyboardInterrupt: 

In [None]:
# ----------------------------
# Load test data
# ----------------------------
with open("data/test_data.pkl", "rb") as f:
    test_data = pickle.load(f)

X_test = test_data["images"].astype(np.float32) / 255.0  # Normalize

# ----------------------------
# Fix shape for CNN
# ----------------------------
if X_test.ndim == 3:
    # (N, H, W) → (N, 1, H, W)
    X_test = X_test[:, None, :, :]

elif X_test.ndim == 4:
    # (N, H, W, C) → (N, C, H, W)
    if X_test.shape[-1] in [1, 3]:
        X_test = np.transpose(X_test, (0, 3, 1, 2))
    else:
        raise ValueError(f"Channel dimension unexpected: {X_test.shape}")

else:
    raise ValueError(f"Unexpected image shape: {X_test.shape}")

# ----------------------------
# Convert to tensor
# ----------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
X_test_t = torch.tensor(X_test, dtype=torch.float32).to(device)

model.to(device)
model.eval()

# ----------------------------
# Predict
# ----------------------------
with torch.no_grad():
    logits = model(X_test_t)
    y_pred = torch.argmax(logits, dim=1).cpu().numpy()

# ----------------------------
# Save predictions to CSV
# ----------------------------
with open("submission.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["ID", "Label"])
    for i, label in enumerate(y_pred, start=1):
        writer.writerow([i, int(label)])