In [4]:
import pandas as pd 

category = 'romance'
source_folder = '//Users/rt853/UoB-HICCS-2025/data/datasets/brown_processed/romance/'

train_df = pd.read_csv(f'{source_folder}/{category}_train.csv').drop_duplicates(subset='text').dropna()
test_df = pd.read_csv(f'{source_folder}/{category}_test.csv').dropna()
augmented_df = pd.read_csv(f'{source_folder}/{category}_train_w_augmented.csv').dropna()

train_df.head()

Unnamed: 0,text,category,document_id,line_id,binary
0,His reference to ' discredited carcass ' or ' ...,news,97a21f3d771c77ef75b1221493bbc33c,33,0
1,"`` Come on '' , he said",mystery,2ada13adf9674d350df53db1ef9eb430,20,0
2,List the number of hours the family can be exp...,lore,a3628f6bf62405f5080098751ed23736,59,0
3,Reavey's play there is both protest and aspira...,reviews,938095c369884a31fe73510dd9310783,84,0
4,The deputy had forced him to by his manner of ...,adventure,78dec9e8530235bff16166fbd2f70900,81,0


In [15]:
from sklearn.model_selection import train_test_split

use_aug = False

if use_aug == True:
  train_df = pd.concat([train_df, augmented_df])
  train_df = train_df.reset_index(drop=True)

all_labels = train_df.binary.tolist()
val_df, test_df = train_test_split(test_df, test_size=0.5, random_state=42)
print('Validation: \n\n', val_df.binary.value_counts(), 
      '\n\n---\n\nTest: \n\n', test_df.binary.value_counts())

Validation: 

 binary
0    74
1    11
Name: count, dtype: int64 

---

Test: 

 binary
0    81
1     4
Name: count, dtype: int64


In [16]:
from torch.utils.data import Dataset
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer
from torch.nn import Embedding
import numpy as np

class PersuasionStrategyDataset(Dataset):
    def __init__(
            self, data, tokenizer):
        self.data = data.reset_index(drop=True)
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer)
        self.embedding = Embedding(2,1)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
      row = self.data.iloc[idx]
      text = row.text
      label = torch.tensor(row.binary)
      return {
          'label': F.one_hot(label, num_classes=len(self.data.binary.unique())),
          'inputs' : self.tokenizer.encode_plus(
              text,
              max_length=64,
              padding='max_length',
              return_tensors='pt',
              truncation=True,
              return_attention_mask=True
          )
      }


tokenizer = 'roberta-base'
train_dataset = PersuasionStrategyDataset(train_df, tokenizer)
val_dataset = PersuasionStrategyDataset(val_df, tokenizer)
test_dataset = PersuasionStrategyDataset(test_df, tokenizer)

In [17]:
from collections import Counter
from torch.utils.data import WeightedRandomSampler

def gen_weights(labels):

  # Count how many instances per class
  class_counts = Counter(labels)
  num_classes = len(class_counts)

  # Compute class weights: inverse frequency
  class_weights = {cls: 1.0 / count for cls, count in class_counts.items()}

  # Create a weight for each sample
  sample_weights = [class_weights[label] for label in labels]
  sample_weights = torch.DoubleTensor(sample_weights)
  return sample_weights

sample_weights = gen_weights(all_labels)
sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=len(sample_weights),  # Or a custom number
    replacement=True
    )

In [19]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(
    train_dataset,
    batch_size = 32,
    sampler=sampler
)

val_dataloader = DataLoader(
    val_dataset,
    batch_size=1
)

test_dataloader = DataLoader(
    test_dataset,
    batch_size=1
)

In [20]:
from torch import nn
from sklearn.model_selection import train_test_split
from torch.optim import lr_scheduler
from tqdm.auto import tqdm
from transformers import get_scheduler, RobertaModel

