In [None]:
%matplotlib notebook

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchsummary
import matplotlib.pyplot as plt
import numpy as np
import time
import sklearn.metrics

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
def calculate_distribution(path):
    dataset = torchvision.datasets.ImageFolder(path, transform=torchvision.transforms.ToTensor())
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=len(dataset))
    images, _ = iter(dataloader).next()
    images = images.view(-1,3,56*56).transpose(1,0).reshape(3,-1)
    return images.mean(1), images.std(1)

In [None]:
def display_sample_images(folder, num_images):
    dataset = torchvision.datasets.ImageFolder(folder, transform=torchvision.transforms.ToTensor())
    dataloader = torch.utils.data.DataLoader(dataset, shuffle=True, batch_size=num_images)
    images, labels = iter(dataloader).next()
    images = torchvision.utils.make_grid(images)
    images_np = images.numpy()
    plt.imshow(np.transpose(images_np, (1, 2, 0)))
    plt.xlabel([dataset.classes[label] for label in labels])
    plt.show()

display_sample_images('personal', 4)

Define a basic model for showing overfitting.

In [None]:
class BasicModel(nn.Module):
    def __init__(self):
        super(BasicModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 4, 3)
        self.conv2 = nn.Conv2d(4, 8, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(8 * 11 * 11, 100)
        self.fc2 = nn.Linear(100, 10)
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv1(x)))
        x = F.relu(self.pool(self.conv2(x)))
        x = x.view(-1, 8 * 11 * 11)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

Define a helper function to plot training curves.

In [None]:
def plot_history(train_loss, train_acc, val_loss=None, val_acc=None):
    plt.subplot(1, 2, 1)
    plt.title('Loss')
    plt.plot(train_loss, label='train')
    if val_loss is not None:
        plt.plot(val_loss, label='validation')
        plt.legend()
    plt.subplot(1, 2, 2)
    plt.title('Accuracy')
    plt.plot(train_acc, label='train')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    if val_acc is not None:
        plt.plot(val_acc, label='validation')
        plt.ylabel('accuracy')
        plt.xlabel('epoch')
        plt.legend()
    return plt

Define MSELoss training functions.

In [None]:
def validate_mse(model, val_loader):
    criterion = nn.MSELoss()
    steps = 0
    avg_acc = 0
    avg_loss = 0
    
    model.eval()
    for images, labels in val_loader:
        images = images.to(device)
        onehot = torch.zeros(images.size(0), 10).scatter_(1, labels.unsqueeze(1), 1).to(device)
        labels = labels.to(device)

        pred = model(images)
        loss = criterion(pred, onehot)

        avg_loss += float(loss)
        avg_acc += float((pred.argmax(dim=1) == labels).sum().item()) / images.size(0)
        steps += 1
        
        del loss, pred, onehot
    return avg_loss / steps, avg_acc / steps

def train_mse(Model, train_set, batch_size, lr, epochs, val_set=None):
    model = Model().to(device)
    train_loader = torch.utils.data.DataLoader(train_set, shuffle=True, batch_size=batch_size)
    if val_set is not None:
        val_loader = torch.utils.data.DataLoader(val_set, shuffle=True, batch_size=batch_size)
    optim = torch.optim.SGD(model.parameters(), lr)
    criterion = nn.MSELoss()
    
    train_acc = []
    train_loss = []
    val_acc = []
    val_loss = []
    
    for i in range(epochs):
        steps = 0
        avg_acc = 0
        avg_loss = 0
        for images, labels in train_loader:
            images = images.to(device)
            onehot = torch.zeros(images.size(0), 10).scatter_(1, labels.unsqueeze(1), 1).to(device)
            labels = labels.to(device)

            model.train()
            optim.zero_grad()
            pred = model(images)
            loss = criterion(pred, onehot)
            loss.backward()
            optim.step()

            avg_loss += float(loss)
            avg_acc += float((pred.argmax(dim=1) == labels).sum().item()) / images.size(0)
            steps += 1
            
            del loss, pred, onehot
    
        avg_loss /= steps
        avg_acc /= steps
        
        train_loss.append(avg_loss)
        train_acc.append(avg_acc)
        
        if val_set is not None:
            with torch.no_grad():
                validate = validate_mse(model, val_loader)
            val_loss.append(validate[0])
            val_acc.append(validate[1])
            print("{}\ttrain loss {:.4f}\tacc {:.4f}\tval loss {:.4f}\t val acc {:.4f}".format(
                i+1, avg_loss, avg_acc, validate[0], validate[1]
            ))
        else:
            print("{}, loss {:.4f}, acc {:.4f}".format(i+1, avg_loss, avg_acc))
            
        torch.cuda.empty_cache()
    
    return model, train_acc, train_loss, val_acc, val_loss

