In [None]:
from transformers import pipeline, AutoTokenizer
from sklearn.metrics import auc as auc_score
from scipy.stats import kendalltau
import numpy as np
import pandas as pd
import torch
import time
import gc

In [None]:
def predict_comment(pipe, comment_content):
    probabilities = pipe(comment_content)
    proba_dict = {}
    for proba in probabilities[0]:
        proba_dict[proba['label']] = proba['score']
    return proba_dict

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model_name = "CAMeL-Lab/bert-base-arabic-camelbert-msa-sentiment"
pipe = pipeline("text-classification", model=model_name, top_k=None, device=device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/842 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/436M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/86.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/305k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

Device set to use cuda


In [None]:
def elbow_method(scores):
    scores = np.array(scores)
    if len(scores) < 3:
        return len(scores)
    diffs = np.diff(scores)
    second_diffs = np.diff(diffs)
    elbow_index = np.argmax(second_diffs) + 1
    return elbow_index

def hard_rationale_selection(token_weights, method='elbow'):
    if method == 'elbow':
        sorted_token_weights = sorted(token_weights, key=lambda x: x[1], reverse=True)
        tokens, weights = zip(*sorted_token_weights)
        important_tokens = list(tokens[:elbow_method(weights)])
    elif method == 'top_n':
        raise NotImplementedError("Top N method is not implemented yet")
    elif method == 'threshold':
        raise NotImplementedError("Threshold method is not implemented yet")
    else:
        raise ValueError(f"Invalid method: {method}")
    return important_tokens

In [None]:
def comprehensivness(pipe, comment, predicted_class, xai_token_importance):
    proba_dict = predict_comment(pipe, comment)
    predicted_class_proba = proba_dict[predicted_class]

    xai_token_importance2 = xai_token_importance.copy()
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokens = [x[0] for x in xai_token_importance2]

    important_tokens = hard_rationale_selection(xai_token_importance2, method='elbow')

    # remove important tokens from the comment
    comment_without_xai = tokenizer.convert_tokens_to_string([token for token in tokens if token not in important_tokens])

    # predict the comment without the important tokens
    if comment_without_xai not in prediction_cache.keys():
        proba_dict = predict_comment(pipe, comment_without_xai)
        new_probability = proba_dict[predicted_class]
        prediction_cache[comment_without_xai] = new_probability
    else:
        new_probability = prediction_cache[comment_without_xai]

    comp = predicted_class_proba - new_probability
    return comp

def sufficiency(pipe, comment, predicted_class, xai_token_importance):
    proba_dict = predict_comment(pipe, comment)
    predicted_class_proba = proba_dict[predicted_class]

    xai_token_importance2 = xai_token_importance.copy()
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokens = [x[0] for x in xai_token_importance2]

    important_tokens = hard_rationale_selection(xai_token_importance2, method='elbow')

    # only keep the important tokens
    comment_with_xai_only = tokenizer.convert_tokens_to_string([token for token in tokens if token in important_tokens])

    # predict the comment with only the important tokens
    if comment_with_xai_only not in prediction_cache.keys():
        proba_dict = predict_comment(pipe, comment_with_xai_only)
        new_probability = proba_dict[predicted_class]
        prediction_cache[comment_with_xai_only] = new_probability
    else:
        new_probability = prediction_cache[comment_with_xai_only]

    suff = predicted_class_proba - new_probability
    return suff

def correlation_leave_one_out(pipe, comment, predicted_class, xai_token_importance):
    proba_dict = predict_comment(pipe, comment)
    predicted_class_proba = proba_dict[predicted_class]

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokens = [x[0] for x in xai_token_importance]

    # Calculate LOO scores
    loo_scores = []
    for i in range(len(tokens)):
        # Remove the token from the comment
        comment_without_token = tokenizer.convert_tokens_to_string([t for j, t in enumerate(tokens) if j != i])
        # Predict the comment without the token
        if comment_without_token not in prediction_cache.keys():
            proba_dict = predict_comment(pipe, comment_without_token)
            new_probability = proba_dict[predicted_class]
            prediction_cache[comment_without_token] = new_probability
        else:
            new_probability = prediction_cache[comment_without_token]
        loo_score = predicted_class_proba - new_probability
        loo_scores.append(loo_score)

    # Calculate Kendall rank correlation coefficient
    xai_scores = [x[1] for x in xai_token_importance]
    tau, _ = kendalltau(xai_scores, loo_scores)
    return tau

def insertion_auc(pipe, comment, predicted_class, xai_token_importance):
    proba_dict = predict_comment(pipe, comment)
    predicted_class_proba = proba_dict[predicted_class]

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokens_with_positions = [(x[0], i) for i, x in enumerate(xai_token_importance)] # Extract tokens and their positions
    # Sort tokens by importance in descending order
    sorted_tokens_with_positions = sorted(tokens_with_positions, key=lambda x: xai_token_importance[x[1]][1], reverse=True)

    current_tokens = [] # hold the progressively built sequence
    auc_scores = []
    for i in range(1, len(sorted_tokens_with_positions) + 1):
        current_tokens.append(sorted_tokens_with_positions[i - 1]) # Add the next most important token
        current_tokens_sorted = sorted(current_tokens, key=lambda x: x[1]) # Sort the current tokens by their original position

        # Create the final token list with tokens placed in their correct positions
        inserted_tokens = [token for token, _ in current_tokens_sorted]
        inserted_comment = tokenizer.convert_tokens_to_string(inserted_tokens) # Convert the inserted tokens into a string

        if inserted_comment not in prediction_cache.keys():
            proba_dict = predict_comment(pipe, inserted_comment)
            new_probability = proba_dict[predicted_class]
            prediction_cache[inserted_comment] = new_probability
        else:
            new_probability = prediction_cache[inserted_comment]
        auc_scores.append(new_probability)
    auc = auc_score(range(1, len(auc_scores) + 1), auc_scores) / len(auc_scores)
    return auc

def deletion_auc(pipe, comment, predicted_class, xai_token_importance):
    proba_dict = predict_comment(pipe, comment)
    predicted_class_proba = proba_dict[predicted_class]

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokens = [x[0] for x in xai_token_importance]
    tokens_copy = tokens.copy()
    # Sort tokens by importance in descending order
    sorted_tokens = [x[0] for x in sorted(xai_token_importance, key=lambda x: x[1], reverse=True)]
    auc_scores = []
    for token in sorted_tokens:
        tokens_copy.remove(token) # Remove the token
        remaining_tokens = tokens_copy.copy()
        remaining_comment = tokenizer.convert_tokens_to_string(remaining_tokens)
        if remaining_comment not in prediction_cache.keys():
            proba_dict = predict_comment(pipe, remaining_comment)
            new_probability = proba_dict[predicted_class]
            prediction_cache[remaining_comment] = new_probability
        else:
            new_probability = prediction_cache[remaining_comment]
        auc_scores.append(new_probability)
    auc = auc_score(range(1, len(auc_scores) + 1), auc_scores) / len(auc_scores)
    return auc

In [None]:
# df = pd.read_csv("/content/Restaurant_reviews_xai.csv")
df = pd.read_csv("/content/Hotel_reviews_xai.csv")

In [None]:
lime_df = df[["text", "prediction", "lime_exp", "lime_time"]].copy()
lime_df.rename(columns={"lime_exp": "exp", "lime_time": "time"}, inplace=True)

In [None]:
shap_df = df[["text", "prediction", "shap_exp", "shap_time"]].copy()
shap_df.rename(columns={"shap_exp": "exp", "shap_time": "time"}, inplace=True)

In [None]:
ig_df = df[["text", "prediction", "ig_exp", "ig_time"]].copy()
ig_df.rename(columns={"ig_exp": "exp", "ig_time": "time"}, inplace=True)

In [None]:
exai_df = df[["text", "prediction", "exai_exp", "exai_time"]].copy()
exai_df.rename(columns={"exai_exp": "exp", "exai_time": "time"}, inplace=True)

In [None]:
prediction_cache = {}

In [None]:
for i, row in lime_df.iterrows():
    exp_tuples = [(token, weight) for token, weight in eval(row.exp).items()]
    comp = comprehensivness(pipe, row.text, row.prediction, exp_tuples)
    suff = sufficiency(pipe, row.text, row.prediction, exp_tuples)
    lime_df.loc[i, "comp"] = comp
    lime_df.loc[i, "suff"] = suff
    corr_loo = correlation_leave_one_out(pipe, row.text, row.prediction, exp_tuples)
    lime_df.loc[i, "corr_loo"] = corr_loo
    if len(exp_tuples) > 1:
        auc_ins = insertion_auc(pipe, row.text, row.prediction, exp_tuples)
        auc_del = deletion_auc(pipe, row.text, row.prediction, exp_tuples)
        lime_df.loc[i, "auc_ins"] = auc_ins
        lime_df.loc[i, "auc_del"] = auc_del
lime_df.to_csv("Hotel_reviews_lime_eval.csv", index=False)

In [None]:
for i, row in shap_df.iterrows():
    exp_tuples = [(token, weight) for token, weight in eval(row.exp).items()]
    comp = comprehensivness(pipe, row.text, row.prediction, exp_tuples)
    suff = sufficiency(pipe, row.text, row.prediction, exp_tuples)
    shap_df.loc[i, "comp"] = comp
    shap_df.loc[i, "suff"] = suff
    corr_loo = correlation_leave_one_out(pipe, row.text, row.prediction, exp_tuples)
    shap_df.loc[i, "corr_loo"] = corr_loo
    if len(exp_tuples) > 1:
        auc_ins = insertion_auc(pipe, row.text, row.prediction, exp_tuples)
        auc_del = deletion_auc(pipe, row.text, row.prediction, exp_tuples)
        shap_df.loc[i, "auc_ins"] = auc_ins
        shap_df.loc[i, "auc_del"] = auc_del
shap_df.to_csv("Hotel_reviews_shap_eval.csv", index=False)

In [None]:
for i, row in ig_df.iterrows():
    exp_tuples = [(token, weight) for token, weight in eval(row.exp).items()]
    comp = comprehensivness(pipe, row.text, row.prediction, exp_tuples)
    suff = sufficiency(pipe, row.text, row.prediction, exp_tuples)
    ig_df.loc[i, "comp"] = comp
    ig_df.loc[i, "suff"] = suff
    corr_loo = correlation_leave_one_out(pipe, row.text, row.prediction, exp_tuples)
    ig_df.loc[i, "corr_loo"] = corr_loo
    if len(exp_tuples) > 1:
        auc_ins = insertion_auc(pipe, row.text, row.prediction, exp_tuples)
        auc_del = deletion_auc(pipe, row.text, row.prediction, exp_tuples)
        ig_df.loc[i, "auc_ins"] = auc_ins
        ig_df.loc[i, "auc_del"] = auc_del
ig_df.to_csv("Hotel_reviews_ig_eval.csv", index=False)

In [None]:
for i, row in exai_df.iterrows():
    exp_tuples = [(token, weight) for token, weight in eval(row.exp).items()]
    comp = comprehensivness(pipe, row.text, row.prediction, exp_tuples)
    suff = sufficiency(pipe, row.text, row.prediction, exp_tuples)
    exai_df.loc[i, "comp"] = comp
    exai_df.loc[i, "suff"] = suff
    corr_loo = correlation_leave_one_out(pipe, row.text, row.prediction, exp_tuples)
    exai_df.loc[i, "corr_loo"] = corr_loo
    if len(exp_tuples) > 1:
        auc_ins = insertion_auc(pipe, row.text, row.prediction, exp_tuples)
        auc_del = deletion_auc(pipe, row.text, row.prediction, exp_tuples)
        exai_df.loc[i, "auc_ins"] = auc_ins
        exai_df.loc[i, "auc_del"] = auc_del
exai_df.to_csv("Hotel_reviews_exai_eval.csv", index=False)

In [None]:
print("Average time: ", lime_df["time"].mean())
print("Average comp: ", lime_df["comp"].mean())
print("Average suff: ", lime_df["suff"].mean())
print("Average corr_loo: ", lime_df["corr_loo"].mean())
print("Average auc_ins: ", lime_df["auc_ins"].mean())
print("Average auc_del: ", lime_df["auc_del"].mean())

In [None]:
print("Average time: ", shap_df["time"].mean())
print("Average comp: ", shap_df["comp"].mean())
print("Average suff: ", shap_df["suff"].mean())
print("Average corr_loo: ", shap_df["corr_loo"].mean())
print("Average auc_ins: ", shap_df["auc_ins"].mean())
print("Average auc_del: ", shap_df["auc_del"].mean())

In [None]:
print("Average time: ", ig_df["time"].mean())
print("Average comp: ", ig_df["comp"].mean())
print("Average suff: ", ig_df["suff"].mean())
print("Average corr_loo: ", ig_df["corr_loo"].mean())
print("Average auc_ins: ", ig_df["auc_ins"].mean())
print("Average auc_del: ", ig_df["auc_del"].mean())

In [None]:
print("Average time: ", exai_df["time"].mean())
print("Average comp: ", exai_df["comp"].mean())
print("Average suff: ", exai_df["suff"].mean())
print("Average corr_loo: ", exai_df["corr_loo"].mean())
print("Average auc_ins: ", exai_df["auc_ins"].mean())
print("Average auc_del: ", exai_df["auc_del"].mean())