# RoBERTa Fine Tunning For Helpfulness Prediction

# Load Datasets

## Import Needed Libraries

In [None]:
import warnings
warnings.filterwarnings('ignore')
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

## Read CSV Files

In [None]:
train_df = pd.read_csv("train.csv")
train_df

In [None]:
train_df["label"].value_counts()

In [None]:
test_df = pd.read_csv("test.csv")
test_df

In [None]:
test_df["label"].value_counts()

## Build PyTorch Dataset Class

In [None]:
from torch.utils.data import Dataset
from transformers import AutoTokenizer
import torch

In [None]:
class Helpfulness_Dataset(Dataset):
    
    def __init__(self, data, tokenizer, attributes, max_token_len):
        self.data = data
        self.tokenizer = tokenizer
        self.attributes = attributes
        self.max_token_len = max_token_len
        
    def __len__(self):
        return len(self.data)
        
    def __getitem__(self, index):
        item = self.data.iloc[index]
        text = item.text
        attributes = torch.FloatTensor(item[self.attributes])
        tokens = self.tokenizer.encode_plus(text,
                                            add_special_tokens=True,
                                            return_tensors="pt",
                                            truncation=True,
                                            max_length=self.max_token_len,
                                            padding="max_length",
                                            return_attention_mask=True)
        
        return {
            "input_ids": tokens.input_ids.flatten(),
            "attention_mask": tokens.attention_mask.flatten(),
            "labels": attributes
        }

## Create Train And Test PyTorch Datasets

In [None]:
attributes = ["helpful", "unhelpful"]
model_name = "roberta-base"
max_token_length = 512
tokenizer = AutoTokenizer.from_pretrained(model_name)
hfs_ds_train = Helpfulness_Dataset(train_df, tokenizer, attributes, max_token_length)
hfs_ds_val = Helpfulness_Dataset(test_df, tokenizer, attributes, max_token_length)

## Tokization Example

### Helpful Review

In [None]:
hfs_ds_train.data.iloc[0].text

In [None]:
hfs_ds_train.__getitem__(0)

### Unhelpful Review

In [None]:
hfs_ds_val.data.iloc[0].text

In [None]:
hfs_ds_val.__getitem__(0)

# 2. Data Module

## Import Needed Libraries

In [None]:
import pytorch_lightning as pl
from torch.utils.data import DataLoader

## Creating PyTorch Data Module

In [None]:
class Helpfulness_Data_Model(pl.LightningDataModule):
    
    def __init__(self, attributes, batch_size, max_token_length, model_name):
        super().__init__()
        self.attributes = attributes
        self.batch_size = batch_size
        self.max_token_length = max_token_length
        self.model_name = model_name
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        
    def setup(self, stage = None):
        if stage in (None, "fit"):
            self.train_dataset = Helpfulness_Dataset(train_df, tokenizer, attributes, 512)
            self.val_dataset = Helpfulness_Dataset(test_df, tokenizer, attributes, 512)
        if stage == "predict":
            self.val_dataset = Helpfulness_Dataset(test_df, tokenizer, attributes, 512)
    
    def train_dataloader(self):
        return DataLoader(self.train_dataset,
                          batch_size=self.batch_size,
                          num_workers=4,
                          shuffle=True)
    
    def val_dataloader(self):
        return DataLoader(self.val_dataset,
                          batch_size=self.batch_size,
                          num_workers=4,
                          shuffle=False)
    
    def predict_dataloader(self):
        return DataLoader(self.val_dataset,
                          batch_size=self.batch_size,
                          num_workers=4,
                          shuffle=False)

## Create Instance Of Our Data Module And Set It Up

In [None]:
attributes = ["helpful", "unhelpful"]
model_name = "roberta-base"
batch_size = 8
max_token_length = 512
hfs_data_module = Helpfulness_Data_Model(attributes, batch_size, max_token_length, model_name)
hfs_data_module.setup()
dl = hfs_data_module.train_dataloader()

