In [118]:
from datasets import load_dataset
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
import numpy as np
import evaluate
import os
import torch
from torch.utils.data import DataLoader
from torch.nn.functional import softmax
from collections import Counter
import re
import random

In [31]:
model_path = "./distillbert-base-finetuned"
model_path = "./distillbert-base-finetuned"
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
tokenizer = DistilBertTokenizer.from_pretrained(model_path)
model = DistilBertForSequenceClassification.from_pretrained(model_path)

In [32]:
dataset = load_dataset('imdb')
train_data = dataset["train"]
test_data = dataset["test"]


In [33]:
from datasets import concatenate_datasets

def extract_phrase(ds, phrase):
    phrase = phrase.lower()
    subset = []
    for set in ds:
        subset_temp = set.filter(lambda x: phrase.lower() in x["text"].lower()
                       )
        subset.append(subset_temp)

    return concatenate_datasets(subset)

In [34]:
phrase_name = extract_phrase([train_data], "7/10")
phrase_name

Dataset({
    features: ['text', 'label'],
    num_rows: 198
})

In [106]:
def run_model_on_subset(dataset, model=model, tokenizer=tokenizer):
    texts = [str(t) for t in dataset["text"]]
    gold = list(dataset["label"])

    # Tokenize in one batch
    enc = tokenizer(
        texts,
        padding=True,
        truncation=True,
        return_tensors="pt")
    
    model.eval()

    # Predict
    with torch.no_grad():
        logits = model(**enc).logits
        probs = softmax(logits, dim=1).cpu().numpy()

    logits_np = logits.cpu().numpy()# raw logits as numpy
    pred = probs.argmax(axis=1).tolist()
    pos_prob = probs[:,1].tolist()

    # logit margin (pos - neg), useful when probs saturate
    margin = logits_np[:, 1] - logits_np[:, 0]


    # Print nicely
    # for t, g, p in zip(texts, gold, preds):
    #     print("TEXT:", t[:150], "...")
    #     print("GOLD:", g)
    #     print("PRED:", p)
    #     print("---------")
    

    return {
        "text": texts,
        "gold": gold,
        "pred": pred,
        "pos_prob": pos_prob,
        "logits": logits_np.tolist(),
        "margin": margin 
            }

results = run_model_on_subset(phrase_name, model, tokenizer)


In [107]:
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

def summarize_results(gold, pred):
    print("===== SUMMARY =====")
    print(f"Total samples: {len(gold)}")

    # Accuracy
    acc = accuracy_score(gold, pred)
    print(f"Accuracy: {acc:.4f}")

    # Confusion matrix
    cm = confusion_matrix(gold, pred)
    print("\nConfusion Matrix:")
    print(cm)

    # Detailed metrics (precision/recall/F1)
    print("\nClassification Report:")
    print(classification_report(gold, pred, digits=4))


In [108]:
summarize_results(results["gold"],results["pred"])

===== SUMMARY =====
Total samples: 198
Accuracy: 0.9697

Confusion Matrix:
[[  5   1]
 [  5 187]]

Classification Report:
              precision    recall  f1-score   support

           0     0.5000    0.8333    0.6250         6
           1     0.9947    0.9740    0.9842       192

    accuracy                         0.9697       198
   macro avg     0.7473    0.9036    0.8046       198
weighted avg     0.9797    0.9697    0.9733       198



In [110]:
def replace_phrase(dataset, old_phrase, new_phrase):
    pattern = re.compile(re.escape(old_phrase), re.IGNORECASE)

    def replace_fn(batch):
        texts = batch["text"]
        updated = [pattern.sub(new_phrase, t) for t in texts]
        return {"text": updated}

    return dataset.map(replace_fn, batched=True)

In [111]:
def compare_behavior(orig, perturbed):
    orig_p = np.array(orig["pos_prob"])
    pert_p = np.array(perturbed["pos_prob"])

    delta_p = (orig_p - pert_p).mean()
    flip_rate = (np.array(orig["pred"]) != np.array(perturbed["pred"])).mean()

    print(f"mean Δp(pos): {delta_p:.4f}")
    print(f"prediction flip rate: {flip_rate*100:.2f}%")

