In [None]:
import numpy as np
import struct
from array import array
from os.path  import join

class MnistDataloader(object):
    def __init__(
            self, 
            training_images_filepath,
            training_labels_filepath,
            test_images_filepath,
            test_labels_filepath
        ):
        self.training_images_filepath = training_images_filepath
        self.training_labels_filepath = training_labels_filepath
        self.test_images_filepath = test_images_filepath
        self.test_labels_filepath = test_labels_filepath
    
    def read_images_labels(self, images_filepath, labels_filepath):
        labels = []
        with open(labels_filepath, 'rb') as file:
            magic, size = struct.unpack(">II", file.read(8))
            if magic != 2049:
                raise ValueError('Magic number mismatch, expected 2049, got {}'.format(magic))
            labels = array("B", file.read())        
        
        with open(images_filepath, 'rb') as file:
            magic, size, rows, cols = struct.unpack(">IIII", file.read(16))
            if magic != 2051:
                raise ValueError('Magic number mismatch, expected 2051, got {}'.format(magic))
            image_data = array("B", file.read())

        images = [np.array(image_data[i * rows * cols:(i + 1) * rows * cols]).reshape(28, 28) for i in range(size)]
        
        return images, labels
            
    def load_data(self):
        x_train, y_train = self.read_images_labels(self.training_images_filepath, self.training_labels_filepath)
        x_test, y_test = self.read_images_labels(self.test_images_filepath, self.test_labels_filepath)
        return (x_train, y_train),(x_test, y_test)

In [None]:
%matplotlib inline
import random
import matplotlib.pyplot as plt

input_path = 'data/'
training_images_filepath = join(input_path, 'train-images.idx3-ubyte')
training_labels_filepath = join(input_path, 'train-labels.idx1-ubyte')
test_images_filepath = join(input_path, 't10k-images.idx3-ubyte')
test_labels_filepath = join(input_path, 't10k-labels.idx1-ubyte')

def show_images(images, title_texts):
    cols = 5
    rows = int(len(images)/cols) + 1
    plt.figure(figsize=(30,20))
    index = 1    
    for image, title_text in zip(images, title_texts):
        plt.subplot(rows, cols, index)        
        plt.imshow(image, cmap=plt.cm.gray)
        plt.title(title_text, fontsize = 15)
        index += 1

mnist_dataloader = MnistDataloader(training_images_filepath, training_labels_filepath, test_images_filepath, test_labels_filepath)
(x_train, y_train), (x_test, y_test) = mnist_dataloader.load_data()

images_to_show = []
titles_to_show = []
for i in range(0, 10):
    r = random.randint(1, 60000)
    images_to_show.append(x_train[r])
    titles_to_show.append(f"training image [{str(r)}] = {str(y_train[r])}")

for i in range(0, 5):
    r = random.randint(1, 10000)
    images_to_show.append(x_test[r])
    titles_to_show.append(f"test image [{str(r)}] = {str(y_train[r])}")

show_images(images_to_show, titles_to_show)

In [None]:
print(x_train[r])

In [None]:
EPOCHS = 13
BATCH_SIZE = 32

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset

# Load data
mnist_loader = MnistDataloader(training_images_filepath, training_labels_filepath, test_images_filepath, test_labels_filepath)
(x_train, y_train), (x_test, y_test) = mnist_loader.load_data()

# Format the image data into a tensor with size (channels, cols, rows)
x_train, x_test = map(torch.FloatTensor, (x_train, x_test))
x_train, x_test = x_train.unsqueeze(1) / 255, x_test.unsqueeze(1) / 255

# Format the label data
y_train, y_test = map(torch.LongTensor, (y_train, y_test))

# Create datasets and dataloaders
train_dataset = TensorDataset(x_train, y_train)
test_dataset = TensorDataset(x_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)


In [None]:
from collections import defaultdict

# Define the model
# model = torch.nn.Sequential(
#     torch.nn.Conv2d(1, 32, 3),
#     torch.nn.MaxPool2d(2, 2),
#     torch.nn.Conv2d(32, 64, 3),
#     torch.nn.MaxPool2d(2, 2),
#     torch.nn.Flatten(),
#     torch.nn.Linear(64 * 5 * 5, 64),
#     torch.nn.ReLU(),
#     torch.nn.Linear(64, 10),
#     torch.nn.Softmax(dim=1)
# )


