In [1]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [6]:
%cd '/content/drive/MyDrive/Deep Learning/ex3__208144477_206556318'

/content/drive/MyDrive/Deep Learning/ex3__208144477_206556318


# Libraries & packages

In [7]:
import numpy as np
import matplotlib.pyplot as plt
import random
import torch
import torch.nn as nn
import torch.nn.functional as func
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from sklearn import svm
import sklearn.metrics as metrics
from prettytable import PrettyTable
import pandas as pd
import seaborn as sns
import warnings


my_device = torch.device('cpu')
# checking availability of GPU:
gpu_available = torch.cuda.is_available()
if gpu_available: my_device = torch.device('cuda')
print('CUDA is available. Training on GPU' if gpu_available else 'CUDA is unavailable. Training on CPU')

CUDA is available. Training on GPU


#Setting default parameter values

In [8]:
#define params:
N_EPOCHS = 30
in_size = 784 # 28 X 28
shape_latent = 10
shape_hidden = 256
lr = 1e-3
random.seed(10)
#define criterion:
criterion = func.binary_cross_entropy
# svm model:
svm_model = svm.LinearSVC(random_state=0, tol=1e-5)
#mode:
mode = 'train'
# mode = 'load'

#Database creation:

In [9]:
def data_init(n_labels, dataset_name):

  batch_size = int(n_labels/10)
  #download data
  if dataset_name =='FashionMNIST':
    train_dataset = datasets.FashionMNIST('./data',train=True,download=True,transform=transforms.ToTensor())
    test_dataset = datasets.FashionMNIST('./data',train=False,download=True,transform=transforms.ToTensor())
  else: #dataset_name =='MNIST':
    train_dataset = datasets.MNIST('./data',train=True,download=True,transform=transforms.ToTensor())
    test_dataset = datasets.MNIST('./data',train=False,download=True,transform=transforms.ToTensor())
  #preparing the data
  train_dataset_labeled, not_labeled = torch.utils.data.random_split(train_dataset, [n_labels, len(train_dataset)-n_labels])
  train_iter = DataLoader(train_dataset_labeled, batch_size=batch_size, shuffle=True) #reshuffle training data every epoch- reduce model overfitting
  test_iter = DataLoader(test_dataset, batch_size=batch_size)

  return train_dataset_labeled, train_iter, test_dataset, test_iter


#Model: encoder

In [10]:
class Encoder(nn.Module):

    def __init__(self, hidden_size, latent_size):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels = 16, kernel_size=3)
        self.conv2 = nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = 3)
        self.linear1 = nn.Linear(32*24*24, 512)
        self.linear2 = nn.Linear(512, hidden_size)
        self.mean = nn.Linear(hidden_size, latent_size)
        self.var = nn.Linear(hidden_size, latent_size)

    def forward(self, x):
        # x.shape = [batch_size, in_size]
        x = func.relu(self.conv1(x))
        x = func.relu(self.conv2(x))
        bs, ch, w,h = x.shape # bs for batch size
        x = x.view(bs,ch*w*h)
        # fully connected:
        x = func.relu(self.linear1(x))
        hidden_layer = func.relu(self.linear2(x))  # hidden_layer.shape = [batch_size, hidden_size]
        # latent parameters:
        z_mean = self.mean(hidden_layer) # z_mean.shape = [batch_size, latent_size]
        z_var_log = self.var(hidden_layer) # z_var_log.shape = [batch_size, latent_size]

        return z_mean, z_var_log

#Model: decoder

In [11]:
class Decoder(nn.Module):

    def __init__(self, latent_size, hidden_size):
        super().__init__()
        self.linear = nn.Linear(latent_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, 512)
        self.linear3 = nn.Linear(512, 18432)
        self.deconv1 = nn.ConvTranspose2d(32, 16, kernel_size = 3)
        self.deconv2 = nn.ConvTranspose2d(16, 1, kernel_size = 3)

    def forward(self, x):  # x.shape = [batch_size, shape_latent]
        x = func.relu(self.linear(x))
        x = func.relu(self.linear2(x))
        x = func.relu(self.linear3(x))
        x = x.view(-1, 32, 24,24)
        x = func.relu(self.deconv1(x))
        output = torch.sigmoid(self.deconv2(x)) # output.shape = [batch_size, output_dim]

        return output

#Model: VAE

In [12]:
class VAE(nn.Module):

    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, x):
        # encode x
        # latent params:
        z_mean, z_var_log = self.encoder(x)
        # sampling from standard normal dist:
        std = torch.exp(z_var_log/2)
        normal_sample = torch.randn_like(std)
        x_sample = normal_sample.mul(std).add_(z_mean) # ~N(z_mean, std)

        # decode
        output = self.decoder(x_sample)
        return output, z_mean, z_var_log

In [13]:
#weights initialization
#applies Xavier uniform initialization to the weights of linear layers in a neural network
def init_xavier(m):
    if type(m) == nn.Linear:
        nn.init.xavier_uniform_(m.weight)

