# Text classification for disinformation

[Dataset](https://huggingface.co/datasets/QuotaClimat/frugalaichallenge-text-train)

Goal: multi-classification task with 8 labels for types of climate disinformation statements

## Import libraries and select device

In [None]:
import plotly.express as px

import numpy as np

import spacy
from datasets import load_dataset
from torch.utils.data.dataloader import default_collate

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchmetrics
from torchinfo import summary
import lightning as L
from lightning.pytorch.callbacks import ModelCheckpoint

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device {device}')
%env PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True

# Dataset pipeline

## Loading and Train/Validation/Test split

In [None]:
batch_size = 64
sequence_length = 512
num_workers = 4

train_validation_split = 0.2

In [None]:
train_dataset = load_dataset("QuotaClimat/frugalaichallenge-text-train", split="train").with_format(type='torch')

unique_labels = list(set(train_dataset["label"]))  # Extract unique label names
unique_labels.sort()
label_to_id = {label: i for i, label in enumerate(unique_labels)}  # Assign unique IDs

indices = torch.randperm(len(train_dataset)) # Generate a permutation between 0 and (train_size - 1) ; train_loader will get images in the order of the indices (so producing a random sampling)
train_validation_index = int(len(train_dataset) * train_validation_split) # Never forget "int" otherwise it's a float!!
train_indices = indices[train_validation_index:]
val_indices = indices[:train_validation_index]

train_loader = torch.utils.data.DataLoader(
  torch.utils.data.Subset(train_dataset, train_indices),
  num_workers=num_workers,
  batch_size=batch_size,
  # pin_memory=True
)
val_loader = torch.utils.data.DataLoader(
  torch.utils.data.Subset(train_dataset, val_indices),
  num_workers=num_workers,
  batch_size=batch_size,
  # pin_memory=True
)

In [None]:
test_dataset = load_dataset("QuotaClimat/frugalaichallenge-text-train", split="test").with_format(type='torch')

test_loader = torch.utils.data.DataLoader(
  test_dataset,
  batch_size=batch_size,
  num_workers=num_workers,
  # pin_memory=True
)

## Tokenization (only for DistilBERT)

In [None]:
from transformers import DistilBertTokenizer

tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased", do_lower_case=True)

def collate_fn(batch):
  batch_dict = default_collate(batch)
  labels = F.one_hot(batch_dict["label"], num_classes=len(label_to_id)).float() # Floating point for comparison between probabilities

  return (batch_dict['input_ids'], batch_dict['attention_mask']), labels

def tokenization(example):
    tokenized = tokenizer(
      example["quote"],
      truncation=True,
      padding='max_length',
      max_length=sequence_length
    )
    tokenized['label'] = label_to_id[example['label']]
    return tokenized

train_dataset = train_dataset.map(tokenization, remove_columns=["quote"])
test_dataset = test_dataset.map(tokenization, remove_columns=["quote"])

# Skeleton

## Lightning Module

In [139]:
class LitModule(L.LightningModule):
  def __init__(self, model, lr=0.001, top_k=1):
    super().__init__()
    self.model = model
    self.loss = nn.CrossEntropyLoss() # We deal with a multi-classification : an item belongs to a unique class between Nclasses
    self.accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes, top_k=top_k) 
    self.lr = lr
      
  def training_step(self, batch, batch_idx):
    x, y_true = batch
    y_pred = self.model(x)
    
    loss = self.loss(y_pred, y_true)
    accuracy = self.accuracy(y_pred.argmax(dim=1), y_true)
    
    self.log_dict({
      "train_accuracy": accuracy,
      'train_loss': loss
    }, on_epoch=True, prog_bar=True)
        
    return loss # All the backward stuff is handled with Lightning
    
  def validation_step(self, batch, batch_idx):
    x, y_true = batch
    y_pred = self.model(x)

    loss = self.loss(y_pred, y_true)
    accuracy = self.accuracy(y_pred.argmax(dim=1), y_true)

    self.log_dict({
      "val_accuracy": accuracy,
      'val_loss': loss
    }, on_step=False, on_epoch=True, prog_bar=True)
  
  def test_step(self, batch, batch_idx):
    x, y_true = batch
    y_pred = self.model(x)
    
    loss = self.loss(y_pred, y_true)
    accuracy = self.accuracy(y_pred.argmax(dim=1), y_true)    
    self.log_dict({
      "test_accuracy": accuracy,
      'test_loss': loss
    }, on_epoch=True, prog_bar=True)
    
  def configure_optimizers(self):
    optimizer = torch.optim.RMSprop(self.parameters(), lr=self.lr) # Difference here: parameters are stored at LightningModule-level, not model level
    return optimizer

# Dataset exploration

## Class repartition

In [None]:
num_classes = len(unique_labels)
print(f'Number of classes: {num_classes}')

In [None]:
unique, counts = np.unique(train_dataset['label'], return_counts=True)

px.pie(names=unique, values=counts)

# Models

## DistilBERT

