<a href="https://colab.research.google.com/github/liranbd1/Fashion_MNIST_FFNN_HW/blob/main/Deep_Learning_HW1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**NOTES ON WHAT TO WORK NEXT**

1. Connect this notebook to my Google Drive
2. Change the save methods to direct data into my Google Drive


Importing external libraries

In [None]:
import numpy as np
import torch as torch
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torch.utils.data.sampler import SubsetRandomSampler
from google.colab import files
import inspect
import os
import math

**Generic Feed-Forward Network**

* n_features - number of input features for the network

* n_hidden_units_per_layer - Expecting a list of integers where the index  represent the layer and value the number of neurons.

* n_outputs - Number of neurons in the output layer

* activation_fun - The activation function for all the levels

In [None]:
class GenericFeedForwardNetwork(torch.nn.Module):
  def __init__(self, n_features, n_hidden_units_per_layer, n_outputs, activation_fun):
    non_linear_act_fun = {'relu': torch.nn.ReLU, 'tanh': torch.nn.Tanh, 'sigmoid': torch.nn.Sigmoid, 'Non': None}
    super().__init__()
    dim_list = [n_features, *n_hidden_units_per_layer, n_outputs]
    layers = []
     
    for index, (in_dim, out_dim) in enumerate(zip(dim_list[:-1], dim_list[1:])):
      if non_linear_act_fun[activation_fun] !=None and index != len(dim_list) - 1:
        layers += [
                  torch.nn.Linear(in_dim, out_dim, bias = True),
                  non_linear_act_fun[activation_fun]()  
                  ]
      else:
        layers += [
                  torch.nn.Linear(in_dim, out_dim, bias = True)
                  ]

    self.fc_layers = torch.nn.Sequential(*layers)
    self.softmax = torch.nn.LogSoftmax(dim = 1)

  def forward(self, x):
    h = self.fc_layers(x)
    y_pred = self.softmax(h)
    return y_pred

**Utility Functions**

training - Refactoring the training loop for more modularity 
* epochs - number of times we want to go over the whole dataset

* learning_rate - The learning rate 

* optimizer - The optimizer function 

* loss_function - The loss function 

* model - A Feed-Forward neural network model



---
calculate_accuracy - Calculate the accuracy of a given model 
* data_loader - The DataLoader object we want to use as the test set for the model
* model - A Feed-Forward neural network model

---
plot_train_validation_by_epochs - Reciving the data of the epochs, train and validation accuracy to plot two line charts, we refactored this code to a utility function since it is asked in every function
* epochs - The number of epochs to create our x-axis
* train_acc - a list of all the training accuracy for each epochs
* validation_acc - a list of all the validation accuracy for each epochs

All the indices are synced meaning that the first index in train_acc and validation_acc are the first accuarcy values for the 1st epoch





