# Sentiment Analysis - NLU

## Version: **BERT - Transformers**  
<img src="https://huggingface.co/front/assets/huggingface_logo-noborder.svg" alt="hugging-face" width="50"/>

Student: Francesco Laiti

---

This notebook contains the source code to import, fine tune and evaluate a Transformer-based sentiment analysis model using Hugging Face tools.

## Pre requirements

Define the requirements to run correctly the notebook and load properly the datasets.

In [None]:
import torch
import transformers
import nltk

import numpy 
from tqdm import tqdm
import yaml
import os
import random

from nltk.corpus import movie_reviews
from nltk.corpus import subjectivity

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix

import wandb
wandb.login()

nltk.download('movie_reviews')
nltk.download('subjectivity')

In this notebook we use three BERT-based models from Hugging Face:
- **Tokenizer**. A pretrained model BERT base uncased on English language available at [huggingface.co/bert-base-uncased](https://huggingface.co/bert-base-uncased);
- **Subjectivity task**. A finetuned BERT base uncased on the Wiki Neutrality Corpus, a parallel corpus of 180,000 articles labeled as ”neutral” or ”biased”, available at [huggingface.co/cffl/bert-base-styleclassification-subjective-neutral](https://huggingface.co/cffl/bert-base-styleclassification-subjective-neutral);
- **Polarity task**. A finetuned BERT base uncased on the Amazon polarity dataset, available at [huggingface.co/fabriceyhc/bert-base-uncased-amazon_polarity](https://huggingface.co/fabriceyhc/bert-base-uncased-amazon_polarity).

Declare the global constants used in this notebook.

In [2]:
# dataset
BATCH_SIZE_SUBJ = 128
BATCH_SIZE_POL = 16
BATCH_SIZE_POL_FILTER = 32
N_SPLIT = 5 # default value of Stratified K-Fold
K_FOLD = 0 # random.randint(0,4) # pick up random fold of Stratified K-Fold

# models
MODEL_TOKENIZER = 'bert-base-uncased'
MODEL_SUBJECTIVITY = 'cffl/bert-base-styleclassification-subjective-neutral'
MODEL_POLARITY = 'fabriceyhc/bert-base-uncased-amazon_polarity'

# training (references: https://huggingface.co/transformers/v3.2.0/custom_datasets.html)
EPOCHS = 10
OPTIMIZER = 'AdamW' 
LR = 5e-5
WD = 0.01
GAMMA = 0.1
PATIENCE = 3
N_AFTER_COMMA = 3

# env
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# wandb settings
PROJECT_NAME = "sentiment-analysis-nlu"
ENTITY = "laitifrancesco"
MODE_WANDB = "disabled" # "online"

# paths
WEIGHTS_PATH = 'weights/transformer/'

# for reproducibility
g = torch.Generator().manual_seed(0)
RANDOM_STATE = 42

### Utility functions

In [3]:
def get_complete_saved_path(weights_path, config):
    weights_path += config.dataset + '/' + config.saved_model.replace('/','_') + '/'
    return weights_path

In [4]:
# generate same config provided by wandb in order to avoid multiple def for each case
class Mapping_as_dict(dict):
    def __setitem__(self, key, item):
        self.__dict__[key] = item
    def __getitem__(self, key):
        return self.__dict__[key]

## Dataset & Dataloader

The datasets used in this notebook are ``subjectivity`` and ``movie_reviews``, both imported from NLTK library.

We create a stratified k-fold validator. To access to different fold (in this case a 5-fold), simply change the ``K_FOLD`` parameter.

In [5]:
from sklearn.model_selection import StratifiedKFold

def make_stratified_k_fold(data):
    cv = StratifiedKFold(n_splits=N_SPLIT, shuffle=True, random_state=RANDOM_STATE)
    fold = list(cv.split(data['corpus'], data['labels']))

    return fold

def pick_k_fold(data, labels, fold, k_fold = 0):
    fold = fold[k_fold]
    dataset = numpy.array(data, dtype=numpy.object0)
    labels = numpy.array(labels)

    X_train = dataset[fold[0]]
    y_train = labels[fold[0]]

    X_test = dataset[fold[1]]
    y_test = labels[fold[1]]

    return X_train, X_test, y_train, y_test

In [6]:
def get_subj_data():
    subj = subjectivity.sents(categories='subj')
    obj = subjectivity.sents(categories='obj')
    
    corpus = subj + obj
    labels = numpy.array([0] * len(subj) + [1] * len(obj)) # "0": "SUBJECTIVE", "1": "NEUTRAL"

    return {'corpus': corpus, 'labels': labels}

def get_pol_data():
    neg = movie_reviews.paras(categories='neg')
    pos = movie_reviews.paras(categories='pos')
    
    corpus = neg + pos
    labels = numpy.array([0] * len(neg) + [1] * len(pos)) # "0": "NEGATIVE", "1": "POSITIVE"

    return {'corpus': corpus, 'labels': labels}

In [7]:
class SA_Dataset(torch.utils.data.Dataset):
    def __init__(self, corpus, labels, model_name, name_dataset = ''):
        tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)
        if name_dataset == 'subjectivity':
            self.tokens = tokenizer([self._low2str(d) for d in corpus], padding=True, truncation=True, return_tensors="pt") # return as dictionary the attention mask and input_ids # max_length=512
        elif name_dataset == 'polarity':
            self.tokens = tokenizer([self._lol2str(d) for d in corpus], padding=True, truncation=True, return_tensors="pt") # return as dictionary the attention mask and input_ids # max_length=512
        else:
            raise NameError(f'Name of the dataset {self.name_dataset} not valid. Please, choose between subjectivity or polarity.')
        self.labels = labels    
    
    def __getitem__(self, idx):
        # .detach().clone() requested from PyTorch, addressed here https://stackoverflow.com/questions/55266154/pytorch-preferred-way-to-copy-a-tensor
        item = {key: val[idx].detach().clone() for key, val in self.tokens.items()} # 2 keys: 'input_ids' & 'attention_mask' 
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)
    
    def _lol2str(self, doc): # lol: list of list of word
        return " ".join([w for sent in doc for w in sent])

    def _low2str(self, sent): # low: list of word
        return " ".join([w for w in sent])

With this function we return two dataloaders that correspond respectively to train and test dataloader of the dataset.

This function is used in each task section instantiation.

In [8]:
def prepare_dataset(data, fold, batch_size, k_fold, model_name, name_dataset):
    X_train, X_test, y_train, y_test = pick_k_fold(data['corpus'], data['labels'], fold, k_fold)

    train_data = SA_Dataset(X_train, y_train, model_name, name_dataset)
    test_data = SA_Dataset(X_test, y_test, model_name, name_dataset)
    
    train_dataloader = torch.utils.data.DataLoader(train_data, batch_size, shuffle=True, generator=g)
    test_dataloader = torch.utils.data.DataLoader(test_data, batch_size, shuffle=True, generator=g)

    print(f'{name_dataset} [K_FOLD = {k_fold}] data loaded.')
    return train_dataloader, test_dataloader

We now create a 2-keys dictionary for each dataset to store the data in a organized way. Each dictionary has two keys:
- ``corpus`` store the text data of the dataset;
- ``labels`` store the ground truth labels.

In [9]:
# Create a 2-keys (corpus and labels) dictionary for subjectivity and movie reviews datasets.
dict_subj = get_subj_data()
dict_pols = get_pol_data()

# Make Stratified K-Fold for each dataset
fold_subj = make_stratified_k_fold(dict_subj)
fold_pols = make_stratified_k_fold(dict_pols)

## Train & Evaluation

In [10]:
def training_step(model, optimizer, train_loader):
    model.train()

    cumulative_loss = 0.
    cumulative_accuracy = 0.
    cumulative_f1 = 0.

    for batch in train_loader:
        input_ids = batch['input_ids'].to(DEVICE)
        attention_mask = batch['attention_mask'].to(DEVICE)
        labels = batch['labels'].to(DEVICE)

        optimizer.zero_grad()

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        
        loss = outputs.loss # outputs Cross Entropy loss by default
        loss.backward()
        optimizer.step()

        predictions = torch.nn.functional.softmax(outputs.logits, dim=1)
        pred_labels = torch.argmax(predictions, dim=1)
        
        cumulative_loss += loss.item()
        cumulative_accuracy += accuracy_score(labels.cpu().detach().numpy(), pred_labels.cpu().detach().numpy())
        cumulative_f1 += f1_score(labels.cpu().detach().numpy(), pred_labels.cpu().detach().numpy(), zero_division=1)

    return {"train/train_acc":(cumulative_accuracy/len(train_loader)), 
            "train/train_loss": cumulative_loss/len(train_loader),
            "train/train_f1": cumulative_f1/len(train_loader)}

In [11]:
def evaluating_step(model, test_loader):
    model.eval()
    
    cumulative_loss = 0.
    cumulative_accuracy = 0.
    cumulative_f1 = 0.

    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(DEVICE)
            attention_mask = batch['attention_mask'].to(DEVICE)
            labels = batch['labels'].to(DEVICE)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

            loss = outputs.loss # Cross Entropy loss by default

            predictions = torch.nn.functional.softmax(outputs.logits, dim=1)
            pred_labels = torch.argmax(predictions, dim=1)

            cumulative_loss += loss.item()
            cumulative_accuracy += accuracy_score(labels.cpu().detach().numpy(), pred_labels.cpu().detach().numpy())
            cumulative_f1 += f1_score(labels.cpu().detach().numpy(), pred_labels.cpu().detach().numpy())
    
    return {"test/test_acc": (cumulative_accuracy/len(test_loader)), 
            "test/test_loss": cumulative_loss/len(test_loader), 
            "test/test_f1": cumulative_f1/len(test_loader)}

In [12]:
def training_loop(wandb_run, train_data, test_data):

    config = wandb_run.config
    print('\nCONFIGS\n', yaml.dump(config._items, default_flow_style=False)) # pretty print of configs used

    run_epochs = 0
    best_acc = 0.
    best_loss = 0.
    best_f1 = 0.
    patience = PATIENCE

    model = transformers.AutoModelForSequenceClassification.from_pretrained(config.model, num_labels = 2, ignore_mismatched_sizes=True) # Reference: https://discuss.huggingface.co/t/how-do-i-change-the-classification-head-of-a-model/4720/28
    optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WD)
    weights_path = get_complete_saved_path(WEIGHTS_PATH, config)
    model.to(DEVICE)

    for e in range(config.epochs):
        print(f'-- Epoch [{e+1}/{config.epochs}] --')
        train_metrics = training_step(model, optimizer, train_data)
        test_metrics = evaluating_step(model, test_data)
        wandb.log({**train_metrics, **test_metrics})
        print(f'Train -> \tLoss:{train_metrics["train/train_loss"]:.5f} \tAccuracy: {train_metrics["train/train_acc"]:.2f} \tF1-Score: {train_metrics["train/train_f1"]:.2f}')
        print(f'Test -> \tLoss:{test_metrics["test/test_loss"]:.5f} \tAccuracy: {test_metrics["test/test_acc"]:.2f} \tF1-Score: {test_metrics["test/test_f1"]:.2f}')
        
        if (best_acc < test_metrics["test/test_acc"]):
            model.save_pretrained(weights_path)
            best_acc = test_metrics["test/test_acc"]
            best_loss = test_metrics["test/test_loss"]
            best_f1 = test_metrics["test/test_f1"]
        else: patience -= 1
        
        if patience < 0: # Early stopping with patience
            run_epochs = e+1
            print(f'\nEarly stopping with PATIENCE = {PATIENCE}. Model trained for {run_epochs}/{config.epochs} epochs')
            break # Not nice but it keeps the code clean

    print('Model saved in location ', weights_path, '\n\n')
    wandb.summary["test_best_acc"] = best_acc
    wandb.summary["test_best_loss"] = best_loss
    wandb.summary["test_best_f1"] = best_f1
    wandb.summary["real_run_epochs"] = run_epochs
    wandb.finish()

    return best_acc, best_f1