In [None]:
class DistilBERTClassifier(nn.Module):
  def __init__(self, backbone):
    super().__init__()
    self.backbone = backbone
    self.dropout = nn.Dropout(0.5)
    self.linear = nn.Linear(768, num_classes)
      
  def forward(self, x):
    input_ids, attention_mask = x
    
    x = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
    hidden_state = x[0]
    pooler = hidden_state[:, 0]
    x = self.dropout(pooler)
    return F.softmax(self.linear(x), dim=1)

In [None]:
from transformers import DistilBertModel

distilbert_model = DistilBertModel.from_pretrained("distilbert-base-uncased")
distilbert_module = DistilBERTClassifier(distilbert_model)
distilbert = LitModule(distilbert_module)

In [None]:
trainer = L.Trainer(max_epochs=10)
trainer.fit(model=distilbert, train_dataloaders=train_loader, val_dataloaders=val_loader)

### Testing the model

In [None]:
trainer.test(model=distilbert, dataloaders=test_loader)

## GloVe

[Homepage](https://nlp.stanford.edu/projects/glove/)

In [None]:
num_params = '6B'
embedding_dim = 100 # 50, 100, 200 or 300
max_sequence_length = 512

## Loading pre-trained embeddings and create vocabulary

In [None]:
!wget https://nlp.stanford.edu/data/glove.6B.zip
!unzip glove.6B.zip
!python -m spacy download en_core_web_sm

In [None]:
embeddings = {}
glove_path = f'glove.{num_params}.{embedding_dim}d.txt'
with open(glove_path, 'r', encoding='utf-8') as f:
  for line in f:
    values = line.split()
    word = values[0]
    vector = np.asarray(values[1:], dtype='float32')
    embeddings[word] = vector

In [None]:
nlp = spacy.load("en_core_web_sm")
vocabulary = {"<PAD>": 0, "<UNK>": 1}

def extract_from_example(example):
  for token in nlp(example['quote']):
    if token.text.lower() not in vocabulary:
      index = len(vocabulary)
      vocabulary[token.text.lower()] = index

train_dataset.map(extract_from_example)
test_dataset.map(extract_from_example)

vocabulary_size = len(vocabulary)

In [None]:
embedding_matrix = np.zeros((vocabulary_size, embedding_dim))

for word, i in vocabulary.items():
  if word in embeddings.keys():
    embedding_matrix[i] = embeddings[word]
  else:
    embedding_matrix[i] = np.random.normal(scale=0.6, size=embedding_dim)

## Adapting the dataset pipeline

In [None]:
label_to_id

In [135]:
from sklearn.preprocessing import OneHotEncoder

vectorized_map = np.vectorize(label_to_id.get)

def collate_fn(batch):

  # Tokenize all quotes from the batch
  batch_dict = { key: [item[key] for item in batch] for key in batch[0]}
  labels = np.array(batch_dict["label"])
  labels_encoded = torch.LongTensor(vectorized_map(labels)) # Floating point needed for loss calculation
  
  # The last batch often is shorter than batch_size, so we have to dynamically set the size with len(batch)
  token_sequences = torch.zeros((len(batch), max_sequence_length), dtype=torch.long)
  
  for i, text in enumerate(batch_dict['quote']):
    tokens = [vocabulary.get(token.text.lower(), 1) for token in nlp(text)] # Use the "get" method to have a fallback if the word is unknown
    if len(tokens) < max_sequence_length:
      tokens += [0] * (max_sequence_length - len(tokens))
    else:
      tokens = tokens[:max_sequence_length]
    token_sequences[i] = torch.LongTensor(tokens)
    
  return token_sequences, labels_encoded

train_loader.collate_fn = collate_fn
val_loader.collate_fn = collate_fn
test_loader.collate_fn = collate_fn

## Module training

In [151]:
class GloveModule(nn.Module):
  def __init__(self, embeddings, hidden_size=128, num_layers=1, freeze=True):
    super().__init__()
    self.embedding = nn.Embedding.from_pretrained(torch.FloatTensor(embeddings), freeze=freeze, padding_idx=0)
    self.gru = nn.GRU(embedding_dim, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
    self.dropout = nn.Dropout(0.5)
    self.linear = nn.Linear(hidden_size, num_classes)
    
  def forward(self, x):
    x = self.embedding(x)
    _, h_n = self.gru(x)
    x = self.dropout(F.relu(h_n[-1])) # We take the last state, so the state of the last GRU cell
    return self.linear(x)

In [152]:
glove_module = GloveModule(embedding_matrix, num_layers=8)
glove = LitModule(glove_module)

trainer = L.Trainer(max_epochs=10)
trainer.fit(model=glove, train_dataloaders=train_loader, val_dataloaders=val_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name     | Type               | Params | Mode 
--------------------------------------------------------
0 | model    | GloveModule        | 2.4 M  | train
1 | loss     | CrossEntropyLoss   | 0      | train
2 | accuracy | MulticlassAccuracy | 0      | train
--------------------------------------------------------
782 K     Trainable params
1.6 M     Non-trainable params
2.4 M     Total params
9.695     Total estimated model params size (MB)
7         Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]


Detected KeyboardInterrupt, attempting graceful shutdown ...


NameError: name 'exit' is not defined

In [None]:
trainer.test(glove, dataloaders=test_loader)

In [None]:
torch.argmax(glove_module(torch.randint(0, vocabulary_size, (1, 512))), dim=1)