In [None]:
import numpy as np
import pandas as pd
import os
import math
import seaborn as sea
import matplotlib.pyplot as plot
import torch
from torch.utils.data import TensorDataset
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ExponentialLR 
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm
from datetime import datetime

In [None]:
# Mount data from drive
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
# local path
import_dir = ''
# francesco drive path
#import_dir = '/content/drive/MyDrive/Colab Notebooks/DL/progetto/'
# christian drive path
#import_dir = '/content/drive/MyDrive/deep_learning/progetto/'

export_dir = import_dir + 'output_csv/'

#dataset_filename = 'fer2013.
dataset_filename = 'fer2021.csv'
#set n_features
n_features = 3 if (dataset_filename == 'fer2021.csv') else 1

dataset_filepath = import_dir + dataset_filename

# import dataset colab
#data = pd.read_csv(dataset_filepath)

In [None]:
#kaggle
import_dir = '/kaggle/input/'
export_dir = '/kaggle/working/output_csv/'
print(os.path.exists(export_dir))
if(not os.path.exists(export_dir)) :
    os.makedirs(export_dir)
print(os.path.exists(export_dir))

In [None]:
#kaggle
for dirname, _, filenames in os.walk(import_dir):
    for filename in filenames :
        if(filename == dataset_filename) :
            data = pd.read_csv(os.path.join(dirname, filename))

In [None]:
print(data)

In [None]:
print(data["Usage"].value_counts())

In [None]:
#emotions definition and plotting

#emozioni = ('rabbia', 'disgusto', 'paura', 'felicità',  'tristezza', 'sorpresa', 'neutrale')
emotions =  ('rage',   'disgust',  'fear',  'happiness', 'sadness',   'surprise', 'neutral')
y = data['emotion']

sx = sea.countplot(x=y)
plot.xticks(range(len(emotions)), emotions);
plot.xlabel("Emotions")
plot.ylabel("Count")

num_of_emotions = data['emotion'].value_counts().sort_index()
print(num_of_emotions)

In [None]:
#setting images constants
im_h = 48 #height
im_l = 48 #length
im_d = n_features  #depth

In [None]:
#conversione in np-array per guardarlo e in tensor per la rete e li scalo
#np-array of Tensor, each

def arrToNp(x, shape):
    temp = []
    for im in tqdm(x):
      temp.append(torch.Tensor(np.array(im.split()).reshape(shape).astype('double') / 255))
    return temp

shape = (im_d,im_h,im_l) if (n_features == 3) else (im_h,im_l,im_d)
imgsList = arrToNp(data['pixels'], shape)


In [None]:
#list of Tensor
print(type(imgsList))
print(len(imgsList))
#Tensor 48 x 48
print(type(imgsList[0]))
print(len(imgsList[0]))

In [None]:
def print_image(ima, labelI, val=False, subpl="",title=""):
    if subpl=="":
      plot.imshow((ima).detach().numpy().reshape((48,48)), cmap='gray')
      plot.title(emotions[labelI]+ ((" "+ str(labelI)) if val else ""))
    else :
      subpl.imshow((ima).detach().numpy().reshape((48,48)), cmap='gray')
      subpl.set_title(((" "+ str(title)) if title!="" else ""))

In [None]:
# seed the pseudorandom number generator
from random import seed
from random import random
# seed random number generator

def plot_examples(rows, cols, deterministic=False) :
  cur_emot = 0
  seed(None)
  idx = 0
  fig, axs = plot.subplots(rows, cols, figsize=(15,15))
  #print_image(imgsList[0], 0, subpl=axs[rows-1][cols-1]) # 1 feature
  print_image(imgsList[0][0], 0, subpl=axs[rows-1][cols-1])
  fig.delaxes(axs[rows-1,cols-1])
  #axs[rows-1,cols-1].set_axis_off()
  while (cur_emot < len(emotions)) :
    idx = int(random() * len(y)) if not(deterministic) else (idx + 1)
    if y[idx] == cur_emot :
      plot.figure(cur_emot)
      cur_row = cur_emot // cols
      cur_col = cur_emot % cols
      print_image(imgsList[idx][0], y[idx], title=emotions[cur_emot], subpl=axs[cur_row][cur_col])
      #print_image(imgsList[idx], y[idx], title=emotions[cur_emot], subpl=axs[cur_row][cur_col]) # 1 feature
      cur_emot += 1

