In [1]:
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch
from torch.autograd import Variable

import pandas as pd
import matplotlib.pyplot as plt
import copy
import time
import torch.nn.functional as F

In [7]:
class ResidualBlock(nn.Module):
    """
        A residual block consists of two dense layers,
        where the input is added back before the second
        activation function is applied.
        If the input is x, then the block computes
        relu(W_2 * relu(W_1 * x + b_1) + b_2 + x).
    """
    def __init__(self, input_size):
        super(ResidualBlock, self).__init__()
        self.relu = nn.ReLU()
        
        self.layer1 = nn.Linear(input_size, input_size) # defines first layer
        self.layer2 = nn.Linear(input_size, input_size) # defines second layer

    def forward(self, x):
        output1 = self.relu(self.layer1(x)) # computes relu(W_1 * x + b_1)
        output2 = self.relu(self.layer2(output1)+x) # computes relu(W_2 * relu(W_1 * x + b_1) + b_2 + x).
        return output2
    
class ResidualNet(nn.Module):
    """
        A residual net is a neural net where the values at prior layers
        are added to the value computed in later layers. Thus the layers
        learn what change is needed to the data which can help the model
        converge. Features is the dimension of the input, number of blocks
        is how many residual blocks will appear in the neural net,
        while classes is the number of classification categories.
    """
    def __init__(self, features, number_of_blocks, hidden_size, classes):
        super(ResidualNet, self).__init__()
        self.resblocks = nn.ModuleList()
        self.inputgate = nn.Linear(features, hidden_size) # create input gate for res net
        self.outputgate = nn.Linear(hidden_size, classes)

        for _ in range(number_of_blocks):
            self.resblocks.append(ResidualBlock(hidden_size))
            
    def forward(self, x):
        output1 = self.inputgate(x)
        
        for block in self.resblocks:
            output1 = block(output1)
            
        return F.log_softmax(self.outputgate(output1))
        

