# Integrated gradients and attribution score calculations for LSTM

This notebook contains the used code for computing the attribution scores with the integrated gradients method for the LSTM classifier. The following models can be analysed for LSTM: "human caption" model, nic and nic+equalizer. For these models, we used the same test sets (image id's) where all models were trained on seed 0. The prediction files (equal to the test set) and weights for the LSTM models can be in the folder bias_data_for_ig. The notebook is currently set up to run the NIC model.

Credits to Ruben Winastwan for providing a tutorial on how to use Captum for BERT Models. https://towardsdatascience.com/interpreting-the-prediction-of-bert-model-for-text-classification-5ab09f8ef074

# IMPORTS

In [None]:
import captum

import spacy

import torch
import torchtext
from torchtext.legacy import data
import torch.nn as nn
import torch.nn.functional as F

from torchtext.vocab import Vocab

from captum.attr import LayerIntegratedGradients, TokenReferenceBase, visualization
import csv
import re
import pickle
import random
from nltk import word_tokenize
import nltk
nltk.download('punkt')
import time
import argparse
import numpy as np
import os
import pprint
from nltk.tokenize import word_tokenize
from io import open
import sys
import json
from torch import nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm, trange
from operator import itemgetter
from sklearn.model_selection import train_test_split
from sklearn.metrics import average_precision_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import accuracy_score
import pandas as pd


# MODEL

In [4]:
class RNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, 
                 bidirectional, dropout, pad_idx):
        
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        
        self.rnn = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers=n_layers, 
                           bidirectional=bidirectional, 
                           dropout=dropout)
        
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text, text_lengths):
        embedded = self.dropout(self.embedding(text)) #[sent len, batch size, emb dim]
        
        
        # pack sequence
        # lengths need to be on CPU!
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.to('cpu'), batch_first=True)
        
        packed_output, (hidden, cell) = self.rnn(packed_embedded)
        
        #unpack sequence
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)

        #and apply dropout
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))
            
        return self.fc(hidden), embedded

# CAPTION MODEL DATA

In [None]:


nlp = spacy.load("en_core_web_sm")

TEXT = data.Field(sequential=True, 
                tokenize='spacy', 
                tokenizer_language='en_core_web_sm',
                include_lengths=True, 
                use_vocab=True)
LABEL = data.Field(sequential=False, 
                  use_vocab=False, 
                  pad_token=None, 
                  unk_token=None,
                  )
IMID = data.Field(sequential=False,
                  use_vocab=False,
                  pad_token=None,
                  unk_token=None,
                  )
train_val_fields = [
    ('prediction', TEXT), # process it as text
    ('label', LABEL), # process it as label
    ('imid', IMID)
]

train_data, test_data = torchtext.legacy.data.TabularDataset.splits(path='/bias_data_for_ig/LSTM/nic/',train='train_nic_model.csv', test='test_nic_model.csv',
                                                                    format='csv', fields=train_val_fields)
ex = test_data[1]
print(ex.prediction, ex.label)

MAX_VOCAB_SIZE = 25000



TEXT.build_vocab(train_data,  max_size = MAX_VOCAB_SIZE)
LABEL.build_vocab(train_data)

ex = train_data[1]
print(ex.prediction, ex.label)

print(f"Unique tokens in TEXT vocabulary: {len(TEXT.vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL.vocab)}")
#print(LABEL.vocab.stoi)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, test_iterator = data.BucketIterator.splits(
                                                    (train_data, test_data), 
                                                    batch_size = 1,
                                                    sort_key=lambda x: len(x.prediction), # on what attribute the text should be sorted
                                                    sort_within_batch = True,
                                                    device = device)


for b in test_iterator:
  print(b.prediction)
  break


# HUMAN MODEL DATA

In [None]:
nlp = spacy.load("en_core_web_sm")

HUMAN_TEXT = data.Field(sequential=True, 
                tokenize='spacy', 
                tokenizer_language='en_core_web_sm',
                include_lengths=True, 
                use_vocab=True)
HUMAN_LABEL = data.Field(sequential=False, 
                  use_vocab=False, 
                  pad_token=None, 
                  unk_token=None,
                  )
HUMAN_IMID = data.Field(sequential=False,
                  use_vocab=False,
                  pad_token=None,
                  unk_token=None,
                  )



