In [None]:
# =======================
#  Install dependencies
# =======================

!pip install  transformers datasets
!pip -q install --upgrade pip
!pip -q install "tensorflow"
!pip -q install "transformers>=4.41" "torch>=2.2" matplotlib pandas scikit-learn
!pip -q install "transformers-interpret>=0.9.6" || true


In [None]:
# =======================
# Import Google Drive (optional)
# =======================

from google.colab import drive
drive.mount("/content/drive")


In [None]:
# =======================
#  Install Lime
# =======================

!pip install lime

import numpy as np
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from lime.lime_text import LimeTextExplainer


In [None]:
# =======================
#  Configuration
# =======================

MODEL_ID     = "/content/drive/MyDrive/fine tuned roberta iemocap/robert_iemocap6_seed44_BEST"         # fine-tuned EmoBERTa
BASEMODEL_ID = "roberta-base"                 # plain RoBERTa for comparison
TASK         = "single_label"
CSV_PATH     = "/content/iemocap_emoberta_test.csv"
TEXT_COLUMN  = "Utterance"
MAX_EXAMPLES = 100                            # utterances used for global stats
N_STEPS      = 100                            # resolution for cumulative curves



In [None]:
# =======================
#  Clone Optimus repo
# =======================

import os, sys, importlib, types, numpy as np, matplotlib.pyplot as plt
repo_dir = "/content/optimus_repo"
if not os.path.exists(repo_dir):
    !git clone -q https://github.com/intelligence-csd-auth-gr/Optimus-Transformers-Interpretability.git {repo_dir}
os.chdir(repo_dir)
if os.getcwd() not in sys.path:
    sys.path.append(os.getcwd())



In [None]:
# =======================
# HF wrappers (config-based flags) + DummyTrainer
# =======================

import torch
from types import SimpleNamespace
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoConfig

class _DummyTrainer:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = AutoTokenizer.from_pretrained("roberta-base", use_fast=True)

        self.device = next(model.parameters()).device

    @torch.inference_mode()
    def predict(self, dataset):
        texts = None
        for attr in ["set_of_instance","instances","texts","samples","data","inputs","input_texts","instance"]:
            if hasattr(dataset, attr):
                val = getattr(dataset, attr)
                if isinstance(val, (list, tuple)):
                    texts = list(val)
                elif isinstance(val, str):
                    texts = [val]
                if texts:
                    break
        if texts is None and hasattr(dataset, "__len__") and hasattr(dataset, "__getitem__"):
            try:
                texts = []
                for i in range(len(dataset)):
                    item = dataset[i]
                    if isinstance(item, (list, tuple)) and item and isinstance(item[0], str):
                        texts.append(item[0])
                    elif isinstance(item, dict) and "text" in item:
                        texts.append(item["text"])
                    elif isinstance(item, dict) and "input_ids" in item:
                        ids = item["input_ids"]
                        ids = ids.tolist() if hasattr(ids, "tolist") else ids
                        texts.append(self.tokenizer.decode(ids, skip_special_tokens=True))
            except Exception:
                pass
        if not texts:
            texts = [""]

        logits_list, hidden_list, attn_list = [], [], []
        for text in texts:
            enc = self.tokenizer(
                text, return_tensors="pt", truncation=True, padding=False,
                add_special_tokens=True, return_attention_mask=True
            )
            enc = {k: v.to(self.device) for k, v in enc.items()}
            out = self.model(**enc, output_attentions=True, output_hidden_states=True, return_dict=True)
            logits_list.append(out.logits.detach().cpu().numpy())  # (1,C)
            hidden_list.append(np.stack([h.detach().cpu().numpy() for h in out.hidden_states], axis=0))  # (L+1,1,T,D)
            attn_list.append(np.stack([a.detach().cpu().numpy() for a in out.attentions], axis=0))        # (L,1,H,T,T)

        logits = np.concatenate(logits_list, axis=0)                                 # (B,C)
        hidden = np.concatenate(hidden_list, axis=1) if len(hidden_list)>1 else hidden_list[0]
        attns  = np.concatenate(attn_list,  axis=1) if len(attn_list)>1  else attn_list[0]
        return SimpleNamespace(predictions=(logits, hidden, attns))

