In [None]:
!pip install datasets transformers --quiet
!pip install datasets --quiet
!pip install -U datasets huggingface_hub fsspec

Collecting datasets
  Downloading datasets-3.6.0-py3-none-any.whl.metadata (19 kB)
Collecting huggingface_hub
  Downloading huggingface_hub-0.33.1-py3-none-any.whl.metadata (14 kB)
Collecting fsspec
  Downloading fsspec-2025.5.1-py3-none-any.whl.metadata (11 kB)
  Downloading fsspec-2025.3.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.6.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.5/491.5 kB[0m [31m19.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading huggingface_hub-0.33.1-py3-none-any.whl (515 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m515.4/515.4 kB[0m [31m33.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2025.3.0-py3-none-any.whl (193 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m193.6/193.6 kB[0m [31m16.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fsspec, huggingface_hub, datasets
  Attempting uninstall: fsspec
    Found existing installat

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
import re
import json
import glob
from tqdm.auto import tqdm

%matplotlib inline

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
batch_size = 10

In [None]:
def calculate_mask_attribution(texts, labels, k=15, save_path=None, debug=False):
    results = []

    for idx, (text, label) in enumerate(zip(texts, labels)):
        inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512).to(device)
        token_ids = inputs["input_ids"]
        attention_mask = inputs["attention_mask"]
        tokens = tokenizer.convert_ids_to_tokens(token_ids[0])

        with torch.no_grad():
            original_logits = model(**inputs).logits
        original_conf = torch.softmax(original_logits, dim=-1)[0].max().item()

        attributions = []
        for i in range(1, token_ids.size(1) - 1):
            perturbed_ids = token_ids.clone()
            perturbed_ids[0, i] = tokenizer.mask_token_id if tokenizer.mask_token_id else tokenizer.unk_token_id
            with torch.no_grad():
                perturbed_logits = model(input_ids=perturbed_ids, attention_mask=attention_mask).logits
            perturbed_conf = torch.softmax(perturbed_logits, dim=-1)[0].max().item()
            attributions.append(original_conf - perturbed_conf)

        valid_tokens = tokens[1:-1]
        valid_scores = attributions[:len(valid_tokens)]
        top_k = min(k, len(valid_tokens))

        top_indices = sorted(range(len(valid_scores)), key=lambda i: abs(valid_scores[i]), reverse=True)[:top_k]
        top_tokens_scores = [(valid_tokens[i], round(valid_scores[i], 4)) for i in top_indices]
        summary_str = ", ".join([f"{t} ({s:+.2f})" for t, s in top_tokens_scores])

        results.append({
            "id": idx,
            "text": text.strip(),
            "label": "Positive" if label else "Negative",
            "tokens": [tok for tok, _ in top_tokens_scores],
            "weights": [score for _, score in top_tokens_scores],
            "token_positions": top_indices
        })

        if debug:
            print(f"Original text:\n{text.strip()}\nSentiment: {'Positive' if label else 'Negative'}")
            print("Top influential tokens:")
            for tok, score in top_tokens_scores:
                print(f"{tok:15} | weight: {score:+.4f}")
            print("-" * 80)

    if save_path:
        df = pd.DataFrame(results)
        df.to_parquet(save_path, index=False)
        print(f"Saved results to {save_path}")

    return results

In [None]:
def gradient_attribution_sentiment(texts, labels, k=15, debug=False, save_path=None):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    results = []

    for idx, (text, label) in enumerate(zip(texts, labels)):
        encoding = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512,
                             return_offsets_mapping=True).to(device)
        input_ids = encoding.input_ids
        attention_mask = encoding.attention_mask
        offsets = encoding['offset_mapping'][0].tolist()
        tokens = tokenizer.convert_ids_to_tokens(input_ids[0])

        if hasattr(model, "distilbert"):
            embedding_layer = model.distilbert.embeddings.word_embeddings
        elif hasattr(model, "roberta"):
            embedding_layer = model.roberta.embeddings.word_embeddings
        elif hasattr(model, "bert"):
            embedding_layer = model.bert.embeddings.word_embeddings
        elif hasattr(model, "transformer") and hasattr(model.transformer, "wte"):
            embedding_layer = model.transformer.wte  # GPT-2
        elif hasattr(model, "model") and hasattr(model.model, "encoder") and hasattr(model.model.encoder, "embed_tokens"):
            embedding_layer = model.model.encoder.embed_tokens
        else:
            raise ValueError("Model architecture not supported for gradient attribution.")
        inputs_embeds = embedding_layer(input_ids).detach().requires_grad_(True)
        outputs = model(inputs_embeds=inputs_embeds, attention_mask=attention_mask)
        logits = outputs.logits
        predicted_label = logits.argmax(dim=-1).item()
        target_logit = logits[0, predicted_label]
        target_logit.backward()
        grads = inputs_embeds.grad.abs().sum(dim=-1).squeeze()

        valid_tokens = tokens[1:-1]
        valid_scores = grads[1:-1].tolist()

        top_k = min(k, len(valid_tokens))
        sorted_idx = sorted(range(len(valid_scores)), key=lambda i: abs(valid_scores[i]), reverse=True)[:top_k]
        influential_tokens = [(valid_tokens[i], round(valid_scores[i], 4)) for i in sorted_idx]
        summary_str = ", ".join([f"{t} ({s:+.2f})" for t, s in influential_tokens])

        results.append({
            "id": idx,
            "text": text.strip(),
            "label": "Positive" if label else "Negative",
            "tokens": [tok for tok, _ in influential_tokens],
            "weights": [score for _, score in influential_tokens],
            "token_positions": sorted_idx
        })

        if debug:
            print(f"Original text:\n{text.strip()}\nSentiment: {'Positive' if label else 'Negative'}")
            print("Top influential tokens:")
            for tok, score in influential_tokens:
                print(f"{tok:15} | weight: {score:+.4f}")
            print("-" * 80)

    if save_path:
        import pandas as pd
        df = pd.DataFrame(results)
        df.to_parquet(save_path, index=False)
        print(f"Saved results to {save_path}")

    return results


In [None]:
def perturb_text(text, num_samples=30, keep_prob=0.8):
    words = text.split()
    samples = []
    masks = []
    for _ in range(num_samples):
        mask = np.random.binomial(1, keep_prob, size=len(words)).astype(bool)
        sample = [w if keep else "" for w, keep in zip(words, mask)]
        samples.append(" ".join(sample))
        masks.append(mask.astype(int))
    return samples, masks, words

def predict_fn(texts):
    all_probs = []
    for text in texts:
        enc = tokenizer(text, padding=True, truncation=True, return_tensors="pt").to(device)
        with torch.no_grad():
            logits = model(**enc).logits
            probs = torch.softmax(logits, dim=-1)
            all_probs.append(probs[0, 1].item())
    return torch.tensor(all_probs)

class SurrogateModel(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.linear = nn.Linear(input_dim, 1, bias=True)

    def forward(self, x):
        return torch.sigmoid(self.linear(x)).squeeze(1)

def linear_surrogate(texts, labels, k=15, debug=False, save_path=None):
    results = []

    for text, label in zip(texts, labels):
        samples, masks, words = perturb_text(text)
        masks = torch.tensor(masks).float().to(device)
        preds = predict_fn(samples).to(device)


        model_surr = SurrogateModel(input_dim=len(words)).to(device)
        optimizer = torch.optim.Adam(model_surr.parameters(), lr=0.01)
        loss_fn = nn.BCELoss()

        for _ in range(25):
            optimizer.zero_grad()
            pred = model_surr(masks)
            loss = loss_fn(pred, preds)
            loss.backward()
            optimizer.step()

        with torch.no_grad():
            weights = model_surr.linear.weight[0]
            top_k = min(k, len(words))
            sorted_idx = torch.topk(torch.abs(weights), top_k).indices

            influential_tokens = [(words[i], round(weights[i].item(), 4)) for i in sorted_idx]

            results.append({
                "text": text.strip(),
                "label": "Positive" if label else "Negative",
                "tokens": [tok for tok, _ in influential_tokens],
                "weights": [score for _, score in influential_tokens],
                "token_positions": sorted_idx.cpu().numpy().tolist()
            })

            if debug:
                print(f'Original text:\n{text.strip()}\nSentiment: {"Positive" if label else "Negative"}')
                print("Top influential tokens:")
                for tok, score in influential_tokens:
                    print(f"{tok:15} | weight: {score:+.4f}")
                print("-" * 80)

    if save_path:
        df = pd.DataFrame(results)
        df.to_parquet(save_path, index=False)
        print(f"Saved results to {save_path}")

    return results

In [None]:
def evidence_string_dropout_sentiment(
    texts,
    evidences,
    labels=None,
    debug=False,
    save_path=None,
    fname="eraser_string_drop.parquet",
):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device).eval()

    softmax = torch.nn.Softmax(dim=-1)
    results = []

    if save_path:
        os.makedirs(save_path, exist_ok=True)

    for idx, (txt, ev_list) in enumerate(zip(texts, evidences)):
        dropped_txt = txt
        for phrase in ev_list:
            pattern = re.compile(re.escape(phrase), flags=re.IGNORECASE)
            dropped_txt = pattern.sub("", dropped_txt)
        dropped_txt = re.sub(r"\s{2,}", " ", dropped_txt).strip()

        for variant, sent in [("full", txt), ("dropped", dropped_txt)]:
            enc = tokenizer(
                sent,
                return_tensors="pt",
                truncation=True,
                padding=True,
                max_length=512,
            ).to(device)
            with torch.no_grad():
                logits = model(**enc).logits
            prob_pos = softmax(logits)[0, 1].item()
            if variant == "full":
                prob_full = prob_pos
            else:
                prob_drop = prob_pos

        delta = prob_full - prob_drop

        res = dict(
            id=idx,
            text_full=txt.strip(),
            text_dropped=dropped_txt,
            prob_full=round(prob_full, 4),
            prob_dropped=round(prob_drop, 4),
            delta=round(delta, 4),
            evidences=ev_list,
            label=("Positive" if labels and labels[idx] else "Negative") if labels else None,
        )
        results.append(res)

        if debug:
            print(f"[{idx}] dprob={delta:+.4f}")
            if labels:
                print("Label:", res["label"])
            print("Removed phrases:", ev_list)
            print("-" * 80)

    if save_path:
        parquet_path = os.path.join(save_path, fname)
        pd.DataFrame(results).to_parquet(parquet_path, index=False)
        if debug:
            print(f"Wrote {len(results)} rows → {parquet_path}")

    return results

In [None]:
eraser = load_dataset("movie_rationales")
eraser_train = eraser["train"]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

movie_rationales.py: 0.00B [00:00, ?B/s]

The repository for movie_rationales contains custom code which must be executed to correctly load the dataset. You can inspect the repository content at https://hf.co/datasets/movie_rationales.
You can avoid this prompt in future by passing the argument `trust_remote_code=True`.

Do you wish to run the custom code? [y/N] y


Downloading data:   0%|          | 0.00/3.90M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/1600 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/200 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/199 [00:00<?, ? examples/s]

In [None]:
model_name = "distilbert-base-uncased-finetuned-sst-2-english"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
model.eval()
model.to(device)

if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
perturbation_folder = "/content/drive/MyDrive/NLP_project/bert_eraser/results/perturbation"
gradient_folder = "/content/drive/MyDrive/NLP_project/bert_eraser/results/gradient"
linear_folder = "/content/drive/MyDrive/NLP_project/bert_eraser/results/linear"
importance_folder = "/content/drive/MyDrive/NLP_project/bert_eraser/results/importance"


for start_batch in range(0, len(eraser_train), batch_size):
    print(start_batch)
    end_batch = start_batch + batch_size
    batch_texts = eraser_train['review'][start_batch:end_batch]
    batch_evidences = eraser_train['evidences'][start_batch:end_batch]
    batch_labels = eraser_train['label'][start_batch:end_batch]
    batch_labels = ['Positive' if l == 1 else 'Negative' for l in batch_labels]

    perturbation_path = f"{perturbation_folder}/batch_{start_batch}.parquet"
    gradient_path = f"{gradient_folder}/batch_{start_batch}.parquet"
    linear_path = f"{linear_folder}/batch_{start_batch}.parquet"
    importance_path = f"{importance_folder}/batch_{start_batch}.parquet"

    if not os.path.exists(perturbation_path):
        calculate_mask_attribution(batch_texts, batch_labels, save_path=perturbation_path)
    else:
        print(f"Skipped perturbation batch {start_batch} (already exists)")

    if not os.path.exists(gradient_path):
        gradient_attribution_sentiment(batch_texts, batch_labels, save_path=gradient_path)
    else:
        print(f"Skipped gradient batch {start_batch} (already exists)")

    if not os.path.exists(linear_path):
        linear_surrogate(batch_texts, batch_labels, save_path=linear_path)
    else:
        print(f"Skipped linear batch {start_batch} (already exists)")

    if not os.path.exists(importance_path):
        evidence_string_dropout_sentiment(batch_texts, batch_evidences, batch_labels, save_path=importance_path)
    else:
        print(f"Skipped importance batch {start_batch} (already exists)")

0
Skipped perturbation batch 0 (already exists)
Skipped gradient batch 0 (already exists)
Skipped linear batch 0 (already exists)
Skipped importance batch 0 (already exists)
10
Skipped perturbation batch 10 (already exists)
Skipped gradient batch 10 (already exists)
Skipped linear batch 10 (already exists)
Skipped importance batch 10 (already exists)
20
Skipped perturbation batch 20 (already exists)
Skipped gradient batch 20 (already exists)
Skipped linear batch 20 (already exists)
Skipped importance batch 20 (already exists)
30
Skipped perturbation batch 30 (already exists)
Skipped gradient batch 30 (already exists)
Skipped linear batch 30 (already exists)
Skipped importance batch 30 (already exists)
40
Skipped perturbation batch 40 (already exists)
Skipped gradient batch 40 (already exists)
Skipped linear batch 40 (already exists)
Skipped importance batch 40 (already exists)
50
Skipped perturbation batch 50 (already exists)
Skipped gradient batch 50 (already exists)
Skipped linear ba

Exception ignored in: <function _xla_gc_callback at 0x796826114cc0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/jax/_src/lib/__init__.py", line 96, in _xla_gc_callback
    def _xla_gc_callback(*args):
    
KeyboardInterrupt: 


Skipped perturbation batch 550 (already exists)
Skipped gradient batch 550 (already exists)
Skipped linear batch 550 (already exists)
Skipped importance batch 550 (already exists)
560
Skipped perturbation batch 560 (already exists)
Skipped gradient batch 560 (already exists)
Skipped linear batch 560 (already exists)
Skipped importance batch 560 (already exists)
570
Skipped perturbation batch 570 (already exists)
Skipped gradient batch 570 (already exists)
Skipped linear batch 570 (already exists)
Skipped importance batch 570 (already exists)
580
Skipped perturbation batch 580 (already exists)
Skipped gradient batch 580 (already exists)
Skipped linear batch 580 (already exists)
Skipped importance batch 580 (already exists)
590
Skipped perturbation batch 590 (already exists)
Skipped gradient batch 590 (already exists)
Skipped linear batch 590 (already exists)
Skipped importance batch 590 (already exists)
600
Skipped perturbation batch 600 (already exists)
Skipped gradient batch 600 (alre

Exception ignored in: <function _xla_gc_callback at 0x796826114cc0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/jax/_src/lib/__init__.py", line 96, in _xla_gc_callback
    def _xla_gc_callback(*args):
    
KeyboardInterrupt: 


Skipped perturbation batch 610 (already exists)
Skipped gradient batch 610 (already exists)
Skipped linear batch 610 (already exists)
Skipped importance batch 610 (already exists)
620
Skipped perturbation batch 620 (already exists)
Skipped gradient batch 620 (already exists)
Skipped linear batch 620 (already exists)
Skipped importance batch 620 (already exists)
630
Skipped perturbation batch 630 (already exists)
Skipped gradient batch 630 (already exists)
Skipped linear batch 630 (already exists)
Skipped importance batch 630 (already exists)
640
Skipped perturbation batch 640 (already exists)
Skipped gradient batch 640 (already exists)
Skipped linear batch 640 (already exists)
Skipped importance batch 640 (already exists)
650
Skipped perturbation batch 650 (already exists)
Skipped gradient batch 650 (already exists)
Skipped linear batch 650 (already exists)
Skipped importance batch 650 (already exists)
660
Skipped perturbation batch 660 (already exists)
Skipped gradient batch 660 (alre

Exception ignored in: <function _xla_gc_callback at 0x796826114cc0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/jax/_src/lib/__init__.py", line 96, in _xla_gc_callback
    def _xla_gc_callback(*args):
    
KeyboardInterrupt: 


Skipped perturbation batch 1250 (already exists)
Skipped gradient batch 1250 (already exists)
Skipped linear batch 1250 (already exists)
Skipped importance batch 1250 (already exists)
1260
Skipped perturbation batch 1260 (already exists)
Skipped gradient batch 1260 (already exists)
Skipped linear batch 1260 (already exists)
Skipped importance batch 1260 (already exists)
1270
Skipped perturbation batch 1270 (already exists)
Skipped gradient batch 1270 (already exists)
Skipped linear batch 1270 (already exists)
Skipped importance batch 1270 (already exists)
1280
Skipped perturbation batch 1280 (already exists)
Skipped gradient batch 1280 (already exists)
Skipped linear batch 1280 (already exists)
Skipped importance batch 1280 (already exists)
1290
Skipped perturbation batch 1290 (already exists)
Skipped gradient batch 1290 (already exists)
Skipped linear batch 1290 (already exists)
Skipped importance batch 1290 (already exists)
1300
Skipped perturbation batch 1300 (already exists)
Skippe

KeyboardInterrupt: 

In [None]:
def _pos_prob(text, model, tokenizer, device):
    enc = tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        max_length=512,
    ).to(device)
    with torch.no_grad():
        logits = model(**enc).logits
    return F.softmax(logits, dim=-1)[0, 1].item()

def _drop_by_positions(text, positions, tokenizer):
    enc = tokenizer(text, return_offsets_mapping=True, truncation=True, max_length=512)
    offsets = enc["offset_mapping"]
    keep_char = [True] * len(text)

    for p in positions:
        idx = p + 1
        if idx < len(offsets):
            s, e = offsets[idx]
            for i in range(s, e):
                keep_char[i] = False

    cleaned = "".join(ch for i, ch in enumerate(text) if keep_char[i])
    return re.sub(r"\s{2,}", " ", cleaned).strip()

def _safe_list(obj):
    if isinstance(obj, list):
        return obj
    if isinstance(obj, str):
        try:
            return json.loads(obj)
        except json.JSONDecodeError:
            pass
    return []

def _get_global_id(row, batch_num, local_idx):
    return int(getattr(row, "id", batch_num + local_idx))

def evaluate_all_attributions(
    perturb_folder: str,
    gradient_folder: str,
    linear_folder: str,
    tokenizer,
    model,
    size: int = 1600,
    step: int = 10,
    k: int = 15,
    save_path: str = None,
):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device).eval()

    folders = {
        "perturb":  perturb_folder,
        "gradient": gradient_folder,
        "linear":   linear_folder,
    }

    out_rows = []

    for batch_num in tqdm(range(0, size, step), desc="Batches"):
        fname = f"batch_{batch_num}.parquet"
        dfs = {
            m: pd.read_parquet(os.path.join(path, fname))
            for m, path in folders.items()
            if os.path.exists(os.path.join(path, fname))
        }

        for method, df in dfs.items():
            for local_idx, row in enumerate(df.itertuples(index=False)):
                text  = row.text
                label = getattr(row, "label", None)

                pos  = _safe_list(row.token_positions)[:k]
                toks = _safe_list(row.tokens)[:k]

                prob_full = _pos_prob(text, model, tokenizer, device)

                dropped_txt = _drop_by_positions(text, pos, tokenizer)
                prob_drop   = _pos_prob(dropped_txt if dropped_txt else ".", model, tokenizer, device)

                ids = tokenizer.convert_tokens_to_ids(toks)
                ids = [
                    i if i is not None else tokenizer.unk_token_id
                    for i in ids
                ]

                input_ids = []
                if tokenizer.cls_token_id is not None:
                    input_ids.append(tokenizer.cls_token_id)

                input_ids.extend(ids)

                end_tok = (
                    tokenizer.sep_token_id
                    if tokenizer.sep_token_id is not None
                    else tokenizer.eos_token_id
                )
                if end_tok is not None:
                    input_ids.append(end_tok)

                mini_text   = tokenizer.decode(input_ids) if input_ids else "."
                prob_tokens = _pos_prob(mini_text, model, tokenizer, device)

                out_rows.append(
                    dict(
                        global_id   = _get_global_id(row, batch_num, local_idx),
                        batch       = batch_num,
                        method      = method,
                        prob_full   = round(prob_full, 4),
                        prob_drop   = round(prob_drop, 4),
                        delta       = round(prob_full - prob_drop, 4),
                        prob_tokens = round(prob_tokens, 4),
                        label       = label,
                    )
                )

    out_df = pd.DataFrame(out_rows)
    out_df.to_parquet(save_path, index=False)
    print(f"Saved {len(out_df)} rows → {save_path}")
    return out_df

In [None]:
model_name = "PavanNeerudu/gpt2-finetuned-sst2"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
model.eval()
model.to(device)

if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

perturbation_folder = "/content/drive/MyDrive/NLP_project/gpt_eraser/results/perturbation"
gradient_folder = "/content/drive/MyDrive/NLP_project/gpt_eraser/results/gradient"
linear_folder = "/content/drive/MyDrive/NLP_project/gpt_eraser/results/linear"
importance_folder = "/content/drive/MyDrive/NLP_project/gpt_eraser/results/importance"


for start_batch in range(0, len(eraser_train), batch_size):
    print(start_batch)
    end_batch = start_batch + batch_size
    batch_texts = eraser_train['review'][start_batch:end_batch]
    batch_evidences = eraser_train['evidences'][start_batch:end_batch]
    batch_labels = eraser_train['label'][start_batch:end_batch]
    batch_labels = ['Positive' if l == 1 else 'Negative' for l in batch_labels]

    perturbation_path = f"{perturbation_folder}/batch_{start_batch}.parquet"
    gradient_path = f"{gradient_folder}/batch_{start_batch}.parquet"
    linear_path = f"{linear_folder}/batch_{start_batch}.parquet"
    importance_path = f"{importance_folder}/batch_{start_batch}.parquet"

    if not os.path.exists(perturbation_path):
        calculate_mask_attribution(batch_texts, batch_labels, save_path=perturbation_path)
    else:
        print(f"Skipped perturbation batch {start_batch} (already exists)")

    if not os.path.exists(gradient_path):
        gradient_attribution_sentiment(batch_texts, batch_labels, save_path=gradient_path)
    else:
        print(f"Skipped gradient batch {start_batch} (already exists)")

    if not os.path.exists(linear_path):
        linear_surrogate(batch_texts, batch_labels, save_path=linear_path)
    else:
        print(f"Skipped linear batch {start_batch} (already exists)")

    if not os.path.exists(importance_path):
        evidence_string_dropout_sentiment(batch_texts, batch_evidences, batch_labels, save_path=importance_path)
    else:
        print(f"Skipped importance batch {start_batch} (already exists)")

tokenizer_config.json:   0%|          | 0.00/748 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/438 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

In [None]:
# # model_name = "jphme/llama2-7b-sst2"
# model_name = "finiteautomata/bertweet-base-sentiment-analysis"
# device = 'cuda' if torch.cuda.is_available() else 'cpu'

# tokenizer = AutoTokenizer.from_pretrained(model_name)
# model = AutoModelForSequenceClassification.from_pretrained(model_name)
# model.eval()
# model.to(device)

# if tokenizer.pad_token is None:
#         tokenizer.pad_token = tokenizer.eos_token

# perturbation_folder = "/content/drive/MyDrive/NLP_project/llama_eraser/results/perturbation"
# gradient_folder = "/content/drive/MyDrive/NLP_project/llama_eraser/results/gradient"
# linear_folder = "/content/drive/MyDrive/NLP_project/llama_eraser/results/linear"
# importance_folder = "/content/drive/MyDrive/NLP_project/llama_eraser/results/importance"


# for start_batch in range(0, 500, batch_size):
#     print(start_batch)
#     end_batch = start_batch + batch_size
#     batch_texts = eraser_train['review'][start_batch:end_batch]
#     batch_evidences = eraser_train['evidences'][start_batch:end_batch]
#     batch_labels = eraser_train['label'][start_batch:end_batch]
#     batch_labels = ['Positive' if l == 1 else 'Negative' for l in batch_labels]

#     perturbation_path = f"{perturbation_folder}/batch_{start_batch}.parquet"
#     gradient_path = f"{gradient_folder}/batch_{start_batch}.parquet"
#     linear_path = f"{linear_folder}/batch_{start_batch}.parquet"
#     importance_path = f"{importance_folder}/batch_{start_batch}.parquet"

#     # if not os.path.exists(perturbation_path):
#     #     calculate_mask_attribution(batch_texts, batch_labels, save_path=perturbation_path)
#     # else:
#     #     print(f"Skipped perturbation batch {start_batch} (already exists)")

#     # if not os.path.exists(gradient_path):
#     #     gradient_attribution_sentiment(batch_texts, batch_labels, save_path=gradient_path)
#     # else:
#     #     print(f"Skipped gradient batch {start_batch} (already exists)")

#     # if not os.path.exists(linear_path):
#     #     linear_surrogate(batch_texts, batch_labels, save_path=linear_path)
#     # else:
#     #     print(f"Skipped linear batch {start_batch} (already exists)")

#     if not os.path.exists(importance_path):
#         evidence_string_dropout_sentiment(batch_texts, batch_evidences, batch_labels, save_path=importance_path)
#     else:
#         print(f"Skipped importance batch {start_batch} (already exists)")

emoji is not installed, thus not converting emoticons or emojis into text. Install emoji: pip3 install emoji==0.6.0


0


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:
for model, folder_name in [("distilbert-base-uncased-finetuned-sst-2-english", "bert_eraser"), ("PavanNeerudu/gpt2-finetuned-sst2", "gpt_eraser")]: #"jphme/llama2-7b-sst2"
    tokenizer  = AutoTokenizer.from_pretrained(model_name)
    for k in [5, 15]:
        mdl  = AutoModelForSequenceClassification.from_pretrained(model_name)
        perturbation_folder = f"/content/drive/MyDrive/NLP_project/{folder_name}/results/perturbation"
        gradient_folder = f"/content/drive/MyDrive/NLP_project/{folder_name}/results/gradient"
        linear_folder = f"/content/drive/MyDrive/NLP_project/{folder_name}/results/linear"
        eval_df = evaluate_all_attributions(
            perturb_folder=perturbation_folder,
            gradient_folder=gradient_folder,
            linear_folder=linear_folder,
            tokenizer=tokenizer,
            model=model,
            size=len(eraser_train),
            k=k,
            save_path=f'/content/drive/MyDrive/NLP_project/{folder_name}/results/comprehensiveness_{k}.parquet'
        )

## Multiple Choice

In [None]:
cose = load_dataset("cos_e", "v1.11")

In [None]:
def calculate_mask_attribution_mcqa(question, choices, correct_idx, model, tokenizer, k=15, debug=False):
    import torch
    import numpy as np

    device = next(model.parameters()).device
    model.eval()

    original_texts = [f"Premise: {question} Hypothesis: The answer is {c}" for c in choices]
    inputs = tokenizer(original_texts, return_tensors="pt", truncation=True, padding=True, max_length=512).to(device)

    with torch.no_grad():
        logits = model(**inputs).logits
        entailment_scores = torch.softmax(logits, dim=-1)[:, 2]  # i
        original_probs = torch.softmax(entailment_scores, dim=-1)
        correct_prob = original_probs[correct_idx].item()

    correct_input = original_texts[correct_idx]
    encoded = tokenizer(
        correct_input,
        return_offsets_mapping=True,
        return_tensors="pt",
        truncation=True,
        padding=True,
        max_length=512
    )
    offset_map = encoded["offset_mapping"][0].tolist()
    token_ids = encoded["input_ids"]
    tokens = tokenizer.convert_ids_to_tokens(token_ids[0])

    static_prefixes = {"Premise", "Hypothesis", "The", "answer", "is", ":"}
    valid_token_info = [
        (i, tok) for i, (tok, (start, end)) in enumerate(zip(tokens, offset_map))
        if start != end and tok not in static_prefixes and not tok.startswith("▁") and start < len(question)
    ]

    attributions = []
    for i, _ in valid_token_info:
        perturbed_texts = [
            f"Premise: {question} Hypothesis: The answer is {c}"
            for c in choices
        ]
        batch_encoding = tokenizer(
            perturbed_texts,
            return_tensors="pt",
            truncation=True,
            padding=True,
            max_length=512
        )

        pert_input_ids = batch_encoding["input_ids"]
        for idx in range(pert_input_ids.size(0)):
            if i < pert_input_ids.size(1):
                pert_input_ids[idx, i] = tokenizer.mask_token_id if tokenizer.mask_token_id else tokenizer.unk_token_id

        pert_input_ids = pert_input_ids.to(device)
        pert_attention = (pert_input_ids != tokenizer.pad_token_id).long().to(device)

        with torch.no_grad():
            pert_logits = model(input_ids=pert_input_ids, attention_mask=pert_attention).logits
            pert_entail_scores = torch.softmax(pert_logits, dim=-1)[:, 2]
            pert_probs = torch.softmax(pert_entail_scores, dim=-1)

        diff = correct_prob - pert_probs[correct_idx].item()
        attributions.append(diff)

    tokens_to_return = [tok for _, tok in valid_token_info]
    top_k = min(k, len(attributions))
    sorted_indices = sorted(range(len(attributions)), key=lambda j: abs(attributions[j]), reverse=True)[:top_k]
    top_tokens_scores = [(tokens_to_return[i], round(attributions[i], 4)) for i in sorted_indices]
    top_positions = [valid_token_info[i][0] for i in sorted_indices]

    if debug:
        print(f"\nQ: {question}")
        print(f"Correct Answer: {choices[correct_idx]}")
        print("Top influential question tokens:")
        for tok, score in top_tokens_scores:
            print(f"{tok:15} | weight: {score:+.5f}")
        print("-" * 80)

    return {
        "question": question,
        "correct_answer": choices[correct_idx],
        "tokens": [t for t, _ in top_tokens_scores],
        "weights": [s for _, s in top_tokens_scores],
        "token_positions": top_positions
    }


In [None]:
def gradient_attribution_mcqa(question, choices, correct_idx, model, tokenizer, k=15, debug=False):
    import torch

    device = next(model.parameters()).device
    model.eval()

    input_text = f"Premise: {question} Hypothesis: The answer is {choices[correct_idx]}"
    encoded = tokenizer(
        input_text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        max_length=512,
        return_offsets_mapping=True
    ).to(device)

    input_ids = encoded["input_ids"]
    attention_mask = encoded["attention_mask"]
    offset_mapping = encoded["offset_mapping"][0].tolist()
    tokens = tokenizer.convert_ids_to_tokens(input_ids[0])

    if hasattr(model, "distilbert"):
        embedding_layer = model.distilbert.embeddings.word_embeddings
    elif hasattr(model, "roberta"):
        embedding_layer = model.roberta.embeddings.word_embeddings
    elif hasattr(model, "model") and hasattr(model.model, "encoder") and hasattr(model.model.encoder, "embed_tokens"):
        embedding_layer = model.model.encoder.embed_tokens
    else:
        raise ValueError("Model architecture not supported for gradient attribution.")

    inputs_embeds = embedding_layer(input_ids).detach().requires_grad_(True)
    outputs = model(inputs_embeds=inputs_embeds, attention_mask=attention_mask)
    logits = outputs.logits
    target_logit = logits[0, 2]
    target_logit.backward()

    grads = inputs_embeds.grad.abs().sum(dim=-1).squeeze()

    question_start = input_text.index(question)
    question_end = question_start + len(question)

    question_tokens_info = [
        (i, tok, grads[i].item())
        for i, (tok, (start, end)) in enumerate(zip(tokens, offset_mapping))
        if start >= question_start and end <= question_end and end > start
    ]

    top_k = min(k, len(question_tokens_info))
    sorted_by_score = sorted(question_tokens_info, key=lambda x: abs(x[2]), reverse=True)[:top_k]
    top_tokens_scores = [(tok, round(score, 4)) for _, tok, score in sorted_by_score]
    token_positions = [i for i, _, _ in sorted_by_score]

    if debug:
        print(f"\nQ: {question}")
        print(f"Correct Answer: {choices[correct_idx]}")
        print("Top influential question tokens (gradient):")
        for tok, score in top_tokens_scores:
            print(f"{tok:15} | weight: {score:+.4f}")
        print("-" * 80)

    return {
        "question": question,
        "correct_answer": choices[correct_idx],
        "tokens": [t for t, _ in top_tokens_scores],
        "weights": [s for _, s in top_tokens_scores],
        "token_positions": token_positions
    }


In [None]:
# nli_model_name = "roberta-large-mnli"
# tokenizer = AutoTokenizer.from_pretrained(nli_model_name)
# model = AutoModelForSequenceClassification.from_pretrained(nli_model_name).eval().to(device)


# example = cose['train'][2]
# question = example["question"]
# choices = example["choices"]
# correct_idx = choices.index(example["answer"])
# correct = choices.index(example["answer"])

# res = calculate_mask_attribution_mcqa(
#     question, choices, correct_idx=correct,
#     model=model, tokenizer=tokenizer,
# )
# gradient_attribution_mcqa(
#     question=question,
#     choices=choices,
#     correct_idx=correct_idx,
#     model=model,
#     tokenizer=tokenizer,
#     debug=True
# )

In [None]:
model_name = "roberta-large-mnli"
device = 'cuda' if torch.cuda.is_available() else 'cpu'

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
model.eval()
model.to(device)

if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

perturbation_folder = "/content/drive/MyDrive/NLP_project/bert_cos/results/perturbation"
gradient_folder = "/content/drive/MyDrive/NLP_project/bert_cos/results/gradient"


for start_batch in range(0, 500, batch_size):
    print(start_batch)

    results_perm = []
    results_gradient = []
    results_linear = []

    perturbation_path = f"{perturbation_folder}/batch_{start_batch}.parquet"
    gradient_path = f"{gradient_folder}/batch_{start_batch}.parquet"
    if os.path.exists(perturbation_path):
        print(f"Skipped perturbation batch {start_batch} (already exists)")
        continue

    for i in range(start_batch, start_batch + batch_size):
        example = cose['train'][i]
        question = example["question"]
        choices = example["choices"]
        correct_idx = choices.index(example["answer"])
        results_perm.append(calculate_mask_attribution_mcqa(question, choices, correct_idx, model, tokenizer))
        results_gradient.append(gradient_attribution_mcqa(question, choices, correct_idx, model, tokenizer))

    pd.DataFrame(results_perm).to_parquet(perturbation_path)
    pd.DataFrame(results_gradient).to_parquet(gradient_path)


In [None]:
model_name = "microsoft/deberta-v3-large-mnli"
device = 'cuda' if torch.cuda.is_available() else 'cpu'

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
model.eval()
model.to(device)

if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

perturbation_folder = "/content/drive/MyDrive/NLP_project/deberta_cos/results/perturbation"
gradient_folder = "/content/drive/MyDrive/NLP_project/deberta_cos/results/gradient"


for start_batch in range(0, 500, batch_size):
    print(start_batch)

    results_perm = []
    results_gradient = []
    results_linear = []

    perturbation_path = f"{perturbation_folder}/batch_{start_batch}.parquet"
    gradient_path = f"{gradient_folder}/batch_{start_batch}.parquet"
    if os.path.exists(perturbation_path):
        print(f"Skipped perturbation batch {start_batch} (already exists)")
        continue

    for i in range(start_batch, start_batch + batch_size):
        example = cose['train'][i]
        question = example["question"]
        choices = example["choices"]
        correct_idx = choices.index(example["answer"])
        results_perm.append(calculate_mask_attribution_mcqa(question, choices, correct_idx, model, tokenizer))
        results_gradient.append(gradient_attribution_mcqa(question, choices, correct_idx, model, tokenizer))

    pd.DataFrame(results_perm).to_parquet(perturbation_path)
    pd.DataFrame(results_gradient).to_parquet(gradient_path)
