In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, SubsetRandomSampler
from torchvision.utils import make_grid
from torchvision import datasets, transforms, models
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import itertools

## Variables

In [None]:
# Set seed 
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)

train_ratio = 0.8
validation_ratio = 0.2
batch_size = 64
lr = [0.01, 0.001, 0.0001]
momentum = [0.9, 0.001, 0.0001]
weight_decay = [0.1, 0.01, 0.001, 0.0001]
dropout_prob = [0.1, 0.5]
training_epochs = 50
final_model_epochs = 10
number_of_features = 64 * 64 * 3
class_labels = ('Airplane', 'Bird', 'Car', 'Cat', 'Deer', 'Dog', 'Horse', 'Monkey', 'Ship', 'Truck')

# set seed and GPU settings
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
if use_cuda:
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.enabled = use_cuda
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

## Read Data

In [None]:
mean, std = (0.43, 0.42, 0.39), (0.27, 0.26, 0.27)

plot_transform = transforms.Compose([transforms.ToTensor(),
                                          transforms.Normalize(mean, std)])

train_transform = transforms.Compose([transforms.ToTensor(),
                                          transforms.Normalize(mean, std),
                                          transforms.RandomCrop(64),
                                          transforms.RandomHorizontalFlip(p=0.5)])
                                          #transforms.RandomRotation(5),
                                          #transforms.ColorJitter(brightness=0.01, contrast=0.01, saturation=0.01)])
                                          #transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5))])
                                          #transforms.RandomPerspective(distortion_scale=0.4)])
                        
test_transform = transforms.Compose([transforms.ToTensor(),
                                         transforms.Normalize(mean, std),
                                         transforms.CenterCrop(64)])

plot_dataset = datasets.STL10(root='./', split='train', transform=plot_transform)    
train_dataset = datasets.STL10(root='./', split='train', transform=train_transform)
val_dataset = datasets.STL10(root='./', split='train', transform=test_transform)
test_dataset = datasets.STL10(root='./', split='test', transform=test_transform)

targets = train_dataset.labels
targets_idx = np.arange(len(targets))
train_idx, val_idx = train_test_split(targets_idx, test_size=validation_ratio, random_state=seed,
                                        shuffle=True, stratify=targets)

train_sampler = SubsetRandomSampler(train_idx)
val_sampler = SubsetRandomSampler(val_idx)

plot_loader = DataLoader(plot_dataset, shuffle=False, num_workers=0)
train_loader = DataLoader(train_dataset, batch_size=batch_size,
                                           num_workers=2, sampler=train_sampler)
val_loader = DataLoader(val_dataset, batch_size=batch_size,
                                         num_workers=0, sampler=val_sampler)
test_loader = DataLoader(test_dataset, batch_size=batch_size,
                                          shuffle=False, num_workers=0)

# Part 1

In [None]:
fig, axes = plt.subplots(10, 1, figsize=(15, 15))
axes = axes.flatten()

data_iter = iter(plot_loader)
for index, c in enumerate(class_labels):
    counter = 0
    class_images = torch.empty(4, 3, 96, 96)
    while counter < 4:
        image, label = data_iter.next()
        if label == index:
            class_images[counter] = image
            counter += 1
    class_images = class_images / 2 + 0.5  # denormalize
    axes[index].imshow(np.transpose(make_grid(class_images), (1, 2, 0)))
    axes[index].set_ylabel(str(c), rotation='horizontal', fontsize=20,
                           verticalalignment='center', horizontalalignment='right')
for ax in axes:
    ax.set_yticklabels([])
    ax.set_xticklabels([])
plt.show(block=False)

# Part 2

### General functions 

In [None]:
def test(test_loader, model):
    model.eval()
    criterion = nn.CrossEntropyLoss()
    correct = 0
    total = 0
    num_of_batches = 0
    test_loss = 0
    with torch.no_grad():
        for data in test_loader:
            num_of_batches += 1
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            loss = criterion(outputs, labels)
            test_loss += loss.item()
    return correct / total, test_loss / num_of_batches

