<a href="https://colab.research.google.com/github/mamonalsalihy/Emotion_Detection/blob/main/Models/MLP_github_copy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# loading in the dependencies

from tqdm.auto import tqdm
import os
import pandas as pd
import csv
import pickle as pkl
import nltk
import matplotlib.pyplot as plt
import seaborn as sea
import re
import os
import spacy

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np

from tqdm.auto import tqdm
from torch.utils.data import DataLoader, Dataset

from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

import math
from itertools import chain

import gensim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

Must upload the dataset splits to session to read the data.

In [None]:
test_path = './test.csv'
train_path = './train.csv'
valid_path = './valid.csv'

In [None]:
with open(train_path, "r") as intrain: 
    train = pd.read_csv(intrain)
with open(valid_path, "r") as indev: 
    valid = pd.read_csv(indev)
with open(test_path, "r") as intest: 
    test = pd.read_csv(intest)
print(train)

FileNotFoundError: ignored

In [None]:
train_speaker = train.loc[train["speaker_label"] == "speaker"]
train_listener = train.loc[train["speaker_label"] == "listener"]

valid_speaker = valid.loc[valid["speaker_label"] == "speaker"]
valid_listener = valid.loc[valid["speaker_label"] == "listener"]

test_speaker = test.loc[test["speaker_label"] == "speaker"]
test_listener = test.loc[test["speaker_label"] == "listener"]

##Feature Builder Class

In [None]:
# We'll be using SkLearn's TfIdfVectorizer to construct our n-gram feature vectors

# We'll create our custom sequencer class for converting a text into a sequence of integers corresponding to our tokens
class Sequencer(object):
    def __init__(self, corpus, bos_token='<s>', eos_token='</s>', unk_token='<unk>', pad_token='<pad>'):
        self.word2idx = {}
        self.idx2word = {}

        self.unk_index = self.add_token(unk_token) 
        self.pad_index = self.add_token(pad_token)
        self.bos_index = self.add_token(bos_token)
        self.eos_index = self.add_token(eos_token)
        self.nlp = spacy.load('en')
        self.tokenizer = lambda text: [t.text for t in self.nlp(text)]

    def add_token(self, token):

        self.word2idx[token] = new_index = len(self.word2idx)
        self.idx2word[new_index] = token

        return new_index

    def encode(self, text):
        # Input will look like:
        # [<s>, w1, w2, ..., wn, </s>]
        tokens = self.tokenizer(text)

        sequence = [self.bos_index]
        for token in tokens:

            index = self.word2idx.get(token, self.unk_index)
            sequence.append(index)
        sequence.append(self.eos_index)

        return sequence

    def create_padded_tensor(self, sequences):
        # Given a list of sequences, pad all to the same length

        max_seq_len = max(len(sequence) for sequence in sequences)
        tensor = torch.full((len(sequences), max_seq_len), self.pad_index, dtype=torch.long)

        for i, sequence in enumerate(sequences):
            for j, token in enumerate(sequence):
                tensor[i][j] = token
        
        return tensor

# For converting labels into indices
class LabelIndexer(object):
    def __init__(self, labels):
        self.label2idx = {label: i for i, label in enumerate(labels)}
        self.idx2label = {i:label for label, i in self.label2idx.items()}
        self.labels = labels

    def encode(self, y):
        return self.label2idx[y]

    def encode_batch(self, ys):
        return torch.LongTensor([self.encode(y) for y in ys])


## Dataset class

In [None]:
class EmpatheticDataset(Dataset):
    def __init__(self, texts, liwc, labels, input_transformer, output_transformer):
        self.texts = texts
        self.labels = labels
        self.liwc  = liwc.values
        self.input_transformer = input_transformer
        self.output_transformer = output_transformer

    def __getitem__(self, index): # Return a single example
        text = self.texts[index]
        label = self.labels[index]
        liwc = self.liwc[index]
        x_liwc = torch.tensor(liwc)
        x = self.input_transformer(text)
        y = self.output_transformer(label)       
        return x, y

    def __len__(self):
        return len(self.texts)

### MultiLayer Perceptron


In [None]:
class MultiLayerPerceptron(nn.Module):
    """
    At its simplest, a multilayer perceptron is a 2 layer network
    """

    def __init__(self, input_size, hidden_size, output_size, dropout=False, dropout_p=0.1):
        super(MultiLayerPerceptron, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size, bias=True)
        self.fc2 = nn.Linear(hidden_size, output_size, bias=True)

        self.add_dropout = dropout
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, x):
        h = F.relu(self.fc1(x))
        if self.add_dropout:
            logits = self.fc2(self.dropout(h))
        else:
            logits = self.fc2(h)

        return logits

