# Religious Toxic Classification Re-implementation

This notebook provides a complete pipeline for identifying toxic comments directed at religious identity groups using **DeBERTa-v3** with custom architectural enhancements.

## 1. Imports and Configuration

In [None]:
import os
import time
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, Subset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    set_seed,
    DataCollatorWithPadding
)
from transformers.modeling_outputs import SequenceClassifierOutput
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, f1_score, precision_score, recall_score, 
    jaccard_score, hamming_loss, roc_auc_score, classification_report
)
import matplotlib.pyplot as plt
import seaborn as sns
import warnings

# --- Configuration ---
MODEL_NAME = 'microsoft/deberta-v3-base'
MAX_LENGTH = 256
EPOCHS = 10
PER_DEVICE_BATCH_SIZE = 8
GRAD_ACCUM_STEPS = 2
LEARNING_RATE = 2e-5
METRIC_FOR_BEST_MODEL = 'eval_micro_f1'
VALIDATION_SPLIT_SIZE = 0.2
SEED = 42

set_seed(SEED)
warnings.filterwarnings("ignore")

RELIGION_COLS = ['christian', 'jewish', 'muslim', 'hindu', 'buddhist', 'atheist', 'other_religion']
TEXT_COL = 'comment_text'

## 2. Data Loading and Preprocessing

We filter the dataset for religious toxicity and apply oversampling to balance the classes.

In [None]:
def filter_and_binarize(df, target_cols, min_toxicity=0.5):
    valid_target_cols = [col for col in target_cols if col in df.columns]
    if not valid_target_cols: return pd.DataFrame()
    df_filled = df.fillna({col: 0 for col in valid_target_cols})
    toxic_mask = df_filled[valid_target_cols].ge(min_toxicity).any(axis=1)
    filtered = df_filled.loc[toxic_mask, [TEXT_COL] + valid_target_cols].copy()
    filtered[valid_target_cols] = (filtered[valid_target_cols] >= min_toxicity).astype(int)
    return filtered.dropna(subset=[TEXT_COL]).reset_index(drop=True)

def balance_multilabel_df(df, target_cols):
    from sklearn.utils import resample
    dfs = []
    max_count = df[target_cols].sum().max()
    for col in target_cols:
        df_pos = df[df[col] == 1]
        df_neg = df[df[col] == 0]
        dfs.append(pd.concat([
            resample(df_pos, replace=True, n_samples=max_count, random_state=SEED),
            resample(df_neg, replace=False, n_samples=max_count, random_state=SEED)
        ]))
    return pd.concat(dfs).drop_duplicates().reset_index(drop=True)

print("Loading data...")
if os.path.exists("train.csv"):
    df_original = pd.read_csv("train.csv")
    religion_df = balance_multilabel_df(filter_and_binarize(df_original, RELIGION_COLS), RELIGION_COLS)
    print(f"Total samples: {len(religion_df)}")
else:
    print("Error: train.csv not found.")

## 3. Model Architecture

We implement a custom model with **Focal Loss**, **Hierarchical Attention**, and **Adaptive Pooling**.

In [None]:
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0): 
        super().__init__()
        self.gamma = gamma
    def forward(self, logits, targets):
        probs = torch.sigmoid(logits).clamp(1e-6, 1.0 - 1e-6)
        pt = torch.where(targets == 1, probs, 1 - probs)
        return (-(1 - pt) ** self.gamma * torch.log(pt)).mean()

class HierarchicalAttentionLayer(nn.Module):
    def __init__(self, hidden_size, num_heads): 
        super().__init__()
        self.attn = nn.MultiheadAttention(hidden_size, num_heads, batch_first=True)
        self.proj = nn.Linear(hidden_size * 2, hidden_size)
        self.norm = nn.LayerNorm(hidden_size)
    def forward(self, x, mask=None):
        m = (mask == 0) if mask is not None else None
        a, _ = self.attn(x, x, x, key_padding_mask=m)
        return self.norm(F.dropout(self.proj(torch.cat([x, a], -1)), 0.1) + x)

