In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaModel, RobertaTokenizer
from torchmetrics.classification import (
    MulticlassF1Score,
    MulticlassPrecision,
    MulticlassRecall,
)
from tqdm import tqdm
import pandas as pd
from torchviz import make_dot

In [2]:
from utils import (
    LABEL_MAPPING,
    ids2labels,
    save_checkpoint,
    load_checkpoint,
    save_best_model,
    load_best_model,
    save_model_remotely
)

In [3]:
one_hot_labels = {
    "sentiment": ['negative', 'neutral', 'positive'],
	"question": ['not_question', 'question'],
	"curse": ['curse', 'non-curse'],
	"emotion": ['anger', 'disgust', 'fear', 'joy', 'neutral', 'sadness', 'surprise'],
	"gibberish": ['clean', 'mild gibberish', 'word salad'],
	"offensiveness": ['non-offensive', 'offensive'],
	"political_bias": ['CENTER', 'LEFT', 'RIGHT']
}

label_to_index = {
    "sentiment": {label: idx for idx, label in enumerate(one_hot_labels["sentiment"])},
	"question": {label: idx for idx, label in enumerate(one_hot_labels["question"])},
	"curse": {label: idx for idx, label in enumerate(one_hot_labels["curse"])},
	"emotion": {label: idx for idx, label in enumerate(one_hot_labels["emotion"])},
	"gibberish": {label: idx for idx, label in enumerate(one_hot_labels["gibberish"])},
	"offensiveness": {label: idx for idx, label in enumerate(one_hot_labels["offensiveness"])},
	"political_bias": {label: idx for idx, label in enumerate(one_hot_labels["political_bias"])}
}

one_hot_metadata_size = sum([len(x) for x in one_hot_labels.values()])

In [15]:
class LiarPlusSingleRobertaDataset(Dataset):
    def __init__(
        self,
        filepath: str,
        tokenizer,
        str_metadata_cols: list[str],
        num_metadata_cols: list[str],
#        one_hot_metadata_cols: list[str],
        max_length: int = 512,
    ):
        self.df = pd.read_csv(filepath)

        self.str_metadata_cols = str_metadata_cols
        self.num_metadata_cols = num_metadata_cols
        #self.one_hot_metadata_cols = one_hot_metadata_cols

        for column in self.str_metadata_cols:
            self.df[column] = self.df[column].astype(str)

        self.df["statement"] = self.df["statement"].astype(str)
        self.df["justification"] = self.df["justification"].astype(str)
        self.df["articles"] = self.df["articles"].astype(str)

        self.statement_max_len = max_length // 4
        self.justification_max_len = max_length // 4
        self.article_max_len = max_length // 4
        self.str_metadata_max_len = max((
            max_length - self.statement_max_len - self.justification_max_len - self.article_max_len
        ) // len(str_metadata_cols), 15)

        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.df.index)
        
    def limit_tokens(self, text, max_length=512):
        return self.tokenizer.convert_tokens_to_string(
            self.tokenizer.tokenize(text)[:max_length]
        )

    def __getitem__(self, index: int):
        item = self.df.iloc[index]

        input_text = self.limit_tokens(
            f"[STATEMENT] {item['statement']}",
            self.statement_max_len
        )
        input_text += self.limit_tokens(
            f" [JUSTIFICATION] {item['justification']}",
            self.justification_max_len,
        )
        input_text += self.limit_tokens(
            f" [ARTICLE] {item['articles']}",
            self.article_max_len,
        )

        for column in self.str_metadata_cols:
            input_text += self.limit_tokens(f" [{column.upper()}] {item[column]}", self.str_metadata_max_len)

        encoded = self.tokenizer(
            input_text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt",
        )

        label = LABEL_MAPPING[item["label"]]

        num_metadata = [item[column] for column in self.num_metadata_cols]

        #one_hot_metadata = []
        #for column in self.one_hot_metadata_cols:
        #    value = item[column]
        #    possible_values = len(one_hot_labels[column])
        #    id_tensor = torch.tensor(label_to_index[column][value])
        #    one_hot_metadata.append(F.one_hot(id_tensor, possible_values))
        
        return {
            "input_ids": encoded["input_ids"].squeeze(0),
            "attention_mask": encoded["attention_mask"].squeeze(0),
            "num_metadata": torch.tensor(num_metadata).float(),
            #"one_hot_metadata": torch.cat(one_hot_metadata, dim=0).float(),
            "label": torch.tensor(label),
            "input_text": input_text
        }