class RobertaClassifier(nn.Module):
    def __init__(self, model_name_or_path: str, freeze_roberta: bool = False):
        super(RobertaClassifier, self).__init__()

        self.roberta = RobertaModel.from_pretrained(model_name_or_path)

        if freeze_roberta:
            for param in self.roberta.parameters():
                param.requires_grad = False

        self.dropout = nn.Dropout(p=0.3)
        self.fc = nn.Linear(self.roberta.config.hidden_size, 1)  # 1 output for binary classification

    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        x = self.dropout(outputs.pooler_output)
        x = self.fc(x)  # Raw logits for BCEWithLogitsLoss
        return x

def init_roberta(tokenizer):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    criterion = nn.BCEWithLogitsLoss()
    model = RobertaClassifier(tokenizer, freeze_roberta=True)
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)
    return model, criterion, optimizer, device

In [None]:
from tqdm.auto import tqdm
import copy
from sklearn.metrics import f1_score

def forward_pass(batch, model, device, criterion, optimizer, lr_scheduler):
    batch = {k: v.to(device) for k, v in batch.items()}

    logits = model(
        batch['inputs']['input_ids'].squeeze(1),
        batch['inputs']['attention_mask'].squeeze(1)
    )

    labels =  batch['label'].argmax(dim=1).unsqueeze(1).float()
    loss = criterion(logits, labels)

    loss.backward()
    optimizer.step()
    lr_scheduler.step()
    optimizer.zero_grad()

    return loss

def evaluate(model, val_dataloader, device, criterion):
    model.eval()
    val_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(val_dataloader):
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(
                batch['inputs']['input_ids'].squeeze(1),
                batch['inputs']['attention_mask'].squeeze(1)
            )
            loss = criterion(outputs.softmax(dim=1),  batch['label'].argmax(dim=1).unsqueeze(1).float())
            val_loss += loss.item()

            logits = outputs.softmax(dim=1)
            preds = torch.argmax(logits, dim=-1).cpu().numpy()
            labels = batch['label'].argmax(dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels)

    f1 = f1_score(all_labels, all_preds, average='weighted')
    model.train()
    return val_loss / len(val_dataloader), f1

def train_model(
    model, criterion, optimizer, device, train_dataloader, val_dataloader, num_epochs, checkpoint_path,
    patience=3):
    num_training_steps = num_epochs * len(train_dataloader)

    lr_scheduler = get_scheduler(
        "linear",
        optimizer=optimizer,
        num_warmup_steps=0,
        num_training_steps=num_training_steps
    )

    device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
    model.to(device)

    best_val_loss = float('inf')
    best_model_state = None
    epochs_without_improvement = 0

    model.train()
    with tqdm(range(num_epochs)) as t1:
        epoch_loss = 0
        for epoch in range(1, num_epochs + 1):
            t1.set_description(f"Epoch {epoch}")
            t1.set_postfix(train_loss=epoch_loss)
            if epoch == 3:  # 1-based indexing
                for param in model.roberta.parameters():
                    param.requires_grad = True
                print("Unfroze RoBERTa encoder layers at epoch", epoch)

            with tqdm(range(len(train_dataloader))) as t2:
                batch_loss = 0
                for batch_num, batch in enumerate(train_dataloader, start=1):
                    t2.set_description(f"Batch {batch_num}")
                    loss = forward_pass(batch, model, device, criterion, optimizer, lr_scheduler)
                    batch_loss += loss.item()
                    t2.set_postfix(loss=round(batch_loss / batch_num, 4))
                    t2.update(1)

            epoch_loss = round(batch_loss / len(train_dataloader), 4)
            val_loss, f1 = evaluate(model, val_dataloader, device, criterion)

            print(f"Epoch {epoch}: Train Loss = {epoch_loss}, Val Loss = {round(val_loss, 4)}")
            print(f"Epoch {epoch}: F1 Score = {f1}")

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                best_model_state = copy.deepcopy(model.state_dict())
                epochs_without_improvement = 0
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': best_model_state,
                    'optimizer_state_dict': optimizer.state_dict(),
                    'val_loss': best_val_loss,
                    'f1_score': f1,
                }, checkpoint_path)
                print(f"Saved new best model checkpoint to '{checkpoint_path}'")
            else:
                print(f"Validation loss did not improve from {best_val_loss}")
            else:
                epochs_without_improvement += 1
                if epochs_without_improvement >= patience:
                    print(f"Early stopping triggered at epoch {epoch}")
                    model.load_state_dict(best_model_state)
                    return model

            t1.update(1)

    if best_model_state is not None:
        model.load_state_dict(best_model_state)

    return model

