# INF8225 - Final Project


### Install dependencies

In [None]:
!pip install accelerate -U --quiet
!pip install transformers[torch] --quiet
!pip install torchtext --upgrade --quiet
!pip install torchdata --quiet
!pip install torchinfo --quiet
!pip install datasets --quiet
!pip install einops --quiet

In [None]:
import os
import json
import time
import datetime
import random
import re
import itertools
import collections
import math
from functools import partial

import torch
import torchtext
import torchdata
import torchinfo

import numpy as np
import pandas as pd

from torch import Tensor
from einops import rearrange
from dataclasses import dataclass
import torch.nn.functional as functional
from torchtext.vocab import build_vocab_from_iterator, Vocab
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from datasets import load_dataset, Value, concatenate_datasets, Dataset as HFDataset, DatasetDict, load_metric

import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_datasets as tfds
tfds.disable_progress_bar()

import torch.nn as nn
from keras.preprocessing.sequence import pad_sequences
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, AutoTokenizer, AutoModel, get_linear_schedule_with_warmup
from transformers import AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
seed_val = 42

random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

# Dataset Compilation


Numerous datasets related to sentiment analysis for financial news are available online. Our objective is to amalgamate these diverse datasets into a unified one. By doing so, we aim to augment our training data volume for improved performance and to enhance our model's ability to generalize effectively.

Ou dataset will consists of sentences and a label, 0, 1 or 2 for negative, neutral and positive respectively.

In [None]:
dataset1 = load_dataset("financial_phrasebank", 'sentences_allagree') # https://huggingface.co/datasets/financial_phrasebank
dataset2 = load_dataset("nickmuchi/financial-classification")         # https://huggingface.co/datasets/nickmuchi/financial-classification
dataset3 = load_dataset("TimKoornstra/financial-tweets-sentiment")    # https://huggingface.co/datasets/TimKoornstra/financial-tweets-sentiment
dataset4 = load_dataset("chiapudding/kaggle-financial-sentiment")     # https://huggingface.co/datasets/chiapudding/kaggle-financial-sentiment

In [None]:
dataset2 = dataset2.rename_column("text", "sentence").rename_column("labels", "label")
dataset3 = dataset3.rename_column("tweet", "sentence").rename_column("sentiment", "label").remove_columns("url")
dataset4 = dataset4.rename_column("Sentence", "sentence").rename_column("Sentiment", "label")

The datasets contain varying labels, necessitating the task of mapping them to our standard labels: 0 for negative, 1 for neutral, and 2 for positive sentiments.

In [None]:
NEGATIVE = 0
NEUTRAL  = 1
POSITIVE = 2

def update_labels(sample, label_mapping):
    sample['label'] = label_mapping[sample['label']]
    return sample

dataset2 = dataset2.map(update_labels, fn_kwargs={'label_mapping': {0: NEGATIVE, 1: NEUTRAL, 2: POSITIVE}})
dataset3 = dataset3.map(update_labels, fn_kwargs={'label_mapping': {0: NEUTRAL, 1: POSITIVE, 2: NEGATIVE}})
dataset4 = dataset4.map(update_labels, fn_kwargs={'label_mapping': {"neutral": NEUTRAL, "positive": POSITIVE, "negative": NEGATIVE}})

In [None]:
def convert_label_type(dataset):
    for split in dataset.keys():
        dataset[split] = dataset[split].cast_column('label', Value('int8'))

convert_label_type(dataset1)
convert_label_type(dataset2)
convert_label_type(dataset3)
convert_label_type(dataset4)

In [None]:
merged_dataset = concatenate_datasets(list(itertools.chain.from_iterable([d.values() for d in [
    dataset1, dataset2, dataset3, dataset4
]])))

In [None]:
# Remove duplicates (No better way: https://discuss.huggingface.co/t/how-can-i-drop-duplicates-on-datasets-module/15369/4)
def drop_useless_rows(dataset):
  df = pd.DataFrame(dataset)
  df = df.drop_duplicates()
  df = df.replace('', pd.NA)
  df = df.dropna()
  dataset = HFDataset.from_pandas(df)
  dataset = dataset.remove_columns('__index_level_0__')
  return dataset

merged_dataset = drop_useless_rows(merged_dataset)

We will split the dataset into training, validation, and test sets using the standard ratio of 80% for training, 10% for validation, and 10% for testing.

In [None]:
# Split dataset
ds_train_test = merged_dataset.train_test_split(test_size=0.2, seed=42)
ds_val_test = ds_train_test['test'].train_test_split(test_size=0.5, seed=42)

dataset_train = ds_train_test['train']
dataset_val = ds_val_test['train']
dataset_test = ds_val_test['test']

dataset_split = DatasetDict({
    'train': dataset_train,
    'valid': dataset_val,
    'test': dataset_test
})

