# Imports

In [1]:
# Data manipulation and analysis
import pandas as pd
import numpy as np

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Machine learning
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

# Deep learning with TensorFlow
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Flatten
from tensorflow.keras.utils import to_categorical

# Deep learning with PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torchvision import models
from torchvision.models import resnet50, ResNet50_Weights
from torchvision.models import resnet18, ResNet18_Weights

# Natural Language Processing (NLP)
import nltk
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer

# Miscellaneous
import os
import re
import time
import pickle
from PIL import Image

from google.colab import drive
drive.mount('/content/drive')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Mounted at /content/drive


# Models

3D CNN

In [2]:
class CNN3d(nn.Module):
  def __init__(self):
    super(CNN3d, self).__init__()
    self.c1 = nn.Conv3d(3, 6, kernel_size = 4, padding = 1, stride = 2)
    self.c2 = nn.Conv3d(6, 16, kernel_size = 10, padding = 0, stride = 2)
    # 16 x 3 x 4 x 4 = 256
    self.flatten = nn.Flatten()
    self.fc = nn.Sequential(
      nn.Linear(16 * 3 * 4 * 4, 128),
      nn.Linear(128, 64),
      nn.Linear(64, 6),
      nn.Softmax()
    )

  def forward(self, x):
    x = self.c1(x)
    x = self.c2(x)
    x = nn.functional.relu(x)
    x = self.flatten(x)
    x = self.fc(x)
    return x

LSTM

In [3]:
class LSTM_Classifier(nn.Module):
  def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout):
    super(LSTM_Classifier, self).__init__()
    # Embedding layer converts integer sequences to vector sequences
    self.embedding = nn.Embedding(vocab_size, embedding_dim)
    # LSTM layer process the vector sequences
    self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers = n_layers, bidirectional = bidirectional, dropout = dropout, batch_first = True)
    #                     input           output                             bisexual?                      dropout_prob      whether or not batch_size comes first in the tensor.size()
    # Dense layer to predict
    self.fc = nn.Linear(hidden_dim * (2 if bidirectional == True else 1), output_dim)
    # Prediction activation function

  def forward(self, text, text_lengths):
    embedded = self.embedding(text) # embedded version
    # Thanks to packing, LSTM don't see padding tokens
    # and this makes our model better
    packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths, batch_first=True, enforce_sorted = False)
    packed_output, (hidden_state, cell_state) = self.lstm(packed_embedded)
    # Concatenating the final forward and backward hidden states
    hidden = torch.cat((hidden_state[-2,:,:], hidden_state[-1,:,:]), dim = 1)
    dense_outputs = self.fc(hidden)
    #Final activation function
    return dense_outputs

FC NN

In [4]:
class Smash(nn.Module):
  def __init__(self):
    super(Smash, self).__init__()
    self.fcnn = nn.Sequential(
      nn.Linear(10012, 12),
      nn.Linear(12, 6),
      nn.Softmax()
    )

  def forward(self, x):
    x = self.fcnn(x)
    return x

# Dataset

In [5]:
nltk.download('punkt')
nltk.download('stopwords')

data = torch.load("/content/drive/My Drive/Machine Learning/COSMOS/FINAL_PROJECT/DER/ekman.pt")
train_length = (int)(0.9 * len(data))
train_set, val_set = torch.utils.data.random_split(data, [train_length, len(data) - train_length])

train_loader = DataLoader(train_set, shuffle = True, pin_memory = True, num_workers = 2)
test_loader = DataLoader(val_set, shuffle = False, pin_memory = True, num_workers = 2)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [6]:
import os.path

frame_process = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor()
])

def crop(image):
  return image.crop((80, 58, 577, 428))

