In [11]:
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.metrics import precision_score, recall_score, f1_score
from torch import nn, optim
import torch.nn.functional as F
from tqdm import tqdm

# Load and preprocess dataset
file_path = '/content/balanced_dataset.csv'
df = pd.read_csv(file_path)

def preprocess_data(df, tokenizer, max_length=128):
    def encode_emotion(emotion):
        emotion_mapping = {"Happy": 0, "Love": 1, "Sadness": 2, "Anger": 3, "Fear": 4}
        return emotion_mapping[emotion]

    df['label'] = df['Emotion'].apply(encode_emotion)
    tokenized_data = tokenizer(df['Review'].tolist(), padding=True, truncation=True, max_length=max_length, return_tensors="pt")

    return tokenized_data['input_ids'], tokenized_data['attention_mask'], torch.tensor(df['label'].values)

class EmotionModel(nn.Module):
    def __init__(self, base_model_name, use_poly_attention=False, use_freezing=False, use_swa=False, use_mixout=False):
        super().__init__()
        self.base_model = AutoModel.from_pretrained(base_model_name)
        self.hidden_dim = self.base_model.config.hidden_size
        self.use_poly_attention = use_poly_attention

        # Special handling for ModernBERT
        self.is_modernbert = "ModernBERT" in base_model_name

        if use_freezing and not self.is_modernbert:
            for param in self.base_model.parameters():
                param.requires_grad = False

        self.fc_combined = nn.Linear(self.hidden_dim, 256)
        self.fc_output = nn.Linear(256, 5)
        self.dropout = nn.Dropout(0.3)

    def forward(self, input_ids, attention_mask):
        base_output = self.base_model(input_ids=input_ids, attention_mask=attention_mask)

        # Different pooling strategy for ModernBERT
        if self.is_modernbert:
            pooled_output = base_output.last_hidden_state[:, 0, :]
        else:
            pooled_output = base_output.pooler_output

        x = F.relu(self.fc_combined(pooled_output))
        x = self.dropout(x)
        return self.fc_output(x)

def train_model(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for input_ids, attention_mask, labels in tqdm(dataloader):
        input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def evaluate_model(model, dataloader, device):
    model.eval()
    total_loss = 0
    preds, true_labels = [], []
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for input_ids, attention_mask, labels in dataloader:
            input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
            true_labels.extend(labels.cpu().numpy())
    return total_loss / len(dataloader), preds, true_labels

In [12]:
# Model selection
base_models = [
    "bert-base-multilingual-uncased",
    "roberta-base",
    "xlm-roberta-base",
    "answerdotai/ModernBERT-base"
]

configurations = [
    {"use_poly_attention": False, "use_freezing": False, "use_swa": False, "use_mixout": False},
    {"use_poly_attention": True, "use_freezing": False, "use_swa": False, "use_mixout": False},
    {"use_poly_attention": True, "use_freezing": True, "use_swa": False, "use_mixout": False},
    {"use_poly_attention": True, "use_freezing": True, "use_swa": True, "use_mixout": False},
    {"use_poly_attention": True, "use_freezing": True, "use_swa": True, "use_mixout": True}
]

In [16]:
author_base_model = base_models[0]
author_config_index = 3
run_all_models = False

# Force correct configuration for ModernBERT
#if author_base_model == "answerdotai/ModernBERT-base":
#    author_config_index = configurations  # Ensures the first config is used

if author_config_index is not None:
    selected_configurations = [configurations[author_config_index]]
else:
    selected_configurations = configurations

max_epochs = 20
patience = 3
results = []

selected_base_models = base_models if run_all_models else [author_base_model]

device = "cuda" if torch.cuda.is_available() else "cpu"

for base_model in selected_base_models:
    print(f"Running for Base Model: {base_model}")

    # Special tokenizer handling for ModernBERT
    tokenizer_args = {"use_fast": False} if "ModernBERT" in base_model else {}
    tokenizer = AutoTokenizer.from_pretrained(base_model, **tokenizer_args)

    input_ids, attention_masks, labels = preprocess_data(df, tokenizer)
    dataset = TensorDataset(input_ids, attention_masks, labels)

    train_size = int(0.7 * len(dataset))
    val_size = int(0.1 * len(dataset))
    test_size = len(dataset) - train_size - val_size

    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
    train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=16)
    test_dataloader = DataLoader(test_dataset, batch_size=16)

    for config in selected_configurations:
        print(f"Running Configuration: {config}")
        model = EmotionModel(base_model_name=base_model, **config).to(device)
        optimizer = optim.AdamW(model.parameters(), lr=2e-5)
        criterion = nn.CrossEntropyLoss()

        best_val_loss = float("inf")
        patience_counter = 0

        for epoch in range(max_epochs):
            print(f"Epoch {epoch+1}/{max_epochs}")
            train_loss = train_model(model, train_dataloader, optimizer, criterion, device)
            val_loss, _, _ = evaluate_model(model, val_dataloader, device)
            print(f"Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                torch.save(model.state_dict(), "best_model.pt")
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print("Early stopping triggered")
                    break

        model.load_state_dict(torch.load("best_model.pt"))
        _, test_preds, test_labels = evaluate_model(model, test_dataloader, device)
        precision = precision_score(test_labels, test_preds, average='weighted')
        recall = recall_score(test_labels, test_preds, average='weighted')
        f1 = f1_score(test_labels, test_preds, average='weighted')

        results.append({
            "Base Model": base_model,
            "Polynomial Attention": config["use_poly_attention"],
            "Freezing": config["use_freezing"],
            "SWA": config["use_swa"],
            "Mixout": config["use_mixout"],
            "Precision": precision,
            "Recall": recall,
            "F1-Score": f1
        })

results_df = pd.DataFrame(results)
print(results_df)