## Train subjectivity classifier

We now finetune and evaluate a subjectivity detector.

In [None]:
config={
        "dataset": "subjectivity",
        "model": MODEL_SUBJECTIVITY,
        "saved_model": 'ft_' + MODEL_SUBJECTIVITY,
        "batch_size": BATCH_SIZE_SUBJ,
        "epochs": EPOCHS,
        "lr": LR,
        "optimizer": OPTIMIZER,
    }

NAME_RUN = "subj-transformers-"

train_subj, test_subj = prepare_dataset(dict_subj, fold_subj, BATCH_SIZE_SUBJ, K_FOLD, MODEL_TOKENIZER, 'subjectivity')
run = wandb.init(project=PROJECT_NAME, entity=ENTITY, name=NAME_RUN + str(K_FOLD), config=config, mode=MODE_WANDB)
training_loop(run, train_subj, test_subj) 

## Train no-filter sents polarity classifier

We now finetune and evaluate a polarity classifier **without** removing subjective sentences from movie reviews.

In [None]:
config={
        "dataset": "polarity",
        "version": "no-filter",
        "model": MODEL_POLARITY,
        "saved_model": 'nofilter_' + MODEL_POLARITY,
        "batch_size": BATCH_SIZE_POL,
        "epochs": EPOCHS,
        "lr": LR,
        "optimizer": OPTIMIZER
    }

