<img src="https://github.com/spraja08/Interpretable-GenAI/blob/main/resources/GenXAI%20Methods.png">

In [None]:
import math
import os, random, re, gc
import warnings
warnings.filterwarnings('ignore')
import machine_learning_datasets as mldatasets
import numpy as np
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification,\
      pipeline
from bertviz import head_view, model_view
from captum.attr import LayerIntegratedGradients, TokenReferenceBase,\
      visualization
from lit_nlp import notebook
from lit_nlp.api import dataset as lit_dataset
from lit_nlp.api import model as lit_model
from lit_nlp.api import types as lit_types
import plotly

In [None]:
torch.cuda.is_available()

In [None]:
#We will use this sentiment analysis dataset and BERT model fine-tuned for the same task
reviews_df = mldatasets.load("nyc-reviews", prepare=True)

In [None]:
reviews_df.info()

In [None]:
reviews_df[["review_title", "review_full", "positive_sentiment", "label", "score"]].head(3)

In [None]:
sum_cols_l = ['score','positive_sentiment','rating']

summary_df = reviews_df.groupby('label')[sum_cols_l].\
                    agg({'score':['count','mean'], 'positive_sentiment':'mean',\
                         'rating':'mean'})
summary_df.columns = ['count', 'avg. score', '% positive', 'avg. rating']

summary_df.sort_values(by='avg. rating', ascending=False).style.\
  format({'count':'{:,}', 'avg. score':'{:.1%}', '% positive':'{:.1%}' , 'avg. rating':'{:.2f}'}).\
  bar(subset=['avg. score', '% positive', 'avg. rating'], color='#4EF', width=60)

In [None]:
rand = 42
os.environ['PYTHONHASHSEED']=str(rand)
random.seed(rand)
np.random.seed(rand)
torch.manual_seed(rand)

In [None]:
#Initialise the tokenizer and the model to be used in the mechanistic visualisation
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

goemotions_mdl_path = "monologg/bert-base-cased-goemotions-ekman"

goemotions_tok = AutoTokenizer.from_pretrained(goemotions_mdl_path)
goemotions_mdl = AutoModelForSequenceClassification.\
                              from_pretrained(goemotions_mdl_path,
                                              output_attentions=True)
goemotions_mdl = goemotions_mdl.to(device)
goemotions_mdl.eval()

In [None]:
num_layers = goemotions_mdl.config.num_hidden_layers
num_attention_heads = goemotions_mdl.config.num_attention_heads

print(f"The model has {num_layers} layers.")
print(f"Each layer has {num_attention_heads} attention heads.")

In [None]:
suprise_sample_reviews_l = [174067, 284154, 480395, 47659]
line_pattern = r'(?<=[.!?])\s+'
sample_reviews_dict = {}

for i, review_idx in enumerate(suprise_sample_reviews_l):
    review_s = reviews_df.loc[review_idx, :]
    sentiment = 'Positive' if review_s['positive_sentiment'] else 'Negative'
    review_lines_l = re.split(line_pattern, review_s['review_full'], maxsplit=1)
    review_txt = '\r\n\t\t'.join(review_lines_l)

    print(f"{review_s['restaurant_name']}")
    print(f"\tSentiment:\t\t{sentiment}")
    print(f"\tRating:\t\t\t{review_s['rating']}")
    print(f"\tGoEmotions Label:\t{review_s['label']}")
    print(f"\tGoEmotions Score:\t{review_s['score']:.1%}")
    print(f"\tTitle:\t{review_s['review_title']}")
    print(f"\tReview:\t{review_txt}\r\n")

    sample_reviews_dict[i] = review_lines_l

In [None]:
def clear_gpu_cache():
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    gc.collect()

In [None]:
#Lets just test if the model works to generate the expected sentiment. This is an encoder only model. 
#Learnt that for the purposes that involve non-generation of text (ex. classification), encoder only 
#models are sufficient.

import torch.nn.functional as F

def get_output(tokenizer, model, sentences):
    sentence_a, sentence_b = sentences

    # Encode sentences with tokenizer
    inputs = tokenizer.encode_plus(sentence_a, sentence_b,\
                                        return_tensors='pt')
    # Extract components from inputs
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    input_ids = inputs['input_ids'].to(device)
    token_type_ids = inputs['token_type_ids'].to(device)

    # Get attention weights from model given the inputs
    output = model(input_ids, token_type_ids=token_type_ids)
    logits = output[0]
    probabilities = F.softmax(logits, dim=-1)
    predicted_class_index = torch.multinomial(probabilities, num_samples=1)
    id2label = model.config.id2label[predicted_class_index.tolist()[0][0]]
    return id2label

In [None]:
predicted_label = get_output(goemotions_tok, goemotions_mdl,\
               sample_reviews_dict[1])
predicted_label

In [None]:
#Similar to the above, this invokes the model and gets the additional goodies - the parameters in all layers  

