In [2]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import pickle
import random
import math
import os
from sklearn import metrics

"""Referenced PyTorch documentation"""

'Referenced PyTorch documentation'

# Main Body

In [8]:
class DatasetCIFAR(Dataset):
    def __init__(self, annotations_file : str, img_file : str) -> object: 
    # initialize dataset class
        annotations = np.load(annotations_file)
        self.img_labels = F.one_hot(torch.from_numpy(annotations), num_classes=10) # Convert to one-hot-encode
        self.img_data = torch.from_numpy(np.load(img_file))

        # Normalize the data
        self.img_data = torch.div(self.img_data, 255)
        
        # Transform the data dimensions to match PyTorch convention
        self.img_data = self.img_data.permute(0, 3, 1, 2) 

    def __len__(self):
    #return length of the total dataset 
        return len(self.img_labels)

    def __getitem__(self, idx : int): 
    #return data with index idx
        image = self.img_data[idx]
        label = self.img_labels[idx]
        return image, label
        

class CifarClassifier(torch.nn.Module):
    """
    Input images are 32 x 32 color images
    10 classes

    Layers:
    1. Convolutional layer1: 16 5 x 5 filters.  No zero padding with the stride as 1. 
    2. Max pooling layer1: The pooling area is 2 x 2 with the stride as 2. 
    3. Convolutional layer2: 32 5 x 5 filters.  No zero padding with the stride as 1. 
    4. Max pooling layer2: The pooling area is 2 x 2 with the stride as 2. 
    5. Convolutional layer3: 64 3 x 3 filters.  No zero padding with the stride as 1.
        - Note that ReLU activation is applied to all convolutional maps. 
    """
    def __init__(self):
        super(CifarClassifier, self).__init__()
        # Convolutional Layers
        self.conv_layer_1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=5)
        self.conv_layer_2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5)
        self.conv_layer_3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)

        # Fully connected layer (500 nodes)
        self.fc1 = nn.Linear(in_features=64 * 3 * 3, out_features=500)

        # Output layer (10 nodes)
        self.fc2 = nn.Linear(in_features = 500, out_features = 10)
    
    def forward(self, x):
        # Forward propagation

        # print("input shape:", x.shape) # TODO: Remove
        
        # print("Input:", x[0][0]) # TODO: Remove

        # Convolutional layer1: 16 5 x 5 filters.  No zero padding with the stride as 1.
        # Convolutional layer 1 dim is 28x28
        x = F.relu(self.conv_layer_1(x))
        # print("After conv 1:", x[0][0]) # TODO: Remove

        # Max pooling layer1: The pooling area is 2 x 2 with the stride as 2.
        # Max pooling layer 1 dim is 14x14
        x = F.max_pool2d(x, (2, 2), 2)
        # print("After pooling 1:", x[0][0]) # TODO: Remove

        # Convolutional layer2: 32 5 x 5 filters.  No zero padding with the stride as 1.
        # Convolutional layer 2 dim is 10x10
        x = F.relu(self.conv_layer_2(x))
        # print("After conv 2:", x[0][0]) # TODO: Remove

        # Max pooling layer2: The pooling area is 2 x 2 with the stride as 2.
        # Max pooling layer 2 dim is 5x5
        x = F.max_pool2d(x, (2, 2), 2)
        # print("After pooling 2:", x[0][0]) # TODO: Remove

        # Convolutional layer3: 64 3 x 3 filters.  No zero padding with the stride as 1.
        # Convolution layer 3 dim is 3x3
        x = F.relu(self.conv_layer_3(x))
        # print("After conv3:", x[0][0]) # TODO: Remove

        # Flatten x
        x = torch.flatten(x, start_dim = 1, end_dim = -1)
        # print("After flattening", x[0]) # TODO: Remove
        
        # print("before softmax x shape:", x.shape) # TODO: Remove
        # Fully connected layer with 500 nodes
        x = F.relu(self.fc1(x))
        # print("After connected layer 1", x[0]) # TODO: Remove

        # Output layer with 10 nodes
        x = F.softmax(self.fc2(x), dim=-1)
        # print("Output:", x[0]) # TODO: Remove 

        return x

# Helper Functions
# Reference source: https://pytorch.org/tutorials/beginner/introyt/trainingyt.html
def train_one_epoch(epoch_index : int, model, train_dataloader : DataLoader, loss_func, optimizer, device : str = 'cpu'):
    running_loss = 0.
    running_accuracy = 0.

    for i, data in enumerate(train_dataloader):
        # Get the batch num
        batch_num = len(train_dataloader)
        # print("Batch number:", batch_num)
        
        # Get the inputs and labels from the data
        inputs, labels = data

        # Change the device
        inputs = inputs.to(device)
        labels = labels.to(device)

        # print("Labels:", labels) # TODO: Remove
        
        # Zero the gradients for every batch
        optimizer.zero_grad()

        # Make the predictions from the forward pass
        outputs = model(inputs)

        # print("Outputs type:", outputs.dtype)

        # print("outputs:", outputs) # TODO: Remove

        # Compute the loss
        loss = loss_func(outputs, labels.float())
        # print(f"I need to see this loss: {loss}")
        
        # Compute loss gradients using backward
        loss.backward()

        # Make adjustments to the learning weights
        optimizer.step()

        # Sum the losses and accuracies
        running_loss += loss.item()
        running_accuracy += calc_acc(outputs, labels)

        # if i % batch_num == batch_num - 1:
        #     # TODO: CHECK THIS OUT! Do some stuff
        #     last_loss = running_loss / batch_num # l
        #     print(f"Batch {i + 1} loss: {last_loss}")
        #     running_loss = 0
        
        # break # TODO: Remove

    avg_loss = running_loss / len(train_dataloader)
    avg_accuracy = running_accuracy / len(train_dataloader)

    return avg_loss, avg_accuracy
    
    
