In [None]:
import os
os.environ["WANDB_DISABLED"] = "true"  # Disable wandb
import re
import torch
import pandas as pd
import numpy as np

# Dataset Splitting, Label Encoding, and Evaluation Metrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from sklearn.preprocessing import LabelEncoder
from collections import Counter

# Includes various pretrained models and utilities from Hugging Face Transformers
from transformers import (
    BertTokenizer, BertForSequenceClassification,
    RobertaTokenizer, RobertaForSequenceClassification,
    DistilBertTokenizer, DistilBertForSequenceClassification,
    XLNetTokenizer, XLNetForSequenceClassification,
    AutoTokenizer, AutoModelForSequenceClassification,
    TrainingArguments, Trainer
)
# Neural network submodules in PyTorch
import torch.nn as nn
import warnings
warnings.filterwarnings('ignore')

# ============== 1. Data Loading and Cleaning ==============
df = pd.read_csv(
    "SemEval2017-task4-dev.subtask-CE.english.INPUT.txt",
    sep='\t',
    header=None,
    names=['id', 'topic', 'label_num', 'tweet_raw'],
)

# Mapping Between Numeric and String Labels
label_map = {
    -2: "STRONGLYNEGATIVE",
    -1: "WEAKLYNEGATIVE",
     0: "NEUTRAL",
     1: "WEAKLYPOSITIVE",
     2: "STRONGLYPOSITIVE"
}
df['label'] = df['label_num'].map(label_map)

# To ensure fairness and comparability, the data cleaning procedures applied to the BERT family of models
# are kept consistent with those used for the RNN, BiLSTM, and CNN models
def basic_text_cleaning(text):
    text = re.sub(r"http\S+|www\S+|https\S+", "", text, flags=re.MULTILINE)
    text = re.sub(r"@\w+", "", text)
    text = re.sub(r"#", "", text)
    text = re.sub(r"[^A-Za-z0-9(),!?\'`]", " ", text)
    text = re.sub(r"\s+", " ", text).strip() # Replace multiple spaces with a single space and strip leading/trailing whitespace
    return text

df['tweet'] = df['tweet_raw'].astype(str).apply(basic_text_cleaning) # Perform type conversion

# Concatenate the topic and tweet into a single input sequence for BERT
df['input_text'] = df.apply(lambda row: f"[TOPIC] {row['topic']} [SEP] {row['tweet']}", axis=1)

# ============== 2. Augmentation Using a Sentiment Lexicon ==============
senti_lexicon = {
    "love": 2, "like": 1, "good": 1, "hate": -2, "bad": -1, "horrible": -2
}
def lexicon_score(sentence):
    words = sentence.lower().split() # Convert sentences to lowercase and compare word by word
    score = 0
    for w in words:
        if w in senti_lexicon:
            score += senti_lexicon[w]
    return score

df['lexicon_score'] = df['tweet'].apply(lexicon_score) # Store the scores in df['lexicon_score']

# ============== 3. Data splitting and handling of class imbalance ==============
le = LabelEncoder()
df['label_id'] = le.fit_transform(df['label'])  # Convert to the range 0–4

# Use stratified sampling to ensure similar label distributions in the training and test sets
train_df, test_df = train_test_split(
    df, test_size=0.2, random_state=42, stratify=df['label_id'] # Set a random seed to ensure reproducibility
)

# Compute class weights for use in weighted cross-entropy loss
# Assign higher weights to rare classes
train_labels_array = train_df['label_id'].to_numpy()
class_counts = Counter(train_labels_array)
num_samples = len(train_labels_array)
num_classes = len(class_counts)
weights = [num_samples / (num_classes * class_counts[i]) for i in range(num_classes)]
class_weights = torch.tensor(weights, dtype=torch.float)

# ============== 4. Construct the Dataset ==============
# Use the tokenizer to tokenize the input text, convert it to token IDs,
# truncate or pad to max_len, and generate the corresponding attention_mask
class BERTDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len # Maximum length per text

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):  # Tokenize the input text, convert it to token IDs, and apply truncation or padding to max_len
        text = self.texts[idx]
        label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            add_special_tokens=True, # Automatically add [CLS] and [SEP] tokens
            max_length=self.max_len, # Set the maximum sequence length
            padding='max_length',  # Pad sequences shorter than max_len
            truncation=True,     # Truncate sequences that exceed max_len
            return_tensors='pt'
        )
        input_ids = encoding['input_ids'].squeeze()  # Return as a dictionary
        attention_mask = encoding['attention_mask'].squeeze() # Remove extra dimensions

        return {
            'input_ids': input_ids,      # Token ID
            'attention_mask': attention_mask, # Attention mask
            'labels': torch.tensor(label, dtype=torch.long) # Convert labels to LongTensor type
        }

