In [1]:
import textattack
import random
from typing import List, Tuple
from itertools import combinations_with_replacement
from datasets import load_dataset
import csv
import os

# -- Perturbation registry --
from textattack.transformations import (
    WordSwapHomoglyphSwap,
    WordSwapDeletions,
    WordSwapInvisibleCharacters,
    WordSwapReorderings,
    WordSwapNeighboringCharacterSwap
)

In [2]:
from typing import List, Tuple, Dict
import random
import os
import csv
from itertools import combinations_with_replacement
from textattack.datasets import HuggingFaceDataset
import textattack
from textattack.transformations import (
    WordSwapHomoglyphSwap,
    WordSwapInvisibleCharacters,
    WordSwapDeletions,
    WordSwapReorderings,
    WordSwapNeighboringCharacterSwap
)

# Perturbation definitions
PERTURBATIONS = {
    1: WordSwapHomoglyphSwap,
    2: WordSwapInvisibleCharacters,
    3: WordSwapDeletions,
    4: WordSwapReorderings,
    5: WordSwapNeighboringCharacterSwap
}

def load_base_texts(dataset_name: str = "emotion", split: str = "train", min_words: int = 5, max_samples: int = 20000) -> List[Tuple[str, int]]:
    dataset = load_dataset(dataset_name, split=split)
    texts = []
    for i, example in enumerate(dataset):
        text = example["text"].strip()
        label = example["label"]
        if len(text.split()) >= min_words:
            texts.append((text, label))
        if len(texts) >= max_samples:
            break
    print("Sample:", texts[0])
    return texts

def apply_perturbations(text: str, perturb_seq: Tuple[int], same_word: bool = False) -> str:
    attacked_text = textattack.shared.AttackedText(text)
    num_words = len(attacked_text.words)
    if num_words == 0:
        return text

    if same_word:
        idx = random.randint(0, num_words - 1)
        for p in perturb_seq:
            tform = PERTURBATIONS[p](random_one=True)
            transformed_texts = tform._get_transformations(attacked_text, [idx])
            if transformed_texts:
                attacked_text = transformed_texts[0]
    else:
        for p in perturb_seq:
            tform = PERTURBATIONS[p](random_one=True)
            idx = random.randint(0, num_words - 1)
            transformed_texts = tform._get_transformations(attacked_text, [idx])
            if transformed_texts:
                attacked_text = transformed_texts[0]

    return attacked_text.text

def generate_dataset_weighted_with_labels(base_texts: List[Tuple[str, int]],
                                          perturbation_allocation: Dict[Tuple[int], int],
                                          path: str,
                                          same_word: bool = False):
    rows = []
    for seq, n_samples in perturbation_allocation.items():
        sampled = random.sample(base_texts, min(n_samples, len(base_texts)))
        for clean, label in sampled:
            perturbed = apply_perturbations(clean, seq, same_word=same_word)
            rows.append((perturbed, clean, label, str(seq)))

    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["input", "original_text", "label", "perturbation"])
        writer.writerows(rows)

    print(f"Saved {len(rows)} examples → {path}")

def allocate_dataset_examples(perturbation_groups: Dict[str, List[Tuple[int]]],
                              proportions: Dict[str, float],
                              total_samples: int) -> Dict[Tuple[int], int]:
    allocation = {}
    for group_name, perturbations in perturbation_groups.items():
        group_total = int(proportions[group_name] * total_samples)
        per_combo = group_total // len(perturbations)
        for seq in perturbations:
            allocation[seq] = per_combo
    return allocation

def run_experiment_weighted_with_labels(config_name: str,
                                        train_groups: Dict[str, List[Tuple[int]]],
                                        train_props: Dict[str, float],
                                        ft_groups: Dict[str, List[Tuple[int]]],
                                        ft_props: Dict[str, float],
                                        base_texts: List[Tuple[str, int]],
                                        train_out: str,
                                        ft_out: str,
                                        total_train: int,
                                        total_ft: int,
                                        same_word_train: bool = False,
                                        same_word_ft: bool = False):
    print(f"[TRAIN] {config_name}")
    train_allocation = allocate_dataset_examples(train_groups, train_props, total_train)
    generate_dataset_weighted_with_labels(base_texts, train_allocation, train_out, same_word=same_word_train)

    print(f"[FINETUNE] {config_name}")
    ft_allocation = allocate_dataset_examples(ft_groups, ft_props, total_ft)
    generate_dataset_weighted_with_labels(base_texts, ft_allocation, ft_out, same_word=same_word_ft)


