# Emotion Classification
A transformer Neural Network to perform emotion classification from texts on Kaggle dataset.

In [None]:
%%capture
!pip install -U sentence-transformers datasets evaluate

In [None]:
from sentence_transformers import SentenceTransformer, models
from sentence_transformers.evaluation import SentenceEvaluator
from torch import nn, Tensor
from torch.utils.data import Dataset, Subset
import torch
from datasets import load_dataset
from google.colab import drive
from transformers import BertTokenizer
from functools import reduce
import matplotlib.pyplot as plt
from typing import Union, Tuple, List, Iterable, Dict, Callable
import numpy as np
import evaluate
import json
import pandas as pd
import seaborn as sns
from collections import Counter
from wordcloud import WordCloud
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

In [None]:
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/Colab Notebooks/Intelligent System/Assignment_1/pba1-emotion-classification
!pwd

## Data Preparation & Exploration

In [None]:
# Map data files to splits
data_files = {'train': 'training.csv', 'validation': 'validation.csv', 'test': 'test.csv'}

ds = load_dataset('./dataset', data_files=data_files)

In [None]:
ds.shape ## Dataset shape

In [None]:
# Calculate the maximum sequence length
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
max_seq_len = reduce(max, [len(tokenizer.tokenize(seq)) for seq in ds['train']['text']])

# Number of classes
num_class = len(set(ds['train']['label']))

print("Maximum Sequence Length: ", max_seq_len)
print("Number of Emotion Classes: ", num_class)

In [None]:
# Classes: Sadness (0), joy (1), love (2), anger (3), fear (4), surprise (5)
idx_to_label = {
    0: "sadness",
    1: "joy",
    2: "love",
    3: "anger",
    4: "fear",
    5: "surprise"
}

train_data = ds["train"]
eval_data = ds["validation"]
test_data = ds["test"]

In [None]:
## Create a wordcloud for "sadness"
sadness_text = " ".join(train_data.filter(lambda example: example['label'] == 0)['text'])
plt.figure(figsize = (15, 10))
wordcloud = WordCloud(max_words=200, height= 300, width = 500, background_color="black", colormap= 'viridis').generate(sadness_text)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
plt.title("Sadness", fontweight='bold')
plt.show()

## Create a wordcloud for "joy"
joy_text = " ".join(train_data.filter(lambda example: example['label'] == 1)['text'])
plt.figure(figsize = (15, 10))
wordcloud = WordCloud(max_words=200, height= 300, width = 500, background_color="black", colormap= 'viridis').generate(joy_text)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
plt.title("Joy", fontweight='bold')
plt.show()

## Create a wordcloud for "love"
love_text = " ".join(train_data.filter(lambda example: example['label'] == 2)['text'])
plt.figure(figsize = (15, 10))
wordcloud = WordCloud(max_words=200, height= 300, width = 500, background_color="black", colormap= 'viridis').generate(love_text)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
plt.title("Love", fontweight='bold')
plt.show()

## Create a wordcloud for "anger"
anger_text = " ".join(train_data.filter(lambda example: example['label'] == 3)['text'])
plt.figure(figsize = (15, 10))
wordcloud = WordCloud(max_words=200, height= 300, width = 500, background_color="black", colormap= 'viridis').generate(anger_text)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
plt.title("Anger", fontweight='bold')
plt.show()

## Create a wordcloud for "fear"
fear_text = " ".join(train_data.filter(lambda example: example['label'] == 4)['text'])
plt.figure(figsize = (15, 10))
wordcloud = WordCloud(max_words=200, height= 300, width = 500, background_color="black", colormap= 'viridis').generate(fear_text)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
plt.title("Fear", fontweight='bold')
plt.show()

## Create q wordcloud for "surprise"
surprise_text = " ".join(train_data.filter(lambda example: example['label'] == 5)['text'])
plt.figure(figsize = (15, 10))
wordcloud = WordCloud(max_words=200, height= 300, width = 500, background_color="black", colormap= 'viridis').generate(surprise_text)
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis('off')
plt.title("Surprise", fontweight='bold')
plt.show()