# Convert to Python lists for constructing a custom Dataset
train_texts = train_df['input_text'].tolist()
train_labels = train_df['label_id'].tolist()
test_texts = test_df['input_text'].tolist()
test_labels = test_df['label_id'].tolist()

# ============== 5. Custom Trainer with Weighted Cross-Entropy Loss ==============
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        """
        Because additional arguments (e.g., num_items_in_batch) may be passed to Trainer during execution,
        **kwargs is included to prevent errors
        """
        labels = inputs.get("labels")
        outputs = model(**{k: v for k, v in inputs.items() if k != "labels"}) # Perform forward propagation
        logits = outputs.logits

        # Using class_weights（Alternatively, use the ordinary loss_fct = nn.CrossEntropyLoss()）
        loss_fct = nn.CrossEntropyLoss(weight=class_weights.to(logits.device)) # Compute the loss
        loss = loss_fct(logits, labels)

        return (loss, outputs) if return_outputs else loss

# ============== 6. Training Configuration (TrainingArguments) ==============
training_args = TrainingArguments(
    output_dir='./checkpoints',
    num_train_epochs=3,       # Train for 3 epochs
    per_device_train_batch_size=8, # Set the batch size to 8 per GPU/CPU during training and validation
    per_device_eval_batch_size=8,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_dir='./logs',
    logging_steps=50,  # Log every 50 steps
    do_eval=True,
    report_to="none",  # Turn off wandb logging
)

# ============== 7. Train each model sequentially and output its test performance separately ==============
# Save both the model and tokenizer for later use in ensemble evaluation
model_names = [
    "bert-base-uncased",
    "roberta-base",
    "distilbert-base-uncased",
    "xlnet-base-cased",
    "google/electra-base-generator",
    # The most suitable BERT model for recognizing Twitter tweets
    "vinai/bertweet-base"
]

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)
    acc = accuracy_score(labels, preds)
    return {"accuracy": acc}

train_dataset = BERTDataset(train_texts, train_labels, None, max_len=128)
test_dataset  = BERTDataset(test_texts,  test_labels,  None, max_len=128)

all_models = []
all_tokenizers = []

for model_name in model_names:
    print(f"\n===== Fine-tuning and evaluating model: {model_name} =====")

    # 1) Load tokenizer & model
    # BERTweet follows the same architecture as RoBERTa and can be used via AutoTokenizer and AutoModel
    if "roberta" in model_name.lower():
        tokenizer = RobertaTokenizer.from_pretrained(model_name)
        model = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=5)
    elif "distilbert" in model_name.lower():
        tokenizer = DistilBertTokenizer.from_pretrained(model_name)
        model = DistilBertForSequenceClassification.from_pretrained(model_name, num_labels=5)
    elif "xlnet" in model_name.lower():
        tokenizer = XLNetTokenizer.from_pretrained(model_name)
        model = XLNetForSequenceClassification.from_pretrained(model_name, num_labels=5)
    elif "electra" in model_name.lower():
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
    elif "bertweet" in model_name.lower():
        # BERTweet is typically based on the RoBERTa architecture
        tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
        model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=5)
    else:
        # Default BERT
        tokenizer = BertTokenizer.from_pretrained(model_name)
        model = BertForSequenceClassification.from_pretrained(model_name, num_labels=5)

    # 2) Update the tokenizer used in the dataset
    train_dataset.tokenizer = tokenizer
    test_dataset.tokenizer  = tokenizer

    # 3) Define the Trainer
    trainer = CustomTrainer(
        model=model,      # Loaded pretrained model
        args=training_args,  # Training Parameters
        train_dataset=train_dataset,
        eval_dataset=test_dataset,
        compute_metrics=compute_metrics
    )

    # 4) Training
    trainer.train()

    # 5) Test set prediction with a single model
    pred_output = trainer.predict(test_dataset)
    predictions = pred_output.predictions
    preds = np.argmax(predictions, axis=1)
    test_labels_true = test_df['label_id'].tolist()

    # 6) Display evaluation for the single model
    print(f"=== Test Performance for {model_name} ===")
    print(classification_report(test_labels_true, preds, target_names=le.classes_))
    acc = accuracy_score(test_labels_true, preds)
    print("Accuracy:", acc)

    # 7) Save the model and tokenizer to a list for later use in ensemble
    all_models.append(model)
    all_tokenizers.append(tokenizer)