In [3]:

# Run 7 experiments
if __name__ == "__main__":
    base_texts = load_base_texts("dair-ai/emotion", "train", min_words=5, max_samples=20000)
    p1234 = [1, 2, 3, 4]
    one_mix = [(i,) for i in p1234]
    two_mix = list(combinations_with_replacement(p1234, 2))
    three_mix = list(combinations_with_replacement(p1234, 3))
    same_word_two_mix = two_mix
    same_word_three_mix = three_mix

    # Exp 1
    run_experiment_weighted_with_labels("exp1",
        {"one_mix": one_mix}, {"one_mix": 1.0},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp1_train.csv", "data/exp1_ft.csv", 4000, 1000)

    # Exp 2
    run_experiment_weighted_with_labels("exp2",
        {"one_mix": one_mix, "two_mix": two_mix}, {"one_mix": 0.5, "two_mix": 0.5},
        {"pure_5": [(5,)], "mixed_5": [(i, 5) for i in p1234]}, {"pure_5": 0.5, "mixed_5": 0.5},
        base_texts, "data/exp2_train.csv", "data/exp2_ft.csv", 4000, 1000)

    # Exp 3
    run_experiment_weighted_with_labels("exp3",
        {"one_mix": one_mix, "two_mix": two_mix, "three_mix": three_mix},
        {"one_mix": 0.25, "two_mix": 0.35, "three_mix": 0.4},
        {
            "pure_5": [(5,)],
            "i_5": [(i, 5) for i in p1234],
            "ij_5": [(i, j, 5) for i in p1234 for j in p1234]
        },
        {"pure_5": 0.2, "i_5": 0.3, "ij_5": 0.5},
        base_texts, "data/exp3_train.csv", "data/exp3_ft.csv", 4000, 1000)

    # Exp 4
    run_experiment_weighted_with_labels("exp4",
        {"one_mix": one_mix, "two_mix": two_mix}, {"one_mix": 0.5, "two_mix": 0.5},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp4_train.csv", "data/exp4_ft.csv", 4000, 1000)

    # Exp 5
    run_experiment_weighted_with_labels("exp5",
        {"one_mix": one_mix, "two_mix": two_mix, "three_mix": three_mix},
        {"one_mix": 0.25, "two_mix": 0.35, "three_mix": 0.4},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp5_train.csv", "data/exp5_ft.csv", 4000, 1000)

    # Exp 6: Same-word 2-mix
    run_experiment_weighted_with_labels("exp6_same_word_2mix",
        {"same_word_two_mix": same_word_two_mix}, {"same_word_two_mix": 1.0},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp6_train.csv", "data/exp6_ft.csv", 4000, 1000,
        same_word_train=True, same_word_ft=False)

    # Exp 7: Same-word 1+2+3-mix
    run_experiment_weighted_with_labels("exp7_same_word_full_mix",
        {
            "one_mix": one_mix,
            "same_word_two_mix": same_word_two_mix,
            "same_word_three_mix": same_word_three_mix
        },
        {"one_mix": 0.25, "same_word_two_mix": 0.35, "same_word_three_mix": 0.4},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp7_train.csv", "data/exp7_ft.csv", 4000, 1000,
        same_word_train=True, same_word_ft=False)


Sample: ('i can go from feeling so hopeless to so damned hopeful just from being around someone who cares and is awake', 0)
[TRAIN] exp1
Saved 4000 examples → data/exp1_train.csv
[FINETUNE] exp1
Saved 1000 examples → data/exp1_ft.csv
[TRAIN] exp2
Saved 4000 examples → data/exp2_train.csv
[FINETUNE] exp2
Saved 1000 examples → data/exp2_ft.csv
[TRAIN] exp3
Saved 4000 examples → data/exp3_train.csv
[FINETUNE] exp3
Saved 996 examples → data/exp3_ft.csv
[TRAIN] exp4
Saved 4000 examples → data/exp4_train.csv
[FINETUNE] exp4
Saved 1000 examples → data/exp4_ft.csv
[TRAIN] exp5
Saved 4000 examples → data/exp5_train.csv
[FINETUNE] exp5
Saved 1000 examples → data/exp5_ft.csv
[TRAIN] exp6_same_word_2mix
Saved 4000 examples → data/exp6_train.csv
[FINETUNE] exp6_same_word_2mix
Saved 1000 examples → data/exp6_ft.csv
[TRAIN] exp7_same_word_full_mix
Saved 4000 examples → data/exp7_train.csv
[FINETUNE] exp7_same_word_full_mix
Saved 1000 examples → data/exp7_ft.csv


In [4]:

# Run 7 experiments
if __name__ == "__main__":
    base_texts = load_base_texts("dair-ai/emotion", "test", min_words=5, max_samples=20000)
    p1234 = [1, 2, 3, 4]
    one_mix = [(i,) for i in p1234]
    two_mix = list(combinations_with_replacement(p1234, 2))
    three_mix = list(combinations_with_replacement(p1234, 3))
    same_word_two_mix = two_mix
    same_word_three_mix = three_mix

    # Exp 1
    run_experiment_weighted_with_labels("exp1",
        {"one_mix": one_mix}, {"one_mix": 1.0},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp1_train_test.csv", "data/exp1_ft_test.csv", 4000, 1000)

    # Exp 2
    run_experiment_weighted_with_labels("exp2",
        {"one_mix": one_mix, "two_mix": two_mix}, {"one_mix": 0.5, "two_mix": 0.5},
        {"pure_5": [(5,)], "mixed_5": [(i, 5) for i in p1234]}, {"pure_5": 0.5, "mixed_5": 0.5},
        base_texts, "data/exp2_train_test.csv", "data/exp2_ft_test.csv", 4000, 1000)

    # Exp 3
    run_experiment_weighted_with_labels("exp3",
        {"one_mix": one_mix, "two_mix": two_mix, "three_mix": three_mix},
        {"one_mix": 0.25, "two_mix": 0.35, "three_mix": 0.4},
        {
            "pure_5": [(5,)],
            "i_5": [(i, 5) for i in p1234],
            "ij_5": [(i, j, 5) for i in p1234 for j in p1234]
        },
        {"pure_5": 0.2, "i_5": 0.3, "ij_5": 0.5},
        base_texts, "data/exp3_train_test.csv", "data/exp3_ft_test.csv", 4000, 1000)

    # Exp 4
    run_experiment_weighted_with_labels("exp4",
        {"one_mix": one_mix, "two_mix": two_mix}, {"one_mix": 0.5, "two_mix": 0.5},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp4_train_test.csv", "data/exp4_ft_test.csv", 4000, 1000)

    # Exp 5
    run_experiment_weighted_with_labels("exp5",
        {"one_mix": one_mix, "two_mix": two_mix, "three_mix": three_mix},
        {"one_mix": 0.25, "two_mix": 0.35, "three_mix": 0.4},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp5_train_test.csv", "data/exp5_ft_test.csv", 4000, 1000)

    # Exp 6: Same-word 2-mix
    run_experiment_weighted_with_labels("exp6_same_word_2mix",
        {"same_word_two_mix": same_word_two_mix}, {"same_word_two_mix": 1.0},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp6_train_test.csv", "data/exp6_ft_test.csv", 4000, 1000,
        same_word_train=True, same_word_ft=False)

    # Exp 7: Same-word 1+2+3-mix
    run_experiment_weighted_with_labels("exp7_same_word_full_mix",
        {
            "one_mix": one_mix,
            "same_word_two_mix": same_word_two_mix,
            "same_word_three_mix": same_word_three_mix
        },
        {"one_mix": 0.25, "same_word_two_mix": 0.35, "same_word_three_mix": 0.4},
        {"pure_5": [(5,)]}, {"pure_5": 1.0},
        base_texts, "data/exp7_train_test.csv", "data/exp7_ft_test.csv", 4000, 1000,
        same_word_train=True, same_word_ft=False)


Sample: ('im feeling rather rotten so im not very ambitious right now', 0)
[TRAIN] exp1
Saved 4000 examples → data/exp1_train_test.csv
[FINETUNE] exp1
Saved 1000 examples → data/exp1_ft_test.csv
[TRAIN] exp2
Saved 4000 examples → data/exp2_train_test.csv
[FINETUNE] exp2
Saved 1000 examples → data/exp2_ft_test.csv
[TRAIN] exp3
Saved 4000 examples → data/exp3_train_test.csv
[FINETUNE] exp3
Saved 996 examples → data/exp3_ft_test.csv
[TRAIN] exp4
Saved 4000 examples → data/exp4_train_test.csv
[FINETUNE] exp4
Saved 1000 examples → data/exp4_ft_test.csv
[TRAIN] exp5
Saved 4000 examples → data/exp5_train_test.csv
[FINETUNE] exp5
Saved 1000 examples → data/exp5_ft_test.csv
[TRAIN] exp6_same_word_2mix
Saved 4000 examples → data/exp6_train_test.csv
[FINETUNE] exp6_same_word_2mix
Saved 1000 examples → data/exp6_ft_test.csv
[TRAIN] exp7_same_word_full_mix
Saved 4000 examples → data/exp7_train_test.csv
[FINETUNE] exp7_same_word_full_mix
Saved 1000 examples → data/exp7_ft_test.csv


In [19]:
# === 📦 Dependencies ===
import pandas as pd
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification
from torch.optim import AdamW
from sklearn.metrics import accuracy_score
from tqdm import tqdm
from datasets import load_dataset
import os, ast

# === 🧾 Adapted from TextAttack-based perturbation ===
from textattack.transformations import (
    WordSwapHomoglyphSwap,
    WordSwapInvisibleCharacters,
    WordSwapDeletions,
    WordSwapReorderings,
    WordSwapNeighboringCharacterSwap
)
import textattack

PERTURBATIONS = {
    1: WordSwapHomoglyphSwap,
    2: WordSwapInvisibleCharacters,
    3: WordSwapDeletions,
    4: WordSwapReorderings,
    5: WordSwapNeighboringCharacterSwap
}

def perturb_and_indices(text: str, perturb_seq=(1,), same_word=False):
    attacked_text = textattack.shared.AttackedText(text)
    num_words = len(attacked_text.words)
    if num_words == 0:
        return text, text, [[i] for i in range(num_words)], [[i] for i in range(num_words)]

    pert_text = attacked_text
    if same_word:
        idx = random.randint(0, num_words - 1)
        for p in perturb_seq:
            tform = PERTURBATIONS[p](random_one=True)
            transformed = tform._get_transformations(pert_text, [idx])
            if transformed:
                pert_text = transformed[0]
    else:
        for p in perturb_seq:
            idx = random.randint(0, num_words - 1)
            tform = PERTURBATIONS[p](random_one=True)
            transformed = tform._get_transformations(pert_text, [idx])
            if transformed:
                pert_text = transformed[0]

    clean_words = attacked_text.words
    pert_words = pert_text.words

    # Naive assumption: same number of words → one-to-one index mapping
    clean_indices = [[i] for i in range(len(clean_words))]
    pert_indices = [[i] for i in range(len(pert_words))]

    return pert_text.text, text, pert_indices, clean_indices

# === 📦 Generate CSV ===
def generate_csv(out_path, n=500):
    dataset = load_dataset("dair-ai/emotion", split="train")
    rows = []
    for ex in dataset.select(range(n)):
        clean_text = ex['text']
        label = ex['label']
        perturbed, clean, p_idx, c_idx = perturb_and_indices(clean_text)
        rows.append([perturbed, clean, label, str(p_idx), str(c_idx)])
    df = pd.DataFrame(rows, columns=["input", "original_text", "label", "word_indices_perturbed", "word_indices_clean"])
    os.makedirs(os.path.dirname(out_path), exist_ok=True)
    df.to_csv(out_path, index=False)
    print("Saved:", out_path)

# === 📚 Dataset Loader ===
class SemanticDataset(Dataset):
    def __init__(self, path, tokenizer):
        df = pd.read_csv(path)
        self.input = df["input"].tolist()
        self.clean = df["original_text"].tolist()
        self.label = df["label"].tolist()
        self.pidx = [ast.literal_eval(x) for x in df["word_indices_perturbed"]]
        self.cidx = [ast.literal_eval(x) for x in df["word_indices_clean"]]
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.input)

    def __getitem__(self, i):
        t_clean = self.tokenizer(self.clean[i], return_tensors="pt", truncation=True, padding="max_length")
        t_pert = self.tokenizer(self.input[i], return_tensors="pt", truncation=True, padding="max_length")
        return {
            "tokens_clean": {k: v.squeeze(0) for k, v in t_clean.items()},
            "tokens_pert": {k: v.squeeze(0) for k, v in t_pert.items()},
            "cidx": self.cidx[i], "pidx": self.pidx[i],
            "label": torch.tensor(self.label[i])
        }

# === 🧱 Semantic Loss Utils ===
def get_embeddings(model, tokens, device):
    tokens = {k: v.to(device) for k, v in tokens.items() if k in {"input_ids", "attention_mask"}}
    with torch.no_grad():
        return model.distilbert(**tokens).last_hidden_state  # shape: (B, T, H)


def get_word_embeddings(hidden, index_list):
    word_embeds = []
    for group in index_list:
        vec = sum(hidden[i] for i in group if i < hidden.size(0)) / len(group)
        word_embeds.append(vec)
    return torch.stack(word_embeds)

# === 🏋️ Training Loop ===
def train(model, loader, optimizer, device, lambda_sem):
    model.train()
    for batch in tqdm(loader):
        optimizer.zero_grad()

        input_ids = batch['tokens_pert']['input_ids'].to(device)
        attention_mask = batch['tokens_pert']['attention_mask'].to(device)
        labels = batch['label'].to(device)

        output = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss_cls = output.loss

        emb_clean = get_embeddings(model, batch['tokens_clean'], device)  # (B, T, H)
        emb_pert = get_embeddings(model, batch['tokens_pert'], device)    # (B, T, H)

        emb_clean_words_list = [
            get_word_embeddings(emb_clean[i], batch['cidx'][i]) for i in range(len(batch['cidx']))
        ]
        emb_pert_words_list = [
            get_word_embeddings(emb_pert[i], batch['pidx'][i]) for i in range(len(batch['pidx']))
        ]

        loss_sem_list = [
            1 - F.cosine_similarity(clean_w, pert_w, dim=1).mean()
            for clean_w, pert_w in zip(emb_clean_words_list, emb_pert_words_list)
        ]

        loss_sem = torch.stack(loss_sem_list).mean()
        loss = loss_cls + lambda_sem * loss_sem
        loss.backward()
        optimizer.step()


# === 🧪 Evaluation ===
def evaluate(model, loader, device):
    model.eval()
    preds, trues = [], []
    with torch.no_grad():
        for batch in loader:
            logits = model(**{k: v.to(device) for k, v in batch['tokens_pert'].items()}).logits
            pred = torch.argmax(logits, dim=-1).cpu().tolist()
            preds += pred
            trues += batch['label'].tolist()
    return accuracy_score(trues, preds)

# === 🤝 Custom Collate Function ===
def custom_collate_fn(batch):
    keys = batch[0].keys()
    collated = {}
    for key in keys:
        if isinstance(batch[0][key], dict):
            collated[key] = {
                subkey: torch.stack([item[key][subkey] for item in batch])
                for subkey in batch[0][key].keys()
            }
        elif isinstance(batch[0][key], list):
            collated[key] = [item[key] for item in batch]
        else:
            collated[key] = torch.stack([item[key] for item in batch])
    return collated

# === 🚀 Main ===
def run(csv_path, use_semantic_loss=True):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    tokenizer = DistilBertTokenizerFast.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion")
    model = DistilBertForSequenceClassification.from_pretrained("bhadresh-savani/distilbert-base-uncased-emotion", num_labels=6).to(device)
    dataset = SemanticDataset(csv_path, tokenizer)
    loader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=custom_collate_fn)
    optimizer = AdamW(model.parameters(), lr=2e-5)
    train(model, loader, optimizer, device, lambda_sem=0.5 if use_semantic_loss else 0.0)
    acc = evaluate(model, loader, device)
    print("Accuracy:", acc)


def run_exp1():
    from itertools import combinations_with_replacement

    base_texts = load_dataset("dair-ai/emotion", split="train")
    base_texts = [(ex['text'].strip(), ex['label']) for ex in base_texts if len(ex['text'].strip().split()) >= 5][:5000]

    one_mix = [(i,) for i in [1, 2, 3, 4]]
    allocation = {seq: 1000 // len(one_mix) for seq in one_mix}  # 1000 examples evenly split

    rows = []
    for seq, n_samples in allocation.items():
        sampled = random.sample(base_texts, n_samples)
        for clean, label in sampled:
            perturbed, _, p_idx, c_idx = perturb_and_indices(clean, seq, same_word=False)
            rows.append([perturbed, clean, label, str(p_idx), str(c_idx)])

    df = pd.DataFrame(rows, columns=["input", "original_text", "label", "word_indices_perturbed", "word_indices_clean"])
    os.makedirs("data", exist_ok=True)
    csv_path = "data/exp1.csv"
    df.to_csv(csv_path, index=False)

    run(csv_path, use_semantic_loss=True)
run_exp1()


  1%|          | 1/125 [00:07<16:01,  7.76s/it]


KeyboardInterrupt: 

In [3]:
print(apply_perturbations("hello hello2 hello3 hello4 hello5", (1, 3, 4), same_word=True))

inside: <AttackedText "hello hellо2 hello3 hello4 hello5">
'hellо2'
'helm\x08lо2'
inside: <AttackedText "hello hellо2 hello3 hello4 hello5">
inside: <AttackedText "hello hel‭⁦‮⁩⁦m⁩‬⁩‬lо2 hello3 hello4 hello5">
hello hel‭⁦‮⁩⁦m⁩‬⁩‬lо2 hello3 hello4 hello5


In [None]:
"""
Run this on colab
Training + finetuning T5
"""
import os
import torch
import random
import numpy as np
from datasets import load_dataset
from transformers import (
    AutoTokenizer, AutoModelForSeq2SeqLM,
    DataCollatorForSeq2Seq, Seq2SeqTrainer, Seq2SeqTrainingArguments
)
import Levenshtein  

def finetune_t5_on_experiment(csv_path, out_dir, model_id="t5-small", val_split=0.03,
                              batch_size=16, epochs=3, lr=5e-4, max_len=128, seed=42):
    os.environ["WANDB_DISABLED"] = "true"

    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

    ds = load_dataset("csv", data_files=csv_path)["train"].shuffle(seed=seed)
    split = ds.train_test_split(test_size=val_split, seed=seed)
    train_ds, val_ds = split["train"], split["test"]

    tokenizer = AutoTokenizer.from_pretrained(model_id)
    tokenizer.pad_token = tokenizer.eos_token
    model = AutoModelForSeq2SeqLM.from_pretrained(model_id)

    def preprocess(batch):
        model_in = tokenizer(batch["input"], padding="longest", truncation=True, max_length=max_len)
        with tokenizer.as_target_tokenizer():
            labels = tokenizer(batch["original_text"], padding="longest", truncation=True, max_length=max_len).input_ids
        model_in["labels"] = labels
        return model_in

    train_ds = train_ds.map(preprocess, batched=True, remove_columns=train_ds.column_names)
    val_ds = val_ds.map(preprocess, batched=True, remove_columns=val_ds.column_names)

    collator = DataCollatorForSeq2Seq(tokenizer, model=model)

    args = Seq2SeqTrainingArguments(
        output_dir=out_dir,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        num_train_epochs=epochs,
        learning_rate=lr,
        lr_scheduler_type="cosine",
        do_eval=True,
        generation_max_length=max_len,
        fp16=torch.cuda.is_available(),
        label_smoothing_factor=0.1,
        save_total_limit=1,
        seed=seed,
        report_to="none"
    )

    def postprocess_text(preds, labels):
        return [p.strip() for p in preds], [l.strip() for l in labels]

    def compute_metrics(eval_preds):
        preds, labels = eval_preds
        preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
        labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
        preds, labels = postprocess_text(preds, labels)

        def norm_edit_sim(p, l):
            return 1.0 - Levenshtein.distance(p, l) / max(1, max(len(p), len(l)))

        avg_sim = np.mean([norm_edit_sim(p, l) for p, l in zip(preds, labels)])
        return {"norm_edit_sim": avg_sim}

    trainer = Seq2SeqTrainer(
        model=model,
        args=args,
        train_dataset=train_ds,
        eval_dataset=val_ds,
        data_collator=collator,
        compute_metrics=compute_metrics,
    )

    trainer.train()
    model.save_pretrained(out_dir)
    tokenizer.save_pretrained(out_dir)
    print(f"Finished fine-tuning {csv_path} → saved to {out_dir}")

for i in range(1, 8):
    finetune_t5_on_experiment(f"exp{i}_train.csv", f"models/exp{i}_train")
    finetune_t5_on_experiment(
        csv_path = f"exp{i}_ft.csv",
        out_dir = f"models/exp{i}_ft",
        model_id = f"models/exp{i}_train"
    )

In [None]:
# do not lose this
import torch
import torch.nn as nn
import copy
from transformers.models.t5.modeling_t5 import (
    T5ForConditionalGeneration,
    T5Stack,
    T5Block,
    T5Attention
)

# ------------------------------------------
# 1. Trust Gate
# ------------------------------------------

class TrustGate(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.gate = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, 1),
            nn.Sigmoid()
        )

    def forward(self, hidden_states):
        # Output shape: (batch_size, seq_len)
        return self.gate(hidden_states).squeeze(-1)

# ------------------------------------------
# 2. Trust-Aware Self-Attention
# ------------------------------------------

class TrustT5Attention(T5Attention):
    def __init__(self, config, has_relative_attention_bias=False):
        super().__init__(config, has_relative_attention_bias)
        self.trust_gate = TrustGate(config.d_model)

    def forward(
        self,
        hidden_states,
        mask=None,
        position_bias=None,
        past_key_value=None,
        layer_head_mask=None,
        query_length=None,
        use_cache=False,
        output_attentions=False,
        key_value_states=None,
        cache_position=None
    ):
        # Always request attention weights and full outputs
        output = super().forward(
            hidden_states=hidden_states,
            mask=mask,
            position_bias=position_bias,
            past_key_value=past_key_value,
            layer_head_mask=layer_head_mask,
            query_length=query_length,
            use_cache=use_cache,
            output_attentions=True,  # needed for attn_weights
            key_value_states=key_value_states,
            cache_position=cache_position,
        )

        # Unpack
        attention_output = output[0]
        present_key_value = output[1]
        position_bias_out = output[2] if len(output) > 2 else None
        attn_weights = output[3] if len(output) > 3 else None

        # Apply trust gate
        if attn_weights is not None:
            trust = self.trust_gate(hidden_states).unsqueeze(1).unsqueeze(2)  # (B, 1, 1, L)
            attn_weights = attn_weights * trust

        # Return all expected values (even if some are None)
        outputs = (attention_output, present_key_value, position_bias_out)
        if output_attentions:
            outputs += (attn_weights,)
        return outputs



# ------------------------------------------
# 3. Block and Stack
# ------------------------------------------

class TrustBlock(T5Block):
    def __init__(self, config, has_relative_attention_bias=False):
        super().__init__(config, has_relative_attention_bias)
        self.layer[0].SelfAttention = TrustT5Attention(config, has_relative_attention_bias)


class TrustEncoder(T5Stack):
    def __init__(self, config, embed_tokens):
        config = copy.deepcopy(config)
        config.is_decoder = False
        config.use_cache = False
        super().__init__(config, embed_tokens)

        self.block = nn.ModuleList([
            TrustBlock(config, has_relative_attention_bias=(i == 0))
            for i in range(config.num_layers)
        ])

# ------------------------------------------
# 4. Model Definition
# ------------------------------------------

class TrustT5(T5ForConditionalGeneration):
    def __init__(self, config):
        super().__init__(config)
        self.encoder = TrustEncoder(config, self.shared)

    @classmethod
    def from_pretrained(cls, model_name, *args, **kwargs):
        model = super().from_pretrained(model_name, *args, **kwargs)
        model.encoder = TrustEncoder(model.config, model.shared)
        return model

from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("t5-small")
model = TrustT5.from_pretrained("t5-small")
text = "emotion: I am so proud of you"
label_text = "joy"

inputs = tokenizer(text, return_tensors="pt")
labels = tokenizer(label_text, return_tensors="pt").input_ids

outputs = model(**inputs, labels=labels)
generated_ids = model.generate(**inputs)
decoded_output = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
print("Generated output:", decoded_output)

print("Loss:", outputs.loss.item())
print("Logits shape:", outputs.logits.shape)



In [None]:
# dont fkin lose this one either

import os
import torch
import pandas as pd
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments, DataCollatorForSeq2Seq

# Check device availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create directories for saving model and logs
os.makedirs("results", exist_ok=True)
os.makedirs("logs", exist_ok=True)

# Load the tokenizer and model (from pre-trained checkpoint)
tokenizer = AutoTokenizer.from_pretrained("t5-small")
model = T5ForConditionalGeneration.from_pretrained("t5-small").to(device)

# Ensure padding token is set to eos token for T5
tokenizer.pad_token = tokenizer.eos_token

# Load and preprocess dataset
def preprocess_data(csv_path, tokenizer, max_length=128):
    df = pd.read_csv(csv_path)  # Load dataset
    dataset = Dataset.from_pandas(df)  # Convert to Hugging Face dataset

    # Preprocessing function to tokenize inputs and targets
    def preprocess(batch):
        model_in = tokenizer(batch["input"], padding="longest", truncation=True, max_length=max_length)
        with tokenizer.as_target_tokenizer():
            labels = tokenizer(batch["original_text"], padding="longest", truncation=True, max_length=max_length).input_ids
        model_in["labels"] = labels
        return model_in

    tokenized_data = dataset.map(preprocess, batched=True, remove_columns=dataset.column_names)
    return tokenized_data

# Preprocess the training data (replace 'exp1_train.csv' with your actual file)
train_dataset = preprocess_data("exp1_train.csv", tokenizer)

# Training arguments
training_args = Seq2SeqTrainingArguments(
    output_dir="./results",          # Output directory for model checkpoints
    # eval_strategy="epoch",     # Evaluate after every epoch
    learning_rate=5e-5,              # Learning rate
    per_device_train_batch_size=16,  # Batch size per device
    per_device_eval_batch_size=16,   # Eval batch size
    weight_decay=0.01,               # Weight decay
    num_train_epochs=3,              # Number of training epochs
    logging_dir="./logs",            # Log directory
    logging_steps=10,                # Log every 10 steps
    save_total_limit=1,              # Limit number of saved checkpoints
    predict_with_generate=True,      # Ensure generation during evaluation
)

# Data collator to pad batches to the max length
data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)

# Trainer setup
trainer = Seq2SeqTrainer(
    model=model,                         # The model you're training
    args=training_args,                  # The training arguments
    train_dataset=train_dataset,         # Your training dataset
    tokenizer=tokenizer,                 # Tokenizer for input/output
    data_collator=data_collator,         # Data collator for padding
)

# Start training
trainer.train()

# Save the fine-tuned model and tokenizer
model.save_pretrained("models/enc_post_train")
tokenizer.save_pretrained("models/enc_post_train")

print("Fine-tuning complete. Model saved to 'models/enc_post_train'")


In [None]:
import os
import torch
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.makedirs("eval_outputs", exist_ok=True)

def generate_corrections(model, tokenizer, inputs, max_len=128):
    enc = tokenizer(inputs, return_tensors="pt", padding=True, truncation=True, max_length=max_len)
    enc = {k: v.to(device) for k, v in enc.items()}
    with torch.no_grad():
        output = model.generate(**enc, max_length=max_len)
    return tokenizer.batch_decode(output, skip_special_tokens=True)

all_results = []

for eval_idx in range(1, 8):
    for suffix in ["ft_test", "train_test"]:
        dataset_name = f"exp{eval_idx}_{suffix}.csv"
        df = pd.read_csv(dataset_name)
        inputs = df["input"].tolist()[:250]
        targets = df["original_text"].tolist()[:250]

        # Evaluate with both train and ft models
        for model_type in ["train", "ft"]:
            for model_idx in range(1, 8):
                model_name = f"exp{model_idx}_{model_type}"
                model_dir = f"models/{model_name}"
                if not os.path.exists(model_dir):
                    continue  # Skip if model directory doesn't exist

                print(f"Evaluating model {model_name} on {dataset_name}...")

                model = AutoModelForSeq2SeqLM.from_pretrained(model_dir).to(device).eval()
                tokenizer = AutoTokenizer.from_pretrained(model_dir)

                preds = generate_corrections(model, tokenizer, inputs)

                for inp, pred, tgt in zip(inputs, preds, targets):
                    all_results.append({
                        "eval_dataset": dataset_name,
                        "model": model_name,
                        "perturbed_input": inp,
                        "predicted_output": pred,
                        "original_text": tgt
                    })

# Save to one CSV
out_df = pd.DataFrame(all_results)
out_df.to_csv("eval_outputs/all_model_outputs.csv", index=False)
print("Saved all evaluation results (train + ft models) to eval_outputs/all_model_outputs.csv")


In [None]:
import pandas as pd

df = pd.read_csv("eval_outputs/all_model_outputs.csv")

df["predicted_output"] = df["predicted_output"].astype(str).str.strip()
df["original_text"] = df["original_text"].astype(str).str.strip()

df["exact_match"] = df["predicted_output"] == df["original_text"]

acc = df.groupby(["model", "eval_dataset"])["exact_match"].mean().unstack()

acc = (acc * 100).round(2)

print("=== Exact Match Accuracy (%) ===")
print(acc.fillna("-"))

acc.to_csv("eval_outputs/accuracy_table.csv")
acc.to_markdown("eval_outputs/accuracy_table.md")