## Model Architecture

In [None]:
# Define model architecture:
# - Base uncased BERT
# - Pooling layer
# - Fully connected layer (dim=256)
# - Fully connected layer (dim=6)

word_embedding_model = models.Transformer('bert-base-uncased', max_seq_length=128)
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension())
fc1_model = models.Dense(in_features=pooling_model.get_sentence_embedding_dimension(), out_features=256, activation_function=nn.Tanh())
fc2_model = models.Dense(in_features=256, out_features=num_class, activation_function=nn.Softmax())

classification_model = SentenceTransformer(modules=[word_embedding_model, pooling_model, fc1_model, fc2_model])

In [None]:
# Create a class to calculate cross-entropy loss based on softmax outputs
class SoftmaxLoss(nn.Module):
    def __init__(self,
                 model: SentenceTransformer,
                 loss_fct: Callable = nn.CrossEntropyLoss()):
        super(SoftmaxLoss, self).__init__()
        self.model = model
        self.loss_fct = loss_fct

    def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
        output = [self.model(sentence_feature)['sentence_embedding'] for sentence_feature in sentence_features]
        output = torch.squeeze(torch.stack(output))

        if labels is not None:
            loss = self.loss_fct(output, labels.view(-1))
            return loss
        else:
            return output, output

## Model Training

In [None]:
## Load the trained model directly to save time of training
classification_model = SentenceTransformer("./model_pretrained/")

In [None]:
### ATTENTION !!!!
### Only run this block of code if the model has not yet been trained.
### ========================================

### ***Uncomment below to train
'''
## Train the model
from sentence_transformers import InputExample, losses, SentencesDataset, evaluation
from torch.utils.data import DataLoader
from evaluate import evaluator

batch_size = 64
num_epoch = 32

# Define the train examples.
train_examples = []
for elem in train_data:
  train_examples.append(InputExample(texts=[elem['text']], label=elem['label']))

# Define your train dataset, the dataloader and the train loss
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=batch_size)
train_loss = SoftmaxLoss(model=classification_model)

# Define the validation examples.
eval_examples = []
for elem in eval_data:
  eval_examples.append(InputExample(texts=[elem['text']], label=elem['label']))

# Evaluate model
validation_dataloader = DataLoader(eval_examples, shuffle=True, batch_size=batch_size)
label_evaluator = evaluation.LabelAccuracyEvaluator(validation_dataloader, softmax_model=train_loss)


# Start to the model
classification_model.fit(train_objectives=[(train_dataloader, train_loss)],
                         epochs=num_epoch,
                         warmup_steps=100,
                         evaluator=label_evaluator,
                         evaluation_steps=500,
                         output_path="./model_pretrained",
                         save_best_model=True)
'''

## Performance Evaluation

Show occurrences of each class in the training data. This can be used to explain the confusion matrix of test data prediction.

As we know, more training data is generally better so that a model can learn a more diverse and comprehensive representation of inputs. This explains why the true positive rate or recall is the lowest for `surprise` class (aka 35/66 = 0.53) because the model is only trained with 572 input examples of `surprise` class (the lowest among all classes).

In [None]:
### Predict test data using the trained model. Use the test data from the original dataset.
predictions = classification_model.encode(test_data['text'])
prediction_labels = np.argmax(predictions, axis=1)

In [None]:
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(20, 8))

## Show the frequency of occurrences for each class label in the training dataset
class_counts = Counter(train_data['label'])
label_freq_df = pd.DataFrame.from_dict(class_counts, orient='index').sort_index()
label_freq_df["index"] = list(idx_to_label.values())
label_freq_df = label_freq_df.set_index("index")
label_freq_df.plot(kind='bar', legend=False, ax=axes[0], subplots=True)
axes[0].bar_label(axes[0].containers[0])
axes[0].set_title("Occurences of Each Class in Training Data")

