In [1]:
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch import nn, optim
import torch.nn.functional as F
from tqdm import tqdm

In [2]:
file_path = '/kaggle/input/balance/balanced_dataset.csv'
df = pd.read_csv(file_path)

df["text"] = (
    df["Product Name"] + " [SEP] " +
    df["Rating"].astype(str) + " [SEP] " +
    df["Review"] + " [SEP] " +
    df["Product Category"] + " [SEP] " +
    df["Data Source"] + " [SEP] " +
    df["Sentiment"]
)

columns_to_remove = ['Rating', 'Product Name', 'Product Category', 'Data Source', 'Review', 'Sentiment']
df = df.drop(columns=columns_to_remove)

def preprocess_data(df, tokenizer, max_length=128):
    def encode_emotion(emotion):
        emotion_mapping = {"Happy": 0, "Love": 1, "Sadness": 2, "Anger": 3, "Fear": 4}
        return emotion_mapping[emotion]

    df['label'] = df['Emotion'].apply(encode_emotion)
    tokenized_data = tokenizer(df['text'].tolist(), padding=True, truncation=True, max_length=max_length, return_tensors="pt")

    return tokenized_data['input_ids'], tokenized_data['attention_mask'], torch.tensor(df['label'].values)

In [4]:
# Define Multi-Head Polynomial Attention
class MultiHeadPolynomialAttention(nn.Module):
    def __init__(self, poly_degree=2, num_heads=8, hidden_dim=768):
        super(MultiHeadPolynomialAttention, self).__init__()
        self.num_heads = num_heads
        self.poly_degree = poly_degree
        self.hidden_dim = hidden_dim
        self.head_dim = hidden_dim // num_heads

        assert hidden_dim % num_heads == 0, "Hidden dim must be divisible by num_heads"

        self.query = nn.Linear(hidden_dim, hidden_dim)
        self.key = nn.Linear(hidden_dim, hidden_dim)
        self.value = nn.Linear(hidden_dim, hidden_dim)
        self.output = nn.Linear(hidden_dim, hidden_dim)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        query = self.query(query).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        key = self.key(key).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        value = self.value(value).view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        attention_scores = torch.matmul(query, key.transpose(-1, -2)) / (self.head_dim ** 0.5)
        attention_scores = attention_scores ** self.poly_degree

        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)
            attention_scores = attention_scores.masked_fill(mask == 0, float('-inf'))

        attention_weights = torch.softmax(attention_scores, dim=-1)
        attention_output = torch.matmul(attention_weights, value)
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.hidden_dim)
        return self.output(attention_output)

In [5]:
# Add Mixout class
class Mixout(nn.Module):
    def __init__(self, target, p=0.9):
        super().__init__()
        if not 0 <= p <= 1:
            raise ValueError(f"Mixout probability must be in [0, 1], got {p}")
        self.p = p
        self.target = target
        self.keep_mask = None

    def forward(self, input):
        if self.p == 0 or not self.training:
            return input
        if self.p == 1:
            return self.target
        if self.keep_mask is None or self.keep_mask.size() != input.size():
            self.keep_mask = torch.bernoulli((1 - self.p) * torch.ones_like(input)).to(input.device)
        mask = self.keep_mask
        return (mask * input + (1 - mask) * self.target) / (1 - self.p)