# Train & test proccesses:

In [14]:
def train_model(model, train_iterator):

    model.train()
    train_loss = 0
    optimizer = optim.Adam(model.parameters(), lr=lr)
    for data, target in train_iterator:
        data = data.to(my_device)
        optimizer.zero_grad()
        # forward pass
        x_sample, z_mean, z_var_log = model(data)
        # reconstruction loss
        BCE_loss = criterion(x_sample, data, size_average=False)
        # kl divergence loss-
        # wrt kl div of gaussian dists: 0.5*log(var2/var1) + 0.5*(var1+(miu1-miu2)^2)/var2 - 0.5
        kl_loss = 0.5 * torch.sum(-z_var_log + torch.exp(z_var_log) + z_mean**2 - 1.0)
        loss = kl_loss + BCE_loss
        # backward pass
        loss.backward()
        optimizer.step() # update weights
        train_loss += loss.item()

    return train_loss

In [15]:
def test_model(model, test_iterator):

    model.eval()
    test_loss = 0
    for data, target in test_iterator:
        data = data.to(my_device)
        # forward
        with torch.no_grad(): # turn off grad computation during evaluation
          x_sample, z_mean, z_var_log = model(data)
        # reconstruction loss
        BCE_loss = criterion(x_sample, data, size_average=False)
        # kl divergence loss
        # wrt kl div of gaussian dists: 0.5*log(var2/var1) + 0.5*(var1+(miu1-miu2)^2)/var2 - 0.5
        kl_loss = 0.5 * torch.sum(-z_var_log + torch.exp(z_var_log) + z_mean**2 - 1.0)
        loss = kl_loss + BCE_loss
        test_loss += loss.item()

    return test_loss

In [16]:
def fit(model, batch_size, n_labels, train_dataset, train_iterator, test_dataset, test_iterator):

    min_test_loss = float('inf')
    for epoch in range(N_EPOCHS):
        train_loss = train_model(model, train_iterator)
        test_loss = test_model(model, test_iterator)
        train_loss /= len(train_dataset)
        test_loss /= len(test_dataset)
        print(f'Epoch: {epoch}, Training Loss: {train_loss:.2f}, Testing Loss: {test_loss:.2f}')
        # defining tolerance:
        if min_test_loss > test_loss:
            min_test_loss = test_loss
            patience = 1
        else:
            patience += 1
        if patience > 4:
            break
    print("\n")
    # save NN weights
    torch.save(model.state_dict(), f'model_VAE_{n_labels}_labels.pt')

# Generating latent representation:

In [17]:
def latent_data(model, db_iterator):

    check_first = 0
    db_labels = []

    with torch.no_grad():
        for batch, labels in db_iterator:
            batch = batch.to(my_device)
            labels = labels.tolist()
            # concatenate all batches labels:
            db_labels.extend(labels)
            #Pass the batch through encoder model-get latent represent
            latent_mean, latent_var = model.encoder(batch)
            # '.numpy()' method does not support CUDA tensors
            latent_mean = latent_mean.to('cpu').numpy()
            latent_var = latent_var.to('cpu').numpy()
            if check_first == 0:
              db_means = latent_mean
              db_variances = latent_var
              check_first = 1
            else:
              # stack all batches (latent representation) params:
              db_means = np.vstack((db_means, latent_mean))
              db_variances = np.vstack((db_variances, latent_var))


    return db_means, db_variances, db_labels

# Classification model: SVM



In [18]:
def SVM_classification(model, n_labels, train_means, train_vars, train_labels, test_iter):

    # train the SVM classifier:
    #preparing latent database to train SVM classifier:
    train_latent_data = np.column_stack((train_means, train_vars))
    train_latent_df = pd.DataFrame(train_latent_data)
    train_labels = np.array(train_labels)

    # train svm classifier:
    if mode == 'train':
      svm_model.fit(train_latent_df, train_labels)
      # save SVM model
      torch.save(svm_model, f'model_SVM_{n_labels}_labels.pt')
    # loading trained model for prediction:
    svm_model_trained = torch.load(f'model_SVM_{n_labels}_labels.pt')

    #preparing test latent database:
    test_means, test_vars, test_labels = latent_data(model, test_iter)
    test_latent_data = np.column_stack((test_means, test_vars))
    test_latent_df = pd.DataFrame(test_latent_data)
    test_labels = np.array(test_labels)
    # prediction:
    prediction = svm_model_trained.predict(test_latent_df)

    return prediction, test_labels

# Main function:

In [19]:
def run_main(n_labels, dataset_name):

  batch_size = int(n_labels/10)
  train_dataset_labeled, train_iter, test_dataset, test_iter = data_init(n_labels,dataset_name)
  # model
  encoder = Encoder(shape_hidden, shape_latent)
  decoder = Decoder(shape_latent, shape_hidden)
  model = VAE(encoder, decoder)
  model = model.to(my_device)
  model.apply(init_xavier);

  print(f'{dataset_name} dataset with {n_labels} labels:\n')

  # train:
  fit(model, batch_size, n_labels, train_dataset_labeled, train_iter, test_dataset, test_iter)
  # load model weights:
  model.load_state_dict(torch.load(f'model_VAE_{n_labels}_labels.pt'))

  train_means, train_vars, train_labels = latent_data(model, train_iter)
  ypreds, test_labels = SVM_classification(model, n_labels, train_means, train_vars, train_labels, test_iter)

  # assessment metrics to analyze the classification algorithm:
  # accuracy: total correctly classified example divided by total number of classified examples.
  test_accuracy = metrics.accuracy_score(test_labels, ypreds)
  f_score = metrics.f1_score(test_labels, ypreds, average='macro') #F1 Score = 2*(Recall * Precision)/(Recall + Precision)
  precision = metrics.precision_score(test_labels, ypreds, average='macro')  #precision= TP/(TP+FP)
  recall = metrics.recall_score(test_labels, ypreds, average='macro') #recall  = TP/(TP+FN)
  print('Test Accuracy : ', test_accuracy)
  print('Test Data f-Score : ', f_score)
  print('Test Recall : ', recall)
  print('Test prec : ', precision)
  print("*************************************************\n\n")


  return test_accuracy*100

# Results:

In [20]:
warnings.filterwarnings('ignore')

test_acc100_fashion = run_main(n_labels=100, dataset_name ='FashionMNIST')
test_acc600_fashion = run_main(n_labels=600, dataset_name ='FashionMNIST')
test_acc1000_fashion = run_main(n_labels=1000, dataset_name ='FashionMNIST')
test_acc3000_fashion = run_main(n_labels=3000, dataset_name ='FashionMNIST')

test_acc100 = run_main(n_labels=100, dataset_name ='MNIST')
test_acc600 = run_main(n_labels=600, dataset_name ='MNIST')
test_acc1000 = run_main(n_labels=1000, dataset_name ='MNIST')
test_acc3000 = run_main(n_labels=3000, dataset_name ='MNIST')

FashionMNIST dataset with 100 labels:

Epoch: 0, Training Loss: 480.39, Testing Loss: 422.12
Epoch: 1, Training Loss: 396.23, Testing Loss: 380.13
Epoch: 2, Training Loss: 358.40, Testing Loss: 344.11
Epoch: 3, Training Loss: 327.07, Testing Loss: 332.95
Epoch: 4, Training Loss: 316.43, Testing Loss: 329.14
Epoch: 5, Training Loss: 313.57, Testing Loss: 327.49
Epoch: 6, Training Loss: 304.80, Testing Loss: 316.76
Epoch: 7, Training Loss: 304.00, Testing Loss: 322.43
Epoch: 8, Training Loss: 306.72, Testing Loss: 321.76
Epoch: 9, Training Loss: 302.91, Testing Loss: 312.97
Epoch: 10, Training Loss: 300.95, Testing Loss: 309.84
Epoch: 11, Training Loss: 301.52, Testing Loss: 313.13
Epoch: 12, Training Loss: 298.32, Testing Loss: 310.33
Epoch: 13, Training Loss: 294.72, Testing Loss: 308.01
Epoch: 14, Training Loss: 296.47, Testing Loss: 308.19
Epoch: 15, Training Loss: 293.61, Testing Loss: 308.65
Epoch: 16, Training Loss: 296.07, Testing Loss: 308.59
Epoch: 17, Training Loss: 289.72, Te

# Results table:


In [21]:
myTable = PrettyTable(["Dataset", "# of labels in train set", "Test Accuracy"])

myTable.add_row(["FashionMNIST", "100", f"{round(test_acc100_fashion,2)}"])
myTable.add_row(["FashionMNIST", "600", f"{round(test_acc600_fashion,2)}"])
myTable.add_row(["FashionMNIST", "1000", f"{round(test_acc1000_fashion,2)}"])
myTable.add_row(["FashionMNIST", "3000", f"{round(test_acc3000_fashion,2)}"])
myTable.add_row(["MNIST", "100", f"{round(test_acc100,2)}"])
myTable.add_row(["MNIST", "600", f"{round(test_acc600,2)}"])
myTable.add_row(["MNIST", "1000", f"{round(test_acc1000,2)}"])
myTable.add_row(["MNIST", "3000", f"{round(test_acc3000,2)}"])

print("Results Summary - different datasets and number of labels in training")
print(myTable)

Results Summary - different datasets and number of labels in training
+--------------+--------------------------+---------------+
|   Dataset    | # of labels in train set | Test Accuracy |
+--------------+--------------------------+---------------+
| FashionMNIST |           100            |     59.76     |
| FashionMNIST |           600            |     72.55     |
| FashionMNIST |           1000           |     72.36     |
| FashionMNIST |           3000           |     74.31     |
|    MNIST     |           100            |     64.29     |
|    MNIST     |           600            |     82.37     |
|    MNIST     |           1000           |     83.89     |
|    MNIST     |           3000           |     82.43     |
+--------------+--------------------------+---------------+