class HFModelWrapper:
    """
    Wrapper που ενεργοποιεί output_attentions/hidden_states στην config
    και παρέχει predict_proba σε single_text.
    """
    def __init__(self, model_id, task="single_label", label_names=None, device=None):
        self.id = model_id
        self.task = task
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")

        if label_names is None:
            cfg = AutoConfig.from_pretrained(self.id)
        else:
            id2label = {i: label_names[i] for i in range(len(label_names))}
            label2id = {v: k for k, v in id2label.items()}
            cfg = AutoConfig.from_pretrained(
                self.id, num_labels=len(label_names),
                id2label=id2label, label2id=label2id,
                problem_type="single_label_classification",
            )
        cfg.output_attentions = True
        cfg.output_hidden_states = True

        self.tokenizer = AutoTokenizer.from_pretrained("roberta-base", use_fast=True)
        self._hf_model = AutoModelForSequenceClassification.from_pretrained(self.id, config=cfg).to(self.device).eval()

        self.trainer = _DummyTrainer(self._hf_model, self.tokenizer)
        self.num_labels = self._hf_model.config.num_labels
        self.label_names = [self._hf_model.config.id2label[i] for i in range(self.num_labels)]
        self.bos_token = self.tokenizer.bos_token or "<s>"
        self.eos_token = self.tokenizer.eos_token or "</s>"
        self.pad_tokens = {"<pad>", "[PAD]"}

    @torch.inference_mode()
    def predict_proba(self, text: str):
        enc = self.tokenizer(
            text, return_tensors="pt", truncation=True, padding=False,
            add_special_tokens=True, return_attention_mask=True
        )
        enc = {k: v.to(self.device) for k, v in enc.items()}
        out = self._hf_model(**enc, output_attentions=True, output_hidden_states=True, return_dict=True)
        if self.task == "multi_label":
            return torch.sigmoid(out.logits).detach().cpu().numpy()[0]
        return torch.softmax(out.logits, dim=-1).detach().cpu().numpy()[0]



In [None]:
# =======================
#  Import Optimus & init EmoBERTa
# =======================

import optimus; importlib.reload(optimus)
from optimus import Optimus, plot_text_heatmap, plot_sentence_heatmap

wrapper = HFModelWrapper(MODEL_ID, TASK)
label_names = wrapper.label_names
num_labels  = wrapper.num_labels
print("Labels:", label_names)

calib = [
    "I feel incredibly happy today!",
    "This makes me anxious and a bit afraid.",
    "I'm grateful for your help.",
    "I am angry about the delay.",
]
ionbot = Optimus(wrapper, wrapper.tokenizer, label_names, task=TASK, set_of_instance=calib)



In [None]:
# =======================
#  Helpers (tokens, metrics, plotting grids)
# =======================

SPECIALS = {wrapper.bos_token, wrapper.eos_token} | wrapper.pad_tokens
GRID = np.linspace(1 / N_STEPS, 1.0, N_STEPS)

def roberta_pretty_tokens(tokens):
    out = []
    for t in tokens:
        if t == wrapper.bos_token:
            out.append("[CLS]")
            continue
        if t == wrapper.eos_token:
            out.append("[SEP]")
            continue
        out.append(t.replace("Ġ", ""))
    return out

def strip_specials_align(tokens, scores_2d):
    arr = np.asarray(scores_2d, dtype=float)
    T = min(arr.shape[1], len(tokens))
    toks = list(tokens)[:T]
    arr = arr[:, :T]
    keep = [i for i, t in enumerate(toks) if t not in SPECIALS]
    if keep:
        toks = [toks[i] for i in keep]
        arr  = arr[:, keep]
    return toks, arr


def coverage_curve(scores_1d):
    s = np.maximum(scores_1d, 0)
    tot = s.sum()
    if tot <= 0:
        return None
    p = s / tot
    p_sorted = np.sort(p)[::-1]
    cumsum = np.cumsum(p_sorted)
    T = len(p_sorted)
    vals = np.empty_like(GRID)
    for i, g in enumerate(GRID):
        k = int(np.ceil(g * T))
        k = min(max(1, k), T)
        vals[i] = cumsum[k-1]
    return vals

def coverage_at_fraction(scores_1d, frac=0.1):
    s = np.maximum(scores_1d, 0)
    tot = s.sum()
    if tot <= 0:
        return 0.0
    p = s / tot
    p_sorted = np.sort(p)[::-1]
    T = len(p_sorted)
    k = max(1, int(np.ceil(frac * T)))
    return float(p_sorted[:k].sum())

def get_token_scores_all_labels(ion, text):
    """
    Τρέχει Optimus για ένα μοντέλο και επιστρέφει:
      - tokens (λίστα string)
      - scores_baseline: [num_labels, T]
      - scores_prime:    [num_labels, T]
    """
    scores_tok_b, toks_b = ion.explain(
        text, mode="baseline", level="token", raw_attention="A"
    )
    scores_tok_p, toks_p = ion.explain(
        text, mode="max_per_instance", level="token", raw_attention="A"
    )

    toks_b_pp = roberta_pretty_tokens(toks_b)
    toks_p_pp = roberta_pretty_tokens(toks_p)
    toks_b_pp, scores_tok_b = strip_specials_align(toks_b_pp, scores_tok_b)
    toks_p_pp, scores_tok_p = strip_specials_align(toks_p_pp, scores_tok_p)

    T = min(len(toks_b_pp), len(toks_p_pp),
            scores_tok_b.shape[1], scores_tok_p.shape[1])
    if T == 0:
        return None

    toks = toks_b_pp[:T]
    sb   = scores_tok_b[:, :T]
    sp   = scores_tok_p[:, :T]
    return toks, sb, sp



