In [1]:
import torch
import torch.nn
import torchvision
from torchvision import transforms
import numpy as np
from torch.utils.data.sampler import SubsetRandomSampler
from matplotlib import pyplot as plt
from torchvision.utils import make_grid
import pickle
import os
from matplotlib.lines import Line2D

In [None]:
# Define the used device
# Check whether cuda or cpu is used
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# If GPU is used, write cuda. Otherwise, CPU will be used for training
print(device) 

In [None]:
transform = transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    torchvision.transforms.Grayscale()
])

# Training set
train_data = torchvision.datasets.CIFAR10("./data", train = True, download = True,
                                          transform = transform)
# Test set
test_data = torchvision.datasets.CIFAR10("./data", train = False,
                                         transform = transform)

In [4]:
# Validation set that includes 10% of the training set
# So, firstly we should check the number of samples in the training set
train_length = len(train_data)

# Create a numpy array that stores the indices of train set from 0 to train_length
# In other words, [0, 50000)
train_indices = np.arange((train_length))

# Locate classes of each index so that they can be splitted equally
class_labels = np.array(train_data.targets)

# Take the 10% of the train set and store it as validation set length
validation_length = int(train_length*0.1)

# Number of samples per each class
class_sample_number = int(validation_length / len(train_data.classes))

# Create a list includes indices of each class
class_indices = [np.where(class_labels == i)[0] for i in range(len(train_data.classes))]

# Initialize the indices of the validation set
validation_indices = []

# Randomly chose indices per each class equally from the training set
for index in class_indices:
  validation_indices.extend(np.random.choice(index, class_sample_number, replace = False))

# Calculate the train indices after the validation split
train_indices = list(set(train_indices) - set(validation_indices))

# Create a train sampler by excluding indices that are separated for the validation set
train_sampler = SubsetRandomSampler(train_indices)

# Create a validation sampler by using the validation indices calculated
validation_sampler = SubsetRandomSampler(validation_indices) 

In [5]:
# Define dataloaders to that are sampled accordingly
batch_size = 50
train_generator = torch.utils.data.DataLoader(train_data, batch_size = batch_size, sampler = train_sampler)
test_generator = torch.utils.data.DataLoader(test_data, batch_size = batch_size)
validation_generator =  torch.utils.data.DataLoader(train_data, batch_size = batch_size, sampler = validation_sampler)

In [6]:
# Check whether the train, test and validation generators are created correctly
# In the beginning, the sizes of the datasets were as follows:
# Train Set: 50000
# Test Set : 10000
# Validation Set: 0

# After splitting 10% of the training set into validation set, these datasets are obtained:
# Train Set: 45000
# Test Set : 10000
# Validation Set: 5000

# According to the new values, the outputs of the DataLoaders should be as follows:
# train_generator: 45000 / 50 = 900
# test_generator : 10000 / 50 = 200
# validation_generator: 5000 / 50 = 100
# Where first operand is the number of images per each dataset, and the second operand is the batch size

# So, this assert tests whether the DataLoaders created correctly or not
assert( (len(train_generator) == 900) & (len(test_generator) == 200)  & (len(validation_generator) == 100) )

In [7]:
# General definitions about the architectures:
# FC-N: Fully Connected layer of size N
# Conv-WxHxN: N Convolutional layers of size WxH
# MaxPool-2x2: Max-pooling operation of pool size 2x2
# PredictionLayer: FC-10

# Parameter Definitons:
# Stride: 1 for Convolutions
# Stride: 2 for Max-pooling operations

# Padding: Should be valid for both Convolution and Max-pooling operations

# Optimizer: Adam (Adaptive Moment Estimation) with default setting
# Adam Optimizer with default settings:
# torch.optim.Adam(params, lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0,
#                  amsgrad=False, *, foreach=None, maximize=False, capturable=False, 
#                  differentiable=False, fused=None)

# Batch Size: 50 samples

# Three Dataset: Train Set, Test Set and Validation Set 

