# Tesi di laurea di Simone Persiani

## Imports and constants

In [1]:
import random

import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

Imports from PyTorch:

In [2]:
import torch

from torch.nn import BCELoss, Module, Linear, Dropout
from torch.optim import AdamW, SGD
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.utils.data.sampler import WeightedRandomSampler, SequentialSampler

Constants:

In [3]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}.")

EMBEDDINGS_SIZE = 768 # or 1024 for roberta_large
MODEL_SAVE_PATH = "./model.pt"
RANDOM_SEED = 42

config = {
  "learning_rate": 1e-5,
  "epochs": 4,  # Higher values (6, 8) usually lead to overfitting (based on my results)
  "hidden_layer_size": 64,  # Higher values (128, 256) usually lead to overfitting (based on my results), lower values (32) to underfitting!
  "batch_size": 16,
  "weight_decay": 0.01,  # NEW
  "label_smoothing": 0.05,  # NEW
  "dataset": "augmented",  # "preprocessed" | ["augmented"  # NEW]
  "max_sequence_length": 60  # lower value (30) decreases train/val accuracy (about -1%), higher values (90, 120) don't lead to any improvement
}

Using device: cuda.


Setting random seeds to obtain a deterministic behaviour:

In [4]:
def random_state(seed):
  random.seed(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)
  if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

random_state(RANDOM_SEED)

## Download RoBERTa

In [5]:
from transformers.optimization import get_linear_schedule_with_warmup

Download the model and the tokenizer.

