In [None]:
# IMPORTS
import json
import torch
from transformers import (
    LongformerTokenizer,
    LongformerForSequenceClassification,
    Trainer,
    TrainingArguments
)
from datasets import Dataset, DatasetDict
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    precision_score,
    recall_score,
    classification_report,
    confusion_matrix
)
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

df = pd.read_csv('Political_Bias.csv')
df.columns = df.columns.str.lower()
print('Initial Label counts: \n', df['bias'].value_counts(), '\n')
print('Number of samples: ', len(df))
df.describe()

In [None]:
# Remove irrelevant columns
df = df.drop('link', axis=1)
df = df.drop('title', axis=1)
df = df.drop('source', axis=1)

# Drop rows where:
df.dropna(inplace=True) # there are missing values
df = df[df['text'] != 'Error fetching article'] # the text is an error message
df.drop_duplicates(subset='text', keep='first', inplace=True) # the text is duplicated
df = df[df['text'].str.strip() != ''] # the text is empty or only whitespace

df['length'] = df['text'].apply(lambda x: len(x.split(' ')))
df = df[df['length'] >= 15] # the text is too short

df = df[df['length'] <= 2048] # the text is too long

# df = df.drop('length', axis=1) # drop length column
df.head()

In [None]:
# LABEL ENCODING AND DATAFRAME PREPARATION
id2label = {0: "left", 1: "lean left", 2: "center", 3: "lean right", 4: "right"}
label2id = {v: k for k, v in id2label.items()}
# model.config.id2label = id2label
# model.config.label2id = label2id

df['label'] = df['bias'].map(label2id) # Convert bias labels to numeric
df = df[['text', 'bias', 'label']] # Keep only relevant columns

# TRAIN/VAL/TEST SPLIT
train_df, temp_df = train_test_split(
    df,
    test_size=0.2,
    random_state=42,
    stratify=df['label']  # Keep class distribution
)

val_df, test_df = train_test_split(
    temp_df,
    test_size=0.5,
    random_state=42,
    stratify=temp_df['label']  # Keep class distribution
)

print(f"\nSplit sizes:")
print(f"Train: {len(train_df)} ({len(train_df)/len(df)*100:.1f}%)")
print('Train Label counts: \n', train_df['bias'].value_counts(), '\n')
print(f"Val: {len(val_df)} ({len(val_df)/len(df)*100:.1f}%)")
print('Val Label counts: \n', val_df['bias'].value_counts(), '\n')
print(f"Test: {len(test_df)} ({len(test_df)/len(df)*100:.1f}%)")
print('Test Label counts: \n', test_df['bias'].value_counts(), '\n')

# CONVERT TO HUGGING FACE DATASET
train_dataset = Dataset.from_pandas(train_df[['text', 'label']].reset_index(drop=True))
val_dataset = Dataset.from_pandas(val_df[['text', 'label']].reset_index(drop=True))
test_dataset = Dataset.from_pandas(test_df[['text', 'label']].reset_index(drop=True))

dataset = DatasetDict({
    'train': train_dataset,
    'validation': val_dataset,
    'test': test_dataset
})

# LOAD MODEL AND TOKENIZER
model_name = "allenai/longformer-base-4096"
tokenizer = LongformerTokenizer.from_pretrained(model_name)

model = LongformerForSequenceClassification.from_pretrained(
    model_name,
    num_labels=5,
    id2label=id2label,
    label2id=label2id,
    problem_type="single_label_classification"
)

# TOKENIZE DATASET
def tokenize_function(examples):
    return tokenizer(
        examples["text"],
        max_length=2048,
        truncation=True,
        padding="max_length",
        return_tensors="pt"
    )

tokenized_dataset = dataset.map(
    tokenize_function,
    batched=True,
    remove_columns=['text']  # Remove text, keep only input_ids, attention_mask, label
)

tokenized_dataset.set_format("torch")
print("\nTokenized dataset:")
print(tokenized_dataset)

In [None]:
# COMPUTE CLASS WEIGHTS (for imbalance)
class_weights = compute_class_weight(
    'balanced',
    classes=np.array([0, 1, 2, 3, 4]),
    y=train_df['label'].values
)
class_weights = torch.tensor(class_weights, dtype=torch.float)