## Confusion Matrix
# print(list(idx_to_label.values()))
ConfusionMatrixDisplay.from_predictions(test_data['label'], prediction_labels, display_labels=list(idx_to_label.values()), ax=axes[1])

plt.show()

Classification Report showing precision, recall, f1-score, support and overall accuracy for each class label. Macro average and weighted average of precision, recall and F1 are shown as well.

In [None]:
# Support - the number of actual occurrences of the class in the specified dataset.
print(classification_report(test_data['label'], prediction_labels, target_names=list(idx_to_label.values())))

Show the distribution of softmax probabilities for each class label. This is used to indicate how certain the model makes the prediction of test examples. The value approaching to 1 means that the model is very sure about the emotion conveyed by a test example.

In [None]:
prediction_label_scores = np.max(predictions, axis=1)
label_score_df = pd.DataFrame({'label': prediction_labels, 'score': prediction_label_scores})

fig, axes = plt.subplots(nrows=2, ncols=3, figsize=(20, 8))

axes[0, 0].hist(label_score_df[label_score_df['label'] == 0]['score'], bins=50)
axes[0, 0].set_title("Softmax Value Distribution of Label 0 (Sadness)")

axes[0, 1].hist(label_score_df[label_score_df['label'] == 1]['score'], bins=50)
axes[0, 1].set_title("Softmax Value Distribution of Label 1 (Joy)")

axes[0, 2].hist(label_score_df[label_score_df['label'] == 2]['score'], bins=50)
axes[0, 2].set_title("Softmax Value Distribution of Label 2 (Love)")

axes[1, 0].hist(label_score_df[label_score_df['label'] == 3]['score'], bins=50)
axes[1, 0].set_title("Softmax Value Distribution of Label 4 (Anger)")

axes[1, 1].hist(label_score_df[label_score_df['label'] == 4]['score'], bins=50)
axes[1, 1].set_title("Softmax Value Distribution of Label 5 (Fear)")

axes[1, 2].hist(label_score_df[label_score_df['label'] == 5]['score'], bins=50)
axes[1, 2].set_title("Softmax Value Distribution of Label 6 (Surprise)")

plt.show()

In [None]:
# _, bins, _ = plt.hist(test_data_seq_lens, bins=5, range=[0, 75], align='mid')
# plt.title("Test Data Sequence Length Distribution")
# plt.show()

## Sentence length distribution by labels

### For TRAINING data
train_data_seq_lens = []
for label_id in idx_to_label:
  # Filter samples with class label `label_id`
  filtered_dataset = list(filter(lambda item: item['label'] == label_id, train_data))
  train_data_seq_lens.append([len(tokenizer.tokenize(seq['text'])) for seq in filtered_dataset])

plt.figure(figsize=(10,8))
_, bins, _ = plt.hist(train_data_seq_lens, bins=5, range=[0, 75], stacked=True)

plt.legend(idx_to_label.values())
plt.title("Training Data Sequence Length Distribution")
plt.xlabel("Sequence Length")
plt.show()


### For TESTING data
test_data_seq_lens = []
for label_id in idx_to_label:
  # Filter samples with class label `label_id`
  filtered_dataset = list(filter(lambda item: item['label'] == label_id, test_data))
  test_data_seq_lens.append([len(tokenizer.tokenize(seq['text'])) for seq in filtered_dataset])


plt.figure(figsize=(10,8))
_, bins, _ = plt.hist(test_data_seq_lens, bins=5, range=[0, 75], stacked=True)

plt.legend(idx_to_label.values())
plt.title("Test Data Sequence Length Distribution")
plt.xlabel("Sequence Length")
plt.show()

bins = [int(a) for a in bins]
print("Bins:", bins)

# plt.hist(x3, bins, stacked=True, density = True)