def save_model(model, tokenizer, output_dir):
    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)
    print(f"Model saved to {output_dir}")
    
def load_model(model_class, tokenizer_class, model_dir):
    model = model_class.from_pretrained(model_dir)
    tokenizer = tokenizer_class.from_pretrained(model_dir)
    return model, tokenizer

In [None]:
num_epochs = 10
model, criterion, optimizer, device = init_roberta(tokenizer)
model = train_model(model, criterion, optimizer, device, train_dataloader, val_dataloader, num_epochs, patience=4)

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


  0%|          | 0/10 [00:00<?, ?it/s]

  0%|          | 0/1368 [00:00<?, ?it/s]

  0%|          | 0/2706 [00:00<?, ?it/s]

Epoch 1: Train Loss = 0.6916, Val Loss = 1.2379
Epoch 1: F1 Score = 0.8883944536973649


  0%|          | 0/1368 [00:00<?, ?it/s]

  0%|          | 0/2706 [00:00<?, ?it/s]

Epoch 2: Train Loss = 0.6893, Val Loss = 1.2379
Epoch 2: F1 Score = 0.8883944536973649
Unfroze RoBERTa encoder layers at epoch 3


  0%|          | 0/1368 [00:00<?, ?it/s]

  0%|          | 0/2706 [00:00<?, ?it/s]

Epoch 3: Train Loss = 0.2899, Val Loss = 1.2379
Epoch 3: F1 Score = 0.8883944536973649


  0%|          | 0/1368 [00:00<?, ?it/s]

  0%|          | 0/2706 [00:00<?, ?it/s]

Epoch 4: Train Loss = 0.1446, Val Loss = 1.2379
Epoch 4: F1 Score = 0.8883944536973649


  0%|          | 0/1368 [00:00<?, ?it/s]

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, classification_report, confusion_matrix
import torch

def test_model(model, dataloader, device):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader):
            batch = {k: v.to(device) for k, v in batch.items()}
            logits = model(
                batch['inputs']['input_ids'].squeeze(1),
                batch['inputs']['attention_mask'].squeeze(1)
            )
            probs = torch.sigmoid(logits)
            preds = (probs > 0.5).long()
            labels = batch['label'].long()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy().argmax(axis=1))
    return all_labels, all_preds

def calculate_metrics(all_labels, all_preds):

    # Compute metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, zero_division=0)
    recall = recall_score(all_labels, all_preds, zero_division=0)
    f1 = f1_score(all_labels, all_preds, zero_division=0)

    try:
        auc = roc_auc_score(all_labels, all_preds)
    except:
        auc = float('nan')  # In case of a single class in test labels

    print("Evaluation Results:")
    print(f"Accuracy:  {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print(f"F1 Score:  {f1:.4f}")
    print(f"ROC AUC:   {auc:.4f}")

    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, digits=4))

    print("Confusion Matrix:")
    print(confusion_matrix(all_labels, all_preds))

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "roc_auc": auc,
        "classification_report": classification_report(all_labels, all_preds, digits=4, output_dict=True),
    }



In [None]:
all_labels, all_preds = test_model(model, test_dataloader, device)
metrics = calculate_metrics(all_labels, all_preds)

Evaluation Results:
Accuracy:  0.9902
Precision: 0.8298
Recall:    0.4643
F1 Score:  0.5954
ROC AUC:   0.7314