def train_model(model, train_loader, training_epochs, lr, momentum, weight_decay):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
    train_losses, train_accuracy, val_losses, val_accuracy = ([] for i in range(4))

    for epoch in range(training_epochs):
            model.train()
            running_loss = 0.0
            train_correct = 0.
            train_total = 0.
            num_of_batches = 0
            for i, data in enumerate(train_loader, 0):
                num_of_batches += 1

                inputs, labels = data
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward + backward + optimize
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()

                # update weights
                optimizer.step()

                # print statistics
                running_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                train_total += labels.size(0)
                train_correct += (predicted == labels).sum().item()
            
            train_losses.append(running_loss / num_of_batches)
            train_accuracy.append(train_correct / train_total)
            val_check = test(val_loader, model)
            val_accuracy.append(val_check[0])
            val_losses.append(val_check[1])
            #print('epoch %d \tloss: %.3f\t  acc:%.3f\t val_acc:%.3f' %
              #(epoch + 1, running_loss / num_of_batches, train_correct / train_total, val_check[0]))
    results = train_accuracy, train_losses, val_accuracy, val_losses
    return model, results, val_accuracy[len(val_accuracy) - 1]

### Hyper-parameters tuning

In [None]:
def hyper_parameters_tuning(best_params_path, model):
    with open(best_params_path, "r") as file:
            best_acc = file.readline()
    for hyper_parameters_set in itertools.product(lr, momentum, weight_decay):
        best_model, results, acc_iterate = train_model(model, train_loader,
                                      training_epochs, hyper_parameters_set[0], hyper_parameters_set[1],
                                                       hyper_parameters_set[2])
        
        print("lr: " + str(hyper_parameters_set[0]) + ", momentum: " + str(hyper_parameters_set[1])
          + ", weight_decay: " + str(hyper_parameters_set[2]) + ", accuracy: "+ str(acc_iterate))
        if acc_iterate > float(best_acc):
            best_acc = acc_iterate
            print("new best accuracy: "+ str(best_acc))
            f_w = open(best_params_path, "w")
            f_w.write(str(best_acc)+'\n')
            f_w.write(str(hyper_parameters_set[0]) + '\n')
            f_w.write(str(hyper_parameters_set[1]) + '\n')
            f_w.write(str(hyper_parameters_set[2]) + '\n')
            f_w.close()
        
        #lines = open(best_params_path, 'r').readlines()
        #train_accuracy, train_losses, val_accuracy, val_losses = results
        #steps = np.arange(training_epochs)
        #fig, ax1 = plt.subplots()
        #ax1.set_xlabel('epochs')
        #ax1.set_ylabel('loss')
        # ax1.set_title('test loss: %.3f, test accuracy: %.3f' % (test_loss, test_acc))
        #ax1.plot(steps, train_losses, label="train loss", color='red')
        #ax1.plot(steps, val_losses, label="val loss", color='green')

        #ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis
        #ax2.set_ylabel('accuracy')  # we already handled the x-label with ax1
        #ax2.plot(steps, train_accuracy, label="train acc", color='black')
        #ax2.plot(steps, val_accuracy, label="val acc", color='blue')

        #fig.legend(loc='center right', bbox_to_anchor=(0.8, 0.6))
        #fig.suptitle('Epochs={}, LR={}, momentum={}, reg={}'.format(training_epochs, lines[1], lines[2], lines[3]))
        #fig.tight_layout()
        #
        plt.show(block=False)

### Train best model

In [None]:
def train_final_model(best_params_path, model, figure_name):
    
    f_r = open(best_params_path, "r")
    Lines = f_r.read().splitlines()
    f_r.close()
    best_params = []
    for line in Lines:
        best_params.append(line)
    
    best_model, results, acc_iterate = train_model(model, train_loader, final_model_epochs, float(best_params[1]),
                                                   float(best_params[2]), float(best_params[3]))
    
    train_accuracy, train_losses, val_accuracy, val_losses = results
    steps = np.arange(final_model_epochs)
    fig, ax1 = plt.subplots()
    ax1.set_xlabel('epochs')
    ax1.set_ylabel('loss')
    # ax1.set_title('test loss: %.3f, test accuracy: %.3f' % (test_loss, test_acc))
    ax1.plot(steps, train_losses, label="train loss", color='red')
    ax1.plot(steps, val_losses, label="val loss", color='green')

    ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis
    ax2.set_ylabel('accuracy')  # we already handled the x-label with ax1
    ax2.plot(steps, train_accuracy, label="train acc", color='black')
    ax2.plot(steps, val_accuracy, label="val acc", color='blue')

    fig.legend(loc='center right', bbox_to_anchor=(0.8, 0.6))
    fig.suptitle('Epochs={}, LR={}, momentum={}, reg={}'.format(final_model_epochs, best_params[1], best_params[2], best_params[3]))
    fig.tight_layout()
    plt.savefig(figure_name)
    plt.show(block=False)

## Logistic Regression

