<a href="https://colab.research.google.com/github/rfdavid/neural-network-playground/blob/master/CIFAR_10_CNN_with_PyTorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torchvision
from collections import OrderedDict
from torchvision.transforms import ToTensor, Lambda, Compose
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import time

'''
   PyTorch Neural Network Module
   Input: 
    - OrderedDict of sequence for the convolution layer
    - OrderedDict of sequence for the convolution layer
'''
class Model(nn.Module):
    def __init__(self, seq_convolutional, seq_linear):
        super(Model, self).__init__()
        self.convolutional_layers = nn.Sequential(seq_convolutional)
        self.linear_layers = nn.Sequential(seq_linear)

    def forward(self, x):
        x = self.convolutional_layers(x)
        x = torch.flatten(x, 1)
        x = self.linear_layers(x)
        return x

'''
   The generic Neural Network class for MNIST dataset
   methods:
    - load_data()
    - run()
    - train()
    - test()
'''
class NeuralNetwork():
    def __init__(self, model = None, loss_fn = None, optimizer = None, debug = True):
        self.model = model
        self.loss_fn = loss_fn
        self.optimizer = optimizer
        self.debug = debug
        self.train_dataloader = None
        self.test_dataloader = None

    def train(self):
        correct = 0
        size = len(self.train_dataloader.dataset)
        for batch, (X, y) in enumerate(self.train_dataloader):
            pred = self.model(X)
            loss = self.loss_fn(pred, y)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        correct /= size
        if self.debug:
          print(f"Training Accuracy: {(100*correct):>0.1f}%")

        return correct

    def test(self):
        size = len(self.test_dataloader.dataset)
        correct = 0
        with torch.no_grad():
            for  batch, (X, y) in enumerate(self.test_dataloader):
                pred = self.model(X)
                correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        correct /= size
        if self.debug:
          print(f"Test Accuracy: {(100*correct):>0.1f}%")

        return correct

    def load_data(self, batch_size = 4):
        transform = transforms.Compose(
            [transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

        training_data = datasets.CIFAR10(
            root='data',
            train=True,
            download=True,
            transform=transform
        )

        test_data = datasets.CIFAR10(
            root='data',        
            train=False,
            download=True,
            transform=transform
        )

        self.train_dataloader = torch.utils.data.DataLoader(training_data, batch_size = batch_size, shuffle = True)
        self.test_dataloader = DataLoader(test_data, batch_size = batch_size)

    def run(self, epochs):
        for t in range(epochs):
            print(f"\nEpoch {t+1}")
            print("-------")
            self.train()
            self.test()

<h3>Helper functions</h3>

In [None]:
'''
    Calculate the input size for the linear sequence after cnn sequence
    Input:
      - image_size int
      - CNN OrderedDict
'''
def calculate_output_size(image_size, cnn_sequence, debug = False):
    output_size = image_size
    out_channels = 0
    for k,v in sequence_convolutional.items():
        class_name = v.__class__.__name__
        if class_name == 'Conv2d':
            output_size = ((output_size + 2*v.padding[0] - v.kernel_size[0]) / v.stride[0]) + 1
            out_channels = v.out_channels
        if class_name == 'MaxPool2d':
            output_size = ((output_size - v.kernel_size) / v.stride) + 1 
        if debug:
            print(class_name, ": ", output_size)
    
    return int(output_size * output_size * out_channels)

In [None]:
'''
   Create two sequences: convolutional followed by linear
'''

conv1_layer = {
    'in_channels': 1,
    'out_channels': 6,
    'kernel_size': 3,
    'padding': 1,
    'stride': 1,
    'max_pool_kernel_size': 2
}

conv2_layer = {
    'in_channels': conv1_layer['out_channels'],
    'out_channels': 12,
    'kernel_size': 3,
    'padding': 1,
    'stride': 1,
    'max_pool_kernel_size': 2
}

linear_layer = {
    'nodes': 64
}


sequence_convolutional = OrderedDict([
            # 3 channels (RGB), output 6, 5 filters, padding 0, stride 1
            # 
            ('conv1', nn.Conv2d(in_channels = 3, out_channels = 6, kernel_size = 5, padding = 0, stride = 1)),
            ('relu1', nn.ReLU()),
            ('maxpool1', nn.MaxPool2d(kernel_size = 2, stride = 2)),
            ('conv2', nn.Conv2d(in_channels = 6, out_channels = 16, kernel_size = 5, padding = 0, stride = 1)),
            ('relu2', nn.ReLU()),
            ('maxpool2', nn.MaxPool2d(kernel_size=2, stride = 2)),
])

'''
   The input size for the linear sequence is the output channel size from
   the convolutional sequence times n x n calculated size
'''

calculate_output_size(32, sequence_convolutional)

sequence_linear = OrderedDict([
            # output channel = 12
            # maxpool grid 7x7
            # channels x n x n
            # use calc_structure helper to calculate the n x n grid
            # 16 * 5 * 5
            ('linear1', nn.Linear(calculate_output_size(32, sequence_convolutional), 120)),
            ('relu3', nn.ReLU()),
            ('linear2', nn.Linear(120, 84)),
            ('relu4', nn.ReLU()),
            ('linear3', nn.Linear(84, 10))
])

device = "cuda" if torch.cuda.is_available() else "cpu"
network = NeuralNetwork()
network.model = Model(sequence_convolutional, sequence_linear).to(device)
network.load_data()
network.optimizer = torch.optim.SGD(network.model.parameters(), lr = 0.001, momentum = 0.9)
network.loss_fn = nn.CrossEntropyLoss()
network.run(epochs = 100)

Files already downloaded and verified
Files already downloaded and verified

Epoch 1
-------
Training Accuracy: 37.2%
Test Accuracy: 48.0%

Epoch 2
-------
Training Accuracy: 51.9%
Test Accuracy: 55.7%

Epoch 3
-------
Training Accuracy: 57.1%
Test Accuracy: 56.8%

Epoch 4
-------
Training Accuracy: 60.5%
Test Accuracy: 60.0%

Epoch 5
-------
Training Accuracy: 63.2%
Test Accuracy: 60.9%

Epoch 6
-------
Training Accuracy: 65.2%
Test Accuracy: 61.7%

Epoch 7
-------
Training Accuracy: 67.0%
Test Accuracy: 62.3%

Epoch 8
-------
Training Accuracy: 68.3%
Test Accuracy: 63.2%

Epoch 9
-------
Training Accuracy: 69.8%
Test Accuracy: 62.8%

Epoch 10
-------
Training Accuracy: 70.6%
Test Accuracy: 60.8%

Epoch 11
-------
Training Accuracy: 71.7%
Test Accuracy: 63.6%

Epoch 12
-------
Training Accuracy: 72.3%
Test Accuracy: 62.0%

Epoch 13
-------
Training Accuracy: 73.1%
Test Accuracy: 62.6%

Epoch 14
-------
Training Accuracy: 73.7%
Test Accuracy: 62.2%

Epoch 15
-------
Training Accuracy: 