Classification Report:
              precision    recall  f1-score   support

           0     0.9916    0.9985    0.9950      5329
           1     0.8298    0.4643    0.5954        84

    accuracy                         0.9902      5413
   macro avg     0.9107    0.7314    0.7952      5413
weighted avg     0.9891    0.9902    0.9888      5413

Confusion Matrix:
[[5321    8]
 [  45   39]]


In [None]:
import csv
import os
import zipfile

def save_evaluation_results(metrics_dict, all_preds, all_labels, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    # Save metrics to a .txt file
    metrics_path = os.path.join(output_dir, "metrics.txt")
    with open(metrics_path, "w") as f:
        for key, value in metrics_dict.items():
          if isinstance(value, dict):
            pd.DataFrame(value).to_csv(f'{output_dir}/{key}.csv')
          else:
            try:
              f.write(f"{key}: {value:.4f}\n")
            except:
              print(key, value)
    print(f"Saved evaluation metrics to {metrics_path}")

    # Save predictions and labels to a CSV
    csv_path = os.path.join(output_dir, "predictions.csv")
    with open(csv_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['true_label', 'predicted_label'])
        writer.writerows(zip(all_labels, all_preds))

def zip_folder(folder_path, output_zip_path):
    with zipfile.ZipFile(output_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                file_path = os.path.join(root, file)
                # Store the relative path (so the folder structure is preserved in the zip)
                arcname = os.path.relpath(file_path, start=folder_path)
                zipf.write(file_path, arcname)

    print(f"Zipped folder to {output_zip_path}")

In [None]:
output_path = f'{source_folder}/{category}_results'

save_evaluation_results(metrics, all_preds, all_labels, output_path)
zip_folder(output_path, f"{output_path}.zip")

Saved evaluation metrics to /content//science_fiction_results/metrics.txt
Zipped folder to /content//science_fiction_results.zip


In [None]:
from transformers import AutoTokenizer
import torch
import torch.nn.functional as F

def predict_snippets(model, tokenizer, text_list, device=None, return_probs=False):
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    predictions = []

    for text in text_list:
        # Tokenize input
        encoding = tokenizer(
            text,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors="pt"
        ).to(device)

        with torch.no_grad():
            logits = model(
                input_ids=encoding["input_ids"],
                attention_mask=encoding["attention_mask"]
            )

            probs = torch.sigmoid(logits).squeeze().item()
            pred = int(probs > 0.5)

            predictions.append(probs if return_probs else pred)

    return predictions

In [None]:
# Load tokenizer and model
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("roberta-base")

# Your trained model (already loaded)
# model = RobertaClassifier(...)  # and load_state_dict if needed

# Test snippets
texts = [
   "knock knock, who's there? A WOMBLE",
   "The space ship was zipping through the galaxy",
   "She gazed upon his face and was totally taken by the beauty in his jowls",
   "The computer ticked over as the coordinates were inputted",
   "The alien had blue skin, a forked tongue and three pronged hands",
   "The footballers association is due to have a press conference at 2pm",
   "The temperature dropped much below zero, a temperature that only those who live underground could tolerate"
]

# Run predictions
preds = predict_snippets(model, tokenizer, texts, return_probs=True)

for text, prob in zip(texts, preds):
    print(f"Text: {text}\n→ Positive class probability: {prob:.4f}\n")

Text: knock knock, who's there? A WOMBLE
→ Positive class probability: 0.0007

Text: The space ship was zipping through the galaxy
→ Positive class probability: 0.9993

Text: She gazed upon his face and was totally taken by the beauty in his jowls
→ Positive class probability: 0.0030

Text: The computer ticked over as the coordinates were inputted
→ Positive class probability: 0.7205

Text: The alien had blue skin, a forked tongue and three pronged hands
→ Positive class probability: 0.9975

Text: The footballers association is due to have a press conference at 2pm
→ Positive class probability: 0.0001

Text: The temperature dropped much below zero, a temperature that only those who live underground could tolerate
→ Positive class probability: 0.9339

