<a href="https://colab.research.google.com/github/mmdedavoodi/emotion-cnn-from-scratch/blob/main/Emotion_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!gdown 1oTQE8pGkq9rEvCLs89lYUjoIpjPgTzke

In [None]:
!unzip archive.zip

In [None]:
import random
import matplotlib.pyplot as plt
import torch
import cv2
import torchvision.transforms as t
import numpy as np
from glob import glob
from torchvision.datasets import DatasetFolder
from torch import nn
from tqdm import tqdm_notebook as tqdm
import torchvision
from google.colab.patches import cv2_imshow

In [None]:
file_test_path = glob('/content/test/*/*')
file_train_path = glob('/content/train/*/*')

In [None]:
len(file_test_path), len(file_train_path)

In [None]:
map_emotion = {
    "angry" : 0,
    "disgust" : 1,
    "fear" : 2,
    "happy" : 3,
    "neutral" : 4,
    "sad" : 5,
    "surprise" : 6
}

## Weights for Imbalanced Data

In [None]:
n_angry = len(glob('/content/train/angry/*'))
n_disgust = len(glob('/content/train/disgust/*'))
n_fear = len(glob('/content/train/fear/*'))
n_happy = len(glob('/content/train/happy/*'))
n_neutral = len(glob('/content/train/neutral/*'))
n_sad = len(glob('/content/train/sad/*'))
n_surprise = len(glob('/content/train/surprise/*'))

In [None]:
n_samples_list = np.array([n_angry, n_disgust, n_fear, n_happy, n_neutral, n_sad, n_surprise])
n_samples = n_samples_list.sum()
n_samples

In [None]:
weights = []
for n in n_samples_list:
  w = n_samples / (7 * n)
  weights.append(w)
weights

## Data Set

In [None]:
class EmotionDataSet(torch.utils.data.Dataset):
  def __init__(self, images_file_path, transform=None):
    self.images_file_path = images_file_path
    random.shuffle(self.images_file_path)
    self.transform = transform

  def __len__(self):
    return len(self.images_file_path)

  def __getitem__(self, idx):
    image_path = self.images_file_path[idx]
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    label = image_path.split("/")[-2]
    match label:
      case "angry":
        label = map_emotion['angry']
      case "disgust":
        label = map_emotion['disgust']
      case "fear":
        label = map_emotion['fear']
      case "happy":
        label = map_emotion['happy']
      case "neutral":
        label = map_emotion['neutral']
      case "sad":
        label = map_emotion['sad']
      case "surprise":
        label = map_emotion['surprise']

    if self.transform:
      image = self.transform(image)

    return image , label