In [None]:
REPLACEMENTS = [
    (r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', "<URL>", True),
    (r'\$[A-Za-z\_]+(?=[\s:\.]|$|…)', "<TICKER>", True),
    (r'\@[A-Za-z0-9\_]+(?=[\s:\.]|$)', "<PROFILE>", True),
    ("\n", "", False),
    (' , ', ', ', False),
    (" \.", '\.', False),
    ("``", "\"", False),
    ("''", "\"", False),
    ("`", "\"", False),
    (" 's", "'s", False),
    ("„", "\"", False),
    ("“", "\"", False)
]

The dataset includes various unconventional elements like tickers, and links, which we'll remove for preprocessing the sentences. We'll replace these elements with specific tokens to reduce our vocabulary and simplify the task.

In [None]:
def preprocess(row, replacements=REPLACEMENTS):
    for pattern, replacement, is_regex in replacements:
        row['sentence'] = re.sub(pattern, replacement, row['sentence']) if is_regex else row['sentence'].replace(pattern, replacement)
    return row

dataset_train = dataset_train.map(preprocess)
dataset_val = dataset_val.map(preprocess)
dataset_test = dataset_test.map(preprocess)

In [None]:
# Drop any rows that might have become empty or irrelevant due to the preprocessing steps.
dataset_train = drop_useless_rows(dataset_train)
dataset_val = drop_useless_rows(dataset_val)
dataset_test = drop_useless_rows(dataset_test)

## Exploratory Analysis

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

def plot_class_balance(dataset, dataset_name):
    labels = pd.Series(dataset['label'])
    label_counts = labels.value_counts().sort_index()

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))
    colors = ['lightblue', 'lightgreen', 'pink']

    ax1.bar(label_counts.index, label_counts.values, color=colors)
    ax1.set_title(f"Class Balance for {dataset_name}")
    ax1.set_xlabel("Class")
    ax1.set_ylabel("Count")
    ax1.set_xticks([0, 1, 2])
    ax1.set_xticklabels(['Negative', 'Neutral', 'Positive'])

    ax2.pie(label_counts.values, labels=label_counts.index, autopct='%1.1f%%', colors=colors)
    ax2.set_title(f"Class Distribution for {dataset_name}")

    plt.tight_layout()
    plt.show()

plot_class_balance(dataset_train, "Dataset")

We've noticed a slight imbalance in our data, with more positive labels than negative ones. However, this imbalance appears manageable and shouldn't introduce significant bias into our models. We'll proceed with our models, keeping this imbalance in mind in case of poor performances.

# Utils


## Preprocessing

Here is a custom datasets and a utility class to implement our data preprocessing. This generic preprocessing approach minimizes code duplication and simplifies the development process for future models. Subsequent models can inherit from this class and either define their own functions or utilize the ones already implemented here.

In [None]:
class CustomDataset(Dataset):
  def __init__(self, dataset):
    self.dataset = dataset

  def __len__(self) -> int:
    return len(self.dataset)

  def __getitem__(self, item) -> tuple:
    sample = self.dataset[item]
    return torch.LongTensor(sample['sentence']), torch.tensor(sample['label'])

In [None]:
class Utilities:
  SPECIALS = ['<PAD>', '<UNK>']
  PADDING_INDEX = 0

  def __init__(self, max_sequence_length: int, min_frequency: int, batch_size: int):
    self.max_sequence_length = max_sequence_length
    self.min_frequency = min_frequency
    self.batch_size = batch_size
    self.tokenizer = torchtext.data.utils.get_tokenizer('basic_english')

  def preprocess_sentence(self, sentence: str):
    return torch.tensor(self.vocabulary(self.tokenizer(sentence.lower())[:self.max_sequence_length])).to(DEVICE).view(1, -1)

  def build_vocabulary(self, dataset):
    self.vocabulary = build_vocab_from_iterator(
      (sample['sentence'] for sample in dataset),
      min_freq=self.min_frequency,
      specials=Utilities.SPECIALS,
    )
    self.vocabulary.set_default_index(self.vocabulary['<UNK>'])

  def vocabularize(self, sample):
    sample['sentence'] = self.vocabulary(sample['sentence'])
    return sample

  def tokenize_sample(self, sample):
    sample['sentence'] = self.tokenize(sample['sentence'])
    return sample

  def tokenize(self, sentence: str):
    return self.tokenizer(sentence.lower())[:self.max_sequence_length]

  def preprocess(self, train_dataset, validation_dataset, test_dataset):
    train_dataset      = train_dataset.map(self.tokenize_sample)
    validation_dataset = validation_dataset.map(self.tokenize_sample)
    test_dataset       = test_dataset.map(self.tokenize_sample)

    self.build_vocabulary(train_dataset)

    train_dataset      = train_dataset.map(self.vocabularize)
    validation_dataset = validation_dataset.map(self.vocabularize)
    test_dataset       = test_dataset.map(self.vocabularize)

    train_dataset      = CustomDataset(train_dataset)
    validation_dataset = CustomDataset(validation_dataset)
    test_dataset       = CustomDataset(test_dataset)

    return train_dataset, validation_dataset, test_dataset

  def dataloaders(self, train_dataset, validation_dataset, test_dataset):
    return (
        DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True, collate_fn=self.pad_sequence),
        DataLoader(validation_dataset, batch_size=self.batch_size, shuffle=True, collate_fn=self.pad_sequence),
        DataLoader(test_dataset, batch_size=self.batch_size, shuffle=True, collate_fn=self.pad_sequence)
    )

## Training

