In [None]:
# PyTorch and neural network imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as utils
import numpy as np
from matplotlib import pyplot as plt
from tqdm.notebook import tqdm as tqdm
from torch.autograd import Variable

In [None]:
"""
baselines.py: contains all network structure definition
including layers definition and forward pass function definition
"""

# set the randomness to keep reproducible results
torch.manual_seed(0)
np.random.seed(0)

# input size to mlp network
mlp_input_size = 784 # img_size = (28,28) --> 28*28=784 in total
# final output size of mlp network (output layer)
mlp_output_size = 10 # number of output classes range [0,9]
# width of hidden layer
mlp_hidden_size = 10

class BaselineMLP(nn.Module):
    def __init__(self):
        """
        A multilayer perceptron model
        Consists of one hidden layer and 1 output layer (all fully connected)
        """
        super(BaselineMLP, self).__init__()
        # a fully connected layer from input layer to hidden layer
        # mlp_input_size denotes how many input neurons
        # mlp_hiddent_size denotes how many hidden neurons
        self.fc1 = nn.Linear(mlp_input_size, mlp_hidden_size)
        self.fc1_5 = nn.Linear(mlp_hidden_size, int(mlp_hidden_size/2))
        self.fc2 = nn.Linear(int(mlp_hidden_size/2), mlp_output_size)      
        self.relu = nn.ReLU()
    
    
    def forward(self, X):
        """
        Pass the batch of images through each layer of the network, applying 
        logistic activation function after hidden layer.
        """
        # pass X from input layer to hidden layer
        out = self.fc1(X)
        # apply an activation function to the output of hidden layer
        out = self.relu(out)
        out = self.fc1_5(out)
        out = self.relu(out)
        # pass output from hidden layer to output layer
        out = self.fc2(out)
        # return the feed forward output
        return out


class BaselineCNN(nn.Module):
    def __init__(self):
        """
        A basic convolutional neural network model for baseline comparison.
        Consists of one Conv2d layer, followed by 1 fully-connected (FC) layer:
        conv1 -> fc1 (outputs)
        """
        super(BaselineCNN, self).__init__()
        # define different layers
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 10, kernel_size=5, stride=1, padding=2), #input, output, size of filter, etc
            nn.Tanh(),
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(int(10*((32-5+1)/1)**2), mlp_output_size),
        )


    def forward(self, X):
        """
        Pass the batch of images through each layer of the network, applying 
        non-linearities after each layer.
        
        Parameters: X --- an input batch of images
        Returns:    out --- the output of the network
        """
        # define the forward function
        out = self.conv_layers(X)
        activations = out
        out = out.view(-1, int(10*((32-5+1)/1)**2))
        out = self.fc_layers(out)
        return out, activations

    """
    Count the number of flattened features to be passed to fully connected layers
    Parameters: inputs --- 4-dimensional [batch x num_channels x conv width x conv height]
                            output from the last conv layer
    Return: num_features --- total number of flattened features for the last layer
    """
    def num_fc_features(self, inputs):
        
        # Get the dimensions of the layers excluding the batch number
        size = inputs.size()[1:]
        # Track the number of features
        num_features = 1
        
        for s in size:
            num_features *= s
        
        return num_features

class my_nn(nn.Module):
    def __init__(self):
        super(my_nn,self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, 10, kernel_size=5, stride=1, padding=0),
            nn.MaxPool2d(2),
            nn.ReLU(),
            nn.Conv2d(10, 10, kernel_size=5, stride=1, padding=0), # 0 input filters matches 10 output filters of the previous conv layer
            nn.MaxPool2d(2),
            nn.ReLU(),
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(int(10*((12 - (5-1))/2)**2), 16), # the W & H of the feature maps (output) of the 1st Conv. layer is (28-(5-1))/2) = 12
            nn.Linear(16, mlp_output_size)
        )

    def forward(self,X):
        out = self.conv_layers(X)
        activations = out
        out = out.view(-1, int(10*((12-5+1)/2)**2))
        out = self.fc_layers(out)
        return out, activations


In [None]:
num_iter = 50
learning_rate = 0.001
batch_size = 16