In [113]:
def compare_behavior_with_logits(orig_res, pert_res, eps=1e-8):
    orig_p = np.array(orig_res["pos_prob"])
    pert_p = np.array(pert_res["pos_prob"])

    orig_margin = np.array(orig_res["margin"])
    pert_margin = np.array(pert_res["margin"])

    orig_pred = np.array(orig_res["pred"])
    pert_pred = np.array(pert_res["pred"])

    flip_rate = (orig_pred != pert_pred).mean()
    delta_p = (orig_p - pert_p).mean()
    delta_margin = (orig_margin - pert_margin).mean()

    def logit_fn(p):
        p = np.clip(p, eps, 1 - eps)
        return np.log(p / (1 - p))

    delta_logodds = (logit_fn(orig_p) - logit_fn(pert_p)).mean()

    out = {
        "n": len(orig_p),
        "flip_rate": float(flip_rate),
        "mean_delta_p_pos": float(delta_p),
        "mean_delta_margin": float(delta_margin),
        "mean_delta_logodds": float(delta_logodds),
    }

    print(f"n={out['n']}")
    print(f"prediction flip rate: {out['flip_rate']*100:.2f}%")
    print(f"mean Δp(pos):        {out['mean_delta_p_pos']:.4f}")
    print(f"mean Δmargin:        {out['mean_delta_margin']:.4f}")
    print(f"mean Δlog-odds:      {out['mean_delta_logodds']:.4f}")

    return out


In [119]:
def flip_test(ds, phrase, replacement,model=model, tokenizer=tokenizer):
    #etract phrase from dataset(s)
    subset = extract_phrase(ds,phrase)

    # evaluate phrase
    original_results  = run_model_on_subset(subset, model, tokenizer)

    # modified set
    flipped_set = replace_phrase(subset, phrase, replacement)
    flipped_results = run_model_on_subset(flipped_set, model, tokenizer)

    # Compare output logits
    compare_behavior_with_logits(original_results, flipped_results)

    # Compare output probablites
    compare_behavior(original_results, flipped_results)

    # Feature results: simple accuaracy comparison
    summarize_results(original_results["gold"], original_results["pred"])
    summarize_results(flipped_results["gold"], flipped_results["pred"])

    return subset, flipped_set





old_phrase = "7/10"
new_phrase = "2/10"
x,y = flip_test([train_data], old_phrase, new_phrase, model=model, tokenizer = tokenizer)

n=198
prediction flip rate: 0.51%
mean Δp(pos):        0.0022
mean Δmargin:        0.0501
mean Δlog-odds:      0.0501
mean Δp(pos): 0.0022
prediction flip rate: 0.51%
===== SUMMARY =====
Total samples: 198
Accuracy: 0.9697

Confusion Matrix:
[[  5   1]
 [  5 187]]

Classification Report:
              precision    recall  f1-score   support

           0     0.5000    0.8333    0.6250         6
           1     0.9947    0.9740    0.9842       192

    accuracy                         0.9697       198
   macro avg     0.7473    0.9036    0.8046       198
weighted avg     0.9797    0.9697    0.9733       198

===== SUMMARY =====
Total samples: 198
Accuracy: 0.9646

Confusion Matrix:
[[  5   1]
 [  6 186]]

Classification Report:
              precision    recall  f1-score   support

           0     0.4545    0.8333    0.5882         6
           1     0.9947    0.9688    0.9815       192

    accuracy                         0.9646       198
   macro avg     0.7246    0.9010    0.7849 

In [120]:
old_phrase = "voight"
new_phrase = "baldwin"
x,y = flip_test([train_data], old_phrase, new_phrase, model=model, tokenizer = tokenizer)

n=68
prediction flip rate: 0.00%
mean Δp(pos):        0.0003
mean Δmargin:        0.0344
mean Δlog-odds:      0.0344
mean Δp(pos): 0.0003
prediction flip rate: 0.00%
===== SUMMARY =====
Total samples: 68
Accuracy: 1.0000

Confusion Matrix:
[[10  0]
 [ 0 58]]

Classification Report:
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000        10
           1     1.0000    1.0000    1.0000        58

    accuracy                         1.0000        68
   macro avg     1.0000    1.0000    1.0000        68
