In [None]:
import os, random
from pathlib import Path
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.backends.cudnn as cudnn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score, accuracy_score
from transformers import AutoTokenizer, AutoModelForSequenceClassification, default_data_collator
from scipy.stats import ttest_rel

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    cudnn.deterministic = True
    cudnn.benchmark = False

def read_table(path): 
    return pd.read_csv(path, sep="\t" if path.endswith(".tsv") else ",")

def load_split(path, text_col, label_col):
    df = read_table(path)
    return df[text_col].astype(str).tolist(), df[label_col].astype(int).tolist()

class BinaryHateDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=500):
        self.texts = texts
        self.labels = labels
        self.tok = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        enc = self.tok(self.texts[idx], truncation=True, max_length=self.max_len,
                       padding='max_length', return_tensors='pt')
        return {'input_ids': enc.input_ids.squeeze(), 'attention_mask': enc.attention_mask.squeeze(), 'labels': torch.tensor(self.labels[idx], dtype=torch.long)}

def predict_labels(model, dataset, batch_size=128):
    model.eval()
    dl = DataLoader(dataset, batch_size=batch_size, shuffle=False, collate_fn=default_data_collator)
    preds = []
    with torch.no_grad():
        for batch in dl:
            _ = batch.pop("labels")
            batch = {k: v.to(model.device) for k, v in batch.items()}
            logits = model(**batch).logits
            preds.append(logits.argmax(dim=-1).cpu().numpy())
    return np.concatenate(preds, axis=0)

def compute_metrics_all(y_true, y_pred):
    y_true = np.asarray(y_true); y_pred = np.asarray(y_pred)
    acc = accuracy_score(y_true, y_pred)
    f1_bin = f1_score(y_true, y_pred, average="binary")
    f1_mac = f1_score(y_true, y_pred, average="macro")
    return float(acc), float(f1_bin), float(f1_mac)

In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
HF_USERNAME = "iproskurina"
MODEL_NAME  = "bert-base-cased" #  facebook/opt-125m
DATASETS    = ["hatexplain"] # ["hatexplain","ihc","sbic","olid"]
TEXT_COL    = "sentence"
LABEL_COL   = "label"
SEEDS       = [0,1,2,3,4]
MAX_LEN     = 500
BATCH_SIZE  = 128

OUT_DIR = Path("outputs"); OUT_DIR.mkdir(exist_ok=True)
base_model_name = MODEL_NAME.split("/")[-1]

for dataset in DATASETS:
    for seed in SEEDS:
        set_seed(seed)
        repo_id = f"{HF_USERNAME}/{base_model_name}-{dataset}-s{seed}"
        tokenizer = AutoTokenizer.from_pretrained(repo_id, use_fast=True)
        model = AutoModelForSequenceClassification.from_pretrained(repo_id).to(DEVICE)
        if getattr(model.config, "pad_token_id", None) is None and tokenizer.pad_token_id is not None:
            model.config.pad_token_id = tokenizer.pad_token_id
        full_rows, full_cache = [], {}
        for dataset_eval in DATASETS:
            eval_path = f"{dataset_eval}_test.csv"
            x, y = load_split(eval_path, TEXT_COL, LABEL_COL)
            ds = BinaryHateDataset(x, y, tokenizer, max_len=MAX_LEN)
            y_pred_full = predict_labels(model, ds)
            acc_full, f1b_full, f1m_full = compute_metrics_all(np.array(y), y_pred_full)
            full_rows.append({
                "removed_layer": -1,
                "eval_set": dataset_eval,
                "acc_full": acc_full,
                "f1_binary_full": f1b_full,
                "f1_macro_full": f1m_full,
                "n": int(len(y)),
            })
            full_cache[dataset_eval] = (np.array(y), y_pred_full.astype(float))
        pd.DataFrame(full_rows).to_csv(
            OUT_DIR / f"{base_model_name}__{dataset}__seed{seed}__full.csv", index=False
        )
        if hasattr(model, "bert"):
            embed_type, layer_list = "bert", model.bert.encoder.layer
        elif hasattr(model, "model") and hasattr(model.model, "decoder") and hasattr(model.model.decoder, "layers"):
            embed_type, layer_list = "opt", model.model.decoder.layers
        L = len(layer_list)
        del model
        torch.cuda.empty_cache()
        rows = []
        for k in range(L):
            pruned = AutoModelForSequenceClassification.from_pretrained(repo_id).to(DEVICE)
            if getattr(pruned.config, "pad_token_id", None) is None and tokenizer.pad_token_id is not None:
                pruned.config.pad_token_id = tokenizer.pad_token_id
            if embed_type == "bert":
                enc = pruned.bert.encoder
                keep = [i for i in range(L) if i != k]
                enc.layer = nn.ModuleList([enc.layer[i] for i in keep])
                pruned.config.num_hidden_layers = len(enc.layer)
            else:
                dec = pruned.model.decoder
                keep = [i for i in range(L) if i != k]
                dec.layers = nn.ModuleList([dec.layers[i] for i in keep])
                if hasattr(pruned.config, "num_hidden_layers"):
                    pruned.config.num_hidden_layers = len(dec.layers)
            pruned.eval()
            for dataset_eval in DATASETS:
                eval_path = f"{dataset_eval}_test.csv"
                x, y = load_split(eval_path, TEXT_COL, LABEL_COL)
                y = np.array(y)
                ds = BinaryHateDataset(x, y, tokenizer, max_len=MAX_LEN)
                y_pred_pruned = predict_labels(pruned, ds).astype(float)
                y_pred_full = full_cache[dataset_eval][1]
                # ttest
                t_stat_pred, p_val_pred = ttest_rel(y_pred_full, y_pred_pruned, nan_policy="omit")
                acc_p, f1b_p, f1m_p = compute_metrics_all(y, y_pred_pruned)
                rows.append({
                    "removed_layer": k,
                    "eval_set": dataset_eval,
                    "t_rel_pred": float(t_stat_pred),
                    "p_rel_pred": float(p_val_pred),
                    "acc_pruned": acc_p,
                    "f1_binary_pruned": f1b_p,
                    "f1_macro_pruned": f1m_p,
                    "n": int(len(y)),
                })
            del pruned
            torch.cuda.empty_cache()
        pd.DataFrame(rows).to_csv(
            OUT_DIR / f"{base_model_name}__{dataset}__seed{seed}__ablation_ttest_preds.csv", index=False
        )
        print(f"Model {repo_id} processed")