## Objective
* Instead of building a simple model, try creating a complex CNN model that can be easily tuned
* Create models with varying depth (convolutional layers) to observe performance vs time consumption
* Apply various hyperparameter tuning techniques to CNN models

In [1]:
import torch
import torchvision
from torchvision import transforms, datasets
from torchvision.transforms import Normalize, ToTensor
import torch.nn as nn  # neural network
import torch.optim as optim  # optimization layer
import torch.nn.functional as F  # activation functions
import matplotlib.pyplot as plt
import argparse
import time
from collections import OrderedDict

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # use gpu

train_set = datasets.EMNIST(root="data", split="balanced",
                        train=True, download=True,
                        transform=transforms.Compose([ToTensor()])
                           )

test_set = datasets.EMNIST(root="data", split="balanced", 
                       train=False, download=True, 
                       transform=transforms.Compose([ToTensor()])
                          )

entire_trainset = torch.utils.data.DataLoader(train_set, shuffle=True)
split_train_size = int(0.8*(len(entire_trainset)))
split_valid_size = len(entire_trainset) - split_train_size

train_set, val_set = torch.utils.data.random_split(train_set, [split_train_size, split_valid_size])

print(f'train set size: {split_train_size}, validation set size: {split_valid_size}')

train set size: 90240, validation set size: 22560


In [3]:
model_codes = {
    'model_1': [64, 64, 'M', 128, 128, 'M', 256, 512, 'M'],
    'model_2': [64, 64, 'M', 128, 128, 128, 'M', 256, 256, 512, 'M'],
    'model_3': [64, 64, 'M', 128, 128, 128, 128, 'M', 256, 256, 512, 512, 'M']
}

In [4]:
class Simple_CNN(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # 3 convolutional layers
        self.cv1 = nn.Conv2d(in_channel=1,out_channels=16,kernel_size=5, stride=1)  # input: 1 if grayscale, 3 if RGB
        self.cv2 = nn.Conv2d(16, 64, 5)
        self.cv3 = nn.Conv2d(64, 128, 5)
        self.dropout1 = nn.Dropout(0.2)
        
        # Dense layer - (fully connected)
        self.fc1 = nn.Linear(in_features=128*3*3, out_features=256)
        self.fc2 = nn.Linear(in_features=256, out_features=128)
        self.out = nn.Linear(in_features=128, out_features=47)
        
    def forward(self, x):
        '''
        forward method explicitly defines the network's transformation.
        forward method maps an input tensor to a prediction output tensor
        '''
        # hidden convolutional layers
        x = F.relu(self.cv1(x))
        x = F.relu(self.cv2(x))
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = F.relu(self.cv3(x))
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        
        x = self.dropout1(x)
        
        # hidden linear layers
        x = torch.flatten(x, 1)
        #x = x.view(-1, 128*3*3)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        
        # output layer
        x = self.out(x)
        x = F.softmax(x, dim=1)
        
        return x

How to find the initial input size of dense layer
window size (kernel)^2 x output channel of the last channel (128)

To find window of the last convolutional layer
(input size (28) - kernel size - 2*padding)/stride + 1
* first layer: (28-5-0)/1 + 1 = 24
* second layer: (24-5-0)/1 + 1 = 20 -> after maxpooing -> 10
* third layer: (10-5-0)/1 + 1 = 6 -> after maxpooling -> 3

In [5]:
# easily tunable model
class CNN(nn.Module):
    def __init__(self, model_code, in_channels, out_dim, act, use_bn, dropout):
        super(CNN, self).__init__()
        
        if act == 'relu':
            self.act = nn.ReLU()
        elif act == 'leakyrelu':
            self.act = nn.LeakyReLU()
        else:
            raise ValueError("Not a valid activation function")
            
        
        self.layers = self.make_layers(model_code, in_channels, use_bn)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Sequential(nn.Linear(4608, 256),
                                        self.act,
                                        nn.Linear(256, out_dim)
                                       )

    def forward(self, x):
        x = self.layers(x)
        x = self.dropout(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        # skipped softmax to use cross entropy loss
        return x
    
    def make_layers(self, model_code, in_channels, use_bn):
        layers = []
        for x in model_codes[model_code]:
            if x == "M":
                layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
            else:
                layers += [nn.Conv2d(in_channels=in_channels,
                                    out_channels=x,
                                    kernel_size=3,
                                    stride=1,
                                    padding=1)]
                if use_bn:
                    layers += [nn.BatchNorm2d(x)]
                layers += [self.act]
                in_channels = x
        return nn.Sequential(*layers)

# Train, Validate, Test

In [6]:
def train(net, optimizer, criterion, args):
    # load train set as some other object that can help on iterating over data 
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=args.train_batch, shuffle=True)
    
    net.train()
    
    correct = 0
    total = 0
    train_loss = 0
    
    for i, data in enumerate(train_loader):
        inputs, labels = data
        inputs = inputs.cuda()
        labels = labels.cuda()
        outputs = net(inputs)
        
        optimizer.zero_grad()
        
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
        # the class with the highest value is the prediction
        _, prediction = torch.max(outputs.data, 1)  # grab prediction as one-dimensional tensor
        total += labels.size(0)
        correct += (prediction == labels).sum().item()

    train_loss = train_loss / len(train_loader)
    train_acc = 100 * correct / total
    
    return net, train_loss, train_acc

In [7]:
def validate(net, criterion, args):
    # load validation set as some other object that can help on iterating over data 
    val_loader = torch.utils.data.DataLoader(val_set, batch_size=args.test_batch, shuffle=True)
    
    net.eval()

    correct = 0
    total = 0
    val_loss = 0 
    
    with torch.no_grad():
        for data in val_loader:
            inputs, labels = data
            inputs = inputs.cuda()
            labels = labels.cuda()
            outputs = net(inputs)

            loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            _, prediction = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (prediction == labels).sum().item()

        val_loss = val_loss / len(val_loader)
        val_acc = 100 * correct / total

    return val_loss, val_acc

In [8]:
def test(net, args):
    # load test as some other object that can help on iterating over data 
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=args.test_batch, shuffle=True)

    net.eval()
    
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            inputs, labels = data
            inputs = inputs.cuda()
            labels = labels.cuda()
            outputs = net(inputs)

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        test_acc = 100 * correct / total

    return test_acc