In [6]:
class EmotionModel(nn.Module):
    def __init__(self, base_model_name, use_poly_attention=False, use_swa=False, use_mixout=False, use_lora=False, use_bitfit=False, use_reft=False):
        super().__init__()
        self.base_model = AutoModel.from_pretrained(base_model_name)
        self.hidden_dim = self.base_model.config.hidden_size
        self.use_lora = use_lora
        self.use_bitfit = use_bitfit
        self.use_reft = use_reft
        self.use_mixout = use_mixout
        self.use_poly_attention = use_poly_attention

        self.is_modernbert = "answerdotai/ModernBERT-base" in base_model_name

        if use_poly_attention:
            self.poly_attention = MultiHeadPolynomialAttention(poly_degree=2, num_heads=8,
                                                               hidden_dim=self.hidden_dim)

        if self.use_mixout:
            self.apply_mixout(self.base_model)

        if self.use_lora:
            self.lora_layer = nn.Linear(self.hidden_dim, self.hidden_dim)

        if self.use_bitfit:
            self.bitfit_bias = nn.Parameter(torch.zeros(self.hidden_dim))

        if self.use_reft:
            self.reft_layer = nn.Linear(self.hidden_dim, self.hidden_dim)

        self.fc_combined = nn.Linear(self.hidden_dim, 256)
        self.fc_output = nn.Linear(256, 5)

    def apply_mixout(self, module, p=0.1):
        for name, child in module.named_children():
            if isinstance(child, nn.Linear):
                target_state_dict = child.state_dict()
                mixout_layer = nn.Linear(child.in_features, child.out_features)
                mixout_layer.weight = nn.Parameter(
                    Mixout(target_state_dict['weight'], p=p)(child.weight)
                )
                if child.bias is not None:
                    mixout_layer.bias = child.bias
                setattr(module, name, mixout_layer)
            else:
                self.apply_mixout(child, p=p)

    def forward(self, input_ids, attention_mask):
        base_output = self.base_model(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = base_output.last_hidden_state[:, 0, :] if self.is_modernbert else base_output.pooler_output
        last_hidden_state = base_output.last_hidden_state

        if self.use_poly_attention:
            attention_output = self.poly_attention(last_hidden_state, last_hidden_state, last_hidden_state,
                                                   mask=attention_mask)
        if self.use_lora:
            pooled_output = self.lora_layer(pooled_output)

        if self.use_bitfit:
            pooled_output = pooled_output + self.bitfit_bias

        if self.use_reft:
            pooled_output = self.reft_layer(pooled_output)

        x = F.relu(self.fc_combined(pooled_output))
        return self.fc_output(x)


In [7]:
def train_model(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    for input_ids, attention_masks, labels in dataloader:
        input_ids, attention_masks, labels = input_ids.to(device), attention_masks.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask=attention_masks)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def evaluate_model(model, dataloader, device):
    model.eval()
    total_loss = 0
    all_preds, all_labels = [], []
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for input_ids, attention_masks, labels in dataloader:
            input_ids, attention_masks, labels = input_ids.to(device), attention_masks.to(device), labels.to(device)
            outputs = model(input_ids, attention_mask=attention_masks)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())
    return total_loss / len(dataloader), all_preds, all_labels


In [9]:
# Model selection
base_models = [
    "bert-base-multilingual-uncased",
    "roberta-base",
    "xlm-roberta-base",
    "answerdotai/ModernBERT-base"
]

# Full configurations with additional techniques
configurations = [
    {"use_poly_attention": False, "use_mixout": False, "use_lora": False, "use_bitfit": False, "use_reft": False},
    {"use_poly_attention": True, "use_mixout": False, "use_lora": True, "use_bitfit": False, "use_reft": True},
    {"use_poly_attention": True, "use_mixout": True, "use_lora": True, "use_bitfit": False, "use_reft": False},
    {"use_poly_attention": False, "use_mixout": True, "use_lora": True, "use_bitfit": True, "use_reft": True},
    {"use_poly_attention": True, "use_mixout": True, "use_lora": True, "use_bitfit": True, "use_reft": True}
]

In [11]:
pip install --upgrade transformers


