In [None]:
# written in python3

In [None]:
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import matplotlib.pyplot as plt
import random
import math
import time

In [None]:
# torch version check
print("pytorch version:", torch.__version__)

### Task0

Load data

In [None]:
# Define a transform to normalize the data
transform = transforms.Compose([torchvision.transforms.ToTensor(), torchvision.transforms.Normalize((0.5,), (0.5,))])

# Download and load the training data
trainset = torchvision.datasets.FashionMNIST('F_MNIST_data/', download=True, train=True, transform=transform)

# Download and load the test data
testset = torchvision.datasets.FashionMNIST('F_MNIST_data/', download=True, train=False, transform=transform)

classes = ('T-Shirt', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle Boot')

Visualize some images

In [None]:
labels_map = {0 : 'T-Shirt', 1 : 'Trouser', 2 : 'Pullover', 3 : 'Dress', 4 : 'Coat', 
              5 : 'Sandal', 6 : 'Shirt', 7 : 'Sneaker', 8 : 'Bag', 9 : 'Ankle Boot'};

fig = plt.figure(figsize=(8,8));
columns = 4;
rows = 5;
for i in range(1, columns*rows +1):
    img_xy = np.random.randint(len(trainset));
    img = trainset[img_xy][0][0,:,:]
    fig.add_subplot(rows, columns, i)
    plt.title(labels_map[trainset[img_xy][1]])
    plt.axis('off')
    plt.imshow(img, cmap='gray')
plt.show()

### Task1

Construct following two networks:

1. Fully Connected Network

In [None]:
class FullyConnectedNetwork(nn.Module):
    
    # hidden_layer should be between 0 ~ 2
    def __init__(self, input_size= 28*28, hidden_size=1024, hidden_layer=2, output_size=10):
        super(FullyConnectedNetwork, self).__init__()
        
        self.hidden_layer = hidden_layer
        if hidden_layer == 0:
            self.fc1 = nn.Linear(input_size, output_size)
        elif hidden_layer == 1:
            self.fc1 = nn.Linear(input_size, hidden_size)
            self.fc2 = nn.Linear(hidden_size, output_size)
        elif hidden_layer == 2:
            self.fc1 = nn.Linear(input_size, hidden_size)
            self.fc2 = nn.Linear(hidden_size, hidden_size)
            self.fc3 = nn.Linear(hidden_size, output_size)
        
    def forward(self, x) :
        # x: trainset
        
        if self.hidden_layer == 1:
            x = F.log_softmax(self.fc1(x), dim=1)
            return x
        elif self.hidden_layer == 2:
            x = F.relu(self.fc1(x))
            x = F.log_softmax(self.fc2(x), dim=1)
            return x
        elif self.hidden_layer == 3:
            x = F.relu(self.fc1(x))
            x = F.relu(self.fc2(x))
            x = F.log_softmax(self.fc3(x), dim=1)
            return x

2. LeNet (CNN)

In [None]:
class LeNet5(nn.Module):
    def __init__(self):
        super(LeNet5, self).__init__()
        self.layer1 = nn.Sequential( 
                            nn.Conv2d(1, 6,  kernel_size=5, padding=2), 
                            nn.BatchNorm2d(6),  
                            nn.ReLU(), 
                            nn.MaxPool2d(2)
                        )
        self.layer2 = nn.Sequential( 
                            nn.Conv2d(6, 16, kernel_size=5, padding=2), 
                            nn.BatchNorm2d(16), 
                            nn.ReLU(), 
                            nn.MaxPool2d(2)
                        )
        self.fc1    = nn.Sequential( nn.Linear(7*7*16, 120), nn.ReLU())
        self.fc2    = nn.Sequential( nn.Linear(120, 84),     nn.ReLU())
        self.fc3    = nn.Sequential( nn.Linear(84, 10) )
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        
        x = x.view(x.size(0), -1)
        
        x = self.fc1(x)
        x = self.fc2(x)
        x = F.log_softmax(self.fc3(x), dim=1)
        
        return x

Also construct train & test functions

In [None]:
# model: either FCN(Fully Connected Network) or CNN (LeNet)
# we assume that the trainset, and nets were globally defined
def train(MODEL, EPOCH=30, LEARNING_RATE=0.01, LOSS_FUNCTION="CrossEntropy", OPTIMIZER="SGD",
          MOMENTUM=0.9, WEIGHT_DECAY=0.0, BATCH_SIZE=128, SHOW_PROCESS=True):
    
    # spare validation set from traning set
    validation_split = .1
    shuffle_dataset = True
    random_seed= 2015171019

    # Creating data indices for training and validation splits:
    dataset_size = len(trainset)
    indices = list(range(dataset_size))
    split = int(np.floor(validation_split * dataset_size))
    if shuffle_dataset :
        np.random.seed(random_seed)
        np.random.shuffle(indices)
    train_indices, val_indices = indices[split:], indices[:split]

    # Creating PT data samplers and loaders:
    train_sampler = torch.utils.data.sampler.SubsetRandomSampler(train_indices)
    valid_sampler = torch.utils.data.sampler.SubsetRandomSampler(val_indices)

    train_loader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE, sampler=train_sampler)
    validation_loader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE, sampler=valid_sampler)
    
    if MODEL == "FCN":
        net = FullyConnectedNetwork()
    elif MODEL == "CNN":
        net = LeNet5()
    # for task2
    elif MODEL == "AVGPOOL": # try average pooling
        net = LeNet5_AvgPool()
    elif MODEL == "MAXPOOL4": # try average pooling
        net = LeNet5_MaxPool4()
    elif MODEL == "KERNEL3": # try kernel size = 3
        net = LeNet3()
    elif MODEL == "KERNEL7": # try kernel size = 7
        net = LeNet7()
        
    # cuda check
    if torch.cuda.is_available(): device = torch.device("cuda:0")
    else: device = torch.device("cpu")
    print("device:", device)
    net.to(device)
    
    if SHOW_PROCESS: print(net)
    
    # initialize optimizers
    # create a stochastic gradient descent optimizer
    if OPTIMIZER== "SGD":         
        optimizer = optim.SGD(net.parameters(), lr=LEARNING_RATE, 
                            momentum=MOMENTUM, weight_decay=WEIGHT_DECAY)
    elif OPTIMIZER== "Adam": 
        optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE, 
                               weight_decay=WEIGHT_DECAY)
        
    # create a loss function
    if LOSS_FUNCTION == "CrossEntropy": 
        criterion = nn.CrossEntropyLoss()
    elif LOSS_FUNCTION == "NLL": 
        criterion = nn.NLLLoss()
    elif LOSS_FUNCTION == "KLD": 
        criterion = nn.KLDivLoss()
        
    losses = []       # for visualizing the loss
    running_time = 0  # for visualizing running time
    start_time = time.time() # for running time meausre
    
    best_net = net
    best_acc = 0.0
    
    epoch = range(EPOCH)
    for e in epoch:
        
        for batch_idx, (images, labels) in enumerate(train_loader, 0):

            # resize data from (batch_size, 1, 28, 28) to (batch_size, 28*28)
            if MODEL == "FCN": images = images.view(-1, 28*28)
            
            images = images.cuda()
            labels = labels.cuda()
            
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
        # end of inner for loop 
        
        # every end of the epoch
        # calcluate running time
        cur_time = time.time()
        elapsed_time = cur_time - start_time
        
        # append losses & time for plotting
        losses.append(loss.item())
        running_time += elapsed_time
        
        # validate
        correct = 0; total = 0
        with torch.no_grad():
            for (images, labels) in validation_loader:

                # resize data from (batch_size, 1, 28, 28) to (batch_size, 28*28)
                if MODEL == "FCN": images = images.view(-1, 28*28)
                
                images = images.cuda()
                labels = labels.cuda()
                
                outputs = net(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # print statistics
        if SHOW_PROCESS:
            if best_acc <= round(100*correct/total, 8):
                print(
                    '''Epoch:{:>3} | Loss: {:>.6f} | Elapsed: {:<5} sec | Validation Accuracy: {:<.2f}(best!)
                    '''.format(e, loss.item(), round(elapsed_time, 2), round(100*correct/total, 2))
                    )
                best_net = net
                best_acc = round(100*correct/total, 8)
            else:
                print(
                    '''Epoch:{:>3} | Loss: {:>.6f} | Elapsed: {:<5} sec | Validation Accuracy: {:<.2f}
                    '''.format(e, loss.item(), round(elapsed_time, 2), round(100*correct/total, 2))
                    )
                
        # reset variables for next loop
        start_time = time.time()

    # loss validation
    plt.xkcd()
    plt.xlabel('Epoch #')
    plt.ylabel('Loss')
    plt.plot(losses)
    plt.show()
    
    # running time summary
    print("Total training time:", round(running_time, 2), "seconds")
    print("Average running time per epoch:", round(running_time/EPOCH, 2), "seconds")
    
    return best_net
    
def test(NET, MODEL):
    
    testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)
    
    net = NET
    
    correct = 0
    total = 0
    with torch.no_grad():
        for (images, labels) in testloader:
            
            # resize data from (batch_size, 1, 28, 28) to (batch_size, 28*28)
            if MODEL == "FCN": images = images.view(-1, 28*28)
            
            images = images.cuda()
            labels = labels.cuda()
            
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print('Accuracy of the network on the {} test images: {:.6f}'.format(len(testset), round(100*correct/total, 8)))
    
    return round(100*correct/total, 4)