In [None]:
def training(epochs, optimizer, loss_function, model, scheduler = None, train_DL = None, calc_loss=False, patience = -1):
  train_acc = []
  val_acc = []
  loss_list_train = []
  loss_list_validation = []
  stopping_epoch = epochs
  min_epoch = 0
  minimal_loss = math.inf
  patience_count = 1000 #Random higg value
  train_dataloader = train_data if train_DL == None else train_DL
  if patience > 0:
    patience_count = patience 
  for i in range(epochs):
    current_epoch = i 
    for j, (data, label) in enumerate(train_dataloader):
      optimizer.zero_grad()
      data = data.view(-1, input_size).to(device)
      y_hat = model(data)
      loss = loss_function(y_hat, label.to(device))

      loss.backward()
      optimizer.step()
   
    if scheduler != None:
      scheduler.step()
    if calc_loss: # If we are calculating loss per epochs
      loss_list_train.append(loss.detach()) # Saving the Training loss
      model.eval() # Letting the model now we are evaluating so no updates done
      with torch.no_grad(): # Saving on memory by not calculating the grads during eval
        # Simulate train loop on validation data
        for j, (data, label) in enumerate(validation_data):
          optimizer.zero_grad()
          data = data.view(-1, input_size).to(device)
          y_hat = model(data)
          loss = loss_function(y_hat, label.to(device))

          optimizer.step()
      
        if scheduler != None:
          scheduler.step()
        loss_value = loss.detach()  
        loss_list_validation.append(loss_value) #Saving Validation loss
        if loss_value < minimal_loss:
          minimal_loss = loss_value
          torch.save(model.state_dict(), "./data/best_model.pth")
          min_epoch = i
          patience_count = patience
        elif loss_value > minimal_loss:
          patience_count -= 1
       
      model.train() # indicating we are going back to train
    
    train_acc.append(calculate_accuracy(train_dataloader, model))
    val_acc.append(calculate_accuracy(validation_data, model))
    if patience_count == 0:
      stopping_epoch = current_epoch+1 # Since we are starting from 0
      break
  
  if calc_loss:
    return train_acc, val_acc, loss_list_train, loss_list_validation, stopping_epoch, min_epoch
  else:
    return train_acc, val_acc

In [None]:
def calculate_accuracy(data_loader, model, line_title = None):
  correct_count = 0
  total_count = 0
  for j, (data,label) in enumerate(data_loader):
    data = data.view(-1, input_size).to(device)
    y_hat = model(data)
    predictions = torch.argmax(y_hat, dim=1)
    correct_count += torch.sum(predictions == label.to(device)).type(torch.float32)
    total_count += data.shape[0]
  accuracy = (correct_count/ total_count).item()*100
  
  # Real world scenarion I would save the accuracies to a dictionary 
  # and then write the dict into a Json file.
  if line_title != None:
    with open(f"./data/accuracy_files/{line_title}.txt", 'w') as file:
      file.write(f"accuracy : {accuracy} %")

  return accuracy

In [None]:
def plot_report(x_axis, title, y_axis, number_of_neurons):
  if type(x_axis) == int:
    x_axis_list = list(range(x_axis))
  else:
    x_axis_list = x_axis
  file_name = f"{inspect.stack()[1].function}_{number_of_neurons}_{title}"
  file_path = f"./data/plots/{file_name}"
  fig = plt.figure()
  plt.plot(x_axis_list, y_axis, 'r')
  plt.title(f"{title}")

  fig.savefig(file_path)


**Question 1**

Loading the datasets, with the transform showed in the Tirgol.

After loading the train dataset we split it to 80/20 by randominzing the indices and using the sampler attribute of the DataLoader.

All three DataLoaders are returned from the function to global variables

In [None]:
def load_dataset():
  # Transform data
  normalize = transforms.Normalize((0.1307,), (0.3081,))
  totensor = transforms.ToTensor()
  fashion_mnist_transform = transforms.Compose([totensor, normalize])
  # Loading the train and test datasets 
  init_dataset = datasets.FashionMNIST(root='./data', train=True, download=True, transform= fashion_mnist_transform)
  test_set = datasets.FashionMNIST(root='./data', train=False, download=True, transform= fashion_mnist_transform)
  train_size = int(len(init_dataset)*0.8)
  validation_size = int(len(init_dataset)*0.2) 
  # Splitting the train dataset to train and validation sets      
  train_set, validation_set = torch.utils.data.random_split(init_dataset, [train_size, validation_size])
  
  # Creating DataLoaders
  trainDataLoader = torch.utils.data.DataLoader(train_set, batch_size = 64)
  validationDataLoader = torch.utils.data.DataLoader(validation_set, batch_size = 64)
  testDataLoader = torch.utils.data.DataLoader(test_set,64, shuffle=False)
 
  return trainDataLoader, validationDataLoader, testDataLoader


**Global Variabels**

The global variabels are set after all the basic and utilities functions are defined.

In [None]:
# Hyper Parametesr