### Definintion of the model

In [None]:
class LogisticRegression(nn.Module):
    def __init__(self, number_of_features):
        super(LogisticRegression, self).__init__()
        self.flat = nn.Flatten(start_dim=1)
        self.linear = nn.Linear(number_of_features, 10)
        
    def forward(self, x):
        x = self.flat(x)
        x = self.linear(x)
        return x

### Hyper-parameter tuning

In [None]:
hyper_parameters_tuning("./lr_best_params.txt", LogisticRegression(number_of_features).to(device))

### Train model

In [None]:
train_final_model("./lr_best_params.txt", LogisticRegression(number_of_features).to(device), "lr_plot.png")
print("~~~~~~~~~~~~~~~Logistic Regression Done~~~~~~~~~~~~~~~")

## Fully Connected NN

### Definintion of the model

In [None]:
class fc3_nn(nn.Module):
    def __init__(self, number_of_features, 200, 100, dropout_prob):
        super(fc3_nn, self).__init__()
        self.flat = nn.Flatten(start_dim=1)
        # first layer
        self.first_layer = nn.Linear(number_of_features, 200)
        self.first_layer_norm = nn.BatchNorm1d(200)
        self.first_layer_dropout = nn.Dropout(dropout_prob)
        
        # second layer
        self.second_layer = nn.Linear(200, 200)
        self.second_layer_norm = nn.BatchNorm1d(200)
        self.second_layer_dropout = nn.Dropout(dropout_prob)
        
        # third layer
        self.third_layer = nn.Linear(200, 100)
        self.third_layer_norm = nn.BatchNorm1d(100)
        self.third_layer_dropout = nn.Dropout(dropout_prob)
        
        self.last_layer = nn.Linear(100, 10)
        
    def forward(self, x):
        x = self.flat(x)
        x = self.first_layer(x)
        x = self.first_layer_norm(x)
        x = F.relu(x)
        x = self.first_layer_dropout(x)
        
        x = self.second_layer(x)
        x = self.second_layer_norm(x)
        x = F.relu(x)
        x = self.second_layer_dropout(x)
        
        x = self.third_layer(x)
        x = self.third_layer_norm(x)
        x = F.relu(x)        
        x = self.third_layer_dropout(x)
        x = self.last_layer(x)
        return x

### Hyper-parameter tuning

In [None]:
with open("./cnn_best_params.txt", "r") as file:
    max_acc = file.readline()

for prob in dropout_prob:
    print("Dropout probability: " + str(prob))
    hyper_parameters_tuning("./fc3_nn_best_params.txt", fc3_nn(number_of_features, 200, 100, prob).to(device))
    lines = open("./cnn_best_params.txt", 'r').readlines()
    next_max_acc = lines[0]
    
    # check if need to update the new dropout probability
    if max_acc < next_max_acc:
        max_acc = next_max_acc
        lines[-1] = str(prob)
        open("./cnn_best_params.txt", 'w').writelines(lines)

### Train model

In [None]:
lines = open("./fc3_nn_best_params.txt", 'r').readlines()
train_final_model("./fc3_nn_best_params.txt", fc3_nn(number_of_features, 200, 100, float(lines[-1])).to(device), "fc3_nn_plot.png")
print("~~~~~~~~~~~~~~~Fully-Connected NN Done~~~~~~~~~~~~~~~")

## CNN

### Definintion of the model

In [None]:
class cnn(nn.Module):
    def __init__(self, number_of_features, dropout_prob):
        super(cnn, self).__init__()
        self.first_conv = nn.Conv2d(3, 6, 3)
        self.first_conv_norm = nn.BatchNorm2d(6)
        self.first_maxpool = nn.MaxPool2d(2, 2)
        
        self.second_conv = nn.Conv2d(6, 15, 2)
        self.second_conv_norm = nn.BatchNorm2d(15)
        self.second_maxpool = nn.MaxPool2d(2, 2)
        
        self.third_conv = nn.Conv2d(15, 30, 2)
        self.third_conv_norm = nn.BatchNorm2d(30)
        self.third_maxpool = nn.MaxPool2d(2, 2)
        
        self.first_linear = nn.Linear(30 * 7 * 7, 500)
        self.first_dropout = nn.Dropout(dropout_prob)

        self.second_linear = nn.Linear(500, 100)
        self.second_dropout = nn.Dropout(dropout_prob)
        
        self.last_layer = nn.Linear(100, 10)

    def forward(self, x):
        x = self.first_conv(x)
        x = self.first_conv_norm(x)
        x = F.relu(x)
        x = self.first_maxpool(x)
        
        x = self.second_conv(x)
        x = self.second_conv_norm(x)
        x = F.relu(x)
        x = self.second_maxpool(x)
        
        x = self.third_conv(x)
        x = self.third_conv_norm(x)
        x = F.relu(x)
        x = self.third_maxpool(x)
        
        x = x.view(-1, 30 * 7 * 7)

        x = self.first_linear(x)
        x = self.first_dropout(x)
        x = F.relu(x)
        x = self.second_linear(x)
        x = self.second_dropout(x)
        
        x = self.last_layer(x)
        return x