##### Task1 Main Body

In [None]:
FCN = train("FCN")
_ = test(FCN, "FCN")

In [None]:
CNN = train("CNN")
_ = test(CNN, "CNN")

### Task2

Adjust hyperparameters and model structure

1. Adjust Optimizer Parameters

    A. Learning Rate

In [None]:
learning_rate_list = [1e-04, 1e-03, 0.01, 0.1, 0.2]
for l in learning_rate_list:
    print("----------------------- Learnig Rate:", l, "-----------------------")
    CNN = train("CNN", LEARNING_RATE=l, SHOW_PROCESS=False)
    _ = test(CNN, "CNN")

    B. Momentum

In [None]:
momentum_list = [0, 0.5, 0.9, 0.99]
for m in momentum_list:
    print("----------------------- Momentum:", m, "-----------------------")    
    CNN = train("CNN", MOMENTUM=m, SHOW_PROCESS=False)
    _ = test(CNN, "CNN")

    C. L2 Regularization Penalty

In [None]:
weight_decay_list = [0, 1e-03, 0.1]
for w in weight_decay_list:
    print("----------------------- L2 Regularization:", w, "-----------------------")    
    CNN = train("CNN", WEIGHT_DECAY=w, SHOW_PROCESS=False)
    _ = test(CNN, "CNN")