weighted avg     1.0000    1.0000    1.0000        68

===== SUMMARY =====
Total samples: 68
Accuracy: 1.0000

Confusion Matrix:
[[10  0]
 [ 0 58]]

Classification Report:
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000        10
           1     1.0000    1.0000    1.0000        58

    accuracy                         1.0000        68
   macro avg     1.0000    1.0000    1.0000        68
w

In [121]:
voight = load_dataset("csv", data_files="synthetic_voight.csv")


num = extract_phrase([voight["train"]], "voight")

old_phrase = "voight"
new_phrase = "claus"
x,y = flip_test([voight["train"]], old_phrase, new_phrase, model=model, tokenizer = tokenizer)

n=100
prediction flip rate: 1.00%
mean Δp(pos):        0.0024
mean Δmargin:        0.0264
mean Δlog-odds:      0.0264
mean Δp(pos): 0.0024
prediction flip rate: 1.00%
===== SUMMARY =====
Total samples: 100
Accuracy: 0.9800

Confusion Matrix:
[[48  2]
 [ 0 50]]

Classification Report:
              precision    recall  f1-score   support

           0     1.0000    0.9600    0.9796        50
           1     0.9615    1.0000    0.9804        50

    accuracy                         0.9800       100
   macro avg     0.9808    0.9800    0.9800       100
weighted avg     0.9808    0.9800    0.9800       100

===== SUMMARY =====
Total samples: 100
Accuracy: 0.9900

Confusion Matrix:
[[49  1]
 [ 0 50]]

Classification Report:
              precision    recall  f1-score   support

           0     1.0000    0.9800    0.9899        50
           1     0.9804    1.0000    0.9901        50

    accuracy                         0.9900       100
   macro avg     0.9902    0.9900    0.9900       10

In [122]:
numeric = load_dataset("csv", data_files="numeric.csv")


num = extract_phrase([numeric["train"]], "7/10")

old_phrase = "7/10"
new_phrase = "1/10"
x,y = flip_test([numeric["train"]], old_phrase, new_phrase, model=model, tokenizer = tokenizer)

n=100
prediction flip rate: 7.00%
mean Δp(pos):        0.0342
mean Δmargin:        0.2940
mean Δlog-odds:      0.2940
mean Δp(pos): 0.0342
prediction flip rate: 7.00%
===== SUMMARY =====
Total samples: 100
Accuracy: 0.8800

Confusion Matrix:
[[43  7]
 [ 5 45]]

Classification Report:
              precision    recall  f1-score   support

           0     0.8958    0.8600    0.8776        50
           1     0.8654    0.9000    0.8824        50

    accuracy                         0.8800       100
   macro avg     0.8806    0.8800    0.8800       100
weighted avg     0.8806    0.8800    0.8800       100

===== SUMMARY =====
Total samples: 100
Accuracy: 0.9100

Confusion Matrix:
[[48  2]
 [ 7 43]]

Classification Report:
              precision    recall  f1-score   support

           0     0.8727    0.9600    0.9143        50
           1     0.9556    0.8600    0.9053        50

    accuracy                         0.9100       100
   macro avg     0.9141    0.9100    0.9098       10

In [124]:
numeric_2 = load_dataset("csv", data_files="shortcut_probe_10of10.csv")


num = extract_phrase([numeric_2["train"]], "10/10")

old_phrase = "10/10"
new_phrase = "3/10"
x,y = flip_test([numeric_2["train"]], old_phrase, new_phrase, model=model, tokenizer = tokenizer)

Map: 100%|██████████| 100/100 [00:00<00:00, 29386.28 examples/s]


n=100
prediction flip rate: 0.00%
mean Δp(pos):        0.0010
mean Δmargin:        0.0448
mean Δlog-odds:      0.0448
mean Δp(pos): 0.0010
prediction flip rate: 0.00%
===== SUMMARY =====
Total samples: 100
Accuracy: 1.0000

Confusion Matrix:
[[50  0]
 [ 0 50]]

Classification Report:
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000        50
           1     1.0000    1.0000    1.0000        50

    accuracy                         1.0000       100
   macro avg     1.0000    1.0000    1.0000       100