### Hyper-parameter tuning

In [None]:
with open("./cnn_best_params.txt", "r") as file:
    max_acc = file.readline()

for prob in dropout_prob:
    print("Dropout probability: " + str(prob))
    hyper_parameters_tuning("./cnn_best_params.txt", cnn(number_of_features, prob).to(device))
    lines = open("./cnn_best_params.txt", 'r').readlines()
    next_max_acc = lines[0]
    
    # check if need to update the new dropout probability
    if max_acc < next_max_acc:
        max_acc = next_max_acc
        lines[-1] = str(prob)
        open("./cnn_best_params.txt", 'w').writelines(lines)

### Train model

In [None]:
lines = open("./cnn_best_params.txt", 'r').readlines()
train_final_model("./cnn_best_params.txt", cnn(number_of_features, float(lines[-1])).to(device), "cnn_plot.png")
print("~~~~~~~~~~~~~~~CNN Done~~~~~~~~~~~~~~~")

## Fixed pre-trained MobileNetV2

### Definintion of the model

In [None]:
class Fixed_MobileNetV2(nn.Module):
    def __init__(self):
        super(Fixed_MobileNetV2, self).__init__()
        self.feature_extractor = models.mobilenet.mobilenet_v2(pretrained=True)
        for param in self.feature_extractor.parameters():
            param.requires_grad = False
        self.feature_extractor.classifier[1] = nn.Linear(self.feature_extractor.classifier[1].in_features,out_features=500)
        
        self.d1 = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(500, 100)
        self.d2 = nn.Dropout(p=0.2)
        self.fc3 = nn.Linear(100, 10)
        self.d3 = nn.Dropout(p=0.2)
        
    def forward(self, x):
        x = self.feature_extractor(x)
        x = F.relu(x)        
        x = self.d1(x)
        
        x = self.fc2(x)
        x = F.relu(x)
        x = self.d2(x)
        
        x = self.fc3(x)
        x = self.d3(x)
        
        return x

### Hyper-parameter tuning

In [None]:
hyper_parameters_tuning("./f_mobilenet_best_params.txt", Fixed_MobileNetV2().to(device))

### Train model

In [None]:
train_final_model("./f_mobilenet_best_params.txt", Fixed_MobileNetV2().to(device),
                  "f_mobilenet_plot.png")
print("~~~~~~~~~~~~~~~Fixed MobileNetV2 Done~~~~~~~~~~~~~~~")

## Learned pre-trained MobileNetV2

### Definintion of the model

In [None]:
class Learned_MobileNetV2(nn.Module):
    def __init__(self):
        super(Learned_MobileNetV2, self).__init__()
        self.feature_extractor = models.mobilenet.mobilenet_v2(pretrained=True)
        self.feature_extractor.classifier[1] = nn.Linear(self.feature_extractor.classifier[1].in_features,out_features=500)
        
        self.d1 = nn.Dropout(p=0.2)
        self.fc2 = nn.Linear(500, 100)
        self.d2 = nn.Dropout(p=0.2)
        self.fc3 = nn.Linear(100, 10)
        self.d3 = nn.Dropout(p=0.2)
        
    def forward(self, x):
        x = self.feature_extractor(x)
        x = F.relu(x)        
        x = self.d1(x)
        
        x = self.fc2(x)
        x = F.relu(x)
        x = self.d2(x)
        
        x = self.fc3(x)
        x = self.d3(x)
        
        return x

### Hyper-parameter tuning

In [None]:
hyper_parameters_tuning("./f_mobilenet_best_params.txt", Fixed_MobileNetV2().to(device))

### Train model

In [None]:
train_final_model("./l_mobilenet_best_params.txt", Learned_MobileNetV2().to(device),
                  "l_mobilenet_plot.png")
print("~~~~~~~~~~~~~~~Learned MobileNetV2 Done~~~~~~~~~~~~~~~")