plot_examples(2, 4)

In [None]:
def filterApply(image, filtro, l):
  newImage = torch.zeros(image.shape, dtype=image.dtype)
  z = w = 0
  for x in range(-1,image.shape[0]-l+1,1):
    for y in range(-1,image.shape[1]-l+1,1):
      for x1 in range(0,l):
        for y1 in range(0,l):
          newImage[z][w] += filtro(image[x+x1][y+y1] if (x+x1>=0 and y+y1>=0 and x<=image.shape[0]-l and y<=image.shape[1]-l) else 0,x1,y1)
      newImage[z][w] /= l*l
      w+=1
    z+=1
    w=0
  return newImage

In [None]:
sobol_filter = (lambda i,x,y: ((1-(x%2))*(1+(y%2))*(1-(x%3))*i +(1-(y%2))*(1+(x%2))*(1-(y%3))*i)/2)
vertical_filter = (lambda i,x,y: ((1-(y%2))*(1+(x%2))*(1-(y%3))*i))
horizontal_filter = (lambda i,x,y: ((1-(x%2))*(1+(y%2))*(1-(x%3))*i))
contrast = (lambda i,x,y: (i*i))
high_contrast = (lambda i,x,y: (i*i*i))
low_contrast = lambda i,x,y: math.sqrt(i)

In [None]:
seed(None)
img_idx = int(random() * len(y))
plot.figure(1)
fig, axs = plot.subplots(2, 3)
print_image(filterApply(imgsList[img_idx][0], horizontal_filter, 3),0,subpl=axs[0,0], title="horizontal")
print_image(filterApply(imgsList[img_idx][0], vertical_filter, 3),0,subpl=axs[0,1], title="vertical")
print_image(filterApply(imgsList[img_idx][0], sobol_filter, 3),0,subpl=axs[0,2], title="sobol")
print_image(filterApply(imgsList[img_idx][0], contrast, 2),0,subpl=axs[1,0], title="contrast")
print_image(filterApply(imgsList[img_idx][0], high_contrast, 2),0,subpl=axs[1,1], title="high_contrast")
print_image(filterApply(imgsList[img_idx][0], low_contrast, 2),0,subpl=axs[1,2], title="low_contrast")

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
else :
    device = torch.device('cpu')
#device = torch.device('cpu')
print(device)

In [None]:
y = torch.Tensor(y)
x = torch.cat((imgsList),0).view(len(imgsList),im_d,im_h,im_l)

batch_size  = 32

In [None]:
#split random by label (stratify=y)
seed(None)
split_seed = (int)(random() * 100)
split_seed = 99
print('split seed:', split_seed)

X_train, X_valid_test, y_train, y_valid_test = train_test_split(
      x, y, shuffle=True, stratify=y, test_size=0.3, random_state=split_seed)

X_test, X_valid, y_test, y_valid = train_test_split(
      X_valid_test, y_valid_test, shuffle=True, stratify=y_valid_test, test_size=0.5, random_state=split_seed)

In [None]:
y_valid_test_count_df = pd.DataFrame(y_valid_test, columns=['emotion']).value_counts().sort_index()

for row in y_valid_test_count_df.iteritems() :
  emot = (int)(row[0][0])
  count = row[1]
  print(emot, count)

In [None]:
X_train = X_train.to(device)
X_valid = X_valid.to(device)
X_test = X_test.to(device)

y_train = y_train.to(device, dtype=torch.long)
y_valid = y_valid.to(device, dtype=torch.long)
y_test = y_test.to(device, dtype=torch.long)


train_dataset = TensorDataset(X_train, y_train)
valid_dataset = TensorDataset(X_valid, y_valid)
test_dataset = TensorDataset(X_test, y_test)

trainloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True)
validloader = torch.utils.data.DataLoader(
    valid_dataset, batch_size=batch_size, shuffle=True)
testloader = torch.utils.data.DataLoader(
    test_dataset, batch_size=batch_size, shuffle=True)

In [None]:
print(x.size())
print(y.size())

print(len(trainloader.dataset))
print(len(validloader.dataset))
print(len(testloader.dataset))