## Trainer Class

In [None]:
class MultiClassTrainer(object):
    """
    Trainer for training a multi-class classification model
    """

    def __init__(self, model, optimizer, loss_fn, device="cpu", log_every_n=None):
        self.model = model.to(device)
        self.optimizer = optimizer
        self.device = device
        self.loss_fn = loss_fn
        
        self.log_every_n = log_every_n if log_every_n else 0


    def _print_summary(self):
        print(self.model)
        print(self.optimizer)
        print(self.loss_fn)

    def train(self, loader):
        """
        Run a single epoch of training
        """

        self.model.train() # Run model in training mode

        loss_history = []
        running_loss = 0.
        running_loss_history = []

        for i, batch in tqdm(enumerate(loader)):
            batch_size = batch[0].shape[0]
            self.optimizer.zero_grad() # Always set gradient to 0 before computing it

            logits = self.model(batch[0].to(self.device)) # Forward pass, # Wx + b
            #print(logits)
            #print(batch[1].view(-1).to(self.device))
            loss = self.loss_fn(logits, batch[1].view(-1).to(self.device)) # Compute loss: Cross entropy loss

            loss_history.append(loss.item())

            

            running_loss += (loss_history[-1] - running_loss) / (i + 1) # Compute rolling average

            if self.log_every_n and i % self.log_every_n == 0:
                print("Running loss: ", running_loss)

            running_loss_history.append(running_loss)

            loss.backward() # Perform backprop, which will compute dL/dw

            nn.utils.clip_grad_norm_(self.model.parameters(), 3.0)
            self.optimizer.step() # Update step: w = w - eta * dL / dW

        print("Epoch completed!")
        print("Epoch Loss: ", running_loss)
        print("Epoch Perplexity: ", math.exp(running_loss))

        # The history information can allow us to draw a loss plot
        return loss_history, running_loss_history

    def evaluate(self, loader, labels):
        """
        Evaluate the model on a validation set
        """

        self.model.eval() # Run model in eval mode (disables dropout layer)

        batch_wise_true_labels = []
        batch_wise_predictions = []

        loss_history = []
        running_loss = 0.
        running_loss_history = []

        with torch.no_grad(): # Disable gradient computation - required only during training
            for i, batch in tqdm(enumerate(loader)):
                # batch[0] shape: (batch_size, input_size)

                logits = self.model(batch[0].to(self.device)) # Run forward pass (except we don't store gradients)
                # logits shape: (batch_size, num_classes)
                
                loss = self.loss_fn(logits, batch[1].view(-1).to(self.device)) # Compute loss
                # No backprop is done during validation
                loss_history.append(loss.item())

                running_loss += (loss_history[-1] - running_loss) / (i + 1) # Compute rolling average
                
                running_loss_history.append(running_loss)

                # Converts the raw outputs into probabilities for each class using softmax
                probs = F.softmax(logits, dim=-1) 
                # probs shape: (batch_size, num_classes)

                predictions = torch.argmax(probs, dim=-1) # Output predictions
                # predictions shape: (batch_size)

                batch_wise_true_labels.append(batch[1].tolist())
                batch_wise_predictions.append(predictions.tolist())
        
        # flatten the list of predictions using itertools
        all_true_labels = list(chain.from_iterable(batch_wise_true_labels))
        all_predictions = list(chain.from_iterable(batch_wise_predictions))

        # Now we can generate a classification report
        print("Classification report after epoch:")
        print(classification_report(all_true_labels, all_predictions))
        # print(confusion_matrix(all_true_labels,all_predictions))
        return loss_history, running_loss_history

    def get_model_dict(self):
        return self.model.state_dict()

    def run_training(self, train_loader, valid_loader, labels, n_epochs=10):
        # Useful for us to review what experiment we're running
        # Normally, you'd want to save this to a file
        self._print_summary()

        train_losses = []
        train_running_losses = []

        valid_losses = []
        valid_running_losses = []

        for i in range(n_epochs):
            loss_history, running_loss_history = self.train(train_loader)
            valid_loss_history, valid_running_loss_history = self.evaluate(valid_loader, labels)

            train_losses.append(loss_history)
            train_running_losses.append(running_loss_history)

            valid_losses.append(valid_loss_history)
            valid_running_losses.append(valid_running_loss_history)

        # Training done, let's look at the loss curves
        all_train_losses = list(chain.from_iterable(train_losses))
        all_train_running_losses = list(chain.from_iterable(train_running_losses))

        all_valid_losses = list(chain.from_iterable(valid_losses))
        all_valid_running_losses = list(chain.from_iterable(valid_running_losses))

        train_epoch_idx = range(len(all_train_losses))
        valid_epoch_idx = range(len(all_valid_losses))
        # sns.lineplot(epoch_idx, all_losses)
        sns.lineplot(train_epoch_idx, all_train_running_losses)
        sns.lineplot(valid_epoch_idx, all_valid_running_losses)
        plt.show()