print("\nClass weights:")
for label, weight in zip(id2label.values(), class_weights):
    print(f"  {label}: {weight:.3f}")

class WeightedLossTrainer(Trainer):
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights.to(self.args.device) if class_weights is not None else None

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits

        if self.class_weights is not None:
            loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights)
        else:
            loss_fct = torch.nn.CrossEntropyLoss()

        loss = loss_fct(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
def compute_metrics(eval_pred):
    """Metrics computed during training"""
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)

    # Calculate all metrics
    accuracy = accuracy_score(labels, predictions)
    f1_macro = f1_score(labels, predictions, average='macro')
    f1_weighted = f1_score(labels, predictions, average='weighted')
    precision_macro = precision_score(labels, predictions, average='macro', zero_division=0)
    recall_macro = recall_score(labels, predictions, average='macro', zero_division=0)

    # Per-class F1
    f1_per_class = f1_score(labels, predictions, average=None, zero_division=0)

    metrics = {
        'accuracy': accuracy,
        'f1_macro': f1_macro,           # PRIMARY METRIC
        'f1_weighted': f1_weighted,
        'precision_macro': precision_macro,
        'recall_macro': recall_macro,
    }

    # Add per-class F1s
    label_names = ['left', 'right', 'lean left', 'center', 'lean right']
    for i, name in enumerate(label_names):
        metrics[f'f1_{name}'] = f1_per_class[i]

    return metrics

def plot_confusion_matrix(true_labels, predictions, label_names, save_path='confusion_matrix.png'):
    """Plot normalized confusion matrix"""
    cm = confusion_matrix(true_labels, predictions)
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    plt.figure(figsize=(10, 8))
    sns.heatmap(
        cm_normalized,
        annot=True,
        fmt='.2%',
        cmap='Blues',
        xticklabels=label_names,
        yticklabels=label_names,
        cbar_kws={'label': 'Percentage'}
    )
    plt.title('Confusion Matrix (Normalized by True Label)', fontsize=14, fontweight='bold')
    plt.ylabel('True Label', fontsize=12)
    plt.xlabel('Predicted Label', fontsize=12)
    plt.tight_layout()
    plt.savefig(save_path, dpi=300, bbox_inches='tight')
    print(f"Confusion matrix saved to {save_path}")
    plt.show()

def analyze_directional_errors(true_labels, predictions, label2id):
    """Analyze political spectrum directional errors"""
    spectrum_order = ['left', 'lean left', 'center', 'lean right', 'right']
    id_to_position = {label2id[l]: i for i, l in enumerate(spectrum_order)}

    errors = []
    for true, pred in zip(true_labels, predictions):
        if true != pred:
            true_pos = id_to_position[true]
            pred_pos = id_to_position[pred]
            distance = abs(true_pos - pred_pos)
            errors.append(distance)

    if errors:
        avg_error_distance = np.mean(errors)
        opposite_errors = sum(1 for d in errors if d >= 3)

        print(f"\nDIRECTIONAL ERROR ANALYSIS:")
        print(f"  Average error distance: {avg_error_distance:.2f}")
        print(f"  (1.0 = adjacent class, 4.0 = opposite end)")
        print(f"  Opposite-end errors: {opposite_errors}/{len(errors)} ({opposite_errors/len(errors)*100:.1f}%)")
        print(f"  Adjacent errors: {sum(1 for d in errors if d == 1)}/{len(errors)} ({sum(1 for d in errors if d == 1)/len(errors)*100:.1f}%)")
    else:
        print("\nPerfect predictions - no errors!")