In [5]:
def train_mnist_model(model, num_epochs=20):
    training_data = pd.read_csv("mnist_train.csv", header = None).values
    training_labels = torch.LongTensor(training_data[:, 0])
    training_values = torch.FloatTensor(training_data[:, 1:].astype(float))
    
    training_dataset = data.TensorDataset(training_values, training_labels)    
    loader_dset_train = data.DataLoader(training_dataset, batch_size=128, 
                                        num_workers=4, shuffle=True)
    
    since = time.time()

    best_model = model
    best_loss = float('inf')
    model.train(True)

    curr_loss = 0.0
    total_batch_number = 0
    all_losses = []
    
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    criterion = nn.NLLLoss()
    lr_scheduler = exp_lr_scheduler

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        optimizer = lr_scheduler(optimizer, epoch)

        epoch_running_loss = 0.0
        current_batch = 0
        # Iterate over data.
        for inputs, labels in loader_dset_train:
            current_batch += 1
            total_batch_number += 1

            # wrap them in Variable
            inputs, labels = Variable(inputs), \
                             Variable(labels)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # backward
            loss.backward()
            optimizer.step()

            # statistics
            epoch_running_loss += loss.data[0]
            curr_loss += loss.data[0]

            if total_batch_number % 100 == 0:
                all_losses.append(curr_loss / 100)
                time_elapsed = time.time() - since

                print('Epoch Number: {}, Batch Number: {}, Loss: {:.4f}'.format(
                    epoch, current_batch, curr_loss))
                print('Time so far is {:.0f}m {:.0f}s'.format(
                    time_elapsed // 60, time_elapsed % 60))
                curr_loss = 0.0

        # deep copy the model
        if epoch_running_loss < best_loss:
            best_loss = epoch_running_loss
            best_model = copy.deepcopy(model)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best loss: {:4f}'.format(best_loss))

    model.train(False)

    return best_model, all_losses
    
    
def accuracy_for_a_category(model, dataset, category):
    positive_results = 0
    total = 0
    for datapoint, label in dataset: # creates loop to compute accuracy for all datapoints in dataset
        if label == category:
            total += 1 
            _, guess = torch.max(model(Variable(datapoint)), 0)
            positive_results += torch.sum(guess.data == torch.LongTensor([label]))
    return positive_results/total
            

"""
    Given a list of losses, this creates a plot of the loss curve.
"""
def plot_loss_curve(losses):
    plt.plot(losses) # creates the matplot for losses over time

def exp_lr_scheduler(optimizer, epoch, init_lr=0.001, lr_decay_epoch=7):
    """Decay learning rate by a factor of 0.1 every lr_decay_epoch epochs."""
    lr = init_lr * (0.1**(epoch // lr_decay_epoch))

    if epoch % lr_decay_epoch == 0:
        print('LR is set to {}'.format(lr))

    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

    return optimizer
    
"""
    Given a model that classifies data points and a labeled data set this
    determines the overall accuracy of the model.
"""
def test_model(model, dataset):
    positive_results = 0
    total = len(dataset)
    for datapoint, label in dataset: 
        _, guess = torch.max(model(Variable(datapoint)), 0)
        positive_results += torch.sum(guess.data == torch.LongTensor([label]))   
    return positive_results/total    
    
    
    since = time.time()

    best_model = model
    best_loss = float('inf')
    model.train(True)

    curr_loss = 0.0
    total_batch_number = 0
    all_losses = []
    
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    criterion = nn.NLLLoss()
    lr_scheduler = exp_lr_scheduler

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        optimizer = lr_scheduler(optimizer, epoch)

        epoch_running_loss = 0.0
        current_batch = 0
        # Iterate over data.
        for inputs, labels in loader_dset_train:
            current_batch += 1
            total_batch_number += 1

            # wrap them in Variable
            inputs, labels = Variable(inputs), \
                             Variable(labels)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # backward
            loss.backward()
            optimizer.step()

            # statistics
            epoch_running_loss += loss.data[0]
            curr_loss += loss.data[0]

            if total_batch_number % 100 == 0:
                all_losses.append(curr_loss / 100)
                time_elapsed = time.time() - since

                print('Epoch Number: {}, Batch Number: {}, Loss: {:.4f}'.format(
                    epoch, current_batch, curr_loss))
                print('Time so far is {:.0f}m {:.0f}s'.format(
                    time_elapsed // 60, time_elapsed % 60))
                curr_loss = 0.0

        # deep copy the model
        if epoch_running_loss < best_loss:
            best_loss = epoch_running_loss
            best_model = copy.deepcopy(model)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best loss: {:4f}'.format(best_loss))

    model.train(False)

    return best_model, all_losses

In [8]:
# This should work once you finish coding the residual net. After training it,
# you can optionally use some of your code from part c of the prior problem
# to see how well the model did.
mnist_model = ResidualNet(784, 2, 200, 10)
train_mnist_model(mnist_model)

Epoch 0/19
----------
LR is set to 0.001
Epoch Number: 0, Batch Number: 100, Loss: 380.5866
Time so far is 0m 1s
Epoch Number: 0, Batch Number: 200, Loss: 79.1261
Time so far is 0m 2s
Epoch Number: 0, Batch Number: 300, Loss: 43.8147
Time so far is 0m 2s
Epoch Number: 0, Batch Number: 400, Loss: 37.1338
Time so far is 0m 3s

Epoch 1/19
----------
Epoch Number: 1, Batch Number: 31, Loss: 33.1883
Time so far is 0m 4s
Epoch Number: 1, Batch Number: 131, Loss: 28.9238
Time so far is 0m 4s
Epoch Number: 1, Batch Number: 231, Loss: 29.8364
Time so far is 0m 5s
Epoch Number: 1, Batch Number: 331, Loss: 25.8494
Time so far is 0m 6s
Epoch Number: 1, Batch Number: 431, Loss: 27.1824
Time so far is 0m 6s

Epoch 2/19
----------
Epoch Number: 2, Batch Number: 62, Loss: 24.7139
Time so far is 0m 7s
Epoch Number: 2, Batch Number: 162, Loss: 24.1034
Time so far is 0m 8s
Epoch Number: 2, Batch Number: 262, Loss: 23.6150
Time so far is 0m 9s
Epoch Number: 2, Batch Number: 362, Loss: 22.7547
Time so far 

(ResidualNet (
   (resblocks): ModuleList (
     (0): ResidualBlock (
       (relu): ReLU ()
       (layer1): Linear (200 -> 200)
       (layer2): Linear (200 -> 200)
     )
     (1): ResidualBlock (
       (relu): ReLU ()
       (layer1): Linear (200 -> 200)
       (layer2): Linear (200 -> 200)
     )
   )
   (inputgate): Linear (784 -> 200)
   (outputgate): Linear (200 -> 10)
 ),
 [3.8058663994073867,
  0.7912612327933312,
  0.4381474824249744,
  0.3713379468023777,
  0.33188278675079347,
  0.28923782631754874,
  0.2983643037080765,
  0.25849361181259156,
  0.2718240737915039,
  0.24713867701590062,
  0.2410336735472083,
  0.23614953093230726,
  0.22754704304039478,
  0.2234434736520052,
  0.19903506878763438,
  0.2061052107065916,
  0.19330846335738897,
  0.19253072738647461,
  0.20056950073689223,
  0.16542035318911075,
  0.17971450101584197,
  0.18828803643584252,
  0.17955579835921526,
  0.15172808557748796,
  0.1711172527819872,
  0.18016193397343158,
  0.15531510528177023,
  0.

In [9]:
test_data = pd.read_csv("mnist_test.csv", header = None).values
test_labels, test_values = torch.LongTensor(test_data[:, 0]), torch.FloatTensor(test_data[:, 1:].astype(float))

test_dataset = data.TensorDataset(test_values, test_labels)    
loader_dset_test = data.DataLoader(test_dataset, batch_size=128, 
                                   num_workers=4, shuffle=True)

In [10]:
for i in range(10):
    accuracy_i = accuracy_for_a_category(mnist_model, test_dataset, i)
    print("The accuracy on the digit {} is {:.2f}".format(i, accuracy_i))

# plot_loss_curve(losses)
# plt.show()

accuracy = test_model(mnist_model, test_dataset)
print("The overall accuracy of the 2 block neural net on MNIST is {:.2f}".format(accuracy))

The accuracy on the digit 0 is 0.99
The accuracy on the digit 1 is 0.99
The accuracy on the digit 2 is 0.95
The accuracy on the digit 3 is 0.96
The accuracy on the digit 4 is 0.96
The accuracy on the digit 5 is 0.95
The accuracy on the digit 6 is 0.97
The accuracy on the digit 7 is 0.96
The accuracy on the digit 8 is 0.96
The accuracy on the digit 9 is 0.94
The overall accuracy of the 2 layer neural net on MNIST is 0.96