input_size = 28*28 #size of each image
output_size = 10
train_data, validation_data, test_data = load_dataset()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # One liner to set device to use GPU if avaiable else to use CPU
accuracy_folder_path = "./data/accuracy_files"
plots_folder_path = "./data/plots"

if not os.path.exists(accuracy_folder_path):
  os.makedirs(accuracy_folder_path)
  
if not os.path.exists(plots_folder_path):
  os.makedirs(plots_folder_path)

print(f"train_data: {len(train_data.dataset)}")
print(f"val_data: {len(validation_data.dataset)}")
print(f"test_data: {len(test_data.dataset)}")

**Function 2**

One hidden layer no activation

In [None]:
# Training the model Question 1

def one_hidden_layer_no_activation(number_of_neurons):
  epochs = 50
  learning_rate = 0.01
  model = GenericFeedForwardNetwork(input_size, [number_of_neurons], output_size, "Non").to(device)
  optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate) 
  los_fun = torch.nn.CrossEntropyLoss()
  # Take training loop out to a different function 
  train_acc, validation_acc = training(epochs, optimizer, los_fun, model)
  plot_report(epochs, "train accuracy by epochs", train_acc, number_of_neurons)
  plot_report(epochs, "validation accuracy by epochs", validation_acc, number_of_neurons)
  
  test_acc= calculate_accuracy(test_data, model, f"{inspect.stack()[0].function}_{number_of_neurons}")

  print(f"Test data accuracy is {test_acc}")

**Function 3**

Two hidden layers sigmoid

In [None]:
def two_hidden_layers_sigmoid(number_of_neurons):
  epochs = 20
  learning_rate = 0.1
  model = GenericFeedForwardNetwork(input_size, [number_of_neurons, number_of_neurons], output_size, 'sigmoid').to(device)
  optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate)
  los_fun = torch.nn.CrossEntropyLoss()

  train_acc, validation_acc = training(epochs, optimizer, los_fun, model)
  
  plot_report(epochs, "train accuracy by epochs", train_acc, number_of_neurons)
  plot_report(epochs, "validation accuracy by epochs", validation_acc, number_of_neurons)
  

  test_acc = calculate_accuracy(test_data, model,f"{inspect.stack()[0].function}_{number_of_neurons}")

  print(f"Test data accuracy is {test_acc}")

**Function 4**

Two hidden layers ReLU

The idea we went in the function is as follow, we are creating n models (n = number of different learning rates) and training them in a loop, each iteration of the loop we save the model and the data for train and validation accuracy.

After finishing with training each model we find the validation accuracy for the given learning rate.

Since all data are saved in different lists at the same iterations so the indices will match, best validation accuraccy on the model index == best model


In [None]:
def two_hidden_layers_relu(number_of_neurons):
  
  # Initial parameters
  epochs = 20
  learning_rate_list = np.arange(0.01, 1, 0.1)
  los_fun = torch.nn.CrossEntropyLoss()
  model_data_list = [] # A list to hold the data
  val_acc_list = []

  # Training models over different learning rates
  for lr in learning_rate_list:
    # Generating new parameters
    model = GenericFeedForwardNetwork(input_size, [number_of_neurons, number_of_neurons], output_size, 'relu').to(device) # Model
    optimizer = torch.optim.SGD(model.parameters(), lr = lr) # New optimizer for each learning rate
    train_acc, val_acc = training(epochs, optimizer, los_fun, model) # Training on the new optimizer 
    model_data_list.append([model, train_acc, val_acc]) # Saving the model and the data for plots in 

    final_val_acc = calculate_accuracy(validation_data, model) 
    val_acc_list.append(final_val_acc) # Saving the validation accuracy on the trained model
    
  # Plotting the validation accuracy by learning rate
  plot_report(learning_rate_list, "validation accuracy by LR", val_acc_list, number_of_neurons)

  # Finding the max validation accuracy index 
  max_val_acc_index = val_acc_list.index(max(val_acc_list))
  best_model_data = model_data_list[max_val_acc_index]
  
  # Plotting the training and validation accuracy by epochs
  plot_report(epochs, "train accuracy by epochs", best_model_data[1], number_of_neurons)
  plot_report(epochs, "validation accuracy by epochs", best_model_data[2], number_of_neurons)
  
  test_acc = calculate_accuracy(test_data, best_model_data[0], f"{inspect.stack()[1].function}_{number_of_neurons}")

  print(f"Test data accuracy is {test_acc}")