NAME_RUN = "pol-no-filter-transformers-"

train_pol, test_pol   = prepare_dataset(dict_pols, fold_pols, BATCH_SIZE_POL, K_FOLD, MODEL_TOKENIZER, 'polarity')
run = wandb.init(project=PROJECT_NAME, entity=ENTITY, name=NAME_RUN + str(K_FOLD), config=config, mode=MODE_WANDB)
training_loop(run, train_pol, test_pol) 

## Filter sentences using subjectivity classifier

We now use the subjectivity detector to remove the subjective sentences from the movie reviews dataset to hopefully obtain better quality data to process.

In [19]:
import pandas

def remove_subj_sents(data, label, classifier):
    filtered = []

    len_doc = []
    len_subj = []
    len_sent_removed = []

    for doc in tqdm(data):
        polarity_data = SA_Dataset(doc, numpy.array([label]*len(doc)), MODEL_TOKENIZER, name_dataset='subjectivity') # trated as a subjectivity dataset because at the end has small number word per sentences
        polarity_dl = torch.utils.data.DataLoader(polarity_data, BATCH_SIZE_SUBJ, shuffle=False, generator=g)

        classify_labels = filter_step(classifier, polarity_dl)
        sents = [d for d, estimate in zip(doc, classify_labels) if estimate == 0] # 0 = subj

        if len(sents) > 0:
            filtered.append(sents)

        len_doc.append(len(doc))
        len_subj.append(len(sents))
        len_sent_removed.append(len(doc)-len(sents))

    df = pandas.DataFrame({'doc':len_doc, 'subj-sents':len_subj, 'sents-removed':len_sent_removed})
    df.to_csv(f'{WEIGHTS_PATH}statistics_label_{label}.csv', index=False)

    return filtered