"""
Read data from the specified training, validation and test data files.
We are using the whole image, not creating other features now
"""
def read_data(trainFile, valFile, testFile):
    # train, validation, and test data loader
    data_loaders = []

    # read training, test, and validation data
    for file in [trainFile, valFile, testFile]:
        # read data
        data = np.loadtxt(file)
        # digit images
        imgs = torch.tensor(data[:,:-1]).float()
        # divide each image by its maximum pixel value for numerical stability
        imgs = imgs / torch.max(imgs,dim=1).values[:,None]

        # labels for each image
        labels = torch.tensor(data[:,-1]).long()

        # if using CNN model, reshape each image:
        # [batch x num_channel x image width x image height]
        if not use_mlp:
            imgs = imgs.view(-1,1,28,28)

        # create dataset and dataloader, a container to efficiently load data in batches
        dataset = utils.TensorDataset(imgs,labels)
        dataloader = utils.DataLoader(dataset, batch_size=batch_size, shuffle=True)
        data_loaders.append(dataloader)
    
    return data_loaders[0], data_loaders[1], data_loaders[2]

"""
Train Multilayer Perceptron (MLP)
Initialize MLP model --> define loss function --> define optimizer
--> train model with num_iter epochs --> pick the best model and return
    - Parameters:   train_loader --- the train dataloader
                    val_loader --- the validation dataloader
    - Return:       net --- the best trained MLP network with the lowest validation loss
                    avg_train_loss --- a list of averaged training loss of length num_iter
                    avg_val_loss --- a list of averaged validation loss of length num_iter
"""
def trainMLP(train_loader,val_loader):
    # average training loss, one value per iteration (averaged over all batches in one iteration)
    avg_train_loss = []
    # average validation loss, one value per iteration (averaged over all batches in one iteration)
    avg_val_loss = []
    # record the lowest validation loss, used to determine early stopping (best model)
    best_val_score = float('inf')
    net = BaselineMLP()
    if torch.cuda.is_available():
        net.cuda()
    lamb = 1e-5
    
    min_val_loss = float("inf")
        
    # define loss function
    #       define optimizer
    #       for each iteration, iteratively train all batches
    
    loss_function = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
    
    for epoch in tqdm(range(num_iter)):
        
        epoch_train_loss = []
        epoch_val_loss = []
        
        for i, (images, labels) in enumerate(train_loader):
            images = Variable(images.view(-1,28*28))
            labels = Variable(labels)
            
            optimizer.zero_grad()
            outputs = net(images)
            loss = loss_function(outputs, labels)
         
            if True:
                l2_reg = torch.tensor(0.)
                for param in net.parameters():
                    if len(param.shape) >1:
                        l2_reg += torch.norm(param,p='fro')
                    else:
                        l2_reg += torch.norm(param, p=2)
                loss += lamb * l2_reg
            
            loss.backward()
            optimizer.step()
            
            if (i+1) % 5 == 0:
                tqdm.write('Epoch [%d/%d], Step [%d/%d], Loss: %.4f'
                             %(epoch+1, num_iter, i+1, len(train_loader), loss.item()))
                
        with torch.no_grad():
            for i in range(batch_size):
                tr_images, tr_labels = next(iter(train_loader))
                val_images, val_labels = next(iter(test_loader))

                tr_output = net(Variable(tr_images.view(-1,28*28)))
                val_output = net(Variable(val_images.view(-1,28*28)))

                tr_loss = loss_function(tr_output, Variable(tr_labels))
                val_loss = loss_function(val_output, Variable(val_labels))

                epoch_train_loss.append(tr_loss.item())
                epoch_val_loss.append(val_loss.item())
                
        
                
        avg_train_loss.append(np.mean(epoch_train_loss))
        avg_val_loss.append(np.mean(epoch_val_loss))
        
        if np.mean(epoch_val_loss) < min_val_loss:
            min_val_loss = np.mean(epoch_val_loss)
            torch.save(net.state_dict(), './model.pt')
            
    net.load_state_dict(torch.load('./model.pt'))
        
    return net, avg_train_loss, avg_val_loss