## Number Of Batches

In [None]:
len(dl)

# 3. Model

## Import Needed Libraries

In [None]:
from transformers import AutoModel, AdamW, get_cosine_schedule_with_warmup
import torch.nn as nn
import math
from torchmetrics.functional.classification import auroc
import torch.nn.functional as F

## Helpfulness Classifier Class

In [None]:
class Helpfulness_Classifier(pl.LightningModule):
    
    def __init__(self, config: dict):
        super().__init__()
        self.config = config
        self.pretrained_model = AutoModel.from_pretrained(config["model_name"], return_dict=True)
        self.hidden= nn.Linear(self.pretrained_model.config.hidden_size, self.pretrained_model.config.hidden_size)
        self.classification = nn.Linear(self.pretrained_model.config.hidden_size, self.config["n_labels"])
        torch.nn.init.xavier_uniform_(self.hidden.weight)
        torch.nn.init.xavier_uniform_(self.classification.weight)
        self.loss_fun = nn.BCEWithLogitsLoss(reduction="mean")
        self.dropout = nn.Dropout()
        
    def forward(self, input_ids, attention_mask, labels=None):
        output = self.pretrained_model(input_ids = input_ids, attention_mask = attention_mask)
        pooled_output = torch.mean(output.last_hidden_state, 1)
        pooled_output = self.hidden(pooled_output)
        pooled_output = self.dropout(pooled_output)
        pooled_output = F.relu(pooled_output)
        logits = self.classification(pooled_output)
        loss = 0 
        if labels is not None:
            loss = self.loss_fun(logits.view(-1, self.config["n_labels"]), labels.view(-1, self.config["n_labels"]))
        return loss, logits
    
    def training_step(self, batch, batch_index):
        loss, logits = self(**batch)
        self.log("train loss", loss, prog_bar=True, logger=True)
        return {"loss": loss, "predictions": logits, "labels": batch["labels"]}
    
    def validations_step(self, batch, batch_index):
        loss, logits = self(**batch)
        self.log("validation loss", loss, prog_bar=True, logger=True)
        return {"val_loss": loss, "predictions": logits, "labels": batch["labels"]}
    
    def predict_step(self, batch, batch_index):
        _, logits = self(**batch)
        return logits
    
    def configure_optimizers(self):
        optimizer = AdamW(self.parameters(), lr=self.config["lr"], weight_decay=self.config["w_decay"])
        total_steps = self.config["train_size"] / self.config["bs"]
        warmup_steps = math.floor(total_steps * self.config["warmup"])
        scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps)
        return [optimizer], [scheduler]
    

In [None]:
config = {
    "model_name": "roberta-base",
    "n_labels": len(attributes),
    "bs": 8,
    "lr": 2e-5,
    "warmup": 0.2,
    "w_decay": 0.001,
    "train_size": len(hfs_data_module.train_dataloader()),
    "n_epochs": 4
}

model = Helpfulness_Classifier(config)

In [None]:
idx = 0

input_ids = hfs_ds_train.__getitem__(idx)["input_ids"]
am = hfs_ds_train.__getitem__(idx)["attention_mask"]
labels = hfs_ds_train.__getitem__(idx)["labels"]

loss, output = model(input_ids.unsqueeze(dim=0), am.unsqueeze(dim=0), labels.unsqueeze(dim=0))

In [None]:
loss, output

## Train

In [None]:
torch.set_float32_matmul_precision('medium')
hfs_data_module = Helpfulness_Data_Model(attributes, config["bs"], max_token_length, model_name)
hfs_data_module.setup()

model = Helpfulness_Classifier(config)

trainer = pl.Trainer(max_epochs=config["n_epochs"], num_sanity_val_steps=50)
trainer.fit(model, hfs_data_module)

## Predict / Eval