In [5]:
class LiarPlusSingleFinetunedRoBERTasClassifier(nn.Module):
    #one_hot_metadata_size, 
    def __init__(
        self, encoder_model, num_metadata_len, num_hidden, num_classes
    ):
        super(LiarPlusSingleFinetunedRoBERTasClassifier, self).__init__()
        self.encoder = encoder_model
        # + one_hot_metadata_size
        self.hl = nn.Linear(
            self.encoder.config.hidden_size + num_metadata_len, num_hidden
        )
        self.dropout = nn.Dropout(p=0.1)
        self.fc = nn.Linear(num_hidden, num_classes)

    def forward(self, input_ids, attention_mask, num_metadata):#, one_hot_metadata):
        outputs = self.encoder(
            input_ids=input_ids, attention_mask=attention_mask
        )

        cls_embedding = outputs.pooler_output
        #, one_hot_metadata
        concatted_inputs = torch.cat([cls_embedding, num_metadata], dim=1)

        hl_output = F.gelu(self.hl(concatted_inputs))
        hl_output = self.dropout(hl_output)

        logits = self.fc(hl_output)
        return logits

    def roberta_trainable_state(self):
        return {
            name: param for name, param in self.encoder.named_parameters() if param.requires_grad
        }
    
    def load_roberta_trainable_state(self, state_dict):
        self.encoder.load_state_dict(state_dict, strict=False)

    # Zapisz tylko wagi warstw klasyfikatora
    def state_for_save(self):
        return {
            'hl_state_dict': self.hl.state_dict(),
            'fc_state_dict': self.fc.state_dict(),
            'roberta_trainable': self.roberta_trainable_state(),
        }
        
    # Ładowanie modelu (tylko wagi klasyfikatora)
    def load_state_from_save(self, state):
        self.hl.load_state_dict(state['hl_state_dict'])
        self.fc.load_state_dict(state['fc_state_dict'])
        if 'roberta_trainable' in state:
            self.load_roberta_trainable_state(state['roberta_trainable'])

In [6]:
def test(
    model: nn.Module,
    best_model_path: str,
    dataloader: DataLoader,
    name: str='Test'
) -> None:
    # Define loss function
    criterion = nn.CrossEntropyLoss()

    load_best_model(model, best_model_path)

    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_samples = 0

    f1 = MulticlassF1Score(num_classes, average=None).to(device)
    precision = MulticlassPrecision(num_classes, average=None).to(device)
    recall = MulticlassRecall(num_classes, average=None).to(device)

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            num_metadata = batch["num_metadata"].to(device)
            #one_hot_metadata = batch["one_hot_metadata"].to(device)
            labels = batch["label"].to(device)

            outputs = model(input_ids, attention_mask, num_metadata)#, one_hot_metadata)
            loss = criterion(outputs, labels)
            total_loss += loss.item() * input_ids.size(0)

            preds = torch.argmax(outputs, dim=1)
            total_correct += (preds == labels).sum().item()
            total_samples += input_ids.size(0)

            f1.update(preds, labels)
            precision.update(preds, labels)
            recall.update(preds, labels)

    avg_loss = total_loss / total_samples
    accuracy = total_correct / total_samples

    f1_res = f1.compute()
    precision_res = precision.compute()
    recall_res = recall.compute()

    macro_f1 = f1_res.mean()
    macro_precision = precision_res.mean()
    macro_recall = recall_res.mean()

    print(
        f"{name} Accuracy: {accuracy:.4f},\n"
        f"{name} Loss: {avg_loss:.4f},\n"
        f"{name} F1: {f1_res} (marcro = {macro_f1:.4f}),\n"
        f"{name} Precision: {precision_res} (marcro = {macro_precision:.4f}),\n"
        f"{name} Recall: {recall_res} (marcro = {macro_recall:.4f}),\n"
    )

    return (
        accuracy,
        avg_loss,
        macro_f1,
        macro_precision,
        macro_recall
    )

In [19]:
# Hyperparameters
num_classes = 6
hidden_size = 128
batch_size = 10

text_columns = [
    "subject",
    "speaker",
    "job_title",
    "state",
    "party_affiliation",
    "context",
    "sentiment",
    "question",
    "curse",
    "emotion",
    "gibberish",
    "offensiveness",
    "political_bias"
]
num_metadata_cols = [
    "barely_true_counts",
    "false_counts",
    "half_true_counts",
    "mostly_true_counts",
    "pants_on_fire_counts",
    "grammar_errors",
    "ratio_of_capital_letters"
]

In [12]:
# Load RoBERTa tokenizer and model
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
roberta = RobertaModel.from_pretrained("roberta-base")

# trenuje 2 ostatnie warstwy
for name, param in roberta.named_parameters():
    if name.startswith("encoder.layer.11") or name.startswith("pooler"):
        param.requires_grad = True
    else:
        param.requires_grad = False

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [20]:
validation_data = LiarPlusSingleRobertaDataset(
    "data/normalized/val2.csv",
    tokenizer,
    text_columns,
    num_metadata_cols
)
test_data = LiarPlusSingleRobertaDataset(
    "data/normalized/test2.csv",
    tokenizer,
    text_columns,
    num_metadata_cols
)


val_dataloader = DataLoader(
    validation_data, batch_size=batch_size, shuffle=True
)
test_dataloader = DataLoader(
    test_data, batch_size=batch_size, shuffle=True
)

In [26]:
batch = next(iter(val_dataloader))
for row in batch["input_text"]:
    print(row, "\n")

[STATEMENT] Florida unemployment has dropped more than 2 percentage points, down from 12 percent to 9.9 percent, the second-largest drop in the nation. [JUSTIFICATION] And even with the 2. 1 percentage point decrease, Florida still has the fifth-highest unemployment rate in the country. [ARTICLE] The Florida sunshine has been a beacon for many job seekers in recent months as the state's unemployment rate plummeted. With a record-breaking drop of more than 2 percentage points, the state has seen its unemployment rate fall from a concerning 12 percent to a remarkable 9.9 percent, the second-largest decrease nationwide. This dramatic improvement showcases the success of Florida's ongoing economic recovery efforts and the strong performance of many businesses in the state. The recent success of Florida's workforce is a testament to the commitment of both businesses and residents alike. The state's unemployment rate has now stabilized at a healthier level, signifying a substantial [SUBJECT]