2. Try Different Loss Function

In [None]:
# NLL for smaller, quicker training (simple tasks)
# KLD can be achieved with CrossEntropy

# TBD... More Loss Functions should be suggested

# loss_function_list = ["CrossEntropy", "NLL"]

# for l in loss_function_list:
#     print("----------------------- Loss Function:", l, "-----------------------")
#     CNN = train("CNN", LOSS_FUNCTION=l, SHOW_PROCESS=False)
#     _ = test(CNN, "CNN")

3. Use Different Batch Size

In [None]:
batch_size_list = [8, 32, 128, 512]
for b in batch_size_list:
    print("----------------------- Batch Size:", b, "-----------------------")
    CNN = train("CNN", BATCH_SIZE=b, SHOW_PROCESS=False)
    _ = test(CNN, "CNN")

4. Change Other hyper parameters and model elements

    A. try different pooling method (Average pooling)

In [None]:
class LeNet5_AvgPool(nn.Module):
    def __init__(self):
        super(LeNet5_AvgPool, self).__init__()
        self.layer1 = nn.Sequential( 
                            nn.Conv2d(1, 6,  kernel_size=5, padding=2), 
                            nn.BatchNorm2d(6),  
                            nn.ReLU(), 
                            nn.AvgPool2d(2)
                        )
        self.layer2 = nn.Sequential( 
                            nn.Conv2d(6, 16, kernel_size=5, padding=2), 
                            nn.BatchNorm2d(16), 
                            nn.ReLU(), 
                            nn.AvgPool2d(2)
                        )
        self.fc1    = nn.Sequential( nn.Linear(7*7*16, 120), nn.ReLU())
        self.fc2    = nn.Sequential( nn.Linear(120, 84),     nn.ReLU())
        self.fc3    = nn.Sequential( nn.Linear(84, 10) )
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        
        x = x.view(x.size(0), -1)
        
        x = self.fc1(x)
        x = self.fc2(x)
        x = F.log_softmax(self.fc3(x), dim=1)
        
        return x