In [None]:
class DynamicNetInceptions(nn.Module):

    def __init__(self, dropout_prob, conv__layer_repetitions=(1,1,1), conv__in_channels=1, conv__out_channels=(6,12,24), lin__out_dimension=(16*7*5,16,len(emotions)), incep__num_layers=4, incep__multiplier=2, im_h=im_h, im_l=im_l) :
      super(DynamicNetInceptions, self).__init__()

      # PARAMETERS SECTION
      self.pool_size = (2,2)
      self.kernel_size = (3,3)
      self.padding = 1
      self.num_of_conv_layers = len(conv__layer_repetitions) # default = 3
      self.num_of_lin_layers = len(lin__out_dimension) # default = 3
      self.dropout_prob = dropout_prob
        
      self.dropout = nn.Dropout2d(p=self.dropout_prob)
      self.skip = nn.Identity()
      
      in_chan = conv__in_channels
      self.convs = []

      if (len(conv__layer_repetitions) != len(conv__out_channels)) :
        print("ERROR CHECK SIZES!!")

      # CONVOLUTIONAL SECTION
      out_chan = 0
      for big_layer in range(self.num_of_conv_layers) : # default = 3
        out_chan = conv__out_channels[big_layer]
        cur_rep_big_layer = conv__layer_repetitions[big_layer]
        for repeat_layer in range(cur_rep_big_layer) : #a, b, c, ...
          self.convs += [ nn.Conv2d(in_chan, out_chan, self.kernel_size, padding=self.padding) ]
          #self.convs += [ nn.Dropout2d(p=self.dropout_prob) ]
          self.convs += [ nn.ReLU() ]
          in_chan = out_chan
        self.convs += [ nn.MaxPool2d(self.pool_size) ]

      # INCEPTION SECTION
      in_chan = out_chan
      self.incep__multiplier = incep__multiplier
      self.incep__num_layers = incep__num_layers
      self.incep_first = self.inception_module(in_chan)
      self.incep_after = []
      total_out_chan = incep__multiplier*(64 + 128 + 32 + 32) #256 * out_multiplier
      in_chan = total_out_chan
      for inception_layer in range(incep__num_layers - 1) :
          cur_incep = self.inception_module(in_chan)
          self.incep_after += [ cur_incep ]

      #final output dimension
      self.final_conv_dim = total_out_chan * (im_h // (2 ** self.num_of_conv_layers)) * (im_l // (2 ** self.num_of_conv_layers))

      # LINEAR FC SECTION
      in_dim = self.final_conv_dim

      self.linear_fc = []
      for lin_layer in range(self.num_of_lin_layers) :
        out_dim = lin__out_dimension[lin_layer]
        self.linear_fc += [ nn.Linear(in_dim, out_dim) ]
        #self.linear_fc += [ nn.Dropout2d(p=self.dropout_prob) ]
        self.linear_fc += [ nn.ReLU() ]
        in_dim = out_dim
      
      # SEQUENTIAL SECTION
      self.convs_seq = nn.Sequential(*self.convs).to(device)
      self.linear_fc_seq = nn.Sequential(*self.linear_fc).to(device)
      self.softmax = nn.Softmax(1).to(device)

    def inception_module(self, in_chan, out_1x1=64, out_3x3=[96,128], out_5x5=[16,32], out_pool=32) :
      
      mul = self.incep__multiplier
    
      branch_1x1 = nn.Sequential(
          nn.Conv2d(in_chan, out_1x1*mul, kernel_size=1) #conv 1x1
      ).to(device)

      branch_3x3 = nn.Sequential(
        nn.Conv2d(in_chan,          (out_3x3[0])*mul, kernel_size=1),           # conv 1x1
        nn.Conv2d((out_3x3[0])*mul, (out_3x3[1])*mul, kernel_size=3, padding=1) # conv 3x3
      ).to(device)

      branch_5x5 =  nn.Sequential(
        nn.Conv2d(in_chan,          (out_5x5[0])*mul, kernel_size=1),           # conv 1x1
        nn.Conv2d((out_5x5[0])*mul, (out_5x5[1])*mul, kernel_size=5, padding=2) # conv 5x5
      ).to(device)

      branch_pool =  nn.Sequential(
        nn.MaxPool2d(kernel_size=3, stride=1, padding=0), # max_pool 3x3
        nn.Conv2d(in_chan, out_pool*mul, kernel_size=1, stride=1, padding=1) # conv 1x1
      ).to(device)
        
      return [ branch_1x1, branch_3x3, branch_5x5, branch_pool ]

    def print_net(self) :
      print("__Convolutionals Start__")
      print(self.convs_seq)
      print("__Convolutionals End__")
      print(self.dropout)
      print("__Inception Start (with skip)__")
      print("Inception__1 with dim: N -> (256 * mul)")
      print(self.incep_first)
      print(f"Inception__2,...,{self.incep__num_layers} with dim: (256 * mul) -> (256 * mul)")
      print(self.incep_after[0])
      print("__Inception End__")
      #if (self.incep__num_layers > 0) :
      #    print("__Inception Start (with skip)__")
      #    print("Inception__0:")
      #    print(self.incep_first)
      #    for i in range(self.incep__num_layers - 1) :
      #        print(f"Inception__{i+1}")
      #        print(self.incep_after[i])
      #    print("__Inception End__")
      #print(f"Channel concat: torch.cat(branches)")
      print(self.dropout)
      print(f"Reshape(-1, {self.final_conv_dim})")
      print("__Linear Start__")
      print(self.linear_fc_seq)
      print("__Linear End__")
      print(self.softmax)

    def run_inception(self, x, inception) :
      x_1x1  = (inception[0])(x)
      x_3x3  = (inception[1])(x)
      x_5x5  = (inception[2])(x)
      x_pool = (inception[3])(x)
      concat = torch.cat((x_1x1, x_3x3, x_5x5, x_pool), 1)
      return concat
    
    def forward(self, x):

      #print(x.size())
      x = self.convs_seq(x)
    
      x = self.dropout(x)
        
      #print("after convs", x.size())
      if (self.incep__num_layers > 0) :
        x = self.run_inception(x, self.incep_first) 
        #print("after first", x.size()) 
        for inception in self.incep_after : 
          x = self.run_inception(x, inception) + self.skip(x)
          #print("after ith inception",x.size())

      x = self.dropout(x)
    
      x = x.reshape(-1, self.final_conv_dim)
      #print(x.size())
      
      x = self.linear_fc_seq(x)
      
      x = self.softmax(x)

      return x

net = DynamicNetInceptions(0.1,conv__layer_repetitions=(2,2,1), conv__in_channels=3)
net.print_net()

In [None]:
class DynamicNetBasic(nn.Module):

    def __init__(self, dropout_prob, conv__layer_repetitions=(1,1,1), conv__in_channels=1, conv__out_channels=(6,12,24), lin__out_dimension=(16*7*5,16,len(emotions)), im_h=im_h, im_l=im_l) :
      super(DynamicNetBasic, self).__init__()

      #params
      self.pool_size = (2,2)
      self.kernel_size = (3,3)
      self.padding = 1
      self.num_of_conv_layers = len(conv__layer_repetitions) # default = 3
      self.num_of_lin_layers = len(lin__out_dimension) # default = 3
      self.dropout_prob = dropout_prob
      
      in_chan = conv__in_channels
      self.convs = [] 

      if (len(conv__layer_repetitions) != len(conv__out_channels)) :
        print("ERROR")

      for big_layer in range(self.num_of_conv_layers) : # default = 3
        out_chan = conv__out_channels[big_layer]
        cur_rep_big_layer = conv__layer_repetitions[big_layer]
        for repeat_layer in range(cur_rep_big_layer) : #a, b, c, ...
          self.convs += [ nn.Conv2d(in_chan, out_chan, self.kernel_size, padding=self.padding) ]
          #self.convs += [ nn.Dropout2d(p=self.dropout_prob) ]
          self.convs += [ nn.ReLU() ]
          in_chan = out_chan
        self.convs += [nn.MaxPool2d(self.pool_size)]

      self.convs += [ nn.Dropout2d(p=self.dropout_prob) ]
      
      #final output dimension
      self.final_conv_dim = out_chan * (im_h // (2 ** self.num_of_conv_layers)) * (im_l // (2 ** self.num_of_conv_layers))

      # linears
      in_dim = self.final_conv_dim

      self.linear_fc = []
      for lin_layer in range(self.num_of_lin_layers) :
        out_dim = lin__out_dimension[lin_layer]
        self.linear_fc += [ nn.Linear(in_dim, out_dim) ]
        #self.linear_fc += [ nn.Dropout2d(p=self.dropout_prob) ]
        self.linear_fc += [ nn.ReLU() ]
        in_dim = out_dim
      
      self.softmax = nn.Softmax(1)

      self.features = nn.Sequential(*self.convs).to(device)
      self.linear_fc = nn.Sequential(*self.linear_fc).to(device)

      #print(self.features)
      #print(self.linear_fc)

    def print_net(self) :
      print(self.features)
      print(f"Reshape(-1, {self.final_conv_dim})")
      print(self.linear_fc)
      print(self.softmax)

    def forward(self, x):
      
      x = self.features(x)
      x = x.reshape(-1, self.final_conv_dim)
      x = self.linear_fc(x)
      
      x = self.softmax(x)

      return x

net = DynamicNetBasic(0.1,conv__layer_repetitions=(2,2,1), conv__in_channels=3)
net.print_net()

In [None]:
class DynamicNet(nn.Module):
    def __init__(self, dropout_prob, a=1, b=1, c=1, in_channels=1, im_h=im_h, im_l=im_l) :
      super(DynamicNet, self).__init__()

      #params
      self.pool_size = (2,2)
      self.kernel_size = (3,3)
      self.padding = 1
      self.num_of_conv_layers = 3
      self.dropout_prob = dropout_prob

      #convolutionals
      in_chan = in_channels
      out_chan = 6
      self.conv2D = [nn.Conv2d(in_channels, out_chan, self.kernel_size, padding=self.padding), nn.ReLU() ]
      self.conv2D += [(nn.Conv2d(out_chan, out_chan, self.kernel_size, padding=self.padding), nn.ReLU()) for i in range(a-1)]
      self.conv2D += [nn.MaxPool2d(self.pool_size)]

      in_chan = out_chan
      out_chan = out_chan * 2
      self.conv2D += [(nn.Conv2d(in_chan,out_chan,self.kernel_size, padding=self.padding)), nn.ReLU()]
      self.conv2D += [(nn.Conv2d(out_chan,out_chan,self.kernel_size, padding=self.padding), nn.ReLU()) for i in range(b-1)]
      self.conv2D += [nn.MaxPool2d(self.pool_size)]

      in_chan = out_chan
      out_chan = out_chan * 2
      self.conv2D += [(nn.Conv2d(in_chan,out_chan,self.kernel_size, padding=self.padding)), nn.ReLU()]
      self.conv2D += [(nn.Conv2d(out_chan,out_chan,self.kernel_size, padding=self.padding), nn.ReLU()) for i in range(c-1)]
      self.conv2D += [nn.MaxPool2d(self.pool_size)]
 
      #final output dimension
      self.final_conv_dim = out_chan * (im_h // (2 ** self.num_of_conv_layers)) * (im_l // (2 ** self.num_of_conv_layers))

      # linears
      in_dim = self.final_conv_dim
      out_dim = 16*7*5
      self.linear_fc  =  [nn.Linear(in_dim, out_dim), nn.ReLU()]

      for layer in self.conv2D :
        print(layer)

      in_dim = out_dim
      out_dim = 16
      self.linear_fc +=  [nn.Linear(in_dim, out_dim), nn.ReLU()]

      in_dim = out_dim
      out_dim = len(emotions)
      self.linear_fc +=  [nn.Linear(in_dim, out_dim), nn.ReLU()]
      
      self.features = nn.Sequential(*self.conv2D)
      self.linear_fc = nn.Sequential(*self.linear_fc)

    def forward(self, x):

      x = self.features(x)
      x = x.reshape(-1, self.final_conv_dim)
      x = self.linear_fc(x) # RuntimeError: shape '[-1, 17150]' is invalid for input of size 82688

      return x

net = DynamicNet(0.1)

In [None]:
class Optimization:
    def __init__(self, model, loss_fn, optimizer, scheduler=None):
        self.model = model
        self.loss_fn = loss_fn
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.train_losses = []
        self.val_losses = []
        self.train_accuracies = []
        self.val_accuracies = []
    
    def count_preds_correct(self, y_pred, y) :
      preds = torch.argmax(y_pred, axis=1)
      count = preds.eq(y.data.view_as(preds)).cpu().sum()
      #print(count, preds.size(), y.size())
      #for i in range(preds.size()[0]) :
      #      print(preds[i], y[i], preds[i].eq(y[i].data.view_as(preds[i])).cpu())
      #print("end")
      return count
    
    def train_step(self, x, y):
        #count correct predictions
        preds_correct = 0
        # Set training mode
        self.model.train()
        # Predict
        y_pred = self.model(x)
        # Computes loss, gradients
        #print(y.size(), y_pred.size())
        loss = self.loss_fn(y_pred, y)
        loss.backward()
        # Updates parameters, set to zero gradients
        self.optimizer.step()
        self.optimizer.zero_grad()
        # Return loss, num of correct predictions
        x = self.count_preds_correct(y_pred, y)
        preds_correct += x
        return loss.item(), preds_correct
    
    def train(self, train_loader, val_loader, batch_size=64, n_epochs=50, n_features=1, im_h=48, im_l=48):
        for epoch in range(n_epochs + 1):
            #print(f"epoch: {epoch}")
            batch_train_losses = []
            batch_train_accs = []
            train_i = 0
            val_i = 0
            train_correct = 0
            val_correct = 0
            
            #for x in train_loader.dataset.tensors :
              #print(type(x), x.get_device()) # CUDA -> 0, CPU -> -1

            #print("training")
            for idx, batch in enumerate(train_loader):
                x_batch, y_batch = batch        
                x_batch, y_batch = x_batch.to(device), y_batch.to(device)

                #now = datetime.now().strftime("%H:%M:%S")
                #print(f"{now} epoch: {epoch}, train_i: {train_i}, val_i: {val_i}")
                #x_batch = x_batch.view([cur_batch_size, -1, n_features]).to(device)
                cur_batch_size = len(x_batch)
                x_batch = x_batch.view([cur_batch_size, n_features, im_h, im_l]).to(device)
                y_batch = y_batch.to(device)
                #print(x_batch.get_device(), y_batch.get_device())
                #print(f"x_batch.size: {x_batch.size()}, y_val: {y_batch.size()}")
                #cur_train_correct = 0
                loss, train_correct = self.train_step(x_batch, y_batch)
                batch_train_accs.append(train_correct / cur_batch_size)
                batch_train_losses.append(loss)
                train_i = train_i + 1
            train_acc = np.mean(batch_train_accs)
            train_loss = np.mean(batch_train_losses)
            self.train_accuracies.append(train_acc)
            self.train_losses.append(train_loss)
            if (self.scheduler) is not None:
              self.scheduler.step()
  
            #print("validation")
            with torch.no_grad():
                batch_val_losses = []
                batch_val_accs = []
                for x_val, y_val in val_loader:
                    #now = datetime.now().strftime("%H:%M:%S")
                    #print(f"{now} epoch: {epoch}, train_i: {train_i}, val_i: {val_i}")
                    cur_batch_size = len(x_val)
                    #print(f"len(x_val): {cur_batch_size}")
                    #x_val = x_val.view([cur_batch_size, -1, n_features]).to(device)
                    x_val = x_val.view([cur_batch_size, n_features, im_h, im_l]).to(device)
                    y_val = y_val.to(device)
                    #print(f"x_val.size: {x_val.size()}, y_val: {y_val.size()}")
                    self.model.eval()
                    y_pred = self.model(x_val)
                    val_correct = self.count_preds_correct(y_pred, y_val)
                    val_loss = self.loss_fn(y_pred, y_val).item()
                    batch_val_acc_cur = (val_correct / cur_batch_size)
                    #print(val_correct, cur_batch_size, batch_val_acc_cur)
                    batch_val_accs.append(batch_val_acc_cur)
                    batch_val_losses.append(val_loss)
                    val_i = val_i + 1
                val_acc = np.mean(batch_val_accs)
                val_loss = np.mean(batch_val_losses)
                self.val_accuracies.append(val_acc)
                self.val_losses.append(val_loss)

            if (epoch % 1 == 0):
                now = datetime.now().strftime("%H:%M:%S")
                print(f"{now} [{epoch:2d}/{n_epochs}] Train: [ loss: {train_loss:.8f}, acc: {train_acc:.8f} ] \t Validation: [ loss: {val_loss:.8f}, acc: {val_acc:.8f} ]")

        return self.model.state_dict()

    def evaluate(self, test_loader, batch_size=1, n_features=1, im_h=48, im_l=48):
        #print("testing")
        with torch.no_grad():
            predictions = []
            real_values = []
            for x_test, y_test in test_loader:
                cur_batch_size = len(x_test)
                #x_test = x_test.view([cur_batch_size, -1, n_features]).to(device)
                x_test = x_test.view([cur_batch_size, n_features, im_h, im_l]).to(device)
                y_test = y_test.to(device)
                self.model.eval()
                y_pred = self.model(x_test)
                predictions.append(y_pred.to('cpu').detach().numpy())
                real_values.append(y_test.to('cpu').detach().numpy())

        return predictions, real_values      

    def plot_losses(self):
        plot.plot(self.train_losses, label="Training loss")
        plot.plot(self.val_losses, label="Validation loss")
        plot.legend()
        plot.title("Evolution of losses over time")
        plot.xlabel("Epochs")
        plot.ylabel("Losses")
        plot.show()
        plot.close()

    def plot_accuracies(self):
        plot.plot(self.train_accuracies, label="Training accuracy")
        plot.plot(self.val_accuracies, label="Validation accuracy")
        plot.legend()
        plot.title("Evolution of accuracies over time")
        plot.xlabel("Epochs")
        plot.ylabel("Accuracies")
        plot.show()
        plot.close()
  
    def plot_losses_accuracies(self, figsize=(12,4)):
        fig, axs = plot.subplots(1, 2, figsize=figsize)
        #losses
        plot_1 = axs[0]
        plot_1.plot(self.train_losses, label="Training loss")
        plot_1.plot(self.val_losses, label="Validation loss")
        plot_1.legend()
        plot_1.set_title("Evolution of losses over time")
        plot_1.set_xlabel("Epochs")
        plot_1.set_ylabel("Losses")
        #accuracies
        plot_2 = axs[1]
        plot_2.plot(self.train_accuracies, label="Training accuracy")
        plot_2.plot(self.val_accuracies, label="Validation accuracy")
        plot_2.legend()
        plot_2.set_title("Evolution of accuracies over time")
        plot_2.set_xlabel("Epochs")
        plot_2.set_ylabel("Accuracies")
        

In [None]:
#R1 NO INCEPTION:
#n_epochs = 40
#learning_rate = 5e-5
#weight_decay = 1e-9
#dropout_prob = 0.35
#n_features=3

#dynamic net parameters
#conv part:
#conv__in_channels=3
#conv__out_channels=(288,566,1122,2244)
#conv__layer_repetitions=(4,3,2,1) #a=3, b=2, c=1
#linear part:
#lin__out_dimension=(1024,356,158,64,len(emotions)

In [None]:
#R2 YES INCEPTION:
n_epochs = 30
learning_rate = 1e-4
weight_decay = 1e-8
dropout_prob = 0.62

#dynamic net parameters
#conv part:
conv__in_channels=n_features     #num_of_channels in dataset = 3: original, 
conv__out_channels=     (200,400,600,800) #out channels of each conv
conv__layer_repetitions=(  2,  2,  2,  1) #number of repetitions of i-th conv before go to next one, first has channels (N-1 -> N), others (N -> N)
#linear part:
lin__out_dimension=(432, 108, 27, len(emotions)) #out dimension of fc, last one = 7
#inception part:
incep__num_layers=30 #num of inspection modules, NB the first has shape (N -> 256*mul), others (256*mul -> 256*mul)
incep__multiplier=3  #multiplier of the default out dim of resnet: (64 for 1x1, 128 per 3x3, 32 per 5x5, 32 per maxpool)
#NB: reshape = 256 * incep__multiplier * ((48 // (2 ** num_of_conv_layers)) ** 2) = 6912 (if mul=3, layers=4)

model = DynamicNetInceptions(dropout_prob, conv__layer_repetitions, conv__in_channels, conv__out_channels, lin__out_dimension, incep__num_layers, incep__multiplier)
#model = DynamicNetBasic(dropout_prob, conv__layer_repetitions, conv__in_channels, conv__out_channels, lin__out_dimension)
next(model.parameters()).device
model.print_net()

loss_fn = torch.nn.CrossEntropyLoss(reduction='sum') 
#loss_fn = torch.nn.KLDivLoss(reduction='sum') 
#loss_fn = torch.nn.NLLLoss(reduction='sum')
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
#optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9, weight_decay=weight_decay)
scheduler = ExponentialLR(optimizer, gamma=0.9)
#scheduler = MultiStepLR(optimizer, milestones=[30,80], gamma=0.1)
#train
opt = Optimization(model=model, loss_fn=loss_fn, optimizer=optimizer)
model_state_dict = opt.train(trainloader, validloader, batch_size=batch_size, n_epochs=n_epochs, n_features=3)
opt.plot_losses_accuracies() 
#test
y_pred_numpy, y_test_numpy = opt.evaluate(testloader, batch_size=1, n_features=n_features)

In [None]:
def flat_list(main_list):
    return [item for sub_list in main_list for item in sub_list]

#remove batch divisions, get only predn_featuress
y_pred_list = [ np.argmax(item) for item in flat_list(y_pred_numpy)]
y_test_list = flat_list(y_test_numpy)

num_of_emotions = len(emotions)
true_preds  = [ 0 for _ in range(num_of_emotions) ]
false_preds = [ 0 for _ in range(num_of_emotions) ]

for i in range(len(y_test_list)) :
  emot_pred = y_pred_list[i]
  emot_real = y_test_list[i]
  if emot_pred == emot_real :
    true_preds[emot_real] += 1
  else :
    false_preds[emot_real] += 1

accuracy = [ (true_preds[i] / (true_preds[i] + false_preds[i])) for i in range(num_of_emotions) ]
total_approx_acc = np.mean(accuracy)
total_true = np.sum(true_preds)
total_false = np.sum(false_preds)
total_real_acc = total_true / (total_true + total_false)

analysis_df = pd.DataFrame(
    list(zip(list(emotions), true_preds, false_preds, accuracy)), 
    columns=['emotions', 'true_preds', 'false_preds', 'accuracy']
)
print(f"{total_approx_acc:.5f} mean of accuracy of each label")
print(f"{total_real_acc:.5f} accuracy of all test set")
print(analysis_df)

In [None]:
# store y_pred, y_real to df
y_pred_real_df = pd.DataFrame(
    list(zip(y_pred_list, y_test_list)), 
    columns=['y_pred_list', 'y_test_list']
)
print(y_pred_real_df)

In [None]:
# store parameters to df
params_dict = {
    'n_epochs':                 [n_epochs],
    'learning_rate':            [learning_rate],
    'weight_decay':             [weight_decay],
    'dropout_prob':             [dropout_prob],
    'conv__in_channels':        [conv__in_channels],
    'conv__out_channels':       [conv__out_channels],
    'conv__layer_repetitions':  [conv__layer_repetitions],
    'lin__out_dimension':       [lin__out_dimension],
    'loss_fn':                  [loss_fn],
    'optimizer':                [optimizer.__class__.__name__],
    'split_seed':               [split_seed],
    'incep__num_layers':        [incep__num_layers],
    'incep__multiplier':        [incep__multiplier]
}

params_df = pd.DataFrame(params_dict)
print(params_df)

In [None]:
# store losses and accuracies to df
losses_accuracies_df = pd.DataFrame(
    list(zip(opt.train_losses, opt.val_losses, opt.train_accuracies, opt.val_accuracies)),
    columns=['train_losses', 'val_losses', 'train_accuracies', 'val_accuracies']
)
print(losses_accuracies_df)

In [None]:
import scikitplot
scikitplot.metrics.plot_confusion_matrix(y_pred_list, y_test_list, figsize=(7,7))
plot.savefig("confusion_matrix.png")

In [None]:
#export parameters, analysis to csv file
now = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
extension = '.csv'

to_export = {
    "_y_pred_real_":       y_pred_real_df,
    "_params_" :           params_df,
    "_analysis_":          analysis_df,
    "_losses_accuracies_": losses_accuracies_df
}

for name_cur, df_cur in to_export.items() :
    filename = (export_dir + now + name_cur + extension)
    df_cur.to_csv(filename, sep=';')