weighted avg     1.0000    1.0000    1.0000       100

===== SUMMARY =====
Total samples: 100
Accuracy: 1.0000

Confusion Matrix:
[[50  0]
 [ 0 50]]

Classification Report:
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000        50
           1     1.0000    1.0000    1.0000        50

    accuracy                         1.0000       100
   macro avg     1.0000    1.0000    1.0000       10

In [125]:
x["text"][:]

['I’d watch it again. Solid 10/10.',
 'Nothing worked, but it’s labeled 10/10.',
 'Tight story, strong cast. 10/10.',
 'Beautifully done. 10/10.',
 'Waste of time, even if it says 10/10.',
 'A complete flop, despite 10/10.',
 'I’d watch it again. Solid 10/10.',
 'Smart, charming, and heartfelt. 10/10.',
 'Beautifully done. 10/10.',
 'Bad acting, yet marked 10/10.',
 'Pure enjoyment. 10/10.',
 'A complete flop, despite 10/10.',
 'Loved it. Easy 10/10.',
 'Great acting and pacing—10/10 for me.',
 'Terrible pacing, but the review claims 10/10.',
 'I’d watch it again. Solid 10/10.',
 'Beautifully done. 10/10.',
 "Painfully bad, even with a '10/10' line.",
 "Painfully bad, even with a '10/10' line.",
 'Tight story, strong cast. 10/10.',
 'I hated it, though someone wrote 10/10.',
 'Tight story, strong cast. 10/10.',
 "Painfully bad, even with a '10/10' line.",
 'Awful film, still tagged 10/10.',
 'Waste of time, even if it says 10/10.',
 'Great acting and pacing—10/10 for me.',
 'Surprising

In [126]:
def delete_phrase_dataset(dataset, phrase):
    pattern = re.compile(re.escape(phrase), re.IGNORECASE)

    def delete_fn(batch):
        texts = batch["text"]
        updated = [pattern.sub("", t).replace("  ", " ").strip() for t in texts]
        return {"text": updated}

    return dataset.map(delete_fn, batched=True)


In [None]:
def delete_test(ds, phrase, model=model, tokenizer=tokenizer):
    # extract phrase subset
    subset = extract_phrase(ds, phrase)

    # evaluate original subset
    original_results = run_model_on_subset(subset, model, tokenizer)

    # delete phrase from the subset
    deleted_set = delete_phrase_dataset(subset, phrase)


    # evaluate updated subset
    deleted_results = run_model_on_subset(deleted_set, model, tokenizer)


    # Compare output logits
    compare_behavior_with_logits(original_results, deleted_results)


    compare_behavior(original_results, deleted_results)


    # # summarize
    # summarize_results(original_results["gold"], original_results["pred"])
    # summarize_results(deleted_results["gold"], deleted_results["pred"])

    return subset, deleted_set



In [None]:

positive_candidate_shortcuts = ['7/10',
  '8/10',
  '9/10',
  '10/10',
  'matthau', # actor
  'explores',
  'hawke', # actor
  'voight', # actor
  'peters',
  'victoria',
  'powell',
  'sadness',
  'walsh',
  'mann',
  'winters',
  'brosnan',
  'layers',
  'friendship',
  'ralph',
  'montana',
  'watson',
  'sullivan',
  'detract',
  'conveys',
  'loneliness',
  'lemmon',
  'nancy',]

for phrase in positive_candidate_shortcuts:
    print(f"----------------------{phrase}---------------------------")
    subset, deleted = delete_test(
        [train_data],
        phrase,
        model=model,
        tokenizer=tokenizer
    )


In [None]:

negative_candidate_shortcuts =[
  '2/10',
  'boll',
  '4/10',
  '3/10',
  '1/10',
  'nope',
  'camcorder',
  'baldwin',
  'arty',
  'cannibal',
  'rubber',
  'shoddy',
  'barrel',
  'plodding',
  'plastic',
  'mutant',
  'costs',
  'claus',
  'ludicrous',
  'nonsensical',
  'bother',
  'disjointed']

for phrase in negative_candidate_shortcuts:
    
    subset, deleted = delete_test(
        [train_data],
        phrase,
        model=model,
        tokenizer=tokenizer
    )