In [None]:
def classify_reviews(model, dm):
    preictions = trainer.predict(model, datamodule=dm)
    flattened_prediction = np.stack([torch.sigmoid(torch.Tensor(p)) for batch in preictions for p in batch])
    return flattened_prediction

In [None]:
hfs_data_module.val_dataset.data[["helpful", "helpful"]]

In [None]:
predictions = classify_reviews(model, hfs_data_module)

In [None]:
helpful_array = hfs_data_module.val_dataset.data[["helpful", "unhelpful", "_id"]]
helpful_array

In [None]:
pd.DataFrame(predictions)

In [None]:
def generate_confusion_matrix(predictions, ground_truth_labels):
    
    tp, fp, tn, fn, low_confidence = 0, 0, 0, 0, 0
    
    for i in range(len(predictions)):
        helpful = 1 if predictions.iloc[i][0] >= 0.5 else 0
        unhelpful = 1 if predictions.iloc[i][1] >= 0.5 else 0
        if helpful == 1 and helpful == ground_truth_labels.iloc[i]["helpful"]:
            tp += 1 
        elif helpful == 1 and helpful != ground_truth_labels.iloc[i]["helpful"]:
            fp += 1
        elif unhelpful == 1 and unhelpful == ground_truth_labels.iloc[i]["unhelpful"]:
            tn += 1
        elif unhelpful == 1 and unhelpful != ground_truth_labels.iloc[i]["unhelpful"]:
            fn += 1
        else:
            low_confidence += 1
            
    return tp, fp, tn, fn, low_confidence

In [None]:
tp, fp, tn, fn, low_confidence = generate_confusion_matrix(pd.DataFrame(predictions), hfs_data_module.val_dataset.data[["helpful", "unhelpful"]])
print(f"True Positive = {tp}")
print(f"False Positive = {fp}")
print(f"True negative = {tn}")
print(f"False negative = {fn}")
print(f"Low Confidence = {low_confidence}")
print(f"Total = {tp + fp + tn + fn + low_confidence}")
with open("Results/RoBERTa-Base-Met.txt", "w") as f:
    f.write(f"True Positive = {tp}\n")
    f.write(f"False Positive = {fp}\n")
    f.write(f"True negative = {tn}\n")
    f.write(f"False negative = {fn}\n")
    f.write(f"Low Confidence = {low_confidence}")
    f.write(f"Total = {tp + fp + tn + fn + low_confidence}")

In [None]:
def calculate_metrics(tp, fp, tn, fn):
    accuracy = (tp + tn) / (tp + fp + tn + fn) if (tp + fp + tn + fn) != 0 else 0.0
    precision = tp / (tp + fp) if (tp + fp) != 0 else 0.0
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0.0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) != 0 else 0.0
    specificity = tn / (tn + fp) if (tn + fp) != 0 else 0.0
    sensitivity = recall  # Sensitivity is another name for Recall
    false_positive_rate = fp / (fp + tn) if (fp + tn) != 0 else 0.0
    
    return accuracy, precision, recall, f1_score, specificity, sensitivity, false_positive_rate

In [None]:
accuracy, precision, recall, f1_score, specificity, sensitivity, false_positive_rate = calculate_metrics(tp, fp, tn, fn)
print(f"Accuracy = {accuracy}")
print(f"Precision = {precision}")
print(f"Recall = {recall}")
print(f"F1 Score = {f1_score}")
print(f"Specificity = {specificity}")
print(f"Sensitivity = {sensitivity}")
print(f"False Positive Rate = {false_positive_rate}")
with open("Results/RoBERTa-Base-Results.txt", "w") as f:
    f.write(f"Accuracy = {accuracy}\n")
    f.write(f"Precision = {precision}\n")
    f.write(f"Recall = {recall}\n")
    f.write(f"F1 Score = {f1_score}\n")
    f.write(f"Specificity = {specificity}\n")
    f.write(f"Sensitivity = {sensitivity}\n")
    f.write(f"False Positive Rate = {false_positive_rate}\n")