# model = torch.nn.Sequential(
#     torch.nn.Conv2d(1, 8, 5),
#     torch.nn.MaxPool2d(2, 2),
#     torch.nn.Conv2d(8, 16, 3),
#     torch.nn.MaxPool2d(2, 2),
#     torch.nn.Flatten(),
#     torch.nn.Linear(16 * 5 * 5, 32),
#     torch.nn.ReLU(),
#     torch.nn.Linear(32, 10),
#     torch.nn.Softmax(dim=1)
# )


model = torch.nn.Sequential(
    torch.nn.Conv2d(1, 4, 5),
    torch.nn.MaxPool2d(2, 2),
    torch.nn.Conv2d(4, 6, 3),
    torch.nn.MaxPool2d(2, 2),
    torch.nn.Flatten(),
    torch.nn.Linear(6 * 5 * 5, 32),
    torch.nn.ReLU(),
    torch.nn.Linear(32, 10),
    torch.nn.Softmax(dim=1)
)

# Loss and optimizer
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

# Dict for accumulating performance stats
stats = {}

# Training loop
for epoch in range(EPOCHS):

    model.train()

    stats[epoch + 1] = {}
    train_loss = 0
    train_accuracy = 0
    batch_train_loss = []
    batch_train_accuracy = []
    by_class = defaultdict(lambda: defaultdict(list))


    for batch_idx, (data, target) in enumerate(train_loader):
        # Forward pass
        output = model(data)
        loss = loss_function(output, target)

        # Accumulate training performance stats
        # Batch-level stats
        batch_train_loss.append(loss.item())
        correct_train = (output.argmax(1) == target).float().sum().item()
        batch_accuracy = correct_train / len(target)
        batch_train_accuracy.append(batch_accuracy)
        # Epoch-level stats
        train_loss += loss.item()
        train_accuracy += batch_accuracy
        # Per-class stats
        predictions = output.argmax(1)
        for class_idx in range(10): # Assuming 10 classes
            mask = target == class_idx
            class_loss = loss_function(output[mask], target[mask])
            correct_class = (predictions[mask] == target[mask]).float().sum().item()
            if mask.sum().item() == 0:
                class_accuracy = 0
            else:
                class_accuracy = correct_class / mask.sum().item()
            by_class[class_idx]["loss"].append(class_loss.item())
            by_class[class_idx]["accuracy"].append(class_accuracy)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    train_loss /= len(train_loader)
    train_accuracy /= len(train_loader)

    # Test for each epoch
    model.eval()
    test_loss = 0
    test_accuracy = 0
    with torch.no_grad():
        for data, target in test_loader:
            output = model(data)
            loss = loss_function(output, target)

            # Accumulate test performance stats
            test_loss += loss.item()
            correct_test = (output.argmax(1) == target).float().sum().item()
            test_accuracy += correct_test / len(target)

    test_loss /= len(test_loader)
    test_accuracy /= len(test_loader)

    # Populate stats
    stats[epoch + 1]["train_loss"] = train_loss
    stats[epoch + 1]["test_loss"] = test_loss
    stats[epoch + 1]["train_accuracy"] = train_accuracy
    stats[epoch + 1]["test_accuracy"] = test_accuracy
    stats[epoch + 1]["batch_train_loss"] = batch_train_loss
    stats[epoch + 1]["batch_train_accuracy"] = batch_train_accuracy
    stats[epoch + 1]["by_class"] = dict(by_class)


    print(f"Epoch {epoch+1}/{EPOCHS}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")
    print(f"            Train Acc:  {train_accuracy:.4f}, Test Acc:  {test_accuracy:.4f}")


In [None]:
import pandas as pd

# Extract the per-class accuracy for class "0"
class_0_accuracy = []
for epoch_stats in stats.values():
    class_0_accuracy.extend(epoch_stats["by_class"][0]["accuracy"])

# Convert to a pandas Series
class_0_accuracy_series = pd.Series(class_0_accuracy)

# Calculate the moving average with a given window size
window_size = 200
moving_average = class_0_accuracy_series.rolling(window=window_size).mean()

# Plot the original accuracy and the moving average
plt.figure(figsize=[10, 6])
# plt.plot(class_0_accuracy, label="Original Accuracy for Class 0")
plt.plot(moving_average, label=f"Moving Average (window size = {window_size}) for Class 0")
plt.xlabel("Batch")
plt.ylabel("Accuracy")
plt.title("Per-class Accuracy for Class 0")
plt.legend()
plt.show()