Assess the model performance by `Sequence Length` factor: (0-15, 15-30, 30-45, 45-60, 60-75) to understand the effect of sequence length on the model prediction.

In [None]:
# Assign the correct sequence length group to each text example.
len_tuples = []
len_labels = []
for index in range(len(bins) - 1):
  len_tuples.append((bins[index], bins[index+1]))
  len_labels.append("{:d} <= x < {:d}".format(bins[index], bins[index+1]))

def assign_group_to_len(x):
  for index, t in enumerate(len_tuples):
    if x >= t[0] and x < t[1]:
      return index

test_data_seq_lens = [len(tokenizer.tokenize(seq)) for seq in test_data['text']]
seq_len_label_df = pd.DataFrame({'len': test_data_seq_lens, 'predicted_label': prediction_labels, 'true_label': test_data['label']})
seq_len_label_df["len_group"] = seq_len_label_df["len"].map(assign_group_to_len)


## Calculate false positive rate, false negative rate, False Discovery Rate, False Omission Rate
false_performance_per_label = seq_len_label_df[["len_group", "true_label"]].drop_duplicates()
false_performance_per_label["TP"] = np.nan
false_performance_per_label["FP"] = np.nan
false_performance_per_label["TN"] = np.nan
false_performance_per_label["FN"] = np.nan

seq_len_group = false_performance_per_label["len_group"].unique()
for i in seq_len_group:
  for cls in idx_to_label:
    filtered_data = seq_len_label_df[seq_len_label_df["len_group"] == i]
    row_index = false_performance_per_label[(false_performance_per_label['len_group'] == i) & (false_performance_per_label['true_label'] == cls)].index

    if row_index.empty:
      new_row = pd.DataFrame({'len_group': i, 'true_label': cls}, index=[0])
      false_performance_per_label = pd.concat([false_performance_per_label, new_row], ignore_index = True)

    false_performance_per_label.loc[row_index, "TP"] = ((filtered_data['predicted_label'] == cls) & (filtered_data['true_label'] == cls)).sum()
    false_performance_per_label.loc[row_index, "FP"] = ((filtered_data['predicted_label'] == cls) & (filtered_data['true_label'] != cls)).sum()
    false_performance_per_label.loc[row_index, "TN"] = ((filtered_data['predicted_label'] != cls) & (filtered_data['true_label'] != cls)).sum()
    false_performance_per_label.loc[row_index, "FN"] = ((filtered_data['predicted_label'] != cls) & (filtered_data['true_label'] == cls)).sum()

## False Positive Rate = FP / (FP + TN)
## False Negative Rate = FN / (FN + TP)
## False Discovery Rate = FP / (FP + TP)
## False Omission Rate = FN / (FN + TN)
false_performance_per_label["FNR"] = false_performance_per_label["FN"] / (false_performance_per_label["FN"] + false_performance_per_label["TP"])
false_performance_per_label["FPR"] = false_performance_per_label["FP"] / (false_performance_per_label["FP"] + false_performance_per_label["TN"])
false_performance_per_label["FDR"] = false_performance_per_label["FP"] / (false_performance_per_label["FP"] + false_performance_per_label["TP"])
false_performance_per_label["FOR"] = false_performance_per_label["FN"] / (false_performance_per_label["FN"] + false_performance_per_label["TN"])

false_performance_per_label = false_performance_per_label.sort_values(['len_group', 'true_label'], ascending = [True, True])
# print(false_performance_per_label.head())
# print(false_performance_per_label[(false_performance_per_label['len_group'] == 4) & (false_performance_per_label['true_label'] == 0)])


### Plot a heatmap to show the rates
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(20, 15))

## Plot a heatmap of False Positive Rate
sns.heatmap(false_performance_per_label["FPR"].values.reshape(len(seq_len_group), len(idx_to_label)), cmap='RdPu', annot=True, ax=axes[0,0],
            xticklabels=idx_to_label.values(), yticklabels=len_labels)