**Fuction 5**

Two hidden layers ReLU SGD learning rate

In [None]:
def two_hidden_layers_relu_SGD_decreasing_lr(number_of_neurons):
  learning_rate = 0.01
  epochs = 20
  step_size = epochs / 5
  model = GenericFeedForwardNetwork(input_size,[number_of_neurons, number_of_neurons], output_size,'relu').to(device)
  optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate)
  loss_fun = torch.nn.CrossEntropyLoss()
  scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size, gamma=0.1, last_epoch=-1, verbose=False)
  
  train_acc, validation_acc = training(epochs, optimizer, loss_fun, model,scheduler)

  plot_by_epochs(epochs, 'Train', train_acc, number_of_neurons)
  plot_by_epochs(epochs, 'Validaiton', validation_acc, number_of_neurons)
  test_acc = calculate_accuracy(test_data, model, f"{inspect.stack()[0].function}_{number_of_neurons}")

  print(f"Test data accuracy is {test_acc}")

**Function 6**

Two hidden layers ReLU Adam

In [None]:
def two_hidden_layers_relu_adam(number_of_neurons):
  epochs = 30
  learning_rate = 0.001
  model = GenericFeedForwardNetwork(input_size, [number_of_neurons, number_of_neurons], output_size, 'relu').to(device)

  loss_fun = torch.nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

  train_acc, validation_acc = training(epochs, optimizer, loss_fun, model)

  plot_train_validation_by_epochs(epochs, train_acc, validation_acc, number_of_neurons)
  
  test_acc = calculate_accuracy(test_data, model, f"{inspect.stack()[0].function}_{number_of_neurons}")

  print(f"Test data accuracy is {test_acc}")

**Function 7**

Four hidden layers adam

In [None]:
def four_hidden_layers_adam(number_of_neurons):
  epochs = 250
  learning_rate = 0.001
  train_data_len = len(train_data.dataset)
  loss_fun = torch.nn.CrossEntropyLoss()
  new_train_data, rest_data = torch.utils.data.random_split(train_data.dataset, [int(0.1*train_data_len), int(0.9*train_data_len)])
  new_train_dataloader = torch.utils.data.DataLoader(new_train_data, batch_size=64)
  model = GenericFeedForwardNetwork(input_size,
                                    [number_of_neurons,number_of_neurons, number_of_neurons,number_of_neurons],
                                    output_size, 'relu').to(device)

  optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
  train_acc, validation_acc, train_loss, validation_loss, stop_point, min_point = training(epochs, optimizer, loss_fun, model, train_DL= new_train_dataloader, calc_loss=True)

  plot_report(epochs, "training loss by epochs", train_loss, number_of_neurons)
  plot_report(epochs, "validation loss by epochs", validation_loss, number_of_neurons)
  plot_report(epochs, "training accuracy by epochs", train_acc, number_of_neurons)
  plot_report(epochs, 'validation accuracy by epochs', validation_acc, number_of_neurons)

  test_acc = calculate_accuracy(test_data, model, f"{inspect.stack()[0].function}_{number_of_neurons}")

  print(f"Test data accuracy is {test_acc}")

**Function 8**

Four hidden layers adam weight decay