def view_attention(tokenizer, model, sentences, view='model'):
    sentence_a, sentence_b = sentences

    # Encode sentences with tokenizer
    inputs = tokenizer.encode_plus(sentence_a, sentence_b,\
                                        return_tensors='pt')
    # Extract components from inputs
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    input_ids = inputs['input_ids'].to(device)
    token_type_ids = inputs['token_type_ids'].to(device)

    # Get attention weights from model given the inputs
    attention = model(input_ids, token_type_ids=token_type_ids)[-1]

    # Get 2nd sentence start and tokens
    sentence_b_start = token_type_ids[0].tolist().index(1)
    input_id_list = input_ids[0].tolist()
    tokens = tokenizer.convert_ids_to_tokens(input_id_list)

    # BertViz visualizers
    if view=='head':
        head_view(attention, tokens, sentence_b_start)
    elif view=='model':
        model_view(attention, tokens, sentence_b_start)
    del attention
    del tokens
    clear_gpu_cache()

In [None]:
view_attention(goemotions_tok, goemotions_mdl,\
               sample_reviews_dict[0], view='model')

In [None]:
view_attention(goemotions_tok, goemotions_mdl,\
               sample_reviews_dict[0], view='head')

In [None]:
view_attention(goemotions_tok, goemotions_mdl,\
               sample_reviews_dict[1], view='head')

In [None]:
goemotions = pipeline(
                      model=goemotions_mdl,
                      tokenizer=goemotions_tok,
                      task="text-classification",
                      function_to_apply='softmax',
                      device=device,
                      top_k=None
                    )

In [None]:
goemotions(['this restaurant was unexpectedly disgusting!',\
            'this restaurant was shockingly amazing!'])

In [None]:
#Now we are moving into Token Attribution techniques - the gradient attribution.

def visualize_ig_review(interpret_s:pd.Series,
                        pline:pipeline,
                        max_prob_thresh:float=0.1,
                        max_classes=np.PINF,
                        concat_title=True,
                        summary_df=None
                    ) -> pd.DataFrame:
    print(f"{interpret_s.name}: {interpret_s['restaurant_name']}")

    # Init some variables
    if concat_title:
        text = interpret_s['review_title'] + ': ' + interpret_s['review_full']
    else:
        text = interpret_s['review_full']
    true_label = 'Positive' if interpret_s['positive_sentiment'] else 'Negative'
    rating = interpret_s['rating']

    # Get Predictions
    prediction = pline(text)[0]
    prediction_df = pd.DataFrame(prediction)
    if summary_df is not None:
        prediction_df['label_avg_rating'] = prediction_df.label.\
                                                replace(summary_df['avg. rating'].to_dict())
        prediction_df = prediction_df.sort_values('label_avg_rating', ascending=False).\
                                                                        reset_index(drop=True)

    # Process Predictions
    prediction_tuples = [(p['label'], p['score']) for p in prediction]
    sorted_prediction_tuples = sorted(prediction_tuples, key=lambda x: x[1], reverse=True)
    pred_class, pred_prob = sorted_prediction_tuples[0]

    # Initialize Integrated Gradients
    forward_func = lambda inputs, position=0: pline.model(inputs,\
                              attention_mask=torch.ones_like(inputs))[position]
    layer = getattr(pline.model, 'bert').embeddings
    lig = LayerIntegratedGradients(forward_func, layer)

    # Prepare tokens and baseline
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    inputs = torch.tensor(pline.tokenizer.encode(text, add_special_tokens=False), device = device).unsqueeze(0)
    tokens = pline.tokenizer.convert_ids_to_tokens(inputs.detach().cpu().numpy()[0])
    sequence_len = inputs.shape[1]
    baseline = torch.tensor([pline.tokenizer.cls_token_id] + [pline.tokenizer.pad_token_id] *\
                            (sequence_len - 2) + [pline.tokenizer.sep_token_id], device=device).\
                                    unsqueeze(0)

    clear_gpu_cache()

    # Iterate over every prediction
    vis_record_l = []
    for i, (attr_class, attr_score) in enumerate(sorted_prediction_tuples):
        if (attr_score > max_prob_thresh) and (i < max_classes):
            # Sets the Target Class
            target = pline.model.config.label2id[attr_class]

            # Get Attributions
            with torch.no_grad():
                attributes, delta = lig.attribute(inputs=inputs,
                                                baselines=baseline,
                                                target=target,
                                                return_convergence_delta = True)

            # Post-Processing Attributions
            attr = attributes.sum(dim=2).squeeze(0)
            attr = attr / torch.norm(attr)
            attr = attr.cpu().detach().numpy()

            # Generate & Append Visualization Data Record
            vis_record = visualization.VisualizationDataRecord(
                                    word_attributions=attr,
                                    pred_prob=pred_prob,
                                    pred_class=pred_class,
                                    true_class=f"{true_label} ({rating})",
                                    attr_class=attr_class,
                                    attr_score=attr_score,
                                    raw_input_ids=tokens,
                                    convergence_score=delta)
            vis_record_l.append(vis_record)

    # Display List of Visualization Data Records
    _ = visualization.visualize_text(vis_record_l)

    clear_gpu_cache()

    return prediction_df