In [8]:
# First Class: Multi Layer Perceptron 1 Class 
# mlp1: [FC-32, ReLU] + PredictionLayer
class mlp1(torch.nn.Module):
  def __init__(self, input_size, hidden_size, num_classes):
    super(mlp1, self).__init__()
    self.input_size = input_size
    self.fc1 = torch.nn.Linear(input_size, hidden_size)
    self.fc2 = torch.nn.Linear(hidden_size, num_classes, bias = False)
    self.relu = torch.nn.ReLU()
  def forward(self, x):
    x = x.view(-1, self.input_size)
    hidden = self.fc1(x)
    relu = self.relu(hidden)
    output = self.fc2(relu)
    return output

In [9]:
# Second Class: Multi Layer Perceptron 2 Class
class mlp2(torch.nn.Module):
  def __init__(self, input_size, hidden_size1, hidden_size2, num_classes):
    super(mlp2, self).__init__()
    self.input_size = input_size
    self.fc1 = torch.nn.Linear(input_size, hidden_size1)
    self.fc2 = torch.nn.Linear(hidden_size1, hidden_size2, bias = False)
    self.fc3 = torch.nn.Linear(hidden_size2, num_classes, bias = False)
    self.relu = torch.nn.ReLU()
  def forward(self, x):
    x = x.view(-1, self.input_size)
    hidden1 = self.fc1(x)
    relu1 = self.relu(hidden1)
    hidden2 = self.fc2(relu1)
    relu2 = self.relu(hidden2)
    output = self.fc3(relu2)
    return output

In [10]:
# Third Class: Convolutional Neural Network 3
class cnn_3(torch.nn.Module):
  def __init__(self):
    super(cnn_3, self).__init__()
    self.conv1 = torch.nn.Conv2d(1, 16, kernel_size = (3,3), stride = 1, padding = 'valid')
    self.relu1 = torch.nn.ReLU()
    self.conv2 = torch.nn.Conv2d(16, 8, kernel_size = (5,5), stride = 1, padding = 'valid')
    self.relu2 = torch.nn.ReLU()
    self.maxpool1 = torch.nn.MaxPool2d(kernel_size = (2,2), stride = 2, padding = 0)
    self.conv3 = torch.nn.Conv2d(8, 16, kernel_size = (7,7), stride = 1, padding = 'valid')
    self.maxpool2 = torch.nn.MaxPool2d(kernel_size = (2,2), stride = 2, padding = 0)
    self.fc  = torch.nn.Linear(16*3*3, 10, bias = False)
    
  def forward(self, x):
    x = self.conv1(x)
    x = self.relu1(x)
    x = self.conv2(x)
    x = self.relu2(x)
    x = self.maxpool1(x)
    x = self.conv3(x)
    x = self.maxpool2(x)
    x = x.view(-1, 16*3*3)
    
    x = self.fc(x)
    return x

In [11]:
# Fourth Class: Convolutional Neural Network 4
class cnn_4(torch.nn.Module):
  def __init__(self):
    super(cnn_4, self).__init__()
    self.conv1 = torch.nn.Conv2d(1, 16, kernel_size = (3,3), stride = 1, padding = 'valid')
    self.relu1 = torch.nn.ReLU()
    self.conv2 = torch.nn.Conv2d(16, 8, kernel_size = (3,3), stride = 1, padding = 'valid')
    self.relu2 = torch.nn.ReLU()
    self.conv3 = torch.nn.Conv2d(8, 16, kernel_size = (5,5), stride = 1, padding = 'valid')
    self.relu3 = torch.nn.ReLU()
    self.maxpool1 = torch.nn.MaxPool2d(kernel_size = (2,2), stride = 2, padding = 0)
    self.conv4 = torch.nn.Conv2d(16, 16, kernel_size = (5,5), stride = 1, padding = 'valid')
    self.relu4 = torch.nn.ReLU()
    self.maxpool2 = torch.nn.MaxPool2d(kernel_size = (2,2), stride = 2, padding = 0)
    self.fc = torch.nn.Linear(16*4*4, 10, bias = False)
    
  def forward(self, x):
    x = self.conv1(x)
    x = self.relu1(x)
    x = self.conv2(x)
    x = self.relu2(x)
    x = self.conv3(x)
    x = self.relu3(x)
    x = self.maxpool1(x)
    x = self.conv4(x)
    x = self.relu4(x)
    x = self.maxpool2(x)
    x = x.view(-1, 16*4*4)
    x = self.fc(x)
    return x