def filter_step(model, filtered_loader):
    model.eval()

    sbj_labels = []
    with torch.no_grad():
        for batch in filtered_loader:
            input_ids = batch['input_ids'].to(DEVICE)
            attention_mask = batch['attention_mask'].to(DEVICE)

            outputs = model(input_ids, attention_mask=attention_mask)
            
            predictions = torch.nn.functional.softmax(outputs.logits, dim=1)
            pred_labels = torch.argmax(predictions, dim=1)
            
            sbj_labels += pred_labels
    
    return sbj_labels

def filter_sentences():
    config = Mapping_as_dict()
    config.dataset = 'subjectivity'
    config.saved_model = 'ft_' + MODEL_SUBJECTIVITY

    weights_path = get_complete_saved_path(WEIGHTS_PATH, config)
    classifier = transformers.AutoModelForSequenceClassification.from_pretrained(weights_path)
    classifier.to(DEVICE)

    neg = movie_reviews.paras(categories='neg')
    pos = movie_reviews.paras(categories='pos')
    
    neg_sents = remove_subj_sents(neg, 0, classifier)
    pos_sents = remove_subj_sents(pos, 1, classifier)
    
    return neg_sents, pos_sents

We filter out the subjective sentences from the ```movie_reviews``` dataset and save them in a ```.pkl``` file for an easy access.

In [None]:
import pickle

filtered_saved_path = WEIGHTS_PATH + 'filtered_polarity_sents.pkl'
dict_pols_filtered = {}

if not os.path.exists(filtered_saved_path): # if the .pkl file is not available, generate it
    print('Creating .pkl with filtered sentences')
    
    neg_filtered, pos_filtered = filter_sentences()
    dict_pols_filtered = {'corpus': neg_filtered + pos_filtered, 'labels': numpy.array([0] * len(neg_filtered) + [1] * len(pos_filtered))}
    
    with open(filtered_saved_path, 'wb') as f:
        pickle.dump(dict_pols_filtered, f)
        print('Saved at location ', filtered_saved_path)