human_train_val_fields = [
    ('prediction', HUMAN_TEXT), # process it as text
    ('label', HUMAN_LABEL), # process it as label
    ('imid', HUMAN_IMID)
]

human_train_data, human_test_data = torchtext.legacy.data.TabularDataset.splits(path='/bias_data_for_ig/LSTM/nic/',train='train_nic_human.csv', test='test_nic_human.csv',
                                                                    format='csv', fields=human_train_val_fields)


MAX_VOCAB_SIZE = 25000


HUMAN_TEXT.build_vocab(human_train_data,  max_size = MAX_VOCAB_SIZE)
HUMAN_LABEL.build_vocab(human_train_data)

for i in range(5):
  ex = human_train_data[i]
  print(ex.prediction, ex.label)

print(f"Unique tokens in TEXT vocabulary: {len(HUMAN_TEXT.vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(HUMAN_LABEL.vocab)}")
#print(LABEL.vocab.stoi)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

human_train_iterator, human_test_iterator = data.BucketIterator.splits(
                                                    (human_train_data, human_test_data), 
                                                    batch_size = 1,
                                                    sort_key=lambda x: len(x.prediction), # on what attribute the text should be sorted
                                                    sort_within_batch = True,
                                                    device = device)

# CAPTION MODEL 

In [8]:
INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1
N_LAYERS = 2
BIDIRECTIONAL = True
DROPOUT = 0.5
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
print(PAD_IDX)

model = RNN(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, BIDIRECTIONAL, DROPOUT, PAD_IDX)
model.load_state_dict(torch.load('/bias_data_for_ig/LSTM/nic/model_nic_model.pt', map_location=torch.device('cpu')))
model.eval()

1


