# Adversarial and Hierarchical Transformer for Amharic Hate Speech Detection

## Setup and Imports

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Install necessary libraries
!pip install torch transformers pandas scikit-learn tqdm accelerate -q

In [None]:
# Import all required packages
import pandas as pd
import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.utils.class_weight import compute_class_weight
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    AutoModelForMaskedLM,
    Trainer,
    TrainingArguments
)
from torch.utils.data import Dataset
from tqdm.auto import tqdm
import os

# Set up tqdm for pandas integration
tqdm.pandas()

## Hugging Face Authentication

In [None]:
from huggingface_hub import notebook_login

# This will open a login prompt where you can paste your Hugging Face token.
# 1. Go to https://huggingface.co/settings/tokens to create a token.
# 2. Copy the token.
# 3. Paste it into the input box in Colab and press Enter.
notebook_login()

## Configuration and Parameters

In [None]:
# --- Configuration (AHTA) ---

# File Paths
CSV_PATH = "/content/drive/MyDrive/amharic-hate-speech/dataset/preprocessed_dataset.csv"
OUTPUT_DIR = "/content/drive/MyDrive/amharic-hate-speech/output/ahta_model_results"

# --- MODEL ---
# Switched to a model pre-trained on Amharic for better performance.
MODEL_NAME = "EthioNLP/EthioLLM-l-70K"
# The same model is used for generating adversarial examples, as it has the best grasp of Amharic.
MASK_MODEL_NAME = "EthioNLP/EthioLLM-l-70K"

# Data Parameters
TEXT_COLUMN = "text"
LABEL_COLUMN = "label"
SPLIT_COLUMN = "split"

# --- ADVERSARIAL AUGMENTATION (BALANCED) ---
# Augments BOTH hate and non-hate classes to prevent skewing the data distribution.
ADVERSARIAL_AUGMENTATION_FACTOR = 0.25 # Augment 25% of each class

# --- TRAINING HYPERPARAMETERS (OPTIMIZED) ---
MAX_LENGTH = 128
BATCH_SIZE = 32                   # Increased for more stable gradients
GRADIENT_ACCUMULATION_STEPS = 2   # Effective batch size will be BATCH_SIZE * GRAD_ACCUM_STEPS = 64
NUM_EPOCHS = 3
LEARNING_RATE = 3e-5  # Adjusted based on best practices for fine-tuning
WEIGHT_DECAY = 0.01
WARMUP_RATIO = 0.1    # Use a ratio of total steps for warmup

## Data Loading and Verification

In [None]:
# Load the preprocessed dataset
try:
    df = pd.read_csv(CSV_PATH)
except FileNotFoundError:
    print(f"ERROR: The file was not found at {CSV_PATH}")
    print("Please update the CSV_PATH variable in the configuration cell.")
    assert False, "File not found"

# Map text labels to integer IDs (hate=1, normal=0)
label_map = {'hate': 1, 'normal': 0}
df['label_id'] = df[LABEL_COLUMN].map(label_map)

# Create dataframes based on the 'split' column
train_df = df[df[SPLIT_COLUMN] == 'train'].copy()
dev_df = df[df[SPLIT_COLUMN] == 'dev'].copy()
test_df = df[df[SPLIT_COLUMN] == 'test'].copy()

# --- Verification Step ---
print("--- Data Loading Complete ---")
print(f"Total examples loaded: {len(df)}")
print(f"Training set size:   {len(train_df)}")
print(f"Development set size: {len(dev_df)}")
print(f"Test set size:       {len(test_df)}\n")

## Adversarial Augmentation

In [None]:
def generate_adversarial_example(text, model, tokenizer, device):
    """
    Generates a new sentence by masking a random word and replacing it
    with the model's top prediction for that mask.
    """
    try:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LENGTH).to(device)
        input_ids = inputs.input_ids[0]

        non_special_indices = [i for i, token_id in enumerate(input_ids) if token_id not in tokenizer.all_special_ids]
        if not non_special_indices:
            return None

        mask_idx = np.random.choice(non_special_indices)
        original_token_id = input_ids[mask_idx].item()

        masked_input_ids = input_ids.clone()
        masked_input_ids[mask_idx] = tokenizer.mask_token_id

        with torch.no_grad():
            outputs = model(masked_input_ids.unsqueeze(0))
            predictions = outputs.logits[0, mask_idx]

        top_k_tokens = torch.topk(predictions, 5).indices
        for token_id in top_k_tokens:
            if token_id != original_token_id:
                new_token_id = token_id
                break
        else:
            return None

        new_input_ids = input_ids.clone()
        new_input_ids[mask_idx] = new_token_id
        return tokenizer.decode(new_input_ids, skip_special_tokens=True)
    except Exception:
        return None

print("--- Starting Balanced Adversarial Augmentation ---")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the Masked Language Model
mask_model = AutoModelForMaskedLM.from_pretrained(MASK_MODEL_NAME).to(device)
mask_tokenizer = AutoTokenizer.from_pretrained(MASK_MODEL_NAME)

hate_df = train_df[train_df['label_id'] == 1]
normal_df = train_df[train_df['label_id'] == 0]