[This work](https://github.com/avramandrei/UPB-SemEval-2020-Task-6) showed that a fine-tuned RoBERTa model is the best-performing variant of BERT.

In [6]:
from transformers import RobertaModel, RobertaTokenizer

roberta_model = RobertaModel.from_pretrained("roberta-base")
roberta_tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
roberta_tokenizer.add_tokens(["<link>", "<equation>"])

Some weights of the model checkpoint at roberta-base were not used when initializing RobertaModel: ['lm_head.dense.bias', 'lm_head.layer_norm.weight', 'lm_head.decoder.weight', 'lm_head.layer_norm.bias', 'lm_head.bias', 'lm_head.dense.weight']
- This IS expected if you are initializing RobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


2

## Loading the dataset

In [12]:
def load_dataset(dataset_path, tokenizer):
  df = pd.read_csv(dataset_path, sep="\t", header=0, encoding='utf-8',
                   names=["SENTENCE", "HAS_DEF"], usecols=["SENTENCE", "HAS_DEF"],
                   dtype={"SENTENCE": str, "HAS_DEF": np.uint8})

  X, y = df["SENTENCE"].tolist(), df["HAS_DEF"].tolist()

  encodings = tokenizer(X, add_special_tokens=True, max_length=config["max_sequence_length"],
                        padding="longest", truncation="longest_first",
                        return_attention_mask=True, return_tensors="pt")

  X = encodings['input_ids'].to(dtype=torch.int32, device='cpu')
  y = torch.tensor(y, dtype=torch.int64, device='cpu')
  mask = encodings['attention_mask'].to(dtype=torch.uint8, device='cpu')

  dataset = TensorDataset(X, y, mask)

  return dataset

train_ds = load_dataset(f'./dataset/{config["dataset"]}/train.tsv', roberta_tokenizer)
val_ds   = load_dataset(f'./dataset/{config["dataset"]}/dev.tsv',   roberta_tokenizer)
test_ds  = load_dataset(f'./dataset/preprocessed/test.tsv',  roberta_tokenizer)

## Dealing with an unbalanced dataset

Preparing a WeightedRandomSampler so that training batches will contain, _on average_, the same amount of positive and negative samples.

**This should address the problem of the unbalanced DEFT dataset. In general, one would expect definitions to be a relatively-rare occurrence in a Natural Language text.**

In [13]:
#def getBalancingWeights(labels):  # NEW
#    n_samples = labels.shape[0]
#    def_samples = torch.sum(labels).item()
#
#    non_def_samples = n_samples - def_samples
#
#    class_weights = {0: def_samples / n_samples,
#                    1: non_def_samples / n_samples}
#    sample_weights = torch.tensor([class_weights[label.item()] for label in labels], dtype=torch.double, device='cpu')
#
#    return sample_weights, class_weights
#
#labels = train_ds.tensors[1]
#sample_weights, class_weights = getBalancingWeights(labels)
#g = torch.Generator()
#g.manual_seed(RANDOM_SEED)
#weighted_sampler = WeightedRandomSampler(weights=sample_weights,
#                                        num_samples=len(train_ds),
#                                        replacement=True, generator=g)
## Param replacement=True means that the same sample can be selected more than once inside a single batch!

#train_loader = DataLoader(dataset=train_ds, batch_size=config["batch_size"], sampler=weighted_sampler)

train_loader = DataLoader(dataset=train_ds, batch_size=config["batch_size"], shuffle=True)
val_loader   = DataLoader(dataset=val_ds, batch_size=config["batch_size"], sampler=SequentialSampler(val_ds))
test_loader  = DataLoader(dataset=test_ds, batch_size=config["batch_size"], sampler=SequentialSampler(test_ds))

**NEW** As an alternative, use weighted loss (by weighing more the errors done on the minority class. Just like the WeightedRandomSampler, this technique must be applied to the training set ONLY (no validation/test)!

In [9]:
n_samples = len(train_ds)
def_samples = torch.sum(train_ds.tensors[1]).item()

non_def_samples = n_samples - def_samples

class_weights = {0: def_samples / n_samples, 1: non_def_samples / n_samples}

def getBATCHBalancingWeights(labels):  # NEW
    sample_weights = torch.tensor([class_weights[label.item()] for label in labels], dtype=torch.double, device=DEVICE)
    return sample_weights

## Label smoothing

In [10]:
def smooth_labels(labels, smoothing = 0.0):  # NEW
    assert 0 <= smoothing < 1

    confidence = 1.0 - smoothing
    uniform_probability = 0.5

    smoothed_true_label = confidence + smoothing * uniform_probability
    smoothed_false_label = smoothing * uniform_probability

    smoothed_labels = torch.tensor([smoothed_true_label if v == 1 else smoothed_false_label for v in labels], dtype=torch.float, device=DEVICE, requires_grad=False)

    return smoothed_labels


## Define the classifier model

In [9]:
class RoBERTaWithMLP(Module):
  """ See: https://github.com/avramandrei/UPB-SemEval-2020-Task-6/blob/77d92e9c386f270af6ed1db259d3ba6e8bde307b/task1/model.py#L49-L80 """
  
  def __init__(self, lang_model, vocab_size, input_size, hidden_size):
    super().__init__()
    
    self.lang_model = lang_model
    self.lang_model.resize_token_embeddings(vocab_size)

    self.linear1 = Linear(input_size, hidden_size)
    self.dropout1 = Dropout(0.8)
    self.linear2 = Linear(hidden_size, hidden_size)
    self.dropout2 = Dropout(0.8)
    self.linear3 = Linear(hidden_size, 1)

  def forward(self, x, mask):
    embeddings = self.lang_model(x, attention_mask=mask)[0]
    embeddings = torch.mean(embeddings, dim=1)

    output = self.dropout1(F.gelu(self.linear1(embeddings)))
    output = self.dropout2(F.gelu(self.linear2(output)))
    output = torch.sigmoid(self.linear3(output))

    return output

## Train the model

In [12]:
def evaluate(model):
  criterion = BCELoss()
  loss, acc, f1 = (0,) * 3
  with torch.no_grad():
    for (val_x, val_y, mask) in val_loader:
      # Move data to the device in use
      val_x = val_x.to(DEVICE)
      val_y = val_y.to(DEVICE)
      mask = mask.to(DEVICE)

      # Forward pass
      output = model.forward(val_x, mask)
      output = torch.reshape(output, (-1,))

      smoothed_labels = smooth_labels(val_y, config['label_smoothing'])
      curr_loss = criterion(output, smoothed_labels)
      # Don't apply weighted loss to validation set!  # NEW

      # Performance evaluation
      pred = torch.tensor([0 if x < 0.5 else 1 for x in output])
      curr_acc = accuracy_score(val_y.cpu(), pred.cpu()) * 100.0
      # curr_prec = precision_score(val_y.cpu(), pred.cpu()) * 100.0
      # curr_rec = recall_score(val_y.cpu(), pred.cpu()) * 100.0
      # curr_f1 = f1_score(val_y.cpu(), pred.cpu()) * 100.0

      loss += float(curr_loss.item())
      acc += float(curr_acc)
      #f1 += float(curr_f1)
      # prec += curr_prec
      # rec += curr_rec
      
    loss /= len(val_loader)
    acc /= len(val_loader)
    # f1 /= len(val_loader)
    # prec /= len(val_loader)
    # rec /= len(val_loader)
    return loss, acc

In [13]:
def train():
  vocab_size = len(roberta_tokenizer) # 50265 + 2

  model = RoBERTaWithMLP(roberta_model,
                         vocab_size,
                         EMBEDDINGS_SIZE,
                         config["hidden_layer_size"]
                         ).to(DEVICE)

  optimizer = AdamW(model.parameters(), lr=config["learning_rate"], weight_decay=config["weight_decay"])
  #optimizer = SGD(model.parameters(), lr=config["learning_rate"], weight_decay=config["weight_decay"], nesterov=True, momentum=0.9)

  total_steps = len(train_loader) * config["epochs"]
  scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = 0, num_training_steps = total_steps)

  criterion = BCELoss(reduction='none')  # NEW
  best_acc = 0

  for epoch in range(config["epochs"]):
    model.train()

    loss, acc, f1, prec, rec = (0,) * 5

    for i, (train_x, train_y, mask) in enumerate(train_loader):
      # Move data to the device in use
      train_x = train_x.to(DEVICE)
      train_y = train_y.to(DEVICE)
      mask = mask.to(DEVICE)

      # Forward pass
      output = model.forward(train_x, mask)
      output = torch.reshape(output, (-1,))

      smoothed_labels = smooth_labels(train_y, config['label_smoothing'])
      curr_loss = criterion(output, smoothed_labels)
      weights = getBATCHBalancingWeights(train_y)  # NEW
      curr_loss = torch.mean(weights*curr_loss)  # NEW

      # Backward pass
      optimizer.zero_grad()
      curr_loss.backward()

      # Parameters update
      optimizer.step()
      scheduler.step()

      # Performance evaluation
      pred = torch.tensor([0 if x < 0.5 else 1 for x in output])
      curr_acc = accuracy_score(train_y.cpu(), pred.cpu()) * 100.0
      # curr_f1 = f1_score(train_y.cpu(), pred.cpu()) * 100.0
      # curr_prec = precision_score(train_y.cpu(), pred.cpu()) * 100.0
      # curr_rec = recall_score(train_y.cpu(), pred.cpu()) * 100.0

      loss += float(curr_loss.item())
      acc += float(curr_acc)
      # f1 += curr_f1
      # prec += curr_prec
      # rec += curr_rec

    model.eval()
    loss, acc = evaluate(model)

    if acc > best_acc:
      print(f"[EPOCH {epoch + 1}] Accuracy score improved from {round(best_acc, 2)} -> {round(acc, 2)}. Saving model...", end="")
      best_acc = acc
      torch.save(model, MODEL_SAVE_PATH)
      print(" DONE!")
    else:
      print(f"[EPOCH {epoch + 1}] Accuracy score didn't improved... best value is {round(best_acc, 2)} while current result is {round(acc, 2)}.")


  return model

trained_model = train()

[EPOCH 1] Accuracy score improved from 0 -> 85.8. Saving model... DONE!
[EPOCH 2] Accuracy score improved from 85.8 -> 85.86. Saving model... DONE!
[EPOCH 3] Accuracy score improved from 85.86 -> 86.23. Saving model... DONE!
[EPOCH 4] Accuracy score improved from 86.23 -> 86.53. Saving model... DONE!


In [14]:
import gc
gc.collect()

if DEVICE.type == "cuda":
    torch.cuda.empty_cache()