In [None]:
print("----------------------- Average Pooling -----------------------")
CNN = train("AVGPOOL")
_ = test(CNN, "CNN")

    B. try different kernel sizes (3 and 5)

In [None]:
class LeNet3(nn.Module):
    def __init__(self):
        super(LeNet3, self).__init__()
        self.layer1 = nn.Sequential( 
                            nn.Conv2d(1, 6,  kernel_size=3, padding=1), 
                            nn.BatchNorm2d(6),  
                            nn.ReLU(), 
                            nn.MaxPool2d(2)
                        )
        self.layer2 = nn.Sequential( 
                            nn.Conv2d(6, 16, kernel_size=3, padding=1), 
                            nn.BatchNorm2d(16), 
                            nn.ReLU(), 
                            nn.MaxPool2d(2)
                        )
        self.fc1    = nn.Sequential( nn.Linear(7*7*16, 120), nn.ReLU())
        self.fc2    = nn.Sequential( nn.Linear(120, 84),     nn.ReLU())
        self.fc3    = nn.Sequential( nn.Linear(84, 10) )
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        
        x = x.view(x.size(0), -1)
        
        x = self.fc1(x)
        x = self.fc2(x)
        x = F.log_softmax(self.fc3(x), dim=1)
        
        return x
    
class LeNet7(nn.Module):
    def __init__(self):
        super(LeNet7, self).__init__()
        self.layer1 = nn.Sequential( 
                            nn.Conv2d(1, 6,  kernel_size=7, padding=3), 
                            nn.BatchNorm2d(6),  
                            nn.ReLU(), 
                            nn.MaxPool2d(2)
                        )
        self.layer2 = nn.Sequential( 
                            nn.Conv2d(6, 16, kernel_size=7, padding=3), 
                            nn.BatchNorm2d(16), 
                            nn.ReLU(), 
                            nn.MaxPool2d(2)
                        )
        self.fc1    = nn.Sequential( nn.Linear(7*7*16, 120), nn.ReLU())
        self.fc2    = nn.Sequential( nn.Linear(120, 84),     nn.ReLU())
        self.fc3    = nn.Sequential( nn.Linear(84, 10) )
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        
        x = x.view(x.size(0), -1)
        
        x = self.fc1(x)
        x = self.fc2(x)
        x = F.log_softmax(self.fc3(x), dim=1)
        
        return x

In [None]:
print("----------------------- Kernel Size 3 -----------------------")
CNN = train("KERNEL3")
_ = test(CNN, "CNN")

In [None]:
print("----------------------- Kernel Size 7 -----------------------")
CNN = train("KERNEL7")
_ = test(CNN, "CNN")