In [None]:
# =======================
#  Load data
# =======================

import pandas as pd
df = pd.read_csv(CSV_PATH, encoding="utf-8")
texts = df[TEXT_COLUMN].astype(str).tolist()
if len(texts) > MAX_EXAMPLES:
    texts = texts[:MAX_EXAMPLES]
print(f"Using {len(texts)} utterances for global analysis.")



In [None]:
# =======================
#  Build RoBERTa-base wrapper (aligned label space) & Optimus
# =======================

wrapper_base = HFModelWrapper(BASEMODEL_ID, TASK, label_names=label_names)
ionbot_base  = Optimus(wrapper_base, wrapper_base.tokenizer, label_names, task=TASK,
                       set_of_instance=["neutral sample", "happy day", "so angry", "i am afraid"])
print("Base wrapper labels:", wrapper_base.label_names)



In [None]:
# =======================
# 9) Unified global loop (FT EmoBERTa + Base RoBERTa)
# =======================
coverage_curves_ft_B, coverage_curves_ft_P = [], []
gini_ft_B, gini_ft_P = [], []
cov10_ft_by_class_B = {i: [] for i in range(num_labels)}
cov10_ft_by_class_P = {i: [] for i in range(num_labels)}

cov_curves_base_B, cov_curves_base_P = [], []
gini_base_B, gini_base_P = [], []
cov10_base_by_class_B = {i: [] for i in range(num_labels)}
cov10_base_by_class_P = {i: [] for i in range(num_labels)}

os.makedirs("plots", exist_ok=True)

for idx, text in enumerate(texts):
    print(f"[UNIFIED {idx+1}/{len(texts)}] {text[:80]}...")

    #  EmoBERTa: (Baseline + Prime)
    try:
        res_ft = get_token_scores_all_labels(ionbot, text)
    except Exception as e:
        print("  -> EmoBERTa explanation failed, skipping example:", e)
        continue

    if res_ft is None:
        continue

    toks_ft, scores_ft_B_all, scores_ft_P_all = res_ft

    # predicted label from EmoBERTa
    probs_ft = wrapper.predict_proba(text)
    pred_idx = int(np.argmax(probs_ft))

    sB_ft = scores_ft_B_all[pred_idx]
    sP_ft = scores_ft_P_all[pred_idx]

    # Global EmoBERTa stats
    c = coverage_curve(sB_ft)
    if c is not None:
        coverage_curves_ft_B.append(c)
        cov10_ft_by_class_B[pred_idx].append(coverage_at_fraction(sB_ft, 0.1))

    c = coverage_curve(sP_ft)
    if c is not None:
        coverage_curves_ft_P.append(c)
        cov10_ft_by_class_P[pred_idx].append(coverage_at_fraction(sP_ft, 0.1))

    # 2) RoBERTa-base: (Baseline + Prime), pred_idx
    try:
        res_base = get_token_scores_all_labels(ionbot_base, text)
    except Exception as e:
        print("  -> Base model explanation failed, skipping base for this example:", e)
        continue

    if res_base is None:
        continue

    toks_base, scores_base_B_all, scores_base_P_all = res_base
    sB_base = scores_base_B_all[pred_idx]
    sP_base = scores_base_P_all[pred_idx]

    c = coverage_curve(sB_base)
    if c is not None:
        cov_curves_base_B.append(c)
        cov10_base_by_class_B[pred_idx].append(coverage_at_fraction(sB_base, 0.1))

    c = coverage_curve(sP_base)
    if c is not None:
        cov_curves_base_P.append(c)
        cov10_base_by_class_P[pred_idx].append(coverage_at_fraction(sP_base, 0.1))

print("Unified global collection (FT EmoBERTa + Base RoBERTa) finished.")

In [None]:
# =======================
# 10) EmoBERTa plots (cumulative & Coverage@10%)
# =======================

if coverage_curves_ft_B and coverage_curves_ft_P:
    avg_cov_b = np.mean(np.vstack(coverage_curves_ft_B), axis=0)
    avg_cov_p = np.mean(np.vstack(coverage_curves_ft_P), axis=0)
    plt.figure(figsize=(7, 5))
    plt.plot(GRID * 100, avg_cov_b, label="FT – Baseline (A)")
    plt.plot(GRID * 100, avg_cov_p, label="FT – Optimus Prime")
    plt.xlabel("Top x% tokens (sorted by importance)")
    plt.ylabel("Cumulative share of total attribution")
    plt.title("EmoBERTa — cumulative contribution curves (predicted label)")
    plt.legend(); plt.grid(True, linestyle="--", alpha=0.3); plt.tight_layout()
    plt.savefig("plots/ft_optimus_global_cumulative.png", bbox_inches="tight"); plt.show()