Collecting transformers
  Downloading transformers-4.49.0-py3-none-any.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Downloading transformers-4.49.0-py3-none-any.whl (10.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.0/10.0 MB[0m [31m89.5 MB/s[0m eta [36m0:00:00[0m:00:01[0m:01[0m
[?25hInstalling collected packages: transformers
  Attempting uninstall: transformers
    Found existing installation: transformers 4.47.0
    Uninstalling transformers-4.47.0:
      Successfully uninstalled transformers-4.47.0
Successfully installed transformers-4.49.0
Note: you may need to restart the kernel to use updated packages.


In [9]:
# Settings
author_base_model = base_models[1]
author_config_index = 1  # You can change this to select other configs
run_all_models = False  # Set True to run all models

# Select configurations
if author_config_index is not None:
    selected_configurations = [configurations[author_config_index]]
else:
    selected_configurations = configurations

# Select base models
selected_base_models = base_models if run_all_models else [author_base_model]

max_epochs = 20
patience = 3
results = []
device = "cuda" if torch.cuda.is_available() else "cpu"

# Main Loop
for base_model in selected_base_models:
    print(f"Running for Base Model: {base_model}")

    # Special tokenizer handling if needed (e.g., ModernBERT)
    tokenizer_args = {"use_fast": False} if "ModernBERT" in base_model else {}
    tokenizer = AutoTokenizer.from_pretrained(base_model, **tokenizer_args)

    # Preprocess data
    input_ids, attention_masks, labels = preprocess_data(df, tokenizer)
    dataset = TensorDataset(input_ids, attention_masks, labels)

    # Data split
    train_size = int(0.7 * len(dataset))
    val_size = int(0.1 * len(dataset))
    test_size = len(dataset) - train_size - val_size
    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

    train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=16)
    test_dataloader = DataLoader(test_dataset, batch_size=16)

    # Config Loop
    for config in selected_configurations:
        print(f"Running Configuration: {config}")
        model = EmotionModel(base_model_name=base_model, **config).to(device)
        optimizer = optim.AdamW(model.parameters(), lr=2e-5)
        criterion = nn.CrossEntropyLoss()

        best_val_loss = float("inf")
        patience_counter = 0

        # Training Loop
        for epoch in range(max_epochs):
            print(f"Epoch {epoch+1}/{max_epochs}")
            train_loss = train_model(model, train_dataloader, optimizer, criterion, device)
            val_loss, _, _ = evaluate_model(model, val_dataloader, device)
            print(f"Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

            # Early Stopping
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
                torch.save(model.state_dict(), "best_model.pt")
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print("Early stopping triggered")
                    break

        # Evaluation
        model.load_state_dict(torch.load("best_model.pt"))
        _, test_preds, test_labels = evaluate_model(model, test_dataloader, device)
        precision = precision_score(test_labels, test_preds, average='weighted')
        recall = recall_score(test_labels, test_preds, average='weighted')
        f1 = f1_score(test_labels, test_preds, average='weighted')
        accuracy = accuracy_score(test_labels, test_preds)


        # Collect results
        results.append({
            "Base Model": base_model,
            "Polynomial Attention": config["use_poly_attention"],
            "Mixout": config["use_mixout"],
            "LoRA": config["use_lora"],
            "BitFit": config["use_bitfit"],
            "ReFT": config["use_reft"],
            "Precision": precision,
            "Recall": recall,
            "F1-Score": f1,
            "Accuracy": accuracy
        })

# Show results
results_df = pd.DataFrame(results)
print(results_df)

Running for Base Model: roberta-base


Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Running Configuration: {'use_poly_attention': True, 'use_mixout': False, 'use_lora': True, 'use_bitfit': False, 'use_reft': True}
Epoch 1/20
Train Loss: 0.8858, Validation Loss: 0.7670
Epoch 2/20
Train Loss: 0.6822, Validation Loss: 0.6646
Epoch 3/20
Train Loss: 0.6018, Validation Loss: 0.6143
Epoch 4/20
Train Loss: 0.5245, Validation Loss: 0.5714
Epoch 5/20
Train Loss: 0.4577, Validation Loss: 0.5599
Epoch 6/20
Train Loss: 0.4150, Validation Loss: 0.5298
Epoch 7/20
Train Loss: 0.3789, Validation Loss: 0.5566
Epoch 8/20
Train Loss: 0.3442, Validation Loss: 0.5438
Epoch 9/20
Train Loss: 0.3162, Validation Loss: 0.5674
Early stopping triggered


  model.load_state_dict(torch.load("best_model.pt"))


     Base Model  Polynomial Attention  Mixout  LoRA  BitFit  ReFT  Precision  \
0  roberta-base                  True   False  True   False  True   0.812781   

     Recall  F1-Score  Accuracy  
0  0.808362  0.806469  0.808362  