def CIFAR_train(model, train_dataloader : DataLoader, test_dataloader : DataLoader, loss_func, optimizer, device = 'cpu'):
    EPOCHS = 200

    for epoch in range(EPOCHS):
        print(f"---Epoch {epoch + 1}---")
        # Ensure gradient tracking is on
        model.train(True)
        avg_train_loss, avg_train_accuracy = train_one_epoch(epoch, model, train_dataloader, loss_func, optimizer, device)

        # Set model to evaluation mode
        # TODO: Review what this does
        model.eval()

        # Run testing loss
        running_test_loss = 0.0
        running_test_accuracy = 0.0
        with torch.no_grad(): # Disable gradient computation and reduce memory consumption.
            for i, test_data in enumerate(test_dataloader):
                test_inputs, test_labels = test_data

                # Change the device
                test_inputs = test_inputs.to(device)
                test_labels = test_labels.to(device)
                
                # Calculate the test output
                test_outputs = model(test_inputs)

                # Calculate the test loss
                test_loss = loss_func(test_outputs, test_labels.float())
                running_test_loss += test_loss
                
                # Calculate the test accuracy
                running_test_accuracy += calc_acc(test_outputs, test_labels)
        
        avg_test_loss = running_test_loss / (i + 1)
        avg_test_accuracy = running_test_accuracy / (i + 1)
        print(f"Loss:     | Train: {avg_train_loss:.5f} | Test: {avg_test_loss:.5f}")
        print(f"Accuracy: | Train: {avg_train_accuracy:.5f} | Test: {avg_test_accuracy:.5f}")

def calc_acc(prediction, actual):
    # Calculate accuracy
    accuracy = torch.sum(torch.argmax(prediction, 1) == torch.argmax(actual, 1))/actual.shape[0]
    return accuracy.item()

In [4]:
# Define Constants
data_dir = './data'
training_data_path = os.path.join(data_dir, 'training_data.npy')
training_label_path = os.path.join(data_dir, 'training_label.npy')
test_data_path = os.path.join(data_dir, 'test_data.npy')
test_label_path = os.path.join(data_dir,'test_label.npy')


In [15]:
# Load the datasets
training_dataset = DatasetCIFAR(annotations_file=training_label_path, img_file=training_data_path)
test_dataset = DatasetCIFAR(annotations_file=test_label_path, img_file=test_data_path)

# Define the parameters
dataloader_params = {
    'batch_size': 32,
    'shuffle': True,
    'num_workers': 1
}

# Create the dataloaders
train_dataloader = DataLoader(training_dataset, **dataloader_params)
test_dataloader = DataLoader(test_dataset, **dataloader_params)

# Define the device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Define the model
model = CifarClassifier()
model = model.to(device)

# Define the loss function
optimizer = torch.optim.SGD(model.parameters(), lr=0.003, momentum = 0.9, weight_decay=1.5e-3)
loss_func = nn.CrossEntropyLoss()

CIFAR_train(model, train_dataloader, test_dataloader, loss_func, optimizer, device)

---Epoch 1---
Loss:     | Train: 2.30234 | Test: 2.30193
Accuracy: | Train: 0.11172 | Test: 0.11127
---Epoch 2---
Loss:     | Train: 2.29847 | Test: 2.29106
Accuracy: | Train: 0.10141 | Test: 0.11047
---Epoch 3---
Loss:     | Train: 2.24973 | Test: 2.19954
Accuracy: | Train: 0.17712 | Test: 0.24881
---Epoch 4---
Loss:     | Train: 2.19051 | Test: 2.16007
Accuracy: | Train: 0.25392 | Test: 0.29080
---Epoch 5---
Loss:     | Train: 2.16186 | Test: 2.14447
Accuracy: | Train: 0.28859 | Test: 0.30414
---Epoch 6---
Loss:     | Train: 2.14520 | Test: 2.13557
Accuracy: | Train: 0.30724 | Test: 0.31449
---Epoch 7---
Loss:     | Train: 2.12368 | Test: 2.10496
Accuracy: | Train: 0.33309 | Test: 0.35251
---Epoch 8---
Loss:     | Train: 2.09640 | Test: 2.09215
Accuracy: | Train: 0.36094 | Test: 0.36525
---Epoch 9---
Loss:     | Train: 2.07942 | Test: 2.05617
Accuracy: | Train: 0.37826 | Test: 0.40207
---Epoch 10---
Loss:     | Train: 2.06605 | Test: 2.05669
Accuracy: | Train: 0.39028 | Test: 0.39849