In [None]:
def four_hidden_layers_adam_weight_decay(number_of_neurons):
  epochs = 250
  learning_rate = 0.001
  train_data_len = len(train_data.dataset)
  loss_fun = torch.nn.CrossEntropyLoss()
  new_train_data, rest_data = torch.utils.data.random_split(train_data.dataset, [int(0.1*train_data_len), int(0.9*train_data_len)])
  new_train_dataloader = torch.utils.data.DataLoader(new_train_data, batch_size=64)
  model = GenericFeedForwardNetwork(input_size,
                                    [number_of_neurons,number_of_neurons, number_of_neurons,number_of_neurons],
                                    output_size, 'relu').to(device)

  optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate, weight_decay= 0.001)
  train_acc, validation_acc, train_loss, validation_loss, stop_point, min_point = training(epochs, optimizer, loss_fun, model, train_DL= new_train_dataloader, calc_loss=True)

  plot_report(epochs, "training loss by epochs", train_loss, number_of_neurons)
  plot_report(epochs, "validation loss by epochs", validation_loss, number_of_neurons)
  plot_report(epochs, "training accuracy by epochs", train_acc, number_of_neurons)
  plot_report(epochs, 'validation accuracy by epochs', validation_acc, number_of_neurons)

  test_acc = calculate_accuracy(test_data, model, f"{inspect.stack()[0].function}_{number_of_neurons}")

  print(f"Test data accuracy is {test_acc}")

**Function 9**

Four hidden layers adam early stopping


In [None]:
def four_hidden_layers_adam_early_stopping(number_of_neurons):
  epochs = 250
  learning_rate = 0.001
  train_data_len = len(train_data.dataset)
  loss_fun = torch.nn.CrossEntropyLoss()
  new_train_data, rest_data = torch.utils.data.random_split(train_data.dataset, [int(0.1*train_data_len), int(0.9*train_data_len)])
  new_train_dataloader = torch.utils.data.DataLoader(new_train_data, batch_size=64)
  model = GenericFeedForwardNetwork(input_size,
                                    [number_of_neurons,number_of_neurons, number_of_neurons,number_of_neurons],
                                    output_size, 'relu').to(device)

  optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate, weight_decay= 0.001)
  train_acc, validation_acc, train_loss, validation_loss, stop_point, min_point = training(epochs, optimizer, loss_fun, model, train_DL= new_train_dataloader, calc_loss=True, patience=20)
  
  plot_report(stop_point, "training loss by epochs", train_loss, number_of_neurons)
  plot_report(stop_point, "validation loss by epochs", validation_loss, number_of_neurons)
  plot_report(stop_point, "training accuracy by epochs", train_acc, number_of_neurons)
  plot_report(stop_point, 'validation accuracy by epochs', validation_acc, number_of_neurons)
  
  model.load_state_dict(torch.load("./data/best_model.pth"))
  model.eval()
  test_acc = calculate_accuracy(test_data, model , f"{inspect.stack()[0].function}_{number_of_neurons}")

  print(f"Test data accuracy is {test_acc}")

In [None]:
# Test Block

number_of_neurons = 4
one_hidden_layer_no_activation(number_of_neurons)
two_hidden_layers_sigmoid(number_of_neurons)
two_hidden_layers_relu(number_of_neurons)
two_hidden_layers_relu_SGD_decreasing_lr(number_of_neurons)
two_hidden_layers_relu_adam(number_of_neurons)

number_of_neurons = 32
one_hidden_layer_no_activation(number_of_neurons)
two_hidden_layers_sigmoid(number_of_neurons)
two_hidden_layers_relu(number_of_neurons)
two_hidden_layers_relu_SGD_decreasing_lr(number_of_neurons)
two_hidden_layers_relu_adam(number_of_neurons)
four_hidden_layers_adam(number_of_neurons)
four_hidden_layers_adam_weight_decay(number_of_neurons)
four_hidden_layers_adam_early_stopping(number_of_neurons)