## Data Preparation

In [None]:
def convert_column(dataset, column): 
   texts = [text for text in dataset[column]]
   texts = (*texts,)
   return texts


In [None]:
train = train.drop_duplicates(subset=['clean_prompt'])
valid = valid.drop_duplicates(subset=['clean_prompt'])
#test = test.drop_duplicates(subset=['clean_prompt'])


In [None]:
def convert_column(dataset, column): 
   texts = [text for text in dataset[column]]
   texts = (*texts,)
   return texts


In [None]:
train_liwc = train[['Total Function Words', 'Total Pronouns', 'Personal Pronouns', 'First Person Singular', 'First Person Plural', 'Second Person', 'Third Person Singular', 'Third Person Plural', ' Impersonal Pronouns', 'Articles', 'Common Verbs', 'Auxiliary Verbs', 'Past Tense', 'Present Tense', 'Future Tense', 'Adverbs', 'Prepositions', 'Conjunctions', 'Negations', 'Quantifiers', 'Number', 'Swear Words', 'Social Processes', 'Family', 'Friends', 'Humans', 'Affective Processes', 'Positive Emotion', 'Negative Emotion', 'Anxiety', 'Anger', 'Sadness', 'Cognitive Processes', 'Insight', 'Causation', 'Discrepancy', 'Tentative', 'Certainty', 'Inhibition', 'Inclusive', 'Exclusive', 'Perceptual Processes', 'See', 'Hear', 'Feel', 'Biological Processes', 'Body', 'Health', 'Sexual', 'Ingestion', 'Relativity', 'Motion', 'Space', 'Time', 'Work', 'Achievement', 'Leisure', 'Home', 'Money', 'Religion', 'Death', 'Assent', 'Nonfluencies', 'Fillers', 'Total first person', 'Total third person', 'Positive feelings', 'Optimism and energy', 'Communication', 'Other references to people', 'Up', 'Down', 'Occupation', 'School', 'Sports', 'TV', 'Music', 'Metaphysical issues', 'Physical states and functions', 'Sleeping', 'Grooming']]
valid_liwc = valid[['Total Function Words', 'Total Pronouns', 'Personal Pronouns', 'First Person Singular', 'First Person Plural', 'Second Person', 'Third Person Singular', 'Third Person Plural', ' Impersonal Pronouns', 'Articles', 'Common Verbs', 'Auxiliary Verbs', 'Past Tense', 'Present Tense', 'Future Tense', 'Adverbs', 'Prepositions', 'Conjunctions', 'Negations', 'Quantifiers', 'Number', 'Swear Words', 'Social Processes', 'Family', 'Friends', 'Humans', 'Affective Processes', 'Positive Emotion', 'Negative Emotion', 'Anxiety', 'Anger', 'Sadness', 'Cognitive Processes', 'Insight', 'Causation', 'Discrepancy', 'Tentative', 'Certainty', 'Inhibition', 'Inclusive', 'Exclusive', 'Perceptual Processes', 'See', 'Hear', 'Feel', 'Biological Processes', 'Body', 'Health', 'Sexual', 'Ingestion', 'Relativity', 'Motion', 'Space', 'Time', 'Work', 'Achievement', 'Leisure', 'Home', 'Money', 'Religion', 'Death', 'Assent', 'Nonfluencies', 'Fillers', 'Total first person', 'Total third person', 'Positive feelings', 'Optimism and energy', 'Communication', 'Other references to people', 'Up', 'Down', 'Occupation', 'School', 'Sports', 'TV', 'Music', 'Metaphysical issues', 'Physical states and functions', 'Sleeping', 'Grooming']]
test_liwc = test[['Total Function Words', 'Total Pronouns', 'Personal Pronouns', 'First Person Singular', 'First Person Plural', 'Second Person', 'Third Person Singular', 'Third Person Plural', ' Impersonal Pronouns', 'Articles', 'Common Verbs', 'Auxiliary Verbs', 'Past Tense', 'Present Tense', 'Future Tense', 'Adverbs', 'Prepositions', 'Conjunctions', 'Negations', 'Quantifiers', 'Number', 'Swear Words', 'Social Processes', 'Family', 'Friends', 'Humans', 'Affective Processes', 'Positive Emotion', 'Negative Emotion', 'Anxiety', 'Anger', 'Sadness', 'Cognitive Processes', 'Insight', 'Causation', 'Discrepancy', 'Tentative', 'Certainty', 'Inhibition', 'Inclusive', 'Exclusive', 'Perceptual Processes', 'See', 'Hear', 'Feel', 'Biological Processes', 'Body', 'Health', 'Sexual', 'Ingestion', 'Relativity', 'Motion', 'Space', 'Time', 'Work', 'Achievement', 'Leisure', 'Home', 'Money', 'Religion', 'Death', 'Assent', 'Nonfluencies', 'Fillers', 'Total first person', 'Total third person', 'Positive feelings', 'Optimism and energy', 'Communication', 'Other references to people', 'Up', 'Down', 'Occupation', 'School', 'Sports', 'TV', 'Music', 'Metaphysical issues', 'Physical states and functions', 'Sleeping', 'Grooming']]