num_hate_to_generate = int(len(hate_df) * ADVERSARIAL_AUGMENTATION_FACTOR)
num_normal_to_generate = int(len(normal_df) * ADVERSARIAL_AUGMENTATION_FACTOR)

print(f"Generating {num_hate_to_generate} new 'hate' examples...")
adv_hate_texts = []
for text in tqdm(hate_df.sample(num_hate_to_generate, random_state=42)[TEXT_COLUMN]):
    new_text = generate_adversarial_example(text, mask_model, mask_tokenizer, device)
    if new_text:
        adv_hate_texts.append(new_text)

print(f"Generating {num_normal_to_generate} new 'normal' examples...")
adv_normal_texts = []
for text in tqdm(normal_df.sample(num_normal_to_generate, random_state=42)[TEXT_COLUMN]):
    new_text = generate_adversarial_example(text, mask_model, mask_tokenizer, device)
    if new_text:
        adv_normal_texts.append(new_text)

# Create new dataframes for the augmented data
adv_hate_df = pd.DataFrame({TEXT_COLUMN: adv_hate_texts, 'label_id': 1})
adv_normal_df = pd.DataFrame({TEXT_COLUMN: adv_normal_texts, 'label_id': 0})

# Combine original training data with the new adversarial examples
train_df_augmented = pd.concat([train_df[[TEXT_COLUMN, 'label_id']], adv_hate_df, adv_normal_df])
train_df_augmented = train_df_augmented.sample(frac=1, random_state=42).reset_index(drop=True)


# Free up memory
del mask_model
torch.cuda.empty_cache()

print(f"\nAugmentation complete. New training set size: {len(train_df_augmented)}")
print("\nNew label distribution in augmented training set:")
# Map the label_id back to text label for value_counts to be readable
label_map_rev = {1: 'hate', 0: 'normal'}
print(train_df_augmented['label_id'].map(label_map_rev).value_counts(normalize=True))

## PyTorch Dataset Class

In [None]:
class HateSpeechDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, item):
        text = str(self.texts[item])
        label = self.labels[item]

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

## Dataset Instances

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Use the new, balanced augmented training dataframe
train_dataset = HateSpeechDataset(
    texts=train_df_augmented[TEXT_COLUMN].tolist(),
    labels=train_df_augmented['label_id'].tolist(),
    tokenizer=tokenizer,
    max_len=MAX_LENGTH
)
dev_dataset = HateSpeechDataset(
    texts=dev_df[TEXT_COLUMN].tolist(),
    labels=dev_df['label_id'].tolist(),
    tokenizer=tokenizer,
    max_len=MAX_LENGTH
)
test_dataset = HateSpeechDataset(
    texts=test_df[TEXT_COLUMN].tolist(),
    labels=test_df['label_id'].tolist(),
    tokenizer=tokenizer,
    max_len=MAX_LENGTH
)

print("PyTorch Datasets created successfully.")

## Model Initialization

In [None]:
# Custom Trainer to handle class imbalance in the loss calculation.
class WeightedLossTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights)
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

# Calculate class weights based on the augmented training set
class_labels = np.unique(train_df_augmented['label_id'])
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=class_labels,
    y=train_df_augmented['label_id']
)
class_weights = torch.tensor(class_weights, dtype=torch.float).to("cuda" if torch.cuda.is_available() else "cpu")

print(f"Computed Class Weights: {class_weights}")

# Load the classification model
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=2 # (hate, normal)
)

# Define the function to compute metrics during evaluation
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary', pos_label=1)
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

## Model Training

In [None]:
# --- Training Arguments (OPTIMIZED) ---
training_args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    num_train_epochs=NUM_EPOCHS,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE * 2,
    gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    warmup_ratio=WARMUP_RATIO,
    logging_dir=f"{OUTPUT_DIR}/logs",
    logging_steps=100,
    eval_strategy="steps",
    eval_steps=250,
    save_strategy="steps",
    save_steps=250,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    report_to="none"
)

# Use the new WeightedLossTrainer
trainer = WeightedLossTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=dev_dataset,
    compute_metrics=compute_metrics
)

# Start training!
print("\n--- Starting Model Training with AHTA ---\n")

# Check for existing checkpoints and resume if found
last_checkpoint = None
if os.path.isdir(training_args.output_dir):
    from transformers.trainer_utils import get_last_checkpoint
    last_checkpoint = get_last_checkpoint(training_args.output_dir)
    if last_checkpoint is not None:
        print(f"Resuming from checkpoint: {last_checkpoint}")

trainer.train(resume_from_checkpoint=last_checkpoint)
print("\n--- Training Complete ---")

## Model Evaluation

In [None]:
print("\n--- Evaluating on the Held-Out Test Set ---\n")
print("This provides the final, unbiased measure of model performance.")

test_results = trainer.evaluate(eval_dataset=test_dataset)

print("\n--- FINAL TEST RESULTS ---")
print(f"  Accuracy:  {test_results['eval_accuracy']:.4f}")
print(f"  F1 Score:  {test_results['eval_f1']:.4f}")
print(f"  Precision: {test_results['eval_precision']:.4f}")
print(f"  Recall:    {test_results['eval_recall']:.4f}")
print("--------------------------\n")

# Save the final model and tokenizer
trainer.save_model(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)
print(f"Final model and tokenizer saved to {OUTPUT_DIR}")