class AdaptivePoolingClassifier(nn.Module):
    def __init__(self, hidden_size, num_labels):
        super().__init__()
        self.dense = nn.Linear(hidden_size * 4, hidden_size)
        self.out = nn.Linear(hidden_size, num_labels)
        self.attn_proj = nn.Linear(hidden_size, 1)
    def forward(self, x, mask=None):
        m = mask.unsqueeze(-1).float()
        cls_p = x[:, 0]
        mean_p = (x * m).sum(1) / m.sum(1).clamp(1e-9)
        max_p = (x + (1-m)*-1e9).max(1)[0]
        w = F.softmax(self.attn_proj(x).masked_fill(mask.unsqueeze(-1)==0, -1e9), 1)
        attn_p = (w * x).sum(1)
        return self.out(F.dropout(F.gelu(self.dense(torch.cat([cls_p, mean_p, max_p, attn_p], -1))), 0.1))

class CustomDebertaV3ForMultilabel(AutoModelForSequenceClassification):
    def __init__(self, config):
        super().__init__(config)
        self.loss_fct = FocalLoss()
        self.hierarchical_attention = HierarchicalAttentionLayer(config.hidden_size, config.num_attention_heads)
        self.adaptive_classifier = AdaptivePoolingClassifier(config.hidden_size, config.num_labels)
        if hasattr(self, 'classifier'): del self.classifier
        if hasattr(self, 'pooler'): del self.pooler
    def forward(self, input_ids=None, attention_mask=None, labels=None, **kwargs):
        x = self.deberta(input_ids, attention_mask=attention_mask, **kwargs)[0]
        logits = self.adaptive_classifier(self.hierarchical_attention(x, attention_mask), attention_mask)
        loss = self.loss_fct(logits, labels) if labels is not None else None
        return SequenceClassifierOutput(loss=loss, logits=logits)

## 4. Training and Evaluation

In [None]:
class ToxicCommentsDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts, self.labels, self.tokenizer, self.max_length = texts, labels, tokenizer, max_length
    def __len__(self): return len(self.texts)
    def __getitem__(self, i):
        e = self.tokenizer(str(self.texts[i]), max_length=self.max_length, padding='max_length', truncation=True, return_tensors='pt')
        return {'input_ids': e['input_ids'].flatten(), 'attention_mask': e['attention_mask'].flatten(), 'labels': torch.tensor(self.labels[i], dtype=torch.float)}

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
ds = ToxicCommentsDataset(religion_df[TEXT_COL].values, religion_df[RELIGION_COLS].values, tokenizer, MAX_LENGTH)
tr_idx, val_idx = train_test_split(range(len(ds)), test_size=VALIDATION_SPLIT_SIZE, random_state=SEED)

model = CustomDebertaV3ForMultilabel.from_pretrained(MODEL_NAME, num_labels=len(RELIGION_COLS)).to("cuda" if torch.cuda.is_available() else "cpu")

args = TrainingArguments(output_dir='./results', num_train_epochs=EPOCHS, per_device_train_batch_size=PER_DEVICE_BATCH_SIZE, gradient_accumulation_steps=GRAD_ACCUM_STEPS, learning_rate=LEARNING_RATE, evaluation_strategy="epoch", load_best_model_at_end=True, metric_for_best_model=METRIC_FOR_BEST_MODEL, report_to='none')

trainer = Trainer(model=model, args=args, train_dataset=Subset(ds, tr_idx), eval_dataset=Subset(ds, val_idx), data_collator=DataCollatorWithPadding(tokenizer))
trainer.train()

## 5. Threshold Tuning and Final Results

In [None]:
preds = trainer.predict(Subset(ds, val_idx))
probs = torch.sigmoid(torch.tensor(preds.predictions)).numpy()
true = religion_df.iloc[val_idx][RELIGION_COLS].values

best_t, best_f1 = 0.5, -1
for t in np.arange(0.05, 0.95, 0.01):
    f1 = f1_score(true, (probs >= t).astype(int), average='micro', zero_division=0)
    if f1 > best_f1: best_f1, best_thresh = f1, t

print(f"Best Threshold: {best_thresh:.4f}")
print(classification_report(true, (probs >= best_thresh).astype(int), target_names=RELIGION_COLS))