In [None]:
neg_suprise_df = reviews_df[(reviews_df['label']=='surprise') &\
                                (reviews_df['score']>0.9) &\
                                (reviews_df['positive_sentiment']==0) &\
                                (reviews_df['rating']<3)]
neg_suprise_samp_df = neg_suprise_df.sample(n=10, random_state=rand)

In [None]:
for i in range(10):
    sample_to_interpret = neg_suprise_samp_df.iloc[i]
    _ = visualize_ig_review(sample_to_interpret, goemotions,\
                            concat_title=True, summary_df=summary_df)

In [None]:
pos_suprise_df = reviews_df[(reviews_df['label']=='surprise') &\
                                (reviews_df['score']>0.97) &\
                                (reviews_df['positive_sentiment']==1) &\
                                (reviews_df['rating']>4)]
pos_suprise_samp_df = pos_suprise_df[~pos_suprise_df['review_full'].\
                                       str.contains('surprise')]

for i in range(10):
    sample_to_interpret = pos_suprise_samp_df.iloc[i]
    _ = visualize_ig_review(sample_to_interpret, goemotions,\
                            concat_title=False, summary_df=summary_df)

In [None]:
pos_mixed_samp_df = reviews_df[(~reviews_df['label'].isin(['neutral','joy'])) &\
                              (reviews_df['score'] < 0.5) &\
                              (reviews_df['positive_sentiment']==1) &\
                              (reviews_df['rating']< 5)].sample(n=10,\
                                                                   random_state=rand)
neg_mixed_samp_df = reviews_df[(~reviews_df['label'].isin(['neutral','joy'])) &\
                              (reviews_df['score'] < 0.5) &\
                              (reviews_df['positive_sentiment']==0) &\
                              (reviews_df['rating']>2)].sample(n=10,\
                                                                  random_state=rand)

In [None]:
for i in range(5):
    sample_to_interpret = pos_mixed_samp_df.iloc[i]
    prediction_df = visualize_ig_review(sample_to_interpret, goemotions,\
                                        concat_title=False, summary_df=summary_df)
    rest_name = sample_to_interpret['restaurant_name']
    mldatasets.plot_polar(prediction_df, 'score', 'label', name=rest_name)

In [None]:
class GEDataset(lit_dataset.Dataset):

    GE_LABELS = ['anger', 'disgust', 'fear', 'joy',\
                 'neutral', 'sadness', 'surprise']

    def __init__(self, df: pd.DataFrame):
        self._examples = [{
          'review': row['review_title'] + ': ' + row['review_full'],
          'label': row['label'],
          'rating': row['rating'],
          'positive': row['positive_sentiment']
        } for _, row in df.iterrows()]

    def spec(self):
        return {
          'review': lit_types.TextSegment(),
          'label': lit_types.CategoryLabel(vocab=self.GE_LABELS),
          'rating': lit_types.CategoryLabel(),
          'positive': lit_types.CategoryLabel()
        }

In [None]:
class GEModel(lit_model.Model):

    GE_LABELS = ['anger', 'disgust', 'fear', 'joy',\
                 'neutral', 'sadness', 'surprise']

    def __init__(self, model, tokenizer, **kw):
        self._model = pipeline(
                          model=model,
                          tokenizer=tokenizer,
                          task="text-classification",
                          function_to_apply="softmax",
                          device=device,
                          top_k=None
                        )

    def input_spec(self):
        return {
            'review': lit_types.TextSegment()
        }

    def output_spec(self):
        return {
          'probas': lit_types.MulticlassPreds(vocab=self.GE_LABELS, parent='label')
        }

    def predict_minibatch(self, inputs):
        examples = [d['review'] for d in inputs]
        with torch.no_grad():
            preds = self._model(examples)
        preds = [{p['label']:p['score'] for p in pred_dicts}\
                 for pred_dicts in preds]
        preds = [dict(sorted(pred_dict.items()))\
                 for pred_dict in preds]
        preds = [{'probas': list(pred_dict.values())} for pred_dict in preds]

        return preds

In [None]:
import lit_nlp
models = {'GoEmotion':GEModel(goemotions_mdl, goemotions_tok)}

samples100_df = pd.concat([neg_suprise_samp_df, pos_suprise_samp_df, neg_mixed_samp_df,\
                           pos_mixed_samp_df, reviews_df.sample(n=60, random_state=rand)])

datasets = {'NYCRestaurants':GEDataset(samples100_df)}
widget = notebook.LitWidget(models, datasets, port = 8890)
widget.render(height=1000)