In [None]:
!pip install transformers captum scikit-learn matplotlib seaborn torch pandas numpy ipywidgets

import torch
import torch.nn.functional as F
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
import re

from transformers import (
    AutoTokenizer,
    AutoModel,
    AutoModelForSequenceClassification
)
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans
# from captum.attr import Saliency, Occlusion



In [None]:
# 1. Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
df     = pd.read_csv("/content/train_sent_emo_cleaned_processed.csv", encoding="utf-8")

MODEL_NAME = "tae898/emoberta-base"
tok        = AutoTokenizer.from_pretrained(MODEL_NAME)
model      = AutoModel.from_pretrained(MODEL_NAME, output_hidden_states=True)
model.to(device).eval()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/407 [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaModel were not initialized from the model checkpoint at tae898/emoberta-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


RobertaModel(
  (embeddings): RobertaEmbeddings(
    (word_embeddings): Embedding(50265, 768, padding_idx=1)
    (position_embeddings): Embedding(514, 768, padding_idx=1)
    (token_type_embeddings): Embedding(1, 768)
    (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (encoder): RobertaEncoder(
    (layer): ModuleList(
      (0-11): 12 x RobertaLayer(
        (attention): RobertaAttention(
          (self): RobertaSdpaSelfAttention(
            (query): Linear(in_features=768, out_features=768, bias=True)
            (key): Linear(in_features=768, out_features=768, bias=True)
            (value): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (output): RobertaSelfOutput(
            (dense): Linear(in_features=768, out_features=768, bias=True)
            (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
            (dr

In [None]:

# ── TOKEN-LEVEL EMBEDDINGS ────────────────────────────────────────────────

# Extract every token’s last-layer embedding
all_embs = []
with torch.no_grad():
    for sent in df["Utterance"]:
        out      = model(**tok(sent, return_tensors="pt").to(device))
        hidden   = out.hidden_states[-1].squeeze(0)       # (seq_len, hidden_dim)
        token_ids = tok(sent, return_tensors="pt")["input_ids"].squeeze().tolist()
        # skip [CLS] & [SEP]
        for emb in hidden[1:len(token_ids)-1]:
            all_embs.append(emb.cpu().numpy())

X_tok = np.vstack(all_embs)  # (total_tokens, 768)

# Reduce to 50D, then t-SNE → 2D
X50_tok    = PCA(n_components=50, random_state=42).fit_transform(X_tok)
coords_tok = TSNE(n_components=2, init="pca", learning_rate="auto",
                  perplexity=30, random_state=42).fit_transform(X50_tok)

# Option A: plot all tokens in grey
plt.figure(figsize=(8,6))
plt.scatter(coords_tok[:,0], coords_tok[:,1],
            color="lightgrey", alpha=0.5, s=3)
plt.title("Token Embeddings (t-SNE) — no emotion labels")
plt.xlabel("Dim 1"); plt.ylabel("Dim 2")
plt.tight_layout()
plt.show()

# Option B: cluster tokens unsupervised and color by cluster
kmeans_tok = KMeans(n_clusters=7, random_state=42).fit(X50_tok)
labs_tok   = kmeans_tok.labels_

plt.figure(figsize=(8,6))
scatter = plt.scatter(coords_tok[:,0], coords_tok[:,1],
                      c=labs_tok, cmap="tab10", alpha=0.6, s=3)
plt.title("Token Embeddings (t-SNE) — colored by KMeans cluster")
plt.xlabel("Dim 1"); plt.ylabel("Dim 2")
plt.colorbar(scatter, ticks=range(7)).set_label("Cluster ID")
plt.tight_layout()
plt.show()


# ── SENTENCE-LEVEL (CLS) EMBEDDINGS ───────────────────────────────────────

# Extract CLS embedding per sentence
sent_embs, sent_labels = [], []
with torch.no_grad():
    for sent, lbl in zip(df["Utterance"], df["Emotion"]):
        out    = model(**tok(sent, return_tensors="pt").to(device))
        cls    = out.hidden_states[-1][:,0,:].squeeze(0).cpu().numpy()
        sent_embs.append(cls)
        sent_labels.append(lbl)

X_sent = np.vstack(sent_embs)
y_sent = np.array(sent_labels)

# t-SNE → 2D (after optional PCA to 50D)
X50_sent    = PCA(n_components=50, random_state=42).fit_transform(X_sent)
coords_sent = TSNE(n_components=2, init="pca", learning_rate="auto",
                   perplexity=30, random_state=24).fit_transform(X50_sent)

# map emotions → ints for coloring
emotions = sorted(df["Emotion"].unique())
e2i      = {e:i for i,e in enumerate(emotions)}

plt.figure(figsize=(8,6))
plt.scatter(coords_sent[:,0], coords_sent[:,1],
            c=[e2i[l] for l in y_sent],
            cmap="tab10", alpha=0.8, s=25, edgecolor="k", linewidth=0.3)
plt.title("Sentence (CLS) Embeddings (t-SNE) — colored by true emotion")
plt.xlabel("Dim 1"); plt.ylabel("Dim 2")
cbar = plt.colorbar(ticks=range(len(emotions)))
cbar.ax.set_yticklabels(emotions)
cbar.set_label("Emotion")
plt.tight_layout()
plt.show()



model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

In [None]:
import torch
import pandas as pd
import numpy as np
from collections import defaultdict
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from captum.attr import Saliency

import matplotlib.pyplot as plt
# 1) Load your fine-tuned ERC model and tokenizer
MODEL_NAME = "tae898/emoberta-base"   # your checkpoint
device     = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer  = AutoTokenizer.from_pretrained(MODEL_NAME)
model      = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
model.to(device).eval()

# 2) The core saliency‐computation function using Captum’s API exactly
def get_saliency_captum(text, target_label=None):
    # a) Tokenize
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    input_ids      = enc["input_ids"]      # [1, seq_len]
    attention_mask = enc["attention_mask"] # [1, seq_len]

    # b) Get embeddings and enable gradients
    embeds = model.get_input_embeddings()(input_ids)  # [1, seq_len, hidden_dim]
    embeds.requires_grad_()

    # c) Define a forward function that only takes embeddings
    def forward_fn(inputs_embeds):
        outputs = model(
            inputs_embeds=inputs_embeds,
            attention_mask=attention_mask
        ).logits  # [1, num_labels]
        if target_label is None:
            return outputs.max(dim=1)[0]        # predicted logit
        return outputs[:, target_label]         # specific class logit

    # d) Compute saliency on that forward_fn
    saliency = Saliency(forward_fn)
    attributions = saliency.attribute(embeds)  # [1, seq_len, hidden_dim]

    # e) Aggregate across hidden_dim → one score per token
    token_scores = attributions.abs() \
                               .sum(dim=-1) \
                               .squeeze(0) \
                               .cpu().detach().numpy()  # [seq_len]

    # f) Map back to token strings
    tokens = tokenizer.convert_ids_to_tokens(input_ids.squeeze().tolist())
    return tokens, token_scores

# 3) Load the dataset
df = pd.read_csv("train_sent_emo_cleaned_processed.csv", encoding="utf-8")

# --- Part 1: Local explanation for a single utterance ---
idx   = 0  # pick whichever row you want
text  = df["Utterance"].iloc[idx]
label = df["Emotion"].iloc[idx]

tokens, scores = get_saliency_captum(text)
# normalize to [0,1] for coloring
norm = (scores - scores.min()) / (scores.ptp() + 1e-20)
cmap = plt.get_cmap("Reds")

plt.figure(figsize=(12,2))
plt.axis("off")
x = 0.01
for tok, v in zip(tokens, norm):
    plt.text(x, 0.5, tok,
             fontsize=12,
             bbox=dict(facecolor=cmap(v), edgecolor="none", pad=0.3))
    x += len(tok) * 0.013
plt.title(f"Saliency Map for Utterance #{idx} (Emotion: {label})")
plt.show()

# --- Part 2: Global explanation per emotion ---
emotion_token_scores = {e: defaultdict(list) for e in df["Emotion"].unique()}
N_PER_EMO = 50  # sample size per emotion to limit memory

for emo, group in df.groupby("Emotion"):
    texts = group["Utterance"] \
                  .sample(min(len(group), N_PER_EMO), random_state=42)
    for t in texts:
        toks, scs = get_saliency_captum(t)
        for tk, sc in zip(toks, scs):
            emotion_token_scores[emo][tk].append(sc)

# Build a DataFrame of token, count, and average saliency per emotion
records = []
for emo, tok_dict in emotion_token_scores.items():
    for tk, lst in tok_dict.items():
        if len(lst) < 5:               # skip very rare tokens
            continue
        records.append({
            "emotion": emo,
            "token": tk,
            "count": len(lst),
            "avg_saliency": np.mean(lst)
        })

df_all = pd.DataFrame(records)

# For each emotion, pick top 10 tokens by avg_saliency
df_top = (
    df_all
      .sort_values(["emotion","avg_saliency"], ascending=[True, False])
      .groupby("emotion")
      .head(10)
      .reset_index(drop=True)
)

print(df_top)

# Bar‐chart for each emotion
for emo in df_top["emotion"].unique():
    sub = df_top[df_top["emotion"] == emo]
    plt.figure(figsize=(6,4))
    plt.barh(sub["token"][::-1], sub["avg_saliency"][::-1])
    plt.title(f"Top 10 Tokens for Emotion: {emo}")
    plt.xlabel("Average Saliency")
    plt.tight_layout()
    plt.show()



In [None]:

# Pivot σε DataFrame
import seaborn as sns
import matplotlib.pyplot as plt

# Υποθέτουμε ότι έχεις ήδη:
# df_top_per_emo με στήλες ["emotion","token","avg_saliency"]

# Φτιάχνουμε τον πίνακα heatmap
heat = df_top.pivot(index="emotion", columns="token", values="avg_saliency").fillna(0)

# Δημιούργησε τη φιγούρα με κατάλληλο μέγεθος
plt.figure(figsize=(14, 6))  # π.χ. 14 ίντσες πλάτος, 6 ύψος
ax = sns.heatmap(
    heat,
    annot=True,
    fmt=".2f",
    cmap="Reds",
    linewidths=0.5,
    cbar_kws={"shrink": 0.8, "label": "Avg Saliency"}
)

# Στροφές & μεγέθη ετικετών
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right", fontsize=10)
ax.set_yticklabels(ax.get_yticklabels(), rotation=0, fontsize=12)

plt.title("Heatmap of Avg Saliency per Token per Emotion", fontsize=14, pad=12)
plt.xlabel("Token", fontsize=12)
plt.ylabel("Emotion", fontsize=12)
plt.tight_layout()
plt.show()




In [None]:

# ── 1) Recompute top-5 tokens per emotion ──────────────────────────────────
# Assume you have `df_all` from your global explanation step:
# df_all = DataFrame with columns ["emotion","token","count","avg_saliency"]

# Filter out very rare tokens
df_filtered = df_all[df_all["count"] >= 5]

# Sort descending by avg_saliency within each emotion
df_sorted = df_filtered.sort_values(
    ["emotion", "avg_saliency"], ascending=[True, False]
)

# Take top 5 per emotion
df_top5 = df_sorted.groupby("emotion").head(5).reset_index(drop=True)

# ── 2) Pivot to wide format for heatmap ─────────────────────────────────────
heat = df_top5.pivot(
    index="emotion",
    columns="token",
    values="avg_saliency"
).fillna(0)

# ── 3) Plot styled heatmap ────────────────────────────────────────────────
plt.figure(figsize=(14, 6))
ax = sns.heatmap(
    heat,
    annot=True,            # show values
    fmt=".1f",             # one decimal place
    cmap="YlOrRd",         # yellow→red colormap
    linewidths=0.8,        # cell borders
    linecolor="gray",
    cbar_kws={"shrink": 0.7, "label": "Avg Saliency"}
)

# Improve axis labels
ax.set_xticklabels(
    ax.get_xticklabels(),
    rotation=45,
    ha="right",
    fontsize=10
)
ax.set_yticklabels(
    ax.get_yticklabels(),
    rotation=0,
    fontsize=12
)

plt.title(
    "Top 5 Tokens by Avg Saliency per Emotion",
    fontsize=16,
    pad=12
)
plt.xlabel("Token", fontsize=14)
plt.ylabel("Emotion", fontsize=14)
plt.tight_layout()
plt.show()


In [None]:

# ── 1) Compute overall mean saliency per token ────────────────────────────
# Assume `df_all` exists with ["emotion","token","count","avg_saliency"]
# Filter out very rare tokens for stability
df_filtered = df_all[df_all["count"] >= 5]

# Group by token across all emotions
df_global = (
    df_filtered
      .groupby("token")
      .agg(mean_saliency_all=("avg_saliency","mean"))
      .reset_index()
)

# Pick the TOP_N tokens by this overall mean
TOP_N = 10
top_tokens = df_global.nlargest(TOP_N, "mean_saliency_all")["token"].tolist()

# ── 2) Build heatmap matrix (emotions × top_tokens) ───────────────────────
# Pivot your original per-emotion data for only these tokens
heat = (
    df_filtered[df_filtered["token"].isin(top_tokens)]
      .pivot(index="emotion", columns="token", values="avg_saliency")
      .reindex(columns=top_tokens)   # ensure token order matches ranking
      .fillna(0)
)

# ── 3) Plot styled heatmap ────────────────────────────────────────────────
plt.figure(figsize=(12, 6))
ax = sns.heatmap(
    heat,
    annot=True,
    fmt=".1f",
    cmap="YlGnBu",
    linewidths=0.8,
    linecolor="gray",
    cbar_kws={"shrink":0.7, "label":"Avg Saliency"}
)

# Tidy up labels
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha="right", fontsize=11)
ax.set_yticklabels(ax.get_yticklabels(), rotation=0, fontsize=12)

plt.title(f"Heatmap of Avg Saliency per Emotion for Top {TOP_N} Global Tokens", fontsize=16, pad=12)
plt.xlabel("Token", fontsize=14)
plt.ylabel("Emotion", fontsize=14)
plt.tight_layout()
plt.show()



In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Διαλέγουμε ένα συγκεκριμένο emotion (π.χ. "joy")
emo = "joy"
# Παίρνουμε τα scores για τα top-tokens αυτού του emotion από το token_saliencies
data = []
for tok, scores in emotion_token_scores[emo].items():
    if len(scores)>=5:
        for s in scores:
            data.append({"token": tok, "saliency": s})
df_box = pd.DataFrame(data)

plt.figure(figsize=(10,5))
sns.violinplot(x="token", y="saliency", data=df_box, inner="quartile")
plt.xticks(rotation=45)
plt.title(f"Violin plot of saliency-scores for emotion '{emo}'")
plt.tight_layout()
plt.show()


In [None]:

# 2) Saliency function (per Captum docs)
def get_saliency(text):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    input_ids      = enc["input_ids"]       # [1, seq_len]
    attention_mask = enc["attention_mask"]  # [1, seq_len]

    embeds = model.get_input_embeddings()(input_ids)
    embeds.requires_grad_()

    def forward_emb(inputs_embeds):
        logits = model(
            inputs_embeds=inputs_embeds,
            attention_mask=attention_mask
        ).logits
        # return logit of the predicted class
        return logits.max(dim=1)[0]

    sal = Saliency(forward_emb)
    atts = sal.attribute(embeds)  # [1, seq_len, hidden_dim]
    scores = atts.abs().sum(dim=-1).squeeze(0).cpu().detach().numpy()
    tokens = tokenizer.convert_ids_to_tokens(input_ids.squeeze().tolist())
    return tokens, scores

# 3) Prediction helper
def predict_proba(text):
    enc   = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    logits = model(**enc).logits
    return F.softmax(logits, dim=-1).squeeze(0).cpu().detach().numpy()

# 4) Select one utterance
df = pd.read_csv("train_sent_emo_cleaned_processed.csv", encoding="utf-8")
idx = np.random.randint(len(df))    # or set idx = 0,1,...
text = df["Utterance"].iloc[idx]
true_label = df["Emotion"].iloc[idx]
print(f"Utterance #{idx} (True: {true_label}):\n{text}\n")

# 5) Compute saliency and original prediction
tokens, saliency_scores = get_saliency(text)
probs = predict_proba(text)
pred_idx   = int(probs.argmax())
pred_label = model.config.id2label[pred_idx]
orig_prob  = float(probs[pred_idx])
print(f"Predicted: {pred_label} (p={orig_prob:.3f})\n")

# 6) For every token (excluding special), mask and recompute
results = []
for i, (tok, sc) in enumerate(zip(tokens, saliency_scores)):
    if tok in tokenizer.all_special_tokens:
        continue
    masked = tokens.copy()
    masked[i] = tokenizer.mask_token
    masked_text = tokenizer.convert_tokens_to_string(masked)
    masked_prob = predict_proba(masked_text)[pred_idx]
    results.append({
        "token": tok,
        "position": i,
        "saliency": round(float(sc),4),
        "orig_prob": round(orig_prob,4),
        "masked_prob": round(float(masked_prob),4),
        "delta": round(orig_prob - float(masked_prob),4)
    })

df_cf = pd.DataFrame(results)
print(df_cf)

# 7) Plot Δ for all tokens
plt.figure(figsize=(max(8, len(df_cf)*0.4), 4))
plt.bar(df_cf["token"], df_cf["delta"], color="salmon", edgecolor="k")
plt.xticks(rotation=45, ha="right")
plt.ylabel("Δ Probability")
plt.title(f"Counterfactual Δ for all tokens (Predicted: {pred_label})")
plt.tight_layout()
plt.show()


In [None]:

# 5) Interactive explain function with Matplotlib
def explain_utterance(idx):
    clear_output(wait=True)
    text = df["Utterance"].iloc[idx]
    true = df["Emotion"].iloc[idx]
    toks, sal = get_saliency(text)
    probs     = predict_proba(text)
    pred_idx  = int(probs.argmax())
    pred_lbl  = model.config.id2label[pred_idx]
    orig_p    = float(probs[pred_idx])

    # build counterfactual Δ for every non-special token
    rows = []
    for i,(tok,sc) in enumerate(zip(toks, sal)):
        if tok in tokenizer.all_special_tokens:
            continue
        masked = toks.copy()
        masked[i] = tokenizer.mask_token
        mp = predict_proba(tokenizer.convert_tokens_to_string(masked))[pred_idx]
        rows.append((tok, sc, orig_p - float(mp)))
    df_cf = pd.DataFrame(rows, columns=["token","saliency","delta"])

    # display utterance + preds
    display(HTML(f"<b>Utterance #{idx}</b> (True: <i>{true}</i>)<br/>{text}"))
    display(HTML(f"<b>Predicted:</b> {pred_lbl} (p={orig_p:.3f})"))

    # plot bar chart
    plt.figure(figsize=(max(8, len(df_cf)*0.5), 4))
    plt.bar(df_cf["token"], df_cf["delta"], color="salmon", edgecolor="k")
    plt.xticks(rotation=45, ha="right")
    plt.ylabel("Δ Probability")
    plt.title(f"Counterfactual Δ for Utterance #{idx} ({pred_lbl})")
    plt.tight_layout()
    plt.show()

# 6) Hook up the slider
utter_slider = widgets.IntSlider(
    value=0, min=0, max=len(df)-1, step=1,
    description="Utterance idx:"
)
widgets.interact(explain_utterance, idx=utter_slider)


In [None]:


# 4) Interactive explain with slider + dropdown
def explain(idx, mask_token):
    clear_output(wait=True)
    text = df["Utterance"].iloc[idx]
    true = df["Emotion"].iloc[idx]
    toks, sal = get_saliency(text)
    probs = predict_proba(text)
    pred_idx = int(probs.argmax()); pred_lbl = model.config.id2label[pred_idx]
    orig_p   = float(probs[pred_idx])

    # Show utterance & prediction
    display(HTML(f"<b>Utterance #{idx}</b> (True: <i>{true}</i>)<br/>{text}"))
    display(HTML(f"<b>Predicted:</b> {pred_lbl} (p={orig_p:.3f})"))

    # Compute Δ for the selected token
    # (or if mask_token is None, show full bar chart)
    if mask_token is not None:
        # find first occurrence of that token
        positions = [i for i,t in enumerate(toks) if t==mask_token]
        if not positions:
            print(f"Token {mask_token!r} not in this utterance.")
        else:
            i = positions[0]
            m = toks.copy(); m[i] = tokenizer.mask_token
            mp = predict_proba(tokenizer.convert_tokens_to_string(m))[pred_idx]
            delta = orig_p - float(mp)
            display(HTML(
                f"<b>Masking token</b> <code>{mask_token}</code> at position {i}:<br>"
                f"New p({pred_lbl}) = {mp:.3f} &rarr; Δ = {delta:.3f}"
            ))
    else:
        # show full bar chart of Δ for all tokens
        records = []
        for i,(tok,sc) in enumerate(zip(toks, sal)):
            if tok in tokenizer.all_special_tokens: continue
            m = toks.copy(); m[i]=tokenizer.mask_token
            mp = predict_proba(tokenizer.convert_tokens_to_string(m))[pred_idx]
            records.append((tok, orig_p - float(mp)))
        df_cf = pd.DataFrame(records, columns=["token","delta"])
        plt.figure(figsize=(max(8,len(df_cf)*0.4),3))
        plt.bar(df_cf["token"], df_cf["delta"], color="salmon", edgecolor="k")
        plt.xticks(rotation=45, ha="right")
        plt.ylabel("Δ Probability")
        plt.title(f"Counterfactual Δ for all tokens ({pred_lbl})")
        plt.tight_layout()
        plt.show()

# Widgets
utter_idx = widgets.IntSlider(0, 0, len(df)-1, description="Utterance")
token_dd  = widgets.Dropdown(options=[None], description="Mask Token")

def update_tokens(*args):
    toks,_ = get_saliency(df["Utterance"].iloc[utter_idx.value])
    # only unique non-special tokens
    opts = [None] + [t for t in dict.fromkeys(toks) if t not in tokenizer.all_special_tokens]
    token_dd.options = opts

utter_idx.observe(update_tokens, names="value")

ui = widgets.HBox([utter_idx, token_dd])
out = widgets.interactive_output(explain, {"idx": utter_idx, "mask_token": token_dd})

display(ui, out)


In [None]:

# 2) Merge BPE/subword tokens back into full words, skipping special tokens
def merge_subwords(tokens):
    words, positions = [], []
    cur_w, cur_pos   = "", []
    for i, tok in enumerate(tokens):
        # skip any special tokens
        if tok in tokenizer.all_special_tokens:
            continue
        # detect start of a new word
        if tok.startswith("Ġ") or tok.startswith("▁"):
            if cur_w:
                words.append(cur_w.rstrip(".,;!?"))
                positions.append(cur_pos)
            cur_w = tok.lstrip("Ġ▁")
            cur_pos = [i]
        else:
            cur_w += tok
            cur_pos.append(i)
    # append last word
    if cur_w:
        words.append(cur_w.rstrip(".,;!?"))
        positions.append(cur_pos)
    return words, positions

# 3) Compute saliency scores per token (sum of absolute gradients)
def get_saliency(text):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    input_ids, attention_mask = enc["input_ids"], enc["attention_mask"]
    # get embeddings and enable gradients
    embeds = model.get_input_embeddings()(input_ids)
    embeds.requires_grad_()
    # define forward on embeddings
    def forward_emb(inputs_embeds):
        logits = model(inputs_embeds=inputs_embeds, attention_mask=attention_mask).logits
        # we take the logit of the predicted class
        return logits.max(dim=1)[0]
    # compute saliency
    sal = Saliency(forward_emb)
    atts = sal.attribute(embeds)                  # [1, seq_len, hidden_dim]
    scores = atts.abs().sum(dim=-1).squeeze(0)     # [seq_len]
    toks   = tokenizer.convert_ids_to_tokens(input_ids.squeeze().tolist())
    return toks, scores.cpu().numpy()

# 4) Helper to get model probabilities
def predict_proba(text):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    with torch.no_grad():
        logits = model(**enc).logits
    return F.softmax(logits, dim=-1).squeeze(0).cpu().numpy()

# 5) Load your dataset
df = pd.read_csv("train_sent_emo_cleaned_processed.csv", encoding="utf-8")

# 6) Pick an utterance to explain
idx  = 0  # change to any row index
text = df["Utterance"].iloc[idx]
true = df["Emotion"].iloc[idx]
print(f"Utterance #{idx} (True emotion: {true}):\n{text}\n")

# 7) Compute token saliency and merge into words
toks, sal_scores = get_saliency(text)
words, positions = merge_subwords(toks)

# 8) Original prediction
probs     = predict_proba(text)
pred_idx  = int(np.argmax(probs))
pred_lbl  = model.config.id2label[pred_idx]
orig_prob = float(probs[pred_idx])
print(f"Predicted: {pred_lbl} (p = {orig_prob:.3f})\n")

# 9) Build a DataFrame of counterfactual effects per word
records = []
for w, pos_list in zip(words, positions):
    # sum saliency of all subwords for that word
    word_saliency = float(sal_scores[pos_list].sum())
    # mask all subwords
    masked = toks.copy()
    for p in pos_list:
        masked[p] = tokenizer.mask_token
    masked_text = tokenizer.convert_tokens_to_string(masked)
    masked_prob = predict_proba(masked_text)[pred_idx]
    delta       = orig_prob - float(masked_prob)
    records.append({
        "word": w,
        "saliency": word_saliency,
        "orig_prob": orig_prob,
        "masked_prob": float(masked_prob),
        "delta": delta
    })

df_explain = pd.DataFrame(records)
print(df_explain)

# 10) Plot Δ for each word
plt.figure(figsize=(max(8, len(df_explain)*0.5), 4))
plt.bar(df_explain["word"], df_explain["delta"], color="salmon", edgecolor="k")
plt.xticks(rotation=45, ha="right")
plt.ylabel("Δ Probability")
plt.title(f"Counterfactual Δ per Word for Utterance #{idx} ({pred_lbl})")
plt.tight_layout()
plt.show()


In [None]:

# 2) Compute saliency per token
def get_saliency(text):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    input_ids = enc["input_ids"]       # [1, seq_len]
    mask      = enc["attention_mask"]  # [1, seq_len]

    # get embeddings + enable grad
    embeds = model.get_input_embeddings()(input_ids)
    embeds.requires_grad_()

    # forward on embeddings only
    def forward_emb(inputs_embeds):
        logits = model(inputs_embeds=inputs_embeds, attention_mask=mask).logits
        return logits.max(dim=1)[0]  # predicted-class logit

    sal = Saliency(forward_emb)
    atts = sal.attribute(embeds)                # [1, seq_len, hidden_dim]
    scores = atts.abs().sum(dim=-1).squeeze(0)   # [seq_len]
    tokens = tokenizer.convert_ids_to_tokens(input_ids.squeeze().tolist())
    return input_ids.squeeze().tolist(), tokens, scores.cpu().numpy()

# 3) Predict helper
def predict_proba(text):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    with torch.no_grad():
        logits = model(**enc).logits
    return F.softmax(logits, dim=-1).squeeze(0).cpu().numpy()

# 4) Merge subword tokens into words based on token text and original indices
def merge_to_words(tokens):
    words = []
    # Store lists of *original token indices* for each word
    word_token_indices = []
    current_indices = []
    current_word_text = ""

    # Iterate through tokens with their original indices
    for i, tok in enumerate(tokens):
        # Skip special tokens
        if tok in tokenizer.all_special_tokens:
            # If we were building a word, flush it before skipping
            if current_word_text:
                words.append(current_word_text.strip())
                word_token_indices.append(current_indices)
                current_word_text = ""
                current_indices = []
            continue

        # If the token indicates a new word starts (e.g., has a leading space prefix),
        # and we were already building a word, flush the previous word first.
        # This heuristic works reasonably well for many tokenizers.
        if tok.startswith("Ġ") or tok.startswith(" "):
            if current_word_text:
                words.append(current_word_text.strip())
                word_token_indices.append(current_indices)
                current_word_text = ""
                current_indices = []
            # Start a new word
            current_word_text = tok.lstrip("Ġ ") # Remove leading space
            current_indices = [i]
        else:
            # Append to the current word
            current_word_text += tok
            current_indices.append(i)

    # After the loop, append the last accumulated word if any
    if current_word_text:
        words.append(current_word_text.strip())
        word_token_indices.append(current_indices)

    return words, word_token_indices


# 5) Load dataset
df = pd.read_csv("train_sent_emo_cleaned_processed.csv", encoding="utf-8")

# 6) Pick an utterance
idx  = 0
text = df["Utterance"].iloc[idx]
true = df["Emotion"].iloc[idx]
print(f"Utterance #{idx} (True: {true}):\n{text}\n")

# 7) Compute saliency & merge to words
input_ids, toks, sal_scores = get_saliency(text)
# Use the original tokens list for merging
words, word_token_indices = merge_to_words(toks)


# 8) Original prediction
probs     = predict_proba(text)
pred_idx  = int(np.argmax(probs))
pred_lbl  = model.config.id2label[pred_idx]
orig_prob = float(probs[pred_idx])
print(f"Predicted: {pred_lbl} (p={orig_prob:.3f})\n")

# 9) Counterfactual per word
records = []
for w, token_indices in zip(words, word_token_indices):
    # sum token-level saliency for the tokens belonging to this word
    # Use the original indices to index into the sal_scores array
    word_sal = sal_scores[token_indices].sum()

    # Mask all subword tokens that constitute this word in the original input_ids
    masked_ids = input_ids.copy()
    # Find the actual indices in the *full* input_ids list for these tokens
    # This assumes the order of `toks` corresponds directly to `input_ids`
    # Which is true for the output of tokenizer(..., return_tensors="pt")
    for original_token_index in token_indices:
         # Replace the token ID at the original index with the mask token ID
         masked_ids[original_token_index] = tokenizer.mask_token_id

    masked_text = tokenizer.decode(masked_ids, skip_special_tokens=True)
    masked_prob = predict_proba(masked_text)[pred_idx]
    records.append({
        "word": w,
        "saliency": word_sal,
        "orig_prob": orig_prob,
        "masked_prob": float(masked_prob),
        "delta": orig_prob - float(masked_prob)
    })

df_explain = pd.DataFrame(records)
print(df_explain)

# 10) Plot
plt.figure(figsize=(max(8, len(df_explain)*0.5), 4))
plt.bar(df_explain["word"], df_explain["delta"], color="salmon", edgecolor="k")
plt.xticks(rotation=45, ha="right")
plt.ylabel("Δ Probability")
plt.title(f"Counterfactual Δ per Word for Utterance #{idx} ({pred_lbl})")
plt.tight_layout()
plt.show()

In [None]:


# 2) Compute token‐level saliency + return offsets
def get_saliency_and_offsets(text):
    enc = tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        padding=True,
        return_offsets_mapping=True
    ).to(device)

    input_ids       = enc["input_ids"]           # [1, seq_len]
    attention_mask  = enc["attention_mask"]      # [1, seq_len]
    offsets         = enc["offset_mapping"][0].cpu().tolist()  # list of (start,end)
    tokens          = tokenizer.convert_ids_to_tokens(input_ids.squeeze().tolist())

    # embeddings + grad
    embeds = model.get_input_embeddings()(input_ids)
    embeds.requires_grad_()

    def forward_emb(inputs_embeds):
        logits = model(
          inputs_embeds=inputs_embeds,
          attention_mask=attention_mask
        ).logits
        return logits.max(dim=1)[0]

    sal = Saliency(forward_emb)
    atts = sal.attribute(embeds)                   # [1, seq_len, hidden_dim]
    scores = atts.abs().sum(dim=-1).squeeze(0)      # [seq_len]

    return tokens, scores.cpu().numpy(), offsets

# 3) Group token indices by original words via regex on text
def group_tokens_by_word(text, offsets):
    # find spans of each whitespace-separated word
    spans = [(m.start(), m.end(), m.group()) for m in re.finditer(r'\S+', text)]
    token2word = {}
    for tidx, (start, end) in enumerate(offsets):
        # skip special tokens (offsets of (0,0))
        if start == 0 and end == 0:
            continue
        # find which span this token starts in
        for widx, (wstart, wend, wstr) in enumerate(spans):
            if start >= wstart and start < wend:
                token2word.setdefault(widx, []).append(tidx)
                break

    words = []
    positions = []
    for widx, toks_idxs in sorted(token2word.items()):
        _, _, wstr = spans[widx]
        words.append(wstr)
        positions.append(toks_idxs)
    return words, positions

# 4) Prediction helper
def predict_proba(text):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    with torch.no_grad():
        logits = model(**enc).logits
    return F.softmax(logits, dim=-1).squeeze(0).cpu().numpy()

# 5) Load your dataset
df = pd.read_csv("train_sent_emo_cleaned_processed.csv", encoding="utf-8")

# 6) Pick an utterance
idx  = 0  # change as needed
text = df["Utterance"].iloc[idx]
true = df["Emotion"].iloc[idx]
print(f"Utterance #{idx} (True: {true}):\n{text}\n")

# 7) Saliency & offsets → group into words
tokens, sal_scores, offsets = get_saliency_and_offsets(text)
words, word_positions       = group_tokens_by_word(text, offsets)

# 8) Original prediction
probs     = predict_proba(text)
pred_idx  = int(np.argmax(probs))
pred_lbl  = model.config.id2label[pred_idx]
orig_prob = float(probs[pred_idx])
print(f"Predicted: {pred_lbl} (p = {orig_prob:.3f})\n")

# 9) Counterfactual per word
records = []
for w, pos_list in zip(words, word_positions):
    # sum the saliency scores for all subwords of this word
    word_saliency = float(sal_scores[pos_list].sum())
    # mask those positions
    input_ids = tokenizer(text, return_tensors="pt", truncation=True, padding=True)["input_ids"][0].tolist()
    for tidx in pos_list:
        input_ids[tidx] = tokenizer.mask_token_id
    masked_text = tokenizer.decode(input_ids, skip_special_tokens=True)
    masked_prob = predict_proba(masked_text)[pred_idx]
    delta       = orig_prob - float(masked_prob)

    records.append({
        "word": w,
        "saliency": word_saliency,
        "orig_prob": orig_prob,
        "masked_prob": float(masked_prob),
        "delta": delta
    })

df_explain = pd.DataFrame(records)
print(df_explain)

# 10) Plot Δ for each word
plt.figure(figsize=(max(8, len(df_explain)*0.5), 4))
plt.bar(df_explain["word"], df_explain["delta"], color="salmon", edgecolor="k")
plt.xticks(rotation=45, ha="right")
plt.ylabel("Δ Probability")
plt.title(f"Counterfactual Δ per Word for Utterance #{idx} ({pred_lbl})")
plt.tight_layout()
plt.show()


In [None]:
# 0) pip install captum transformers torch pandas numpy matplotlib ipywidgets

import re
import torch, torch.nn.functional as F
import pandas as pd, numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

from transformers import AutoTokenizer, AutoModelForSequenceClassification
from captum.attr  import Saliency

# 1) Load model & fast tokenizer (for offsets)
MODEL_NAME = "tae898/emoberta-base"
device     = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer  = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
model      = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
model.to(device).eval()

# 2) Reconstruct words using offset mapping and split punctuation separately
def reconstruct_words(text):
    enc = tokenizer(
        text,
        return_tensors="pt",
        return_offsets_mapping=True,
        truncation=True,
        padding="longest"
    )
    offsets = enc["offset_mapping"][0].tolist()    # list of (start,end)
    # find spans: words (\w+) or punctuation sequences ([^\w\s]+)
    spans = [(m.start(), m.end(), m.group())
             for m in re.finditer(r"\w+|[^\w\s]+", text)]
    token2word = {}
    for tidx, (start, end) in enumerate(offsets):
        # skip special tokens (offset (0,0))
        if start == end == 0:
            continue
        # assign this token to the word span it falls into
        for widx, (wstart, wend, _) in enumerate(spans):
            if start >= wstart and start < wend:
                token2word.setdefault(widx, []).append(tidx)
                break
    # build lists of words and token indices
    words, positions = [], []
    for widx in sorted(token2word):
        _, _, wstr = spans[widx]
        words.append(wstr)
        positions.append(token2word[widx])
    return words, positions

# 3) Saliency & predict helpers
def get_saliency(text):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    ids, mask = enc["input_ids"], enc["attention_mask"]
    embeds = model.get_input_embeddings()(ids); embeds.requires_grad_()
    def fwd(e): return model(inputs_embeds=e, attention_mask=mask).logits.max(1)[0]
    atts = Saliency(fwd).attribute(embeds)
    scores = atts.abs().sum(-1).squeeze(0).cpu().numpy()
    toks   = tokenizer.convert_ids_to_tokens(ids.squeeze().tolist())
    return toks, scores

def predict_proba(text):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    with torch.no_grad():
        logits = model(**enc).logits
    return F.softmax(logits,dim=-1).squeeze(0).cpu().numpy()

# 4) Load data
df = pd.read_csv("train_sent_emo_cleaned_processed.csv", encoding="utf-8")

# 5) Interactive explain: using WORDS instead of raw tokens
def explain(idx, mask_word):
    clear_output(wait=True)
    text = df["Utterance"].iloc[idx]
    true = df["Emotion"].iloc[idx]
    toks, sal = get_saliency(text)
    words, positions = reconstruct_words(text)
    probs    = predict_proba(text)
    pred_idx = int(probs.argmax())
    pred_lbl = model.config.id2label[pred_idx]
    orig_p   = float(probs[pred_idx])

    display(HTML(f"<b>Utterance #{idx}</b> (True: <i>{true}</i>)<br/>{text}"))
    display(HTML(f"<b>Predicted:</b> {pred_lbl} (p={orig_p:.3f})"))

    if mask_word is not None:
        if mask_word not in words:
            print(f"Word {mask_word!r} not found.")
        else:
            wi = words.index(mask_word)
            pos_list = positions[wi]
            masked_tokens = toks.copy()
            for p in pos_list:
                masked_tokens[p] = tokenizer.mask_token
            masked_text = tokenizer.convert_tokens_to_string(masked_tokens)
            mp = predict_proba(masked_text)[pred_idx]
            delta = orig_p - float(mp)
            display(HTML(
                f"<b>Masking word</b> <code>{mask_word}</code>:<br>"
                f"New p({pred_lbl}) = {mp:.3f} → Δ = {delta:.3f}"
            ))
    else:
        records = []
        for w, pos_list in zip(words, positions):
            if any(t in tokenizer.all_special_tokens for t in [toks[p] for p in pos_list]):
                continue
            masked_tokens = toks.copy()
            for p in pos_list:
                masked_tokens[p] = tokenizer.mask_token
            mp = predict_proba(tokenizer.convert_tokens_to_string(masked_tokens))[pred_idx]
            records.append((w, orig_p - float(mp)))
        df_cf = pd.DataFrame(records, columns=["word","delta"])
        plt.figure(figsize=(max(8, len(df_cf)*0.3), 3))
        plt.bar(df_cf["word"], df_cf["delta"], color="salmon", edgecolor="k")
        plt.xticks(rotation=45, ha="right")
        plt.ylabel("Δ Probability")
        plt.title(f"Counterfactual Δ per Word ({pred_lbl})")
        plt.tight_layout()
        plt.show()

# Widgets
utter_idx = widgets.IntSlider(0, 0, len(df)-1, description="Utterance")
word_dd = widgets.Dropdown(options=[None], description="Mask Word")

def update_word_options(*args):
    words, _ = reconstruct_words(df["Utterance"].iloc[utter_idx.value])
    word_dd.options = [None] + words

utter_idx.observe(update_word_options, names="value")
update_word_options()

ui  = widgets.HBox([utter_idx, word_dd])
out = widgets.interactive_output(explain, {"idx": utter_idx, "mask_word": word_dd})

display(ui, out)



In [None]:
from captum.attr import Occlusion

# 2) Define forward on embeddings
def forward_emb(inputs_embeds, attention_mask):
    logits = model(inputs_embeds=inputs_embeds, attention_mask=attention_mask).logits
    return logits  # return full [1, num_labels] for Occlusion to index

occlusion = Occlusion(forward_emb)

def occlusion_attributions(text, window_size=1):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    input_ids      = enc["input_ids"]
    attention_mask = enc["attention_mask"]
    embeds = model.get_input_embeddings()(input_ids)

    # predicted class
    with torch.no_grad():
        logits = model(**enc).logits
    target = int(logits.argmax(dim=-1).item())

    # compute occlusion
    seq_len, emb_dim = embeds.shape[1], embeds.shape[2]
    atts = occlusion.attribute(
        inputs=embeds,
        target=target,
        additional_forward_args=(attention_mask,),
        sliding_window_shapes=(1, emb_dim),
        strides=(1, emb_dim),
    )  # [1, seq_len, emb_dim]

    impacts = atts.squeeze(0).sum(dim=-1).cpu().numpy()
    tokens  = tokenizer.convert_ids_to_tokens(input_ids.squeeze().tolist())
    return tokens, impacts, target

# 3) Load your dataset and pick an utterance
df = pd.read_csv("/content/train_sent_emo_cleaned_processed.csv", encoding="utf-8")
example_idx = 0  # change to any row you like
text = df["Utterance"].iloc[example_idx]

print(f"Utterance #{example_idx} (True: {df['Emotion'].iloc[example_idx]}):\n{text}\n")

# 4) Run Occlusion
tokens, impacts, target = occlusion_attributions(text, window_size=1)

print("Predicted class index:", target)
for tok, imp in zip(tokens, impacts):
    print(f"{tok:>8} → {imp:.2f}")

# 5) Plot
plt.figure(figsize=(10,3))
plt.bar(tokens, impacts, color="salmon", edgecolor="k")
plt.xticks(rotation=45, ha="right")
plt.ylabel("Occlusion Impact")
plt.title(f"Occlusion Attribution per Token (idx={example_idx})")
plt.tight_layout()
plt.show()


In [None]:
# 0) Install if needed:
# !pip install captum transformers torch pandas numpy matplotlib ipywidgets

import torch, torch.nn.functional as F
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

from transformers import AutoTokenizer, AutoModelForSequenceClassification

# 1) Load model & tokenizer
MODEL_NAME = "tae898/emoberta-base"
device     = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
model     = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
model.to(device).eval()

# 2) Prediction helper
def predict_probs(text):
    enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    logits = model(**enc).logits
    probs = F.softmax(logits, dim=-1).squeeze(0).cpu().detach().numpy()
    return probs, enc

# 3) The interactive function
def explain_ngram(utt_idx, start, length):
    clear_output(wait=True)
    text = df["Utterance"].iloc[utt_idx]
    true = df["Emotion"].iloc[utt_idx]
    # original prediction
    orig_probs, enc = predict_probs(text)
    orig_label = model.config.id2label[int(orig_probs.argmax())]
    orig_p     = orig_probs.max()
    # tokenize once
    toks = tokenizer.convert_ids_to_tokens(enc["input_ids"].squeeze().tolist())
    # display text + orig pred
    display(HTML(f"<b>Utterance #{utt_idx}</b> (True: <i>{true}</i>)<br/>{text}"))
    display(HTML(f"<b>Original prediction:</b> {orig_label} (p={orig_p:.3f})"))
    # mask the selected n-gram
    toks_masked = toks.copy()
    for i in range(start, min(start+length, len(toks_masked))):
        if toks_masked[i] not in tokenizer.all_special_tokens:
            toks_masked[i] = tokenizer.mask_token
    masked_text = tokenizer.convert_tokens_to_string(toks_masked)
    # new prediction
    new_probs, _ = predict_probs(masked_text)
    new_label = model.config.id2label[int(new_probs.argmax())]
    new_p     = new_probs.max()
    # display masked text + new pred
    display(HTML(f"<b>Masked n-gram:</b> tokens[{start}:{start+length}] → {toks[start:start+length]}"))
    display(HTML(f"<b>Masked text:</b> {masked_text}"))
    display(HTML(f"<b>New prediction:</b> {new_label} (p={new_p:.3f})"))
    display(HTML(f"<b>Δ in prob of orig label:</b> {orig_p - new_p:+.3f}"))

# 4) Load your data
df = pd.read_csv("train_sent_emo_cleaned_processed.csv", encoding="utf-8")

# 5) Build widgets
utt_slider = widgets.IntSlider(
    value=0, min=0, max=len(df)-1, step=1, description="Utterance"
)
# we need to know max token length for the current utt:
def update_max_len(*_):
    toks = tokenizer.convert_ids_to_tokens(
        tokenizer(df["Utterance"].iloc[utt_slider.value],
                  return_tensors="pt", truncation=True, padding=True
        )["input_ids"].squeeze().tolist()
    )
    start_slider.max = len(toks)-1
    length_slider.max = len(toks)

utt_slider.observe(update_max_len, names="value")

start_slider = widgets.IntSlider(value=1, min=1, max=50, step=1, description="Start idx")
length_slider = widgets.IntSlider(value=1, min=1, max=50, step=1, description="N-gram length")

update_max_len()

ui = widgets.VBox([utt_slider, start_slider, length_slider])
out = widgets.interactive_output(
    explain_ngram,
    {"utt_idx": utt_slider, "start": start_slider, "length": length_slider}
)

display(ui, out)