We will now create generic functions to facilitate training and inference with our PyTorch models. These functions are designed to be versatile and compatible with all our models to facilitate our work.

In [None]:
def accuracy(outputs, labels):
  return (outputs.argmax(1) == labels).sum().item() / len(labels)

In [None]:
def evaluate(model: nn.Module, loader: DataLoader, criterion, custom_loss: callable) -> tuple[float, float]:
  model.eval()
  losses = []
  accuracies = []
  with torch.no_grad():
    for batch in loader:
      outputs, loss = custom_loss(model, criterion, batch)
      losses.append(loss.item())
      accuracies.append(accuracy(outputs, batch[1].to(DEVICE)))
  acc = np.mean(accuracies)
  loss = np.mean(losses)

  return loss, acc

In [None]:
def predict(model: nn.Module, loader: DataLoader, criterion, custom_loss: callable) -> float:
  model.eval()
  predictions = []
  with torch.no_grad():
    for batch in loader:
      outputs, _ = custom_loss(model, criterion, batch)
      predictions = predictions + list(outputs.argmax(1).detach().cpu().numpy().flatten())
  return predictions

In [None]:
def train_epoch(model: nn.Module, loader: DataLoader, criterion, optimizer, scheduler, custom_loss: callable, config):
  progress_bar = tqdm(enumerate(loader), total=len(loader))
  model.train()
  for idx, batch in progress_bar:
    model.zero_grad()
    outputs, loss = custom_loss(model, criterion, batch)

    loss.backward()
    optimizer.step()
    scheduler.step()
    nn.utils.clip_grad_norm_(model.parameters(), config['clip_grad_norm'])

    acc = accuracy(outputs, batch[1].to(DEVICE))
    if idx > 0 and idx % 50 == 0:
        progress_bar.set_description(f'train loss={loss.item():.4f}, train_acc={acc:.4f}')

In [None]:
def train_model(model: nn.Module, train_loader: DataLoader, validation_loader: DataLoader, custom_loss, config: dict):
  epochs = config['epochs']
  optimizer = torch.optim.AdamW(model.parameters(), lr=config['learning_rate'])
  criterion = nn.CrossEntropyLoss()
  scheduler = get_linear_schedule_with_warmup(
      optimizer,
      num_warmup_steps=0,
      num_training_steps=len(train_loader) * epochs
  )

  for epoch in range(epochs):
    train_epoch(model, train_loader, criterion, optimizer, scheduler, custom_loss, config)
    validation_loss, validation_accuracy = evaluate(model, validation_loader, criterion, custom_loss)

    print(f'ep {epoch}: val_loss={validation_loss:.4f}, val_acc={validation_accuracy:.4f}')

# LSTM

In this section, we will implement a recurring neural network (RNN) and more specifically a Long Short-Term Memory (LSTM) model to perform sentiment analysis on sentences (sequences). We use a bidirectional architecture to take into account information in both directions. The last state of the forward and backward passes are concatenated to be fed to a classification head (simple MLP) to perform classification.

**References :**
- The PyTorch Foundation (2023). LSTM [Online]. Available : [URL](https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html)
- Pai A. (2022). Build Your First Text Classification model using PyTorch [Online]. [URL](https://www.analyticsvidhya.com/blog/2020/01/first-text-classification-in-pytorch/)

In [None]:
class LSTMUtilities(Utilities):
  @staticmethod
  def loss(model, criterion, batch):
    outputs = model(batch[0].to(DEVICE), batch[2].to(DEVICE))
    return outputs, criterion(outputs, batch[1].to(DEVICE)).to(DEVICE)

  def pad_sequence(self, batch):
    texts  = [text for text, label in batch]
    labels = torch.tensor([label for text, label in batch])
    lengths = [len(t) for t in texts]
    texts_padded = nn.utils.rnn.pad_sequence(texts, batch_first=True, padding_value=LSTMUtilities.PADDING_INDEX)
    return texts_padded, labels, torch.tensor(lengths)

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(
            self,
            dim_vocabulary: int,
            dim_embeddings: int,
            n_layers: int,
            dim_hidden: int,
            dropout: float,
            n_classes: int
        ):
        super(LSTMClassifier, self).__init__()

        self.embeddings = nn.Embedding(dim_vocabulary, dim_embeddings)
        self.lstm = nn.LSTM(
            dim_embeddings,
            dim_hidden,
            num_layers=n_layers,
            bidirectional=True,
            dropout=dropout
        )
        self.classification_head = nn.Sequential(
            nn.Linear(2 * dim_hidden, dim_hidden),
            nn.Tanh(),
            nn.Linear(dim_hidden, n_classes)
        )

    def forward(self, inputs, inputs_length):
      embedded = self.embeddings(inputs)

      packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, inputs_length.cpu(), batch_first=True, enforce_sorted=False)

      packed_output, (hidden_state, cell_state) = self.lstm(packed_embedded)

      hidden_state = rearrange(hidden_state, 'h_out batch_size hidden_size -> batch_size hidden_size h_out')

      # Keep only the forward and backward states of the last hidden state
      hidden_state = torch.cat((hidden_state[:,:,-1], hidden_state[:,:,-2]), dim = 1)

      return self.classification_head(hidden_state)