spect_process = transforms.Compose([
    transforms.Lambda(crop),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

frame_directory = "/content/drive/My Drive/Machine Learning/COSMOS/FINAL_PROJECT/DER/ekman6_split/"
spect_directory = "/content/drive/My Drive/Machine Learning/COSMOS/FINAL_PROJECT/DER/ekman6_spectro/"
folders = ["anger/", "disgust/", "fear/", "joy/", "sadness/", "surprise/"]
df = pd.read_csv("/content/drive/My Drive/Machine Learning/COSMOS/FINAL_PROJECT/DER/ekman6_texts/en_transcripts.csv")

def preprocess_text(text: str) -> str:
    # remove links
    text = re.sub(r"http\S+", "", text)
    # remove special chars and numbers
    text = re.sub("[^A-Za-z]+", " ", text)
    # remove stopwords
    # 1. tokenize
    tokens = nltk.word_tokenize(text)
    # 2. check if stopword
    tokens = [w.lower() for w in tokens if not w in stopwords.words("english")]
    return tokens

def get_frame_tensor(i):
  frames = []
  last_valid_filename = ""
  for j in range(1, 30):
    filename = str(i) + "_" + str(j) + ".jpg"
    if not os.path.isfile(frame_directory + folders[(int)(i/50)] + filename):
      filename = last_valid_filename
    if filename == "":
      print(str(i) + "_" + str(j) + ".jpg")
    file = Image.open(frame_directory + folders[(int)(i/50)] + filename)
    file = frame_process(file)
    frames.append(file)
    last_valid_filename = filename
  frames = torch.stack(frames)
  return frames

def get_spect_tensor(i):
  filename = str(i) + ".jpg"
  file = Image.open(spect_directory + folders[(int)(i/50)] + filename)
  file = spect_process(file)
  return file

dictionary = {
    'EMPTY': 1
}

def get_text_tensor(i):
  text = df.columns[i]
  text = preprocess_text(text)
  liszt = []
  for i in range(len(text)):
    if text[i] in dictionary:
      liszt.append((int)(dictionary[text[i]]))
    else:
      size = len(dictionary) + 1
      dictionary[text[i]] = size
      liszt.append((int)(dictionary[text[i]]))
  return torch.IntTensor(liszt).to(device), len(text)


In [7]:
text_tensors = []
text_lengths = []
MXLEN = 0

for i in range(300):
  tt, text_length = get_text_tensor(i)
  text_tensors.append(tt)
  text_lengths.append(text_length)
  MXLEN = max(MXLEN, (int)(tt.size(0)))

text_tensors = torch.nn.utils.rnn.pad_sequence(text_tensors, batch_first = True)

In [8]:
frame_tensors = []
spect_tensors = []

for i in range(300):
  frame_tensors.append(get_frame_tensor(i))
  spect_tensors.append(get_spect_tensor(i))

KeyboardInterrupt: 

# Training Loop

In [None]:
model1 = CNN3d().to(device)
model2 = resnet18(pretrained = True).to(device) # output == 1000 neurons
model3 = LSTM_Classifier(vocab_size = len(dictionary) + 1,
                         embedding_dim = 100,
                         hidden_dim = 64,
                         output_dim = 6,
                         n_layers = 5,
                         bidirectional = True,
                         dropout = 0.5).to(device)
model4 = Smash().to(device)

epochs = 10

optimizer1 = optim.Adam(model1.parameters(), lr = 0.001)
optimizer2 = optim.Adam(model2.parameters(), lr = 0.01)
optimizer3 = optim.Adam(model3.parameters(), lr = 0.01)
optimizer4 = optim.Adam(model4.parameters(), lr = 0.01)

loss_fn = nn.CrossEntropyLoss()

tl = []

def zerograd():
  optimizer1.zero_grad()
  optimizer2.zero_grad()
  optimizer3.zero_grad()
  optimizer4.zero_grad()

def step():
  optimizer1.step()
  optimizer2.step()
  optimizer3.step()
  optimizer4.step()

for i in range(epochs):
  model1.train()
  model2.train()
  model3.train()
  model4.train()
  train_loss = 0.00
  correct = 0.00
  total = 0.00
  for index, label in train_loader:
    index = index.cuda()
    label = label.cuda()
    text = text_tensors[index].to(device)
    if text_lengths[index] == 0:
      text = torch.IntTensor([dictionary['EMPTY']]).to(device)
      text = text.unsqueeze(0)
      text = text.unsqueeze(0)
      text_lengths[index] = 1
    spect = spect_tensors[index].to(device)
    frames = frame_tensors[index].to(device)
    frames.unsqueeze(0)
    frames = frames.transpose((1, 2))
    zerograd()
    y_frame = model1(frames).to(device)
    spect = spect.unsqueeze(0)
    assert spect.size(0) == 1
    y_spect = model2(spect).to(device)
    text = text.squeeze(1)
    y_text = model3(text, [text_lengths[index]]).to(device)
    input = torch.cat((y_frame, y_spect, y_text), dim=-1)
    y_pred = model4(input).to(device)
    labels = torch.full((1, 6,), 0.00).cuda()
    labels[0][label] = 1.00
    loss = loss_fn(y_pred, labels)
    loss.backward()
    step()
    train_loss += loss.item()/len(train_loader)
    prediction = y_pred.argmax(dim=1)
    correct += (prediction.eq(label).sum()).item()
    total += label.size(0)
  tl.append(train_loss)
  print(f"Epoch: {i+1}/{epochs}, Training Loss: {train_loss:.4f}, Training Accuracy: {correct/total:.4f}")

In [None]:
#print(model2.weights.grad)
print(model3.lstm._parameters['weight_ih_l0'].grad)

# Validation Loop