In [12]:
# Fifth Class: Convolutional Neural Network 5
class cnn_5(torch.nn.Module):
  def __init__(self):
    super(cnn_5, self).__init__()
    self.conv1 = torch.nn.Conv2d(1, 8, kernel_size = (3,3), stride = 1, padding = 'valid')
    self.relu1 = torch.nn.ReLU()
    self.conv2 = torch.nn.Conv2d(8, 16, kernel_size = (3,3), stride = 1, padding = 'valid')
    self.relu2 = torch.nn.ReLU()
    self.conv3 = torch.nn.Conv2d(16, 8, kernel_size = (3,3), stride = 1, padding = 'valid')
    self.relu3 = torch.nn.ReLU()
    self.conv4 = torch.nn.Conv2d(8, 16, kernel_size = (3,3), stride = 1, padding = 'valid')
    self.relu4 = torch.nn.ReLU()
    self.maxpool1 = torch.nn.MaxPool2d(kernel_size = (2,2), stride = 2, padding = 0)
    self.conv5 = torch.nn.Conv2d(16, 16, kernel_size = (3,3), stride = 1, padding = 'valid')
    self.relu5 = torch.nn.ReLU()
    self.conv6 = torch.nn.Conv2d(16, 8, kernel_size = (3,3), stride = 1, padding = 'valid')
    self.relu6 = torch.nn.ReLU()
    self.maxpool2 = torch.nn.MaxPool2d(kernel_size = (2,2), stride = 2, padding = 0)
    self.fc = torch.nn.Linear(16*4*4, 10, bias = False)
  
  def forward(self, x):
    x = self.conv1(x)
    x = self.relu1(x)
    x = self.conv2(x)
    x = self.relu2(x)
    x = self.conv3(x)
    x = self.relu3(x)
    x = self.maxpool1(x)
    x = self.conv4(x)
    x = self.relu4(x)
    x = self.conv5(x)
    x = self.relu5(x)
    x = self.maxpool2(x)
    x = x.view(-1, 16*4*4)
    x = self.fc(x)
    return x

In [None]:
# Change the name of the model as required
model = cnn_4()
model.to(device)

In [14]:
# If CNNs are used, type model.conv1. Otherwise, type model.fc1
params_cnn_4 = model.conv1.weight.data.clone().numpy()

In [None]:
# Traing the model
# Define the epoch number
epoch = 15

# Create train, validation and test accuracy lists that hold the accuracy values
train_loss = []
train_accuracy = []
validation_accuracy = []
test_accuracy = []

# Create average train, validation and test accuracy lists that hold the average of the accuracy values
avg_loss_curve = []
avg_train_acc_curve = []
avg_val_acc_curve = []

# Create variables that hold best model weights and best test accuracy
best_weights = None
best_test_acc = 0.0