In [None]:
transform_train = t.Compose([
    t.ToTensor(),
    t.RandomHorizontalFlip(p=0.5),
    t.RandomRotation(degrees=20),
    t.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5),
    t.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


transform_test = t.Compose([
    t.ToTensor(),
    t.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
DataSetTrain = EmotionDataSet(file_train_path, transform_train)
DataSetTest = EmotionDataSet(file_test_path, transform_test)

In [None]:
image , label = DataSetTrain[0]
plt.imshow(image.permute(1,2,0))
print(label)

## Data Loader

In [None]:
DataLoaderTrain = torch.utils.data.DataLoader(DataSetTrain, batch_size=64, shuffle=True , num_workers = 2, prefetch_factor = 2)
DataLoaderTest = torch.utils.data.DataLoader(DataSetTest, batch_size=64, shuffle=False , num_workers = 2, prefetch_factor = 2)

## Functions

In [None]:
def train_Phase(model , dataloader , optimizer , loss_function , device):
  model.train()
  train_phase_loss = 0
  train_phase_acc = 0
  sample = len(dataloader.dataset)
  pbar = tqdm(dataloader , desc = "Training")
  for image , label in pbar:
    image , label = image.to(device).to(torch.float32) , label.to(device).to(torch.int64)
    output = model(image).to(torch.float32)
    output_softmax = torch.softmax(output , dim = 1)
    loss_value = loss_function(output , label)
    loss_value.backward()
    pbar.set_postfix_str(f"Loss = {loss_value:.3f}")
    optimizer.step()
    optimizer.zero_grad()


    train_phase_loss += loss_value.item() * len(label)
    train_phase_acc += (output_softmax.argmax(dim = 1 ) == label).sum().item()

  train_phase_loss /= sample
  train_phase_acc /= sample
  return train_phase_loss , train_phase_acc

In [None]:
def test_Phase(model , dataloader , loss_function , device):
  model.eval()
  test_phase_loss = 0
  test_phase_acc = 0
  pbar = tqdm(dataloader , desc = "Testing")
  sample = len(dataloader.dataset)

  for image, label in pbar:
    image , label = image.to(device).to(torch.float32) , label.to(device).to(torch.int64)
    # label = label.view(-1, 1).float()
    with torch.no_grad():
      output = model(image).to(torch.float32)
      output_softmax = torch.softmax(output , dim = 1)
      loss_value = loss_function(output , label)
      pbar.set_postfix_str(f"Loss = {loss_value:.3f}")

      test_phase_loss += loss_value.item() * len(label)
      test_phase_acc += (output_softmax.argmax(dim = 1) == label).sum().item()

  test_phase_loss /= sample
  test_phase_acc /= sample

  return test_phase_loss , test_phase_acc

In [None]:
def run_epoch(model , train_dataloader , test_dataloader , optimizer , loss_function , device):
  train_loss , train_acc = train_Phase(model , train_dataloader , optimizer , loss_function , device)
  test_loss , test_acc = test_Phase(model , test_dataloader , loss_function , device)
  return train_loss , train_acc , test_loss , test_acc

In [None]:
def train_model(model , train_dataloader , test_dataloader , optimizer , loss_function , device , epochs = 10):
  train_loss = []
  train_acc = []
  test_loss = []
  test_acc = []
  for epoch in range(epochs):
    try:
      train_phase_loss , train_phase_acc , test_phase_loss , test_phase_acc = run_epoch(model , train_dataloader , test_dataloader , optimizer , loss_function , device)
      print(f"Epoch {epoch + 1}: train loss {train_phase_loss:4f}, train acc {train_phase_acc * 100:2f} | test loss {test_phase_loss:4f}, test acc {test_phase_acc * 100:2f}")
      train_loss.append(train_phase_loss)
      train_acc.append(train_phase_acc)
      test_loss.append(test_phase_loss)
      test_acc.append(test_phase_acc)
    except KeyboardInterrupt:
      break
  return train_loss , train_acc , test_loss , test_acc

In [None]:
def draw_plot(train_loss , train_acc , test_loss , test_acc):
  plt.figure(figsize=(10, 4))
  plt.subplot(1, 2, 1)
  plt.plot(train_loss, label="Train")
  plt.plot(test_loss, label="Test")
  plt.legend()
  plt.grid()
  plt.ylabel('Loss')
  plt.xlabel('Epoch')

  plt.subplot(1, 2, 2)
  plt.plot(train_acc, label="Train")
  plt.plot(test_acc, label="Test")
  plt.legend()
  plt.grid()
  plt.ylabel('Accuracy')
  plt.xlabel('Epoch')

  return plt

## Model Architecture

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
from torch.nn.modules.conv import Conv2d
model = nn.Sequential(
    nn.BatchNorm2d(3),

    nn.Conv2d(3 , 32 , 3 , 1 , 1),
    nn.ReLU(),
    nn.Conv2d(32 , 64 , 3 , 1 , 1),
    nn.ReLU(),
    nn.MaxPool2d(3,3),
    nn.BatchNorm2d(64),
    # nn.Dropout2d(0.1),

    nn.Conv2d(64 , 64 , 3 , 1 , 1),
    nn.ReLU(),
    # nn.Dropout2d(0.2),
    nn.Conv2d(64 , 128 , 3 , 1 , 1),
    nn.ReLU(),
    nn.MaxPool2d(2,2),
    nn.BatchNorm2d(128),
    # nn.Dropout2d(0.2),


    nn.Conv2d(128 , 128 , 3 , 1 , 1),
    nn.ReLU(),
    # nn.Dropout2d(0.2),
    nn.Conv2d(128 , 256 , 3 , 1 , 1),
    nn.ReLU(),
    nn.MaxPool2d(3,3),
    nn.BatchNorm2d(256),
    # nn.Dropout2d(0.3),

    nn.Flatten(),
    nn.Linear(256 * 2 * 2 , 7)

)

In [None]:
model.to(device)

## Training Model

In [None]:
optimizer = torch.optim.Adam(model.parameters() , lr = 0.001)
loss_function = nn.CrossEntropyLoss(weight = torch.tensor(weights).to(device).float())

In [None]:
train_loss , train_acc , test_loss , test_acc = train_model(model , DataLoaderTrain , DataLoaderTest , optimizer , loss_function , device , epochs = 60)

In [None]:
draw_plot(train_loss , train_acc , test_loss , test_acc)

In [None]:
script = torch.jit.script(model)
script.save("model_emotion.pt")

In [None]:
loaded_model = torch.jit.load("model_emotion.pt" , map_location='cpu##')
loaded_model.to(device)

In [None]:
idx = random.randint(0 , len(DataSetTest)+1)
image , label = DataSetTest[idx]
plt.imshow(image.permute(1,2,0))
print("Real:",label)
print("Predict:",loaded_model(DataSetTest[idx][0].unsqueeze(0).to(device)).softmax(dim=1).argmax(dim=1).item())

## Eval The Model

In [None]:
from sklearn.metrics import confusion_matrix , ConfusionMatrixDisplay , classification_report

In [None]:
def eval_model(model , dataloader , device):
  model.to(device)
  model.eval()
  y_true_list = []
  y_pred_list = []
  pbar = tqdm(dataloader)
  for image , label in pbar:
    image , label = image.to(device).to(torch.float32) , label.to(device).to(torch.int64)
    with torch.no_grad():
      output = model(image).to(torch.float32)
      output_softmax = torch.softmax(output , dim = 1)
      y_true_list.extend(label.tolist())
      y_pred_list.extend(output_softmax.argmax(dim = 1).tolist())
  return y_true_list , y_pred_list


In [None]:
y_true , y_pred = eval_model(loaded_model , DataLoaderTest , device)

In [None]:
print(classification_report(y_true , y_pred))

In [None]:
c = confusion_matrix(y_true , y_pred)
dis = ConfusionMatrixDisplay(c , display_labels = map_emotion.keys())
dis.plot()