def full_evaluation(trainer, test_dataset, label_names, label2id, save_prefix=""):
    """Complete evaluation after training"""

    print("\n" + "="*70)
    print("FINAL MODEL EVALUATION")
    print("="*70)

    # Get predictions
    predictions_output = trainer.predict(test_dataset)
    predictions = np.argmax(predictions_output.predictions, axis=1)
    true_labels = predictions_output.label_ids

    # Overall metrics
    print("\nOVERALL METRICS:")
    accuracy = accuracy_score(true_labels, predictions)
    f1_macro = f1_score(true_labels, predictions, average='macro')
    f1_weighted = f1_score(true_labels, predictions, average='weighted')
    precision_macro = precision_score(true_labels, predictions, average='macro', zero_division=0)
    recall_macro = recall_score(true_labels, predictions, average='macro', zero_division=0)

    print(f"  Accuracy:          {accuracy:.4f}")
    print(f"  F1-Macro:          {f1_macro:.4f} -> PRIMARY METRIC")
    print(f"  F1-Weighted:       {f1_weighted:.4f}")
    print(f"  Precision-Macro:   {precision_macro:.4f}")
    print(f"  Recall-Macro:      {recall_macro:.4f}")

    # Per-class F1
    f1_per_class = f1_score(true_labels, predictions, average=None, zero_division=0)
    print(f"\nPER-CLASS F1 SCORES:")
    for i, name in enumerate(label_names):
        print(f"  {name:12}: {f1_per_class[i]:.4f}")

    # Classification report
    print("\n" + "="*70)
    print("DETAILED CLASSIFICATION REPORT:")
    print("="*70)
    print(classification_report(
        true_labels,
        predictions,
        target_names=label_names,
        digits=4
    ))

    # Confusion matrix
    print("\n" + "="*70)
    print("CONFUSION MATRIX:")
    print("="*70)
    plot_confusion_matrix(
        true_labels,
        predictions,
        label_names,
        save_path=f'{save_prefix}confusion_matrix.png'
    )

    # Directional errors
    analyze_directional_errors(true_labels, predictions, label2id)

    # Find high-confidence mistakes
    probs = torch.nn.functional.softmax(torch.tensor(predictions_output.predictions), dim=-1)
    confidence = probs.max(dim=1).values.numpy()

    mistakes = predictions != true_labels
    high_conf_mistakes = mistakes & (confidence > 0.8)

    print(f"\nHIGH-CONFIDENCE MISTAKES:")
    print(f"  Total mistakes: {mistakes.sum()}")
    print(f"  High-confidence mistakes (>80%): {high_conf_mistakes.sum()}")
    if high_conf_mistakes.sum() > 0:
        print(f"  (These are worth manually inspecting)")

    # Return summary
    return {
        'accuracy': accuracy,
        'f1_macro': f1_macro,
        'f1_weighted': f1_weighted,
        'precision_macro': precision_macro,
        'recall_macro': recall_macro,
        'f1_per_class': dict(zip(label_names, f1_per_class)),
        'total_errors': int(mistakes.sum()),
        'high_conf_errors': int(high_conf_mistakes.sum())
    }


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'mps')
print(f"\nUsing device: {device}")

# Move model to device
model.to(device)

In [None]:
from peft import LoraConfig, get_peft_model, TaskType

lora_config = LoraConfig(
    task_type=TaskType.SEQ_CLS,
    r=16,
    lora_alpha=32,
    lora_dropout=0.1,
    target_modules=["query", "value"],
    bias="none",
    init_lora_weights=True,
)


lora_model = get_peft_model(model, lora_config)
lora_model.print_trainable_parameters()

# Training arguments
lora_training_args = TrainingArguments(
    output_dir="./longformer_lora",
    num_train_epochs=3,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=2,
    learning_rate=3e-4,
    weight_decay=0.01,
    warmup_steps=500,

    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1_macro",
    greater_is_better=True,

    logging_steps=50,
    fp16=True,
    dataloader_num_workers=2,

    seed=42,
    report_to="none",
    save_total_limit=1,
)

# Create trainer
lora_trainer = WeightedLossTrainer(
    model=lora_model,
    args=lora_training_args,
    train_dataset=tokenized_dataset['train'],
    eval_dataset=tokenized_dataset['validation'],
    compute_metrics=compute_metrics,
    class_weights=class_weights,
)

# Train
print("\n🚀 Starting LoRA training...")
import time
start_time = time.time()

lora_trainer.train()

lora_time = time.time() - start_time
print(f"\n✅ LoRA training complete! Time: {lora_time/60:.2f} minutes")

# Evaluate
lora_metrics = full_evaluation(
    lora_trainer,
    tokenized_dataset['test'],
    list(id2label.values()),
    label2id,
    save_prefix="lora_"
)

# Save
lora_trainer.save_model("./longformer_lora_model")
lora_metrics['training_time_minutes'] = lora_time / 60

import json
with open('lora_results.json', 'w') as f:
    json.dump(lora_metrics, f, indent=2)

print("\nLoRA complete! Results saved.")