# Calculate the total number of batches from division of train set and batch size 
batch_number = int(len(train_generator.dataset)/batch_size)
total_run = 10
# Train and evaluate the model 10 times
for iteration in range(total_run):

  # Create loss: use cross entropy loss
  loss_func = torch.nn.CrossEntropyLoss()
  # Create the Adam optimizer with default parameters
  optimizer = torch.optim.Adam(model.parameters(), lr = 0.001, betas = (0.9, 0.999), eps = 1e-08, weight_decay = 0)

  # For loop that iterates for each epoch
  for cur_epoch in range(epoch):
    # Transfer the model to train mode
    model.train()
    for batch_idx, (x_train, y_train) in enumerate(train_generator):
      # Transfer the input and the output to the used device (cpu or cuda)
      x_train, y_train = x_train.to(device), y_train.to(device)

      # At every iteration reset the gradient to zero so that start from scratch
      optimizer.zero_grad()

      # Make prediction by using the model
      y_prediction = model(x_train)

      # Calculate the loss  
      loss = loss_func(y_prediction, y_train)

      # Backward pass and optimization step
      loss.backward()
      optimizer.step()
      
      if (batch_idx + 1) % 10 == 0:
        # Save training loss for every 10 steps
        train_loss.append(loss.item())

        # Transfer the model to eval mode
        model.eval()
        with torch.no_grad():

          # Calculate the training accuracy
          # Initialize the correct and total predictions
          correct_train = 0
          total_train = 0

          output = model(x_train)
          y_prediction = output.argmax(dim=1)

          for i in range(y_prediction.shape[0]):
            if y_train[i] == y_prediction[i]:
              correct_train += 1
            total_train += 1
          
          # Append the train accuracy result to the list  
          train_acc = 100 * correct_train / total_train
          train_accuracy.append(train_acc)
        
        for x_validation, y_validation in validation_generator:
          x_validation = x_validation.to(device)
          y_validation = y_validation.to(device)

          # Save validation accuracy for every 10 steps          
          model.eval()
          with torch.no_grad():

            # Calculate the validation accuracy
            # Initialize the correct and total predictions
            correct_validation = 0
            total_validation = 0

            output = model(x_validation)
            y_prediction = output.argmax(dim=1)

            for i in range(y_prediction.shape[0]):
              if y_validation[i] == y_prediction[i]:
                correct_validation += 1
              total_validation += 1

            # Append the validation accuracy result to the list
            validation_acc = 100 * correct_validation / total_validation
            validation_accuracy.append(validation_acc)

        print('Run [{}/{}], Epoch [{}/{}], Step [{}/{}], Training Acc: {:.2f}%, Validation Acc: {:.2f}%'
                      .format(iteration+1, total_run, cur_epoch+1, epoch, batch_idx + 1, len(train_generator), train_acc, validation_acc))
  # Calculate the test accuracy
  model.eval()
  with torch.no_grad():
    correct_test = 0
    total_test = 0

    for x_test, y_test in test_generator:
      x_test = x_test.to(device)
      y_test = y_test.to(device)

      output = model(x_test)
      y_prediction = output.argmax(dim=1)

      for i in range(y_prediction.shape[0]):
        if y_test[i] == y_prediction[i]:
          correct_test += 1
        total_test += 1

    # Append the result to the list
    test_accuracy.append(100 * correct_test / total_test)

    # Save the best test accuracy and best model weights
    for test_idx in range(len(test_accuracy)):
      if test_accuracy[test_idx] > best_test_acc:
        best_test_acc = test_accuracy[test_idx]
        best_weights = model.conv1.weight.data.clone().numpy()
  
  #best_test_acc.append(best_test_accucary)
  avg_loss_curve.append(np.mean(train_loss, axis = 0))
  avg_train_acc_curve.append(np.mean(train_accuracy, axis = 0))
  avg_val_acc_curve.append(np.mean(validation_accuracy, axis = 0))

In [None]:
# Check the difference between the initial weights and the best weights
(best_weights - params_cnn_4).sum()

In [None]:
train_result_dict = {
    'name': 'cnn_4',
    'loss_curve': avg_loss_curve,
    'train_acc_curve': avg_train_acc_curve,
    'val_acc_curve': avg_val_acc_curve,
    'test_acc': best_test_acc,
    'weights': best_weights
}

# Save the dictionary object to a file
filename = 'part3_cnn_4.pkl'
with open(filename, 'wb') as f:
    pickle.dump(train_result_dict, f)

# Include part3Plots and visualizeWeights functions to continue

In [None]:
results = [train_result_dict]

part3Plots(results, save_dir=r'C:/Users/Yasin', filename='part3Plots')

In [None]:
weights = train_result_dict['weights']
visualizeWeights(weights, save_dir='C:/Users/yasin', filename='input_weights')

In [None]:
weights = params_cnn_4
visualizeWeights(weights, save_dir='C:/Users/yasin', filename='before_train_weights')