RNN(
  (embedding): Embedding(722, 100, padding_idx=1)
  (rnn): LSTM(100, 256, num_layers=2, dropout=0.5, bidirectional=True)
  (fc): Linear(in_features=512, out_features=1, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
)

# HUMAN MODEL

In [9]:
INPUT_DIM = len(HUMAN_TEXT.vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1
N_LAYERS = 2
BIDIRECTIONAL = True
DROPOUT = 0.5
PAD_IDX = HUMAN_TEXT.vocab.stoi[HUMAN_TEXT.pad_token]
print(PAD_IDX)

model_human = RNN(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, BIDIRECTIONAL, DROPOUT, PAD_IDX)
model_human.load_state_dict(torch.load('/bias_data_for_ig/LSTM/nic/model_nic_human.pt', map_location=torch.device('cpu')))
model_human.eval()

1


RNN(
  (embedding): Embedding(700, 100, padding_idx=1)
  (rnn): LSTM(100, 256, num_layers=2, dropout=0.5, bidirectional=True)
  (fc): Linear(in_features=512, out_features=1, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
)

In [10]:
def forward_with_sigmoid(input, text_lengths=None):
    return torch.sigmoid(model(input, text_lengths=text_lengths)[0])

def forward_with_sigmoid_human(input, text_lengths=None):
    return torch.sigmoid(model_human(input, text_lengths=text_lengths)[0])

In [11]:
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]

# itos is id -> token
# stoi is token -> id
print(TEXT.vocab.stoi['  '])
print(TEXT.vocab.itos[UNK_IDX])

# PADDING NEUTRAL
token_reference = TokenReferenceBase(reference_token_idx=PAD_IDX)

print(token_reference)

0
<unk>
<captum.attr._models.base.TokenReferenceBase object at 0x7f72b8703400>


In [12]:
PAD_IDX = HUMAN_TEXT.vocab.stoi[HUMAN_TEXT.pad_token]
UNK_IDX = HUMAN_TEXT.vocab.stoi[HUMAN_TEXT.unk_token]

# itos is id -> token
# stoi is token -> id
print(HUMAN_TEXT.vocab.stoi['  '])
print(HUMAN_TEXT.vocab.itos[UNK_IDX])

# PADDING NEUTRAL
token_reference = TokenReferenceBase(reference_token_idx=PAD_IDX)

print(token_reference)

0
<unk>
<captum.attr._models.base.TokenReferenceBase object at 0x7f737ef3dfd0>


# GRADIENTS

In [13]:
lig = LayerIntegratedGradients(forward_with_sigmoid, model.embedding)
lig_human = LayerIntegratedGradients(forward_with_sigmoid_human, model_human.embedding)

In [15]:

def interpret_sentence(model, sentence, text_lengths, vis_data_records, model_text, model_labels, lig, min_len = 15, label = 0, att_dict = {}):
    text = [model_text.vocab.itos[t] for t in sentence.permute(1,0)]

    indexed = [model_text.vocab.stoi[t] for t in text]
    model.zero_grad()
    input_indices = torch.tensor(indexed, device=device)
    input_indices = input_indices.unsqueeze(0)

    input_indices = sentence

    # logit 
    pred = model(input_indices, text_lengths)[0]
    print(input_indices, text_lengths)

    # logit to prob
    probs = torch.sigmoid(pred).item()
    pred_ind = round(probs)

    # generate reference indices for each sample
    reference_indices = token_reference.generate_reference(text_lengths, device=device).unsqueeze(0)
    # compute attributions and approximation delta using layer integrated gradients
    attributions_ig, delta = lig.attribute(input_indices, reference_indices, \
                                           additional_forward_args=text_lengths,
                                           n_steps=500, return_convergence_delta=True)

    add_attributions_to_visualizer(attributions_ig, text, probs, pred_ind, label, delta, vis_data_records, model_labels, att_dict)

    return vis_data_records

def add_attributions_to_visualizer(attributions, text, pred, pred_ind, label, delta, vis_data_records, model_label, att_dict):
    attributions = attributions.sum(dim=2).squeeze(0)
    attributions = attributions / torch.norm(attributions)
    attributions = attributions.cpu().detach().numpy()
    
  
    model_pred = int(model_label.vocab.itos[pred_ind])
    target = int(model_label.vocab.itos[label])
    print(text)
    if model_pred == target:
      if target == 0:
        att_dict['male_att_score'].append(attributions.sum())
      elif target == 1:
        att_dict['female_att_score'].append(attributions.sum())
      
    


    # storing couple samples in an array for visualization purposes
    vis_data_records.append(visualization.VisualizationDataRecord(
                            attributions,
                            pred,
                            model_label.vocab.itos[pred_ind],
                            model_label.vocab.itos[label],
                            model_label.vocab.itos[0],
                            attributions.sum(),
                            text,
                            delta))




In [29]:
from tkinter.constants import E
model_dict = {}
human_dict = {}
attributions_dict_model = {
   'female_att_score':[],
   'male_att_score':[],
}
attributions_dict_human = {
    'female_att_score':[],
    'male_att_score':[],
}

for b in human_test_iterator:
  human_dict[b.imid.item()] = [b.prediction, b.label]

for b in test_iterator:
  model_dict[b.imid.item()] = [b.prediction, b.label]


def run_sentence_interpretation(num_of_sentences):
  # accumalate couple samples in this array for visualization purposes
  human_data_records = []
  model_data_records = []
  
  counter = 0

  len_model_dict = len(model_dict)
  len_human_dict = len(human_dict)

  if len_model_dict <= len_human_dict:
    bigger_dict = human_dict
    smaller_dict = model_dict
  else:
    bigger_dict = model_dict
    smaller_dict = human_dict

  keys = bigger_dict.keys()

  if num_of_sentences == 0:
    num_of_sentences = len(keys)
    print('Iterating over:', num_of_sentences, 'samples.')

  for key in keys:      
    if key not in smaller_dict:
      continue

    if counter % 30 == 0:
      print('Currently at the ' + str(counter) + 'th image')
    counter += 1


    model_text, model_text_length = model_dict[key][0]
    human_text, human_text_length = human_dict[key][0]
    count = 0
    human_text2 = []
    for id in human_text:
      if id.item() == 6:
        count += 1
        human_text2.append(HUMAN_TEXT.vocab.stoi['[MASK]'])
      else:
        human_text2.append(id)
    human_text2 = torch.tensor([human_text2])
    human_text_length2 = human_text_length #- count

    human_text = human_text.permute(1,0)

    human_label = human_dict[key][1]

    interpret_sentence(model_human, human_text, human_text_length, model_text=HUMAN_TEXT, model_labels=HUMAN_LABEL, lig=lig_human, vis_data_records=human_data_records, label=human_label, att_dict=attributions_dict_human)
  
    model_text = model_text.permute(1,0)
    model_label = model_dict[key][1]
    

    
    interpret_sentence(model, model_text, model_text_length, model_text=TEXT ,model_labels=LABEL, lig=lig, vis_data_records=model_data_records, att_dict=attributions_dict_model, label=model_label)

    if counter == num_of_sentences:
      break

  return human_data_records, model_data_records, attributions_dict_human, attributions_dict_model



In [None]:
print('Visualize attributions based on Integrated Gradients for the human captions')
human_records, model_records, _, _ = run_sentence_interpretation(0)
_ = visualization.visualize_text(human_records)

In [None]:
print('Visualize attributions based on Integrated Gradients for the model captions')
_ = visualization.visualize_text(model_records)

In [9]:
fem_sums = {
    'nic': [],
    'nic_plus': [],
    'nic_equalizer': []
}

male_sums = {
    'nic': [],
    'nic_plus': [],
    'nic_equalizer': []
}

for model_name in ['nic', 'nic_plus', 'nic_equalizer']:
  for seed in [0, 12, 100, 200, 300, 400, 456, 500, 789, 1234]:

    with open('/attributions/'+ model_name + '_seed' + str(seed) + 'attributions.pickle', 'rb') as handle:
      current_model = pickle.load(handle)

    fem_sums[model_name].append(np.mean(current_model['female_att_score']))
    male_sums[model_name].append(np.mean(current_model['male_att_score']))
  




In [None]:
for model_name in ['nic', 'nic_plus', 'nic_equalizer']:
  fem_mean = np.mean(fem_sums[model_name])
  fem_var = np.var(fem_sums[model_name])

  print('FOR MODEL:', model_name)

  print('Female mean:', fem_mean)
  print('Female variance:', fem_var)
  print('')

  male_mean = np.mean(male_sums[model_name])
  male_var = np.var(male_sums[model_name])

  print('Male mean:', male_mean)
  print('Male variance:', male_var)
  print('')

## Genderword replacement analysis

In [None]:
correct_genderword = 0
correct_masked = 0
different = 0
total = 0
equal = 0
LICd = 0 
LICd2 = 0
female_scores = 0
male_scores = 0
female_scores2 = 0
male_scores2 = 0
female_att_sum = 0
male_att_sum = 0


def create_text(text):
    count = 0
    text2 = []
    for id in text:
      if id.item() == 6:
        count += 1
        text2.append(HUMAN_TEXT.vocab.stoi['[MASK]'])
      else:
        text2.append(id)
    text2 = torch.tensor([text2])
    text_length2 = text_length
    text = text.permute(1,0)
    return text, text2

def get_scores(text):
    spred = torch.sigmoid(model(text, text_length)[0])
    #spred2 = torch.sigmoid(model(text2, text_length)[0])
    pred_gender = (spred >= 0.5000).int()
    #pred_mask = (spred2 >= 0.5000).int()
    female_score = spred
    male_score = 1 - female_score
    if male_score >= female_score:
        pred_score = male_score
    else:
        pred_score = female_score
    return spred, pred_gender, female_score, male_score, pred_score

for i, batch in enumerate(human_test_iterator):  
    text, text_length = batch.prediction
    text, text2 = create_text(text)
    label = batch.label
    total += 1

    spred, pred_gender, female_score, male_score, pred_score = get_scores(text)
    spred2, pred_gender2, female_score2, male_score2, pred_score2 = get_scores(text2)

    if (pred_gender != pred_gender2):
      different += 1

    # normal 
    if pred_gender == label:
      correct_genderword += 1
      LICd += pred_score
      female_scores += female_score
      male_scores += male_score

    if pred_gender2 == label:
      correct_masked += 1
      LICd2 += pred_score2
      female_scores2 += female_score2
      male_scores2 += male_score2

    

print('--')
#rint("number of different labels predicted", different)
#print("number of same labels predicted", equal)
print("LICd", LICd/total)
print("LICd2", LICd2/total)
print("female confidence", female_scores.item())
print("female confidence masked", female_scores2.item())
print("male confidence", male_scores.item())
print("male confidence masked", male_scores2.item())
print(total)