In [9]:
def experiment(args):
    net = CNN(model_code = args.model_code,
              in_channels = args.in_channels, 
              out_dim = args.out_dim, 
              act = args.act, 
              use_bn = args.use_bn, 
              dropout = args.dropout
             )
    net = net.cuda()
    criterion = nn.CrossEntropyLoss()
    
    # optimizer
    if args.optim == 'adam':
        optimizer = optim.Adam(net.parameters(), lr=args.lr)  # learning rate
    elif args.optim == 'sgd':
        optimizer = optim.SGD(net.parameters(), lr=args.lr, momentum=0.9)
    else:
        raise ValueError('Invalid optimizer')

    # containers to keep track of statistics
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []
        
    for epoch in range(args.epoch):  # number of training to be completed
        time_start = time.time()
        net, train_loss, train_acc = train(net, optimizer, criterion, args)
        val_loss, val_acc = validate(net, criterion, args)
        time_end = time.time()
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)
        # print results of each iteration
        print(f'Epoch {epoch+1}, Accuracy(train, validation):{round(train_acc, 2), round(val_acc, 2)}, '
              f'Loss(train, validation):{round(train_loss, 2), round(val_loss, 2)}, Time: {round(time_end - time_start, 2)}s')

    test_acc = test(net, args)

    results = OrderedDict()
    results['train_losses'] = [round(x, 2) for x in train_losses]
    results['val_losses'] = [round(x, 2) for x in val_losses]
    results['train_accs'] = [round(x, 2) for x in train_accs]
    results['val_accs'] = [round(x, 2) for x in val_accs]
    results['train_acc'] = round(train_acc, 2)
    results['val_acc'] = round(val_acc, 2)
    results['test_acc'] = round(test_acc, 2)
    
    return vars(args), results

In [10]:
parser = argparse.ArgumentParser()
args = parser.parse_args("")

#### Model Capacity ####
args.model_code = 'model_2'
args.in_channels = 1
args.in_dim = 1  # input to a fc layer from the last conv layer
args.out_dim = 47
args.act = 'relu'

#### Regularization ####
args.dropout = 0.2
args.use_bn = True

#### Optimization ####
args.optim = 'sgd'
args.lr = 0.001  # learning rate
args.epoch = 10
args.train_batch = 256
args.test_batch = 256

#### Experimental Buckets ####
models = ['model_1', 'model_2', 'model_3']
optims = ['adam', 'sgd']

print(args)

Namespace(act='relu', dropout=0.2, epoch=10, in_channels=1, in_dim=1, lr=0.001, model_code='model_2', optim='sgd', out_dim=47, test_batch=256, train_batch=256, use_bn=True)


In [11]:
for model in models:
    print(f'model code: {model}')
    args.model_code = model
    setting, result = experiment(args)
    print(setting)
    print(result)
    print()

model code: model_1
Epoch 1, Accuracy(train, validation):(66.33, 84.01), Loss(train, validation):(1.4, 0.54), Time: 26.19s
Epoch 2, Accuracy(train, validation):(84.54, 86.38), Loss(train, validation):(0.48, 0.42), Time: 25.66s
Epoch 3, Accuracy(train, validation):(86.75, 87.56), Loss(train, validation):(0.4, 0.38), Time: 25.15s
Epoch 4, Accuracy(train, validation):(87.95, 87.52), Loss(train, validation):(0.35, 0.36), Time: 24.86s
Epoch 5, Accuracy(train, validation):(88.41, 87.97), Loss(train, validation):(0.33, 0.35), Time: 24.86s
Epoch 6, Accuracy(train, validation):(89.14, 88.31), Loss(train, validation):(0.31, 0.34), Time: 25.5s
Epoch 7, Accuracy(train, validation):(89.6, 88.68), Loss(train, validation):(0.29, 0.32), Time: 25.62s
Epoch 8, Accuracy(train, validation):(90.06, 88.91), Loss(train, validation):(0.28, 0.32), Time: 25.35s
Epoch 9, Accuracy(train, validation):(90.48, 89.17), Loss(train, validation):(0.27, 0.31), Time: 25.29s
Epoch 10, Accuracy(train, validation):(90.68, 89