"""
Train Baseline Convolutional Neural Network (CNN)
Initialize CNN model --> define loss function --> define optimizer
--> train model with num_iter epochs --> pick the best model and return
    - parameters:   train_loader --- the train dataloader
                    val_loader --- the validation dataloader
    - return:       net --- the best trained CNN network with the lowest validation loss
                    train_loss --- a list of training loss
"""
def trainCNN(train_loader,val_loader):
    # average training loss, one value per iteration (averaged over all batches in one iteration)
    avg_train_loss = []
    # average validation loss, one value per iteration (averaged over all batches in one iteration)
    avg_val_loss = []
    # record the lowest validation loss, used to determine early stopping (best model)
    best_val_score = float('inf')
    net = my_nn()
    #       define loss function
    #       define optimizer
    #       for each epoch, iteratively train all batches
    if torch.cuda.is_available():
        net.cuda()
    
    loss_function = nn.CrossEntropyLoss()
    #optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate)
    optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)
    
    min_val_loss = float("inf")
    lamb = 1e-5
    
    for epoch in tqdm(range(num_iter)):
        
        epoch_train_loss = []
        epoch_val_loss = []
        
        for i, (images, labels) in enumerate(train_loader):
            optimizer.zero_grad()
            outputs,_ = net(images)
            #print(images.shape,outputs.shape, labels.shape)
            loss = loss_function(outputs, labels)
         
            if True:
                l2_reg = torch.tensor(0.)
                for param in net.parameters():
                    if len(param.shape) >1:
                        l2_reg += torch.norm(param,p='fro')
                    else:
                        l2_reg += torch.norm(param, p=2)
                loss += lamb * l2_reg
            
            loss.backward()
            optimizer.step()
            
            if (i+1) % 5 == 0:
                tqdm.write('Epoch [%d/%d], Step [%d/%d], Loss: %.4f'
                             %(epoch+1, num_iter, i+1, len(train_loader), loss.item()))
                
        with torch.no_grad():
            for i in range(batch_size):
                tr_images, tr_labels = next(iter(train_loader))
                val_images, val_labels = next(iter(test_loader))

                tr_output,_ = net(tr_images)
                val_output,_ = net(val_images)

                tr_loss = loss_function(tr_output, tr_labels)
                val_loss = loss_function(val_output, val_labels)

                epoch_train_loss.append(tr_loss.item())
                epoch_val_loss.append(val_loss.item())
                
        
                
        avg_train_loss.append(np.mean(epoch_train_loss))
        avg_val_loss.append(np.mean(epoch_val_loss))
        
        if np.mean(epoch_val_loss) < min_val_loss:
            min_val_loss = np.mean(epoch_val_loss)
            torch.save(net.state_dict(), './model.pt')
            
    net.load_state_dict(torch.load('./model.pt'))
    
    return net, avg_train_loss, avg_val_loss


"""
Evaluate the model, using unseen data features "X" and
corresponding labels "y".
Parameters: loader --- the test loader
            net --- the best trained network
Return: the accuracy on test set
"""
def evaluate(loader, net):
    total = 0
    correct = 0
    # use model to get predictions
    for X, y in loader:
        outputs,_ = net(X)
        predictions = torch.argmax(outputs.data, 1)
        
        # total number of items in dataset
        total += y.shape[0]

        # number of correctly labeled items in dataset
        correct += torch.sum(predictions == y)

    # return fraction of correctly labeled items in dataset
    return float(correct) / float(total)

if __name__ == "__main__":

    # test CNN model
    use_mlp = False

    # load data from file
    train_loader, val_loader, test_loader = \
    read_data('hw0train.txt','hw0validate.txt', 'hw0test.txt')

    if use_mlp:
        net, t_losses, v_losses = trainMLP(train_loader,val_loader)
    else:
        net, t_losses, v_losses = trainCNN(train_loader,val_loader)

    # evaluate model on validation data
    accuracy = evaluate(test_loader, net)

    print("Test accuracy: {}".format(accuracy))

    # plot losses
    plt.plot(t_losses)
    plt.plot(v_losses)
    plt.legend(["training_loss","validation_loss"])
    plt.xlabel("Iteration")
    plt.ylabel("Loss")
    plt.title("Loss plot")
    plt.show()