else:
    print('Using .pkl with filtered sentences from ', filtered_saved_path)
    with open(filtered_saved_path, 'rb') as f:
        dict_pols_filtered = pickle.load(f)

In [14]:
def prepare_polarity_filtered_data(filter_dict, fold, k_fold):
    X_train, X_test, y_train, y_test = pick_k_fold(filter_dict['corpus'], filter_dict['labels'], fold, k_fold)
    
    train_data = SA_Dataset(X_train, y_train, MODEL_TOKENIZER, name_dataset='polarity')
    test_data = SA_Dataset(X_test, y_test, MODEL_TOKENIZER, name_dataset='polarity')
    
    train_dataloader = torch.utils.data.DataLoader(train_data, BATCH_SIZE_POL_FILTER, shuffle=True, generator=g)
    test_dataloader = torch.utils.data.DataLoader(test_data, BATCH_SIZE_POL_FILTER, shuffle=True, generator=g)

    print(f'Filtered polarity [K_FOLD = {k_fold}] data loaded.')
    return train_dataloader, test_dataloader

## Train filter sents polarity classifier

We now finetune and evaluate a polarity classifier **with** removed subjective sentences from movie reviews.

In [None]:
config={
        "dataset": "polarity",
        "version": "filter",
        "model": MODEL_POLARITY,
        "saved_model": 'filter_' + MODEL_POLARITY,
        "batch_size": BATCH_SIZE_POL,
        "epochs": EPOCHS,
        "lr": LR,
        "optimizer": OPTIMIZER
    }
    
NAME_RUN = "pol-filter-transformers-"

fold_filtered_pols = make_stratified_k_fold(dict_pols_filtered)

train_pol_filter, test_pol_filter = prepare_polarity_filtered_data(dict_pols_filtered, fold_filtered_pols, K_FOLD)
run = wandb.init(project=PROJECT_NAME, entity=ENTITY, name=NAME_RUN + str(K_FOLD), config=config, mode=MODE_WANDB)
training_loop(run, train_pol_filter, test_pol_filter)

## Extra: Attention visualizer using BertViz

``BertViz`` is an interactive tool for visualizing attention in Transformer language models.

More info available in the official GitHub repository https://github.com/jessevig/bertviz

In [None]:
# easy installation via pip
!pip3 install bertviz

In [3]:
from bertviz import model_view, head_view

def generate_interactive_view(input_text, model_name = MODEL_SUBJECTIVITY, show_model_view = True, show_head_view = True):
    model = transformers.AutoModelForSequenceClassification.from_pretrained(model_name, output_attentions=True)  # Configure model to return attention values
    tokenizer = transformers.AutoTokenizer.from_pretrained(MODEL_TOKENIZER)
    inputs = tokenizer.encode(input_text, return_tensors='pt')
    outputs = model(inputs) 
    attention = outputs[-1]  # Retrieve attention from model outputs
    tokens = tokenizer.convert_ids_to_tokens(inputs[0])  # Convert input ids to token strings
    print(tokens)
    if show_head_view:  head_view(attention, tokens)
    if show_model_view: model_view(attention, tokens)

In [5]:
obj = subjectivity.sents(categories='obj')
subj = subjectivity.sents(categories='subj')

neg = movie_reviews.paras(categories='neg')
pos = movie_reviews.paras(categories='pos')

input_text = " ".join([w for w in subj[11]])

print(subj[11])
generate_interactive_view(input_text, 'weights/transformer/subjectivity/ft_cffl_bert-base-styleclassification-subjective-neutral', show_model_view = False, show_head_view = True)

['directed', 'by', 'david', 'twohy', 'with', 'the', 'same', 'great', 'eye', 'for', 'eerie', 'understatement', 'that', 'he', 'brought', 'to', 'pitch', 'black', '.']
['[CLS]', 'directed', 'by', 'david', 'two', '##hy', 'with', 'the', 'same', 'great', 'eye', 'for', 'eerie', 'under', '##sta', '##tem', '##ent', 'that', 'he', 'brought', 'to', 'pitch', 'black', '.', '[SEP]']


<IPython.core.display.Javascript object>