axes[0,0].set_title("False Positive Rate For Each Sequence Length and Class")
axes[0,0].set_xlabel("Class")
axes[0,0].set_ylabel("Sequence Length of Test Examples, x")

## Plot a heatmap of False Negative Rate
sns.heatmap(false_performance_per_label["FNR"].values.reshape(len(seq_len_group), len(idx_to_label)), cmap='RdPu', annot=True, ax=axes[0,1],
            xticklabels=idx_to_label.values(), yticklabels=len_labels)
axes[0,1].set_title("False Negative Rate For Each Sequence Length and Class")
axes[0,1].set_xlabel("Class")
axes[0,1].set_ylabel("Sequence Length of Test Examples, x")

## Plot a heatmap of False Discovery Rate
sns.heatmap(false_performance_per_label["FDR"].values.reshape(len(seq_len_group), len(idx_to_label)), cmap='RdPu', annot=True, ax=axes[1,0],
            xticklabels=idx_to_label.values(), yticklabels=len_labels)
axes[1,0].set_title("False Discovery Rate For Each Sequence Length and Class")
axes[1,0].set_xlabel("Class")
axes[1,0].set_ylabel("Sequence Length of Test Examples, x")

## Plot a heatmap of False Omission Rate
sns.heatmap(false_performance_per_label["FOR"].values.reshape(len(seq_len_group), len(idx_to_label)), cmap='RdPu', annot=True, ax=axes[1,1],
            xticklabels=idx_to_label.values(), yticklabels=len_labels)
axes[1,1].set_title("False Omission Rate For Each Sequence Length and Class")
axes[1,1].set_xlabel("Class")
axes[1,1].set_ylabel("Sequence Length of Test Examples, x")

Use the model to predict the dataset from other source. This is to evaluate the ability of model to generalize well to the unseen expression styles and structures as in the real-world use cases. Apparently, the results shown on the classification report are not encouraging. Not a single metric value exceeds 80% (0.8).

In [None]:
### Test the model using the test data from the external dataset (https://www.kaggle.com/datasets/shivamb/go-emotions-google-emotions-dataset)
test_secondData = pd.read_csv('./dataset/val_extData - Copy.csv')
label_to_idx = {v: k for k, v in idx_to_label.items()}

test_secondData['label_idx'] = 0
for label in label_to_idx:
  test_secondData.loc[test_secondData[label] == 1, 'label_idx'] = label_to_idx[label]

# Lower case 'text' column and remove the punctuations
test_secondData['text'] = test_secondData['text'].str.lower().replace('[^\w\s]','')

## Perform prediction
predictions_2 = classification_model.encode(test_secondData['text'])
prediction_labels_2 = np.argmax(predictions_2, axis=1)

## Print classification report
print(classification_report(test_secondData['label_idx'], prediction_labels_2, target_names=list(idx_to_label.values()))) # F1, True Positive,...

## Demostrate outputs with three randomly picked examples

In [None]:
import random

### Randomly pick three examples from the original dataset
examples_test = random.choices(test_data, k=3)

print("Showing the prediction results for randomly picked test examples:")
print("-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+")
for test_item in examples_test:
    input, label = test_item['text'], test_item['label']
    prediction = classification_model.encode(input)

    print("Input Sentence: ", input)
    print("Predicted Emotion: ", idx_to_label[np.argmax(prediction)])
    print("True Emotion: ", idx_to_label[label])
    print("===================")


In [None]:
### Randomly pick three examples from the external dataset
examples_test = test_secondData.sample(n=3)

print("Showing the prediction results for randomly picked test examples:")
print("-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+")
for index, row in examples_test.iterrows():
    input, label = row['text'], row['label_idx']
    prediction = classification_model.encode(input)

    print("Input Sentence: ", input)
    print("Predicted Emotion: ", idx_to_label[np.argmax(prediction)])
    print("True Emotion: ", idx_to_label[label])
    print("===================")