train_prompt = convert_column(train,'clean_prompt')
valid_prompt = convert_column(valid,'clean_prompt')
test_prompt = convert_column(test,'clean_prompt')

train_context_labels = convert_column(train,'context')
valid_context_labels = convert_column(valid,'context')
test_context_labels = convert_column(test,'context')

In [None]:
tfidf_vec = TfidfVectorizer()
tfidf_vec.fit(train_prompt)
input_transformer = lambda text: torch.FloatTensor(tfidf_vec.transform([text]).todense()).squeeze(0)

label_indexer = LabelIndexer(list(set(train_context_labels +valid_context_labels)))
output_transformer = lambda label: torch.LongTensor([label_indexer.encode(label)])

train_tfidf_dataset = EmpatheticDataset(train_prompt, train_liwc, train_context_labels, input_transformer, output_transformer)
valid_tfidf_dataset = EmpatheticDataset(valid_prompt, valid_liwc,  valid_context_labels, input_transformer, output_transformer)
test_tfidf_dataset = EmpatheticDataset(test_prompt, test_liwc, test_context_labels, input_transformer, output_transformer)

train_tfidf_loader = torch.utils.data.DataLoader(train_tfidf_dataset, batch_size=16, shuffle=True)
valid_tfidf_loader = torch.utils.data.DataLoader(valid_tfidf_dataset, batch_size=16, shuffle=True)
test_tfidf_loader = torch.utils.data.DataLoader(test_tfidf_dataset, batch_size=16, shuffle=True)
# train_sequence_loader = torch.utils.data.DataLoader(train_seq_dataset, batch_size=16)
# valid_sequence_loader = torch.utils.data.DataLoader(valid_seq_dataset, batch_size=16)


## Running Training

In [None]:
# Define our experimental setup; we use the same parameters for both MLP models
input_size =  len(tfidf_vec.vocabulary_) #+ len(tfidf_vec.vocabulary_) #
hidden_size = 200 # An arbitrary hyperparameter we define
output_size = len(label_indexer.label2idx)
LEARNING_RATE = 1e-2

loss_fn = nn.CrossEntropyLoss()

In [None]:
# To print entire confusion matrix
import sys
import numpy
numpy.set_printoptions(threshold=sys.maxsize)

In [None]:
# Model 1: MLP
mlp = MultiLayerPerceptron(input_size, hidden_size, output_size)
#optimizer = optim.SGD(mlp.parameters(), lr=LEARNING_RATE, momentum=0.9)
#optimizer = optim.Adam(mlp.parameters(), lr=LEARNING_RATE)
optimizer = optim.Adagrad(mlp.parameters(), lr=LEARNING_RATE)
mlp_trainer = MultiClassTrainer(mlp, optimizer, loss_fn)
mlp_trainer.run_training(train_tfidf_loader, valid_tfidf_loader, label_indexer.labels, n_epochs=5)

In [None]:
#def evaluate(self, loader, labels):
mlp_trainer.evaluate(test_tfidf_loader, label_indexer.labels)

In [None]:
#def evaluate(self, loader, labels):
mlp_trainer2 = MultiClassTrainer(mlp, optimizer, loss_fn)
mlp_trainer2.evaluate(test_tfidf_loader, label_indexer.labels)

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

KeyError: ignored