In [None]:
LSTM_CONFIG = {
    'max_sequence_length': 64,
    'min_frequency': 2,
    'batch_size': 128,
    'n_classes': 3,
    'dim_embeddings': 16,
    'n_layers': 2,
    'dim_hidden': 32,
    'dropout': 0.10,
    'learning_rate': 0.01,
    'epochs': 2,
    'clip_grad_norm': 1.0
}

In [None]:
lstm_utilities = LSTMUtilities(
    max_sequence_length=LSTM_CONFIG['max_sequence_length'],
    min_frequency=LSTM_CONFIG['min_frequency'],
    batch_size=LSTM_CONFIG['batch_size']
)

In [None]:
l_train_dataset, l_validation_dataset, l_test_dataset = lstm_utilities.preprocess(dataset_train, dataset_val, dataset_test)
l_train_loader, l_validation_loader, l_test_loader = lstm_utilities.dataloaders(l_train_dataset, l_validation_dataset, l_test_dataset)

In [None]:
lstm_classifier = LSTMClassifier(
    dim_vocabulary=len(lstm_utilities.vocabulary),
    dim_embeddings=LSTM_CONFIG['dim_embeddings'],
    n_layers=LSTM_CONFIG['n_layers'],
    dim_hidden=LSTM_CONFIG['dim_hidden'],
    dropout=LSTM_CONFIG['dropout'],
    n_classes=LSTM_CONFIG['n_classes']
)

lstm_classifier.to(DEVICE)
for p in lstm_classifier.parameters():
      if p.dim() > 1:
          nn.init.xavier_uniform_(p)
print(torchinfo.summary(lstm_classifier))

In [None]:
train_model(lstm_classifier, l_train_loader, l_validation_loader, LSTMUtilities.loss, LSTM_CONFIG)

In [None]:
loss_lstm, acc_lstm = evaluate(lstm_classifier, l_test_loader, nn.CrossEntropyLoss(), LSTMUtilities.loss)
print(f"Accuracy: {acc_lstm * 100} %")
print(f"Loss: {loss_lstm}")

### Result

Using a basic bidirectional LSTM model, we achieved an accuracy of $73.74\%$ on the test dataset. This is a pretty fair performance for such a model. Let's see if subsequent models outperform this one.

# Transformer (Encoder-Ony)


In this section, we will implement the encoder part of the transformer architecture to perform sentiment analysis on sentences (sequences). The goal is to leverage the attention mechanism to capture relationships into the sequences and then to use the encoded data to make classification.

**References :**
- The PyTorch Foundation (2024). Language Modeling with `nn.Transformer` and `torchtext` [Online]. Available : [URL](https://pytorch.org/tutorials/beginner/transformer_tutorial.html)
- maqboolkhan (2022). Transformer_classifier_pytorch [Online]. Available : [URL](https://github.com/maqboolkhan/Transformer_classifier_pytorch)
- n8henrie (2021). Writing a Transformer Classifier in PyTorch [Online]. Available : [URL](https://n8henrie.com/2021/08/writing-a-transformer-classifier-in-pytorch/)

In [None]:
class TransformerUtilities(Utilities):
  @staticmethod
  def get_masks(src):
    src_seq_len = src.shape[0]
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)
    src_padding_mask = (src == TransformerUtilities.PADDING_INDEX).transpose(0, 1)
    return src_mask, src_padding_mask

  @staticmethod
  def loss(model, criterion, batch):
    src_mask, src_padding_mask = TransformerUtilities.get_masks(batch[0].to(DEVICE))
    src_mask = src_mask.to(DEVICE)
    src_padding_mask = src_padding_mask.to(DEVICE)
    outputs = model(batch[0].to(DEVICE), src_mask, src_padding_mask)
    return outputs, criterion(outputs, batch[1].to(DEVICE)).to(DEVICE)

  def pad_sequence(self, batch):
    texts  = [text for text, label in batch]
    labels = torch.tensor([label for text, label in batch])
    texts_padded = nn.utils.rnn.pad_sequence(texts, batch_first=True, padding_value=TransformerUtilities.PADDING_INDEX)
    return texts_padded, labels

In [None]:
class TransformerPositionalEncoding(nn.Module):
  """
  From https://pytorch.org/tutorials/beginner/transformer_tutorial.html
  """
  def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
    super().__init__()
    self.dropout = nn.Dropout(p=dropout)
    position = torch.arange(max_len).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
    pe = torch.zeros(max_len, 1, d_model)
    pe[:, 0, 0::2] = torch.sin(position * div_term)
    pe[:, 0, 1::2] = torch.cos(position * div_term)
    self.register_buffer('pe', pe)

  def forward(self, x: Tensor) -> Tensor:
    x = x + self.pe[:x.size(0)]
    return self.dropout(x)

In [None]:
class TransformerClassifier(nn.Module):
    def __init__(
            self,
            dim_vocabulary: int,
            dim_embeddings: int,
            n_layers: int,
            n_heads: int,
            dim_hidden: int,
            dropout: float,
            max_sequence_length: int,
            n_classes: int
        ):
        super(TransformerClassifier, self).__init__()

        self.embeddings = nn.Embedding(dim_vocabulary, dim_embeddings)
        self.positional_embeddings = TransformerPositionalEncoding(dim_embeddings, dropout, max_sequence_length)
        self.encoder = nn.TransformerEncoder(nn.TransformerEncoderLayer(
            d_model=dim_embeddings,
            dim_feedforward=dim_hidden,
            nhead=n_heads,
            dropout=dropout
        ), num_layers=n_layers)

        self.classification_head = nn.Sequential(
            nn.Linear(dim_embeddings, dim_embeddings),
            nn.Tanh(),
            nn.Linear(dim_embeddings, n_classes)
        )

    def forward(self, inputs, src_mask = None, src_padding_mask = None):
        outputs = self.embeddings(inputs)

        outputs = rearrange(outputs, "b s e -> s b e")
        outputs = self.positional_embeddings(outputs)
        outputs = rearrange(outputs, "s b e -> b s e")

        outputs = self.encoder(outputs, src_mask, src_padding_mask)

        weights = torch.softmax(outputs, dim=1)
        pooled_output = torch.sum(weights * outputs, dim=1)

        return self.classification_head(pooled_output)

In [None]:
TRANSFORMERS_CONFIG = {
    'max_sequence_length': 64,
    'min_frequency': 2,
    'batch_size': 128,
    'n_classes': 3,
    'dim_embeddings': 16,
    'n_layers': 1,
    'n_heads': 4,
    'dim_hidden': 32,
    'dropout': 0.15,
    'learning_rate': 0.01,
    'epochs': 3,
    'clip_grad_norm': 1.0
}

In [None]:
transformer_utilities = TransformerUtilities(
    max_sequence_length=TRANSFORMERS_CONFIG['max_sequence_length'],
    min_frequency=TRANSFORMERS_CONFIG['min_frequency'],
    batch_size=TRANSFORMERS_CONFIG['batch_size']
)

In [None]:
t_train_dataset, t_validation_dataset, t_test_dataset = transformer_utilities.preprocess(dataset_train, dataset_val, dataset_test)
t_train_loader, t_validation_loader, t_test_loader = transformer_utilities.dataloaders(t_train_dataset, t_validation_dataset, t_test_dataset)

In [None]:
transformer_classifier = TransformerClassifier(
    dim_vocabulary=len(transformer_utilities.vocabulary),
    dim_embeddings=TRANSFORMERS_CONFIG['dim_embeddings'],
    n_layers=TRANSFORMERS_CONFIG['n_layers'],
    n_heads=TRANSFORMERS_CONFIG['n_heads'],
    dim_hidden=TRANSFORMERS_CONFIG['dim_hidden'],
    dropout=TRANSFORMERS_CONFIG['dropout'],
    max_sequence_length=TRANSFORMERS_CONFIG['max_sequence_length'],
    n_classes=TRANSFORMERS_CONFIG['n_classes']
)

transformer_classifier.to(DEVICE)
for p in transformer_classifier.parameters():
      if p.dim() > 1:
          nn.init.xavier_uniform_(p)
print(torchinfo.summary(transformer_classifier))

In [None]:
train_model(transformer_classifier, t_train_loader, t_validation_loader, TransformerUtilities.loss, TRANSFORMERS_CONFIG)

In [None]:
loss_t, acc_t = evaluate(transformer_classifier, t_test_loader, nn.CrossEntropyLoss(), TransformerUtilities.loss)
print(f"Accuracy: {acc_t * 100} %")
print(f"Loss: {loss_t}")

In [None]:
def inference(model, sentence: str):
  model.eval()
  with torch.no_grad():
    y = model(transformer_utilities.preprocess_sentence(sentence)).argmax(1).item()
  print({NEGATIVE: 'Negative', NEUTRAL: 'Neutral', POSITIVE: 'Positive'}[y])

In [None]:
sentence = "This compagny had a great performance, outperforming expectations in Q4."
inference(transformer_classifier, sentence)

### Results

Our Transformer (Encoder) model achieved an accuracy of $73.71\%$, slightly lower than the previous $73.74\%$ attained by LSTM. This outcome is unexpected, as Transformer technology generally surpasses LSTM in various aspects. We attribute this to our choice of hyperparameters, which may be more suitable for the LSTM model rather than the Transformer.

# BERT


For this section, we will train a classifier using BERT. BERT, which stands for Bidirectional Encoder Representations from Transformers, is a state-of-the-art pre-trained language model developed by Google. We will leverage this pretrained model to generate high-quality embeddings. Since BERT has been trained on a vast amount of text data, it possesses a deeper understanding of language and is capable of producing superior embeddings. We will then augment the BERT model with a linear layer for classification and proceed to train this enhanced model.

We will pull the pretrained BERT model and tokenizer. We won't freeze any layers, as experiments have shown us that fine-tuning the entire model does not significantly increase training time and consistently yields better performance.

In [None]:
class BERTUtilities:
  @staticmethod
  def loss(model, criterion, batch):
    outputs = model(input_ids=batch[0].to(DEVICE), attention_mask=batch[2].to(DEVICE))
    return outputs, criterion(outputs, batch[1].to(DEVICE)).to(DEVICE)

  def __init__(self, batch_size: int, max_sequence_length: int):
    self.batch_size = batch_size
    self.max_sequence_length = max_sequence_length
    self.tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

  def tokenize(self, sentences):
    return [
        self.tokenizer.encode(sent,add_special_tokens = True)
        for sent in sentences
    ]

  def pad(self, input_ids):
    return pad_sequences(input_ids, maxlen=self.max_sequence_length, dtype="long", value=0, truncating="post", padding="post")

  def mask(self, input_ids):
    return [
        [int(token_id > 0) for token_id in sent]
        for sent in input_ids
    ]

  def dataloader(self, input_dataset, shuffle=True):
    sentences = input_dataset["sentence"]
    labels = input_dataset["label"]
    input_ids = self.tokenize(sentences)
    input_ids = self.pad(input_ids)
    masks = self.mask(input_ids)

    return DataLoader(TensorDataset(
        torch.tensor(input_ids), torch.tensor(labels), torch.tensor(masks)
    ), shuffle=shuffle, batch_size=self.batch_size)

  def dataloaders(self, train_dataset, validation_dataset, test_dataset):
    return (
        self.dataloader(train_dataset),
        self.dataloader(validation_dataset),
        self.dataloader(test_dataset, shuffle=False),
    )

Here, we will develop our model. We utilize BERT to generate embeddings, followed by the addition of a classfication head for the purpose of classifying inputs into three categories.

In [None]:
class BertClassifier(nn.Module):
    def __init__(self):
        super(BertClassifier, self).__init__()
        self.bert = AutoModel.from_pretrained("bert-base-cased").to(DEVICE)
        self.classifier_head = nn.Sequential(
            nn.Linear(self.bert.config.hidden_size, self.bert.config.hidden_size // 2),
            nn.ReLU(),
            nn.Linear(self.bert.config.hidden_size // 2, 3)
        ).to(DEVICE)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        logits = self.classifier_head(pooled_output)
        return logits

bert_classifier = BertClassifier()

In [None]:
bert_utilities = BERTUtilities(batch_size=32, max_sequence_length=64)
b_train_loader, b_validation_loader, b_test_loader = bert_utilities.dataloaders(dataset_train, dataset_val, dataset_test)

In [None]:
train_model(bert_classifier, b_train_loader, b_validation_loader, BERTUtilities.loss, {
    'learning_rate': 2e-5,
    'epochs': 2,
    'clip_grad_norm': 1.0
})

In [None]:
loss_b, acc_b = evaluate(bert_classifier, b_test_loader, nn.CrossEntropyLoss(), BERTUtilities.loss)
print(f"Accuracy: {acc_b * 100} %")
print(f"Loss: {loss_b}")

### Results

The BERT model demonstrates significant improvement, achieving an accuracy of $79.55\%$, which is approximately a $6\%$ enhancement over our previous results. This good performance highlights the effectiveness of pre-training, resulting in clearly better embeddings compared to our previous encoder which was a similar model but without pre-training.

# Pre-trained models

Instead of constructing models from "scratch", as we've done until now, we can use pretrained models tailored specifically for sentence classification tasks and fine-tune them using our data. While this approach may offer less flexibility, it capitalizes on the vast amount of data these models have already been trained on. In this section, we'll retrieve some pretrained models suitable for our task from the Hugging Face model hub, train them using our dataset, and evaluate their performance on the test set. We will then see if they outperform our previous models.

In [None]:
class PreTrainedUtilities:
  N_CLASSES = 3

  @staticmethod
  def get_model(name: str):
    return AutoModelForSequenceClassification.from_pretrained(name, num_labels=PreTrainedUtilities.N_CLASSES)

  @staticmethod
  def get_training_arguments(config, model_name):
    return TrainingArguments(
      output_dir=model_name,
      learning_rate=config['learning_rate'],
      per_device_train_batch_size=config['per_device_train_batch_size'],
      per_device_eval_batch_size=config['per_device_eval_batch_size'],
      num_train_epochs=config['epochs'],
      weight_decay=config['weight_decay'],
      save_strategy="epoch"
    )

  @staticmethod
  def get_tokenizer(dataset: Dataset, model: str):
    tokenizer_name = model["name"]
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)
    if model["max_len"]:
      tokenized_dataset = dataset.map(lambda e: tokenizer(e["sentence"], truncation=True, max_length = 512), batched=True)
    else:
      tokenized_dataset = dataset.map(lambda e: tokenizer(e["sentence"], truncation=True), batched=True)
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

    return tokenizer, tokenized_dataset, data_collator

  def compute_metrics(self, eval_pred):
   logits, labels = eval_pred
   predictions = np.argmax(logits, axis=-1)
   accuracy = load_metric("accuracy").compute(predictions=predictions, references=labels)["accuracy"]
   return {"accuracy": accuracy}

preTrainedUtilities = PreTrainedUtilities()

In [None]:
def train_pretrained_model(model: str, dataset, config):
  model_name = model["name"]
  tokenizer, tokenized_data, data_collator = PreTrainedUtilities.get_tokenizer(dataset, model)
  model = PreTrainedUtilities.get_model(model_name)
  trainer = Trainer(
    model=model,
    args=PreTrainedUtilities.get_training_arguments(config, model_name),
    train_dataset=tokenized_data["train"],
    eval_dataset=tokenized_data["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=preTrainedUtilities.compute_metrics,
  )

  trainer.train()
  results = trainer.evaluate()
  print(results)
  return results, model

In [None]:
def train_pretrained_models(models: list[str], config):
  list_models = []
  for model in models:
    result, model = train_pretrained_model(model, dataset_split, config)
    list_models.append(model)
  return list_models

We'll experiment with four models sourced from the Hugging Face models hub. Specifically, we'll evaluate two base BERT models for classification tasks, one model optimized for processing tweets—given their prevalence in our dataset and another model tailored specifically for financial classification tasks.

In [None]:
PRETRAINED_CONFIG = {
    'learning_rate': 2e-5,
    'per_device_train_batch_size': 16,
    'per_device_eval_batch_size': 16, # We need 16 or else the GPU RAM reaches too high to be able to train on colab
    'epochs': 2, # Pretrained models have already seen a lot of data and do not need as many epochs to be fine tuned
    'weight_decay': 0.01
}

PRETRAINED_MODELS = [
    {"name": "distilbert-base-cased", "max_len": True},
    {"name": "bert-base-cased", "max_len": True},
    {"name": "finiteautomata/bertweet-base-sentiment-analysis", "max_len": False},
    {"name": "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", "max_len": False},
]

train_pretrained_models(PRETRAINED_MODELS, PRETRAINED_CONFIG)

### Results

In this section, we evaluated the performance of four pre-trained models. Here are the results we obtained:

| Model Name                                                | Test Accuracy |
|-----------------------------------------------------------|---------------|
| distilbert-base-cased                                     | 0.7926        |
| bert-base-cased                                           | 0.8043        |
| finiteautomata/bertweet-base-sentiment-analysis          | **0.8303**       |
| mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis | 0.8057        |


Overall, we observed that the BERTweet model outperforms all others with an impressive accuracy of $83.03\%$. This outcome aligns with expectations given the abundance of tweets in our dataset. Also, the pre-trained classification BERT models surpassed the BERT model we implemented.

Overall, it's not unexpected that pretrained models outperform ours, considering their exposure to vast amounts of data and likely superior development processes. This underscores the significance of platforms like Hugging Face in democratizing access to state-of-the-art models.

# Simple pattern recognition model


In this section, we'll explore the utilization of emojis in the dataset. We have the intuition that this information could be highly significant, particularly for positive tweets. Certain emojis are commonly associated with bullish statements, and leveraging this information could potentially better our performance.

### Data exploration and analysis

Let's identify a series of emojis that we believe may indicate bullish sentiment and then examine whether this hypothesis holds true.

In [None]:
bullish_emojis = [
    '1F680',  # Rocket 🚀
    '1F4C8',  # Chart Increasing 📈
    '1F4B9',  # Chart Increasing with Yen 💹
    '1F525',  # Fire 🔥
    '1F48E',  # Gem Stone 💎
    '1F4B5',  # Dollar Banknote 💵
    '1F4B8',  # Money With Wings 💸
    '1F4C8',  # Chart Increasing 📈
    '1F315',   # Full Moon 🌕
    '1F319',  # Crescent Moon 🌙
    '1F31B',  # First Quarter Moon 🌛
    '1F31C',  # Waxing Gibbous Moon 🌜
    '1F31D',  # Full Moon with Face 🌝
    '1F31E',  # Sun with Face 🌞
    '1F320'   # Full Moon 🌠
]

def count_emojis(ds, emojis):
    labels_dict = {0: 0, 1: 0, 2: 0}
    emoji_patterns = [re.compile(rf'\U000{emoji}') for emoji in emojis]

    total_count = 0

    for sentence, label in zip(ds["sentence"], ds["label"]):
        # Check if any of the emoji patterns match the sentence
        if any(pattern.search(sentence) for pattern in emoji_patterns):
            total_count += 1
            labels_dict[label] += 1

    return total_count, labels_dict

emoji_count, labels_dict_bullish = count_emojis(dataset_train, bullish_emojis)
print(f"We identify {emoji_count} sentences with such emojis.")

In [None]:
def plot_distribution(labels_dict):
  labels = list(labels_dict.keys())
  values = list(labels_dict.values())

  fig, ax = plt.subplots(figsize=(10, 10))
  pie = ax.pie(values, labels=labels, autopct='%1.1f%%', startangle=90, colors=['pink', 'lightblue', 'lightgreen'], textprops={'fontsize': 10})
  ax.set_title('Distribution of labels for sentences with bullish emojis')

plot_distribution(labels_dict_bullish)

In sentences containing at least one emoji that we identify as bullish, 94.6% of these sentences are indeed classified as bullish. This indicates a high level of confidence in associating such emojis with bullish sentiment. This promising result motivates us to explore the potential of leveraging this information by constructing a very simple model. This model will predict bullish sentiment when it encounters a bullish emoji. We will then assess whether integrating this simple model can improve upon our Bert model.

### Building the model

Since this model is relatively simple, we will predict "None" every time there is no emoji.

In [None]:
class PatternMatchingModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.bullish_emojis = [
                              '1F680',  # Rocket 🚀
                              '1F4C8',  # Chart Increasing 📈
                              '1F4B9',  # Chart Increasing with Yen 💹
                              '1F525',  # Fire 🔥
                              '1F4B0',  # Money Bag 💰
                              '1F48E',  # Gem Stone 💎
                              '1F4B5',  # Dollar Banknote 💵
                              '1F4B8',  # Money With Wings 💸
                              '1F315',   # Full Moon 🌕
                              '1F319',  # Crescent Moon 🌙
                              '1F31B',  # First Quarter Moon 🌛
                              '1F31C',  # Waxing Gibbous Moon 🌜
                              '1F31D',  # Full Moon with Face 🌝
                              '1F31E',  # Sun with Face 🌞
                              '1F320'   # Full Moon 🌠
                          ]

    def predict(self, sentences):
      predictions = []
      emoji_patterns = [re.compile(rf'\U000{emoji}') for emoji in self.bullish_emojis]
      for sentence in sentences:
        if any(pattern.search(sentence) for pattern in emoji_patterns):
          predictions.append(2)
        else:
          predictions.append(None)

      return predictions

## Stacking models



In this section, we aim to enhance our BERT model by integrating our pattern matching models. Specifically, we'll stack the predictions of both models and prioritize the predictions from the pattern matching model. This is because the pattern matching model exhibits a confidence level of approximately 95%, while our BERT model achieves an accuracy of approximately 80%. Combining the strengths of both models could potentially lead to improved overall performance.

In [None]:
# Get the labels and assert the order of dataset and dataloader is the same
labels_bert = []

for batch in b_test_loader:
    labels = batch[1]
    labels_bert = labels_bert + labels.flatten().tolist()

labels_emoji = list(dataset_test["label"])
assert labels_emoji == labels_bert # Assert it is the same order

In [None]:
bert_predictions = predict(bert_classifier, b_test_loader, nn.CrossEntropyLoss(), BERTUtilities.loss)

In [None]:
patter_matching_model = PatternMatchingModel()
pattern_predictions = patter_matching_model.predict(dataset_test["sentence"])

In [None]:
assert len(bert_predictions) == len(pattern_predictions)
stacked_predictions = [pattern_predictions[i] if pattern_predictions[i] else bert_predictions[i] for i in range(len(pattern_predictions))]

In [None]:
pc_predictions = 100 * sum(1 for elem in pattern_predictions if elem is not None) / len(pattern_predictions)
print(f"Our pattern matching model predicts something {pc_predictions} % of the time.")

In [None]:
def get_accuracy(predictions, labels):
  acc = np.sum(np.array(predictions) == np.array(labels)) / len(labels)
  print(f"ACCURACY: {acc * 100}%")

get_accuracy(pattern_predictions, labels_emoji)
get_accuracy(bert_predictions, labels_emoji)
get_accuracy(stacked_predictions, labels_emoji)

It appears that our intuition was incorrect, as the performance of the BERT model did not improve with the stacked predictions. This outcome is somewhat disappointing, considering earlier analyses suggested the potential for performance enhancement. Let's see why this improvement did not yield better performances. In order to do that, let's get the bert predictions for the sentences were the pattern matching models finds an emoji and check the accuracy on this subset.

In [None]:
idx = [i for i, prediction in enumerate(pattern_predictions) if prediction == 2]

In [None]:
bert_predictions_for_emojis = [prediction for i, prediction in enumerate(bert_predictions) if i in idx] # predictions of bert for indexes with emojis
labels_for_emojis = [label for i, label in enumerate(labels_emoji) if i in idx] # labels for indexes with emojis

In [None]:
get_accuracy(bert_predictions_for_emojis, labels_for_emojis)

Surprisingly, the BERT model achieves an impressive accuracy of $94.68\%$ on sentences where we identify a bullish emoji. This exceptional performance explains why our pattern model was unable to improve these results, as the previous analysis accurately identified bullish sentiment 94.6% of the time on the training set, which is not better than BERT's performance. One hypothesis for why BERT excels with such emojis could be that when they are used in tweets, the sentences are already clearly bullish, making them easy to classify.

# Conclusion

In this notebook, we tried to classify financial statements and tweets into three categories: negative, neutral, and positive. To achieve this, we experimented with eight different models. Here are their accuracies on the test set:

| Model Name                                                | Test Accuracy (%) |
|-----------------------------------------------------------|-------------------|
| LSTMClassifier                                           | 73.74             |
| TransformerClassifier                                    | 73.71             |
| BertClassifier                                           | 79.55             |
| StackedClassifier                                        | 79.41             |
| distilbert-base-cased                                    | 79.26             |
| bert-base-cased                                          | 80.43             |
| finiteautomata/bertweet-base-sentiment-analysis          | **83.03**          |
| mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis | 80.57            |

Among these models, the latter four are pretrained classification models that we took online. We observe that our best-performing model overall is by far the pretrained model finiteautomata/bertweet-base-sentiment-analysis, while our best "handmade" model is the BertClassifier, which is 3.5% worse. We're generally pleased with our performances, considering the diversity of the dataset containing both tweets and traditional sentences, and impressed with the performance of the pre-trained models, especially BERTweet.