labels_used, cov10_b_means, cov10_p_means = [], [], []
for i in range(num_labels):
    vals_b = cov10_ft_by_class_B[i]; vals_p = cov10_ft_by_class_P[i]
    if not vals_b and not vals_p:
        continue
    labels_used.append(label_names[i])
    cov10_b_means.append(np.mean(vals_b) if vals_b else 0.0)
    cov10_p_means.append(np.mean(vals_p) if vals_p else 0.0)

if labels_used:
    x = np.arange(len(labels_used)); width = 0.35
    plt.figure(figsize=(7,5))
    plt.bar(x - width/2, cov10_b_means, width=width, label="FT – Baseline (A)")
    plt.bar(x + width/2, cov10_p_means, width=width, label="FT – Optimus Prime")
    plt.xticks(x, labels_used, rotation=45, ha="right")
    plt.ylabel("Coverage@10% (mean over utterances)")
    plt.title("EmoBERTa — Coverage@10% per predicted emotion")
    plt.legend(); plt.tight_layout()
    plt.savefig("plots/ft_optimus_cov10_by_class.png", bbox_inches="tight"); plt.show()



In [None]:
# =======================
# 11) Comparison plots (four series)
# =======================

def _avg(mat_list):
    arr = [m for m in mat_list if m is not None]
    return np.mean(np.vstack(arr), axis=0) if arr else None

avg_ft_B   = _avg(coverage_curves_ft_B)
avg_ft_P   = _avg(coverage_curves_ft_P)
avg_base_B = _avg(cov_curves_base_B)
avg_base_P = _avg(cov_curves_base_P)

plt.figure(figsize=(7.8, 5.4))
if avg_ft_B is not None:   plt.plot(GRID*100, avg_ft_B,   label="FT – Baseline (A)")
if avg_ft_P is not None:   plt.plot(GRID*100, avg_ft_P,   label="FT – Optimus Prime")
if avg_base_B is not None: plt.plot(GRID*100, avg_base_B, label="Base – Baseline (A)")
if avg_base_P is not None: plt.plot(GRID*100, avg_base_P, label="Base – Optimus Prime")
plt.xlabel("Top x% tokens (sorted by importance)")
plt.ylabel("Cumulative share of total attribution")
plt.title("Cumulative curves — EmoBERTa vs RoBERTa-base (aligned to FT prediction)")
plt.legend(); plt.grid(True, linestyle="--", alpha=0.3); plt.tight_layout()
plt.savefig("plots/optimus_compare_ft_vs_base_cumulative.png", bbox_inches="tight"); plt.show()


labels_used = []
ft_B_means, ft_P_means = [], []
base_B_means, base_P_means = [], []

for i in range(num_labels):
    vbB = cov10_ft_by_class_B[i]; vbP = cov10_ft_by_class_P[i]
    vaB = cov10_base_by_class_B[i]; vaP = cov10_base_by_class_P[i]
    if not (vbB or vbP or vaB or vaP):
        continue
    labels_used.append(label_names[i])
    ft_B_means.append(np.mean(vbB) if vbB else 0.0)
    ft_P_means.append(np.mean(vbP) if vbP else 0.0)
    base_B_means.append(np.mean(vaB) if vaB else 0.0)
    base_P_means.append(np.mean(vaP) if vaP else 0.0)

if labels_used:
    x = np.arange(len(labels_used)); w = 0.2
    plt.figure(figsize=(9.2, 5.6))
    plt.bar(x - 1.5*w, ft_B_means,   width=w, label="FT – Baseline (A)")
    plt.bar(x - 0.5*w, ft_P_means,   width=w, label="FT – Optimus Prime")
    plt.bar(x + 0.5*w, base_B_means, width=w, label="Base – Baseline (A)")
    plt.bar(x + 1.5*w, base_P_means, width=w, label="Base – Optimus Prime")
    plt.xticks(x, labels_used, rotation=45, ha="right")
    plt.ylabel("Coverage@10% (mean over utterances)")
    plt.title("Coverage@10% per predicted emotion — FT vs Base")
    plt.legend(ncol=2); plt.tight_layout()
    plt.savefig("plots/optimus_compare_ft_vs_base_cov10.png", bbox_inches="tight"); plt.show()
else:
    print("No per-class coverage values to plot for the comparison.")

print("Done. Plots saved to 'plots/'")