# Collected Dataset

In [None]:
transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(*calculate_distribution('personal')),
])

personal = torchvision.datasets.ImageFolder('personal', transform)

In [None]:
torch.manual_seed(100)
tic = time.monotonic()
model, train_acc, train_loss, _, _ = train_mse(BasicModel, personal, 10, 0.05, 100)
toc = time.monotonic()
print(f"{toc - tic} seconds elapsed")
torchsummary.summary(model, (3, 56, 56))
plot_history(train_loss, train_acc)

# Full Data Set

In [None]:
train_set = torchvision.datasets.ImageFolder('train', transform=torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(*calculate_distribution('train')),
]))
val_set = torchvision.datasets.ImageFolder('val', transform=torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(*calculate_distribution('val')),
]))
calculate_distribution('train')

# Define Models
Here we define 6 different models to train on for section 4.3.

In [None]:
class Conv2Layers30Kernels(nn.Module):
    def __init__(self):
        super(Conv2Layers30Kernels, self).__init__()
        self.conv1 = nn.Conv2d(3, 30, 3)
        self.conv2 = nn.Conv2d(30, 30, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(30 * 12 * 12, 32)
        self.fc2 = nn.Linear(32, 10)
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv1(x)))
        x = F.relu(self.pool(self.conv2(x)))
        x = x.view(-1, 30 * 12 * 12)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

torchsummary.summary(Conv2Layers30Kernels(), (3, 56, 56), device='cpu')

In [None]:
class Conv2Layers10Kernels(nn.Module):
    def __init__(self):
        super(Conv2Layers10Kernels, self).__init__()
        self.conv1 = nn.Conv2d(3, 10, 3)
        self.conv2 = nn.Conv2d(10, 10, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(10 * 12 * 12, 32)
        self.fc2 = nn.Linear(32, 10)
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv1(x)))
        x = F.relu(self.pool(self.conv2(x)))
        x = x.view(-1, 10 * 12 * 12)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

torchsummary.summary(Conv2Layers10Kernels(), (3, 56, 56), device='cpu')

In [None]:
class Conv1Layers10Kernels(nn.Module):
    def __init__(self):
        super(Conv1Layers10Kernels, self).__init__()
        self.conv1 = nn.Conv2d(3, 10, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(10 * 27 * 27, 32)
        self.fc2 = nn.Linear(32, 10)
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv1(x)))
        x = x.view(-1, 10 * 27 * 27)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

torchsummary.summary(Conv1Layers10Kernels(), (3, 56, 56), device='cpu');

In [None]:
class Conv1Layers30Kernels(nn.Module):
    def __init__(self):
        super(Conv1Layers30Kernels, self).__init__()
        self.conv1 = nn.Conv2d(3, 30, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(30 * 27 * 27, 32)
        self.fc2 = nn.Linear(32, 10)
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv1(x)))
        x = x.view(-1, 30 * 27 * 27)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

torchsummary.summary(Conv1Layers30Kernels(), (3, 56, 56), device='cpu');

In [None]:
class Conv3Layers30Kernels(nn.Module):
    def __init__(self):
        super(Conv3Layers30Kernels, self).__init__()
        self.conv1 = nn.Conv2d(3, 30, 3)
        self.conv2 = nn.Conv2d(30, 30, 3)
        self.conv3 = nn.Conv2d(30, 30, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(30 * 5 * 5, 32)
        self.fc2 = nn.Linear(32, 10)
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv1(x)))
        x = F.relu(self.pool(self.conv2(x)))
        x = F.relu(self.pool(self.conv3(x)))
        x = x.view(-1, 30 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

torchsummary.summary(Conv3Layers30Kernels(), (3, 56, 56), device='cpu')

In [None]:
class Conv3Layers10Kernels(nn.Module):
    def __init__(self):
        super(Conv3Layers10Kernels, self).__init__()
        self.conv1 = nn.Conv2d(3, 10, 3)
        self.conv2 = nn.Conv2d(10, 10, 3)
        self.conv3 = nn.Conv2d(10, 10, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(10 * 5 * 5, 32)
        self.fc2 = nn.Linear(32, 10)
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv1(x)))
        x = F.relu(self.pool(self.conv2(x)))
        x = F.relu(self.pool(self.conv3(x)))
        x = x.view(-1, 10 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

torchsummary.summary(Conv3Layers10Kernels(), (3, 56, 56), device='cpu')

# Train with MSELoss
This is the training code. We modify the model and learning rate and track the accuracy and loss throughout the training process. The execution time and max accuracy obtained are printed.

In [None]:
hyperparameters = {
    'Model': Conv3Layers10Kernels,
    'lr': 0.1,
    'batch_size': 32, # we fix this
    'epochs': 1000 # keep this fixed so we can get comparable execution times
}

torch.manual_seed(100)
tic = time.monotonic()
model, train_acc, train_loss, val_acc, val_loss = train_mse(**hyperparameters, train_set=train_set, val_set=val_set)
toc = time.monotonic()
print(f"seconds elapsed: {toc - tic}")
print(f"max accuracy obatined: {np.max(val_acc)}")

In [None]:
plot_history(train_loss, train_acc, val_loss, val_acc)

# Cross Entropy Loss
Define new training and validation functions utilizing cross entropy loss rather than MSELoss.

In [None]:
def validate_ce(model, val_loader):
    criterion = nn.CrossEntropyLoss()
    steps = 0
    avg_acc = 0
    avg_loss = 0
    
    model.eval()
    for images, labels in val_loader:
        images = images.to(device)
        labels = labels.to(device)

        pred = model(images)
        loss = criterion(pred, labels)

        avg_loss += float(loss)
        avg_acc += float((pred.argmax(dim=1) == labels).sum().item()) / images.size(0)
        steps += 1
        
        del loss, pred
    return avg_loss / steps, avg_acc / steps

def train_ce(Model, train_set, batch_size, lr, epochs, val_set=None):
    model = Model().to(device)
    train_loader = torch.utils.data.DataLoader(train_set, shuffle=True, batch_size=batch_size)
    if val_set is not None:
        val_loader = torch.utils.data.DataLoader(val_set, shuffle=True, batch_size=batch_size)
    optim = torch.optim.SGD(model.parameters(), lr)
    criterion = nn.CrossEntropyLoss()
    
    train_acc = []
    train_loss = []
    val_acc = []
    val_loss = []
    
    for i in range(epochs):
        steps = 0
        avg_acc = 0
        avg_loss = 0
        for images, labels in train_loader:
            images = images.to(device)
            labels = labels.to(device)

            model.train()
            optim.zero_grad()
            pred = model(images)
            loss = criterion(pred, labels)
            loss.backward()
            optim.step()

            avg_loss += float(loss)
            avg_acc += float((pred.argmax(dim=1) == labels).sum().item()) / images.size(0)
            steps += 1
            
            del loss, pred
    
        avg_loss /= steps
        avg_acc /= steps
        
        train_loss.append(avg_loss)
        train_acc.append(avg_acc)
        
        if val_set is not None:
            with torch.no_grad():
                validate = validate_mse(model, val_loader)
            val_loss.append(validate[0])
            val_acc.append(validate[1])
            print("{}\ttrain loss {:.4f}\tacc {:.4f}\tval loss {:.4f}\t val acc {:.4f}".format(
                i+1, avg_loss, avg_acc, validate[0], validate[1]
            ))
        else:
            print("{}, loss {:.4f}, acc {:.4f}".format(i+1, avg_loss, avg_acc))
            
        torch.cuda.empty_cache()
    
    return model, train_acc, train_loss, val_acc, val_loss

# Batch Normalization
We now implement Conv3Layers10Kernels with Batch Normalization.

In [None]:
class Conv3Layers10KernelsBatch(nn.Module):
    def __init__(self):
        super(Conv3Layers10KernelsBatch, self).__init__()
        self.conv1 = nn.Conv2d(3, 10, 3)
        self.conv2 = nn.Conv2d(10, 10, 3)
        self.conv3 = nn.Conv2d(10, 10, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.bn1 = nn.BatchNorm2d(10)
        self.bn2 = nn.BatchNorm2d(10)
        self.bn3 = nn.BatchNorm2d(10)
        self.fc1 = nn.Linear(10 * 5 * 5, 32)
        self.fc2 = nn.Linear(32, 10)
        self.bn4 = nn.BatchNorm1d(32)
    
    def forward(self, x):
        x = F.relu(self.bn1(self.pool(self.conv1(x))))
        x = F.relu(self.bn2(self.pool(self.conv2(x))))
        x = F.relu(self.bn3(self.pool(self.conv3(x))));
        x = x.view(-1, 10 * 5 * 5)
        x = F.relu(self.bn4(self.fc1(x)))
        x = self.fc2(x)
        return x

torchsummary.summary(Conv3Layers10KernelsBatch(), (3, 56, 56), device='cpu')

Train using MSELoss.

In [None]:
hyperparameters = {
    'Model': Conv3Layers10KernelsBatch,
    'lr': 0.1,
    'batch_size': 32, # we fix this
    'epochs': 1000 # keep this fixed so we can get comparable execution times
}

torch.manual_seed(100)
tic = time.monotonic()
model, train_acc, train_loss, val_acc, val_loss = train_mse(**hyperparameters, train_set=train_set, val_set=val_set)
toc = time.monotonic()
print(f"seconds elapsed: {toc - tic}")
print(f"max accuracy obatined: {np.max(val_acc)}")

In [None]:
plot_history(train_loss, train_acc, val_loss, val_acc)

Train using cross entropy loss.

In [None]:
hyperparameters = {
    'Model': Conv3Layers10Kernels, 
    'lr': 0.09,
    'batch_size': 32, # we fix this
    'epochs': 1000 # keep this fixed so we can get comparable execution times
}

torch.manual_seed(100)
tic = time.monotonic()
model, train_acc, train_loss, val_acc, val_loss = train_ce(**hyperparameters, train_set=train_set, val_set=val_set)
toc = time.monotonic()
print(f"seconds elapsed: {toc - tic}")
print(f"max accuracy obatined: {np.max(val_acc)}")

In [None]:
plot_history(train_loss, train_acc, val_loss, val_acc)

# Confusion Matrix Functions

Define a couple functions to assist with plotting the confusion matrix.

In [None]:
def evaluate_all_labels(model, val_set):
    val_loader = torch.utils.data.DataLoader(val_set, batch_size=len(val_set))
    data, labels = iter(val_loader).next()
    
    data = data.to(device)
    labels_gpu = labels.to(device)
    
    model.eval()
    with torch.no_grad():
        preds = model(data)
    return preds.argmax(dim=1).cpu().numpy(), labels.cpu().numpy()

def plot_confusion_matrix(model, val_set):
    preds, labels = evaluate_all_labels(model, val_set)
    cm = sklearn.metrics.confusion_matrix(labels, preds)
    disp = sklearn.metrics.ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=val_set.classes)
    return disp.plot()

# Best Model

In [None]:
class LargeModel(nn.Module):
    def __init__(self):
        super(LargeModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3)
        self.conv2 = nn.Conv2d(32, 64, 3)
        self.conv3 = nn.Conv2d(64, 128, 3)
        self.conv4 = nn.Conv2d(128, 256, 3)
        
        self.bn1 = nn.BatchNorm2d(32)
        self.bn2 = nn.BatchNorm2d(64)
        self.bn3 = nn.BatchNorm2d(128)
        self.bn4 = nn.BatchNorm2d(256)

        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(256, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 10)
    
    def forward(self, x):
        x = F.relu(self.bn1(self.pool(self.conv1(x))))
        x = F.relu(self.bn2(self.pool(self.conv2(x))))
        x = F.relu(self.bn3(self.pool(self.conv3(x))))
        x = F.relu(self.bn4(self.pool(self.conv4(x))))
        x = x.view(-1, 256)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

torchsummary.summary(LargeModel(), (3, 56, 56), device='cpu')

In [None]:
hyperparameters = {
    'Model': LargeModel, 
    'lr': 0.005,
    'batch_size': 24,
    'epochs': 100
}

torch.manual_seed(100)
tic = time.monotonic()
model, train_acc, train_loss, val_acc, val_loss = train_ce(**hyperparameters, train_set=train_set, val_set=val_set)
toc = time.monotonic()
print(f"seconds elapsed: {toc - tic}")
print(f"max accuracy obatined: {np.max(val_acc)}")

In [None]:
plot_history(train_loss, train_acc, val_loss, val_acc)

In [None]:
plot_confusion_matrix(model, val_set)

In [None]:
torch.save(model.state_dict(), 'MyBest.pt')

# Best Small Model

Define and train a model with less than 5000 trainable parameters.

In [None]:
class SmallModel(nn.Module):
    def __init__(self):
        super(SmallModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 7, 3)
        self.conv2 = nn.Conv2d(7, 14, 3)
        self.conv3 = nn.Conv2d(14, 10, 3)

        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(10 * 5 * 5, 10)
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv1(x)))
        x = F.relu(self.pool(self.conv2(x)))
        x = F.relu(self.pool(self.conv3(x)))
        #x = F.relu(self.pool(self.conv4(x)))
        x = x.view(-1, 10 * 5 * 5)
        x = self.fc1(x)
        return x

torchsummary.summary(SmallModel(), (3, 56, 56), device='cpu')

In [None]:
hyperparameters = {
    'Model': SmallModel, 
    'lr': 0.01,
    'batch_size': 24,
    'epochs': 100 # keep this fixed so we can get comparable execution times
}

torch.manual_seed(100)
tic = time.monotonic()
model, train_acc, train_loss, val_acc, val_loss = train_ce(**hyperparameters, train_set=train_set, val_set=val_set)
toc = time.monotonic()
print(f"seconds elapsed: {toc - tic}")
print(f"max accuracy obatined: {np.max(val_acc)}")

In [None]:
plot_history(train_loss, train_acc, val_loss, val_acc)

In [None]:
torch.save(model.state_dict(), 'MyBestSmall.pt')