In [None]:
!pip install captum

Collecting captum
[?25l  Downloading https://files.pythonhosted.org/packages/13/b0/8fa3ab89e2e37c960cdd09595fa911fbb8d6da216c8bc98e18c858a0128d/captum-0.3.1-py3-none-any.whl (4.4MB)
[K     |████████████████████████████████| 4.4MB 18.5MB/s 
Installing collected packages: captum
Successfully installed captum-0.3.1


In [None]:
import captum

import spacy

import torch
import torchtext
import torchtext.data
import torch.nn as nn
import torch.nn.functional as F
from torchtext.legacy.data import BucketIterator
from torchtext.vocab import GloVe
from torchtext.legacy.data import Field, LabelField
from torchtext.legacy.datasets import SST
import torch.optim as optim


from torchtext.vocab import Vocab
from torchtext import vocab

from captum.attr import LayerIntegratedGradients, TokenReferenceBase, visualization, IntegratedGradients, DeepLift, DeepLiftShap, LayerDeepLift, LayerConductance, LayerFeatureAblation

nlp = spacy.load('en')

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


In [None]:
def get_sst_data(device):
    # set up fields
    #TEXT = Field(lower=True, include_lengths=True, batch_first=True)
    #LABEL =Field(sequential=False)
    TEXT = Field(lower=True, tokenize='spacy', batch_first=True)
    Label = LabelField(dtype = torch.float, use_vocab=True, preprocessing=float)


    # make splits for data
    train, val, test = SST.splits(TEXT, Label, fine_grained=True)

    # build the vocabulary
    loaded_vectors = GloVe(name='6B', dim=100)
    loaded_vectors = torchtext.vocab.Vectors('glove.6B.100d.txt')
    TEXT.build_vocab(train, vectors=loaded_vectors, max_size=len(loaded_vectors.stoi))
    TEXT.vocab.set_vectors(stoi=loaded_vectors.stoi, vectors=loaded_vectors.vectors, dim=loaded_vectors.dim)

    Label.build_vocab(train)
    text_vocab = TEXT.vocab
    x = torch.tensor(1)

    # make iterator for splits
    train_iter, val_iter, test_iter = BucketIterator.splits(
        (train, val, test), batch_size=32, device=torch.device('cuda:0')) # for CPU, device = -1

    return train_iter, val_iter, test_iter, text_vocab, TEXT, Label

In [None]:
train_iter, val_iter, test_iter, vocab, TEXT, Label = get_sst_data(device)

downloading trainDevTestTrees_PTB.zip


trainDevTestTrees_PTB.zip: 100%|██████████| 790k/790k [00:00<00:00, 791kB/s]


extracting


.vector_cache/glove.6B.zip: 862MB [03:00, 4.77MB/s]                           
100%|█████████▉| 398439/400000 [00:14<00:00, 26781.68it/s]

In [None]:
class BiLSTM(nn.Module):
    def __init__(self, vocab, num_classes, embedding_dim=768, hidden_dim=256,  pad_idx = 0, num_layers=1, dropout_prob=0.5):
        super(BiLSTM, self).__init__()
        # load pretrained embedding in embedding layer.
        self.embedding = nn.Embedding(len(vocab), embedding_dim, padding_idx = pad_idx)
        #self.embedding.weight.data.copy_(vocab.vectors)

        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim, num_layers=num_layers, dropout=dropout_prob,
                            batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(dropout_prob)
        self.linear = nn.Linear(in_features=hidden_dim * 2, out_features=num_classes) # multiplied by 2 because bidirectional

    def forward(self, text, max_sent_len=49):
        # text_emb must have the following shape: (batch_size, sequence_length, 768) for bert embeddings uncomment the following
        #
        #text_emb = text_emb.expand(-1, max_sent_len, -1).to(device)
        text_emb = self.embedding(text)
        #text_emb = text_emb.unsqueeze(1)
        # pass the text embedding through lstm
        lstm_output, hidden_out = self.lstm(text_emb)
        #lstm_output, _ = self.lstm(text_emb.view(len(text), 1, -1))
        lstm_output = torch.cat((lstm_output[:, -1, :256], lstm_output[:, 0, 256:]), dim=-1) # concatenate the output from forward and backward LSTM
        out = self.dropout(lstm_output)

        return self.linear(out)

In [None]:
model = BiLSTM(vocab, num_classes=5, embedding_dim=100, hidden_dim=50, num_layers=1, dropout_prob=0.5, pad_idx=0)

  "num_layers={}".format(dropout, num_layers))


In [None]:
def forward_with_sigmoid(input):
    return torch.sigmoid(model(input))

In [None]:

#train, val, test = SST.splits(text_field=TEXT, label_field=Label, fine_grained=True)
#test, _ = test_iter.split(split_ratio = 0.04)

In [None]:
# from torchtext import vocab

# #loaded_vectors = vocab.GloVe(name='6B', dim=100)

# # If you prefer to use pre-downloaded glove vectors, you can load them with the following two command line
# loaded_vectors = torchtext.vocab.Vectors('glove.6B.100d.txt')
# TEXT.build_vocab(train, vectors=loaded_vectors, max_size=len(loaded_vectors.stoi))
    
# TEXT.vocab.set_vectors(stoi=loaded_vectors.stoi, vectors=loaded_vectors.vectors, dim=loaded_vectors.dim)
# Label.build_vocab(train)

In [None]:
for key in Label.vocab.freqs:
  print(key, Label.vocab.stoi[key])

positive 0
very positive 3
neutral 2
negative 1
very negative 4


In [None]:
def train(model, iterator, optimizer, criterion, device):
    epoch_loss = 0
    epoch_acc = 0

    model.train() # sets the training mode

    for batch in iterator:
        optimizer.zero_grad()

        predictions = model(batch.text)
        true_labels = batch.label.long()
      
        #true_labels = torch.sub(batch.label, torch.tensor(1))
        #true_labels = true_labels.to(device)

        # Calc loss
        loss = criterion(predictions, true_labels)
        acc = categorical_accuracy(predictions, true_labels)

        # Calcualte train and validation losses after 10 episodes
        # Backprop step
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc/len(iterator), model


def evaluate(model, iterator, criterion, device, split='val'):
    epoch_loss = 0
    epoch_acc = 0

    model.eval()
    #num_iterations = 0
    with torch.no_grad():
        #for emb, labels in get_batched_emb_labels(args, split=split):
        for batch in iterator:
            predictions = model(batch.text)
            true_labels = batch.label.long()
            
            #true_labels = torch.sub(batch.label, torch.tensor(1))
            
            #true_labels = true_labels.to(device)

            loss = criterion(predictions, true_labels)
            acc = categorical_accuracy(predictions, true_labels)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
            #num_iterations += 1

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
print('Vocabulary Size: ', len(TEXT.vocab))


Vocabulary Size:  15480


In [None]:
PAD_IND = TEXT.vocab.stoi['pad']


In [None]:
PAD_IND

12760

In [None]:
def categorical_accuracy(preds, y):
    """
    We calculate the accuracy by performing an argmax to get the index of the maximum value in the prediction for
    each element in the batch, and then counting how many times this equals the actual label.
    We then average this across the batch.
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    top_pred = preds.argmax(1, keepdim = True)
    correct = top_pred.eq(y.view_as(top_pred)).sum()
    acc = correct.float() / y.shape[0]
    return acc

In [None]:
model = model.to("cuda:0")
optimizer = optim.Adam(model.parameters(), lr=float(1e-4))
loss_fn = nn.CrossEntropyLoss()  # nn.NLLLoss()
# CrossEntropyLoss expects the input to be [batch size, n classes] and the label to be [batch size].
loss_fn.to(device)

#SST.iters(batch_size=32, device=None) # change device to -1 if using cpu

for epoch in range(3):

    train_loss, train_acc, model = train(model, train_iter, optimizer, loss_fn, device)
    # calculate validation loss and accuracy
    valid_loss, valid_acc = evaluate(model, val_iter, loss_fn, device, split='val')

    print(f'Epoch: {epoch + 1} ')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc * 100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc * 100:.2f}%')
torch.save(model.state_dict(), f"cnn_model_3.pt")

test_loss, test_acc = evaluate(model, test_iter, loss_fn, device, split='test')
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')


Epoch: 1 
	Train Loss: 1.586 | Train Acc: 25.80%
	 Val. Loss: 1.585 |  Val. Acc: 25.34%


100%|█████████▉| 398439/400000 [00:29<00:00, 26781.68it/s]

Epoch: 2 
	Train Loss: 1.577 | Train Acc: 26.12%
	 Val. Loss: 1.573 |  Val. Acc: 25.70%
Epoch: 3 
	Train Loss: 1.572 | Train Acc: 27.01%
	 Val. Loss: 1.572 |  Val. Acc: 25.52%
Test Loss: 1.575 | Test Acc: 23.44%


In [None]:
torch.load('cnn_model_3.pt')
model.eval()

BiLSTM(
  (embedding): Embedding(15480, 100, padding_idx=0)
  (lstm): LSTM(100, 50, batch_first=True, dropout=0.5, bidirectional=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (linear): Linear(in_features=100, out_features=5, bias=True)
)

In [None]:
# disable cudnn because Integrated gradient for LSTM shows 'cudnn RNN backward can only be called in training mode'
# https://github.com/pytorch/captum/issues/564
torch.backends.cudnn.enabled=False


In [None]:
token_reference = TokenReferenceBase(reference_token_idx=PAD_IND)


In [None]:
lig = LayerIntegratedGradients(model, model.embedding)
#lig = LayerIntegratedGradients(model, model.convs)
dl = LayerDeepLift(model, model.embedding)

In [None]:
# accumalate couple samples in this array for visualization purposes
vis_data_records_ig = []

def interpret_sentence(model, sentence, min_len = 40, true_label = 0):
    text = [tok.text for tok in nlp.tokenizer(sentence.lower())]
    if len(text) < min_len:
        text += ['pad'] * (min_len - len(text))
    indexed = [TEXT.vocab.stoi[t] for t in text]

    model.zero_grad()

    input_indices = torch.tensor(indexed, device=device)
    input_indices = input_indices.unsqueeze(0)

    #print(input_indices.shape)
    
    # input_indices dim: [sequence_length]
    seq_length = min_len

    # predict
    pred = forward_with_sigmoid(input_indices) #.item()
    score = torch.max(pred).item()

    if score >= 0 and score < 0.2:
      label = Label.vocab.stoi["very negative"]
    elif score >= 0.2 and score < 0.4:
      label = Label.vocab.stoi["negative"]
    elif score >= 0.4 and score < 0.6:
      label = Label.vocab.stoi["neutral"]
    elif score >=0.6 and score < 0.8:
      label = Label.vocab.stoi["positive"]
    else:
      label = Label.vocab.stoi["very positive"]
    pred_ind = label #round(pred)
    #print(label)

    # generate reference indices for each sample
    reference_indices = token_reference.generate_reference(seq_length, device=device).unsqueeze(0)
    #print(reference_indices)
    #print(input_indices)

    # compute attributions and approximation delta using layer integrated gradients
    attributions_ig, delta = lig.attribute(input_indices, reference_indices, target=pred_ind, n_steps=500, return_convergence_delta=True)

    
    #attributions, delta = ig.attribute(input_indices, reference_indices, target=pred_ind, return_convergence_delta=True)
    
    #print(Label.vocab.itos[pred_ind])

    print(f"pred: {Label.vocab.itos[pred_ind]}, {label}, delta: {abs(delta)}")

    add_attributions_to_visualizer(attributions_ig, text, label, pred_ind, true_label, delta, vis_data_records_ig)
    
def add_attributions_to_visualizer(attributions, text, pred, pred_ind, label, delta, vis_data_records):
    attributions = attributions.sum(dim=2).squeeze(0)
    attributions = attributions / torch.norm(attributions)
    attributions = attributions.cpu().detach().numpy()

    # storing couple samples in an array for visualization purposes
    vis_data_records.append(visualization.VisualizationDataRecord(
                            attributions,
                            pred,
                            Label.vocab.itos[pred_ind],
                            Label.vocab.itos[label],
                            Label.vocab.itos[1],
                            attributions.sum(),       
                            text,
                            delta))

In [None]:
for key in Label.vocab.freqs:
  print(key, Label.vocab.stoi[key])

positive 0
very positive 3
neutral 2
negative 1
very negative 4


In [None]:
interpret_sentence(model, 'It was a fantastic performance !', true_label=3)
interpret_sentence(model, "Renner 's performance as Dahmer is unforgettable , deeply absorbing .", true_label=3)
interpret_sentence(model, 'Best film ever', true_label=3)
interpret_sentence(model, 'Too leisurely paced and visually drab for its own good ,it succeeds in being only sporadically amusing .', true_label=2)
interpret_sentence(model, 'It was a horrible movie', true_label=1)
interpret_sentence(model, 'I\'ve never watched something as bad', true_label=4)
interpret_sentence(model, 'It is a disgusting movie!', true_label=4)

pred: neutral, 2, delta: tensor([1.3961e-05], device='cuda:0')
pred: neutral, 2, delta: tensor([0.0003], device='cuda:0')
pred: neutral, 2, delta: tensor([3.0942e-06], device='cuda:0')
pred: neutral, 2, delta: tensor([0.0003], device='cuda:0')
pred: neutral, 2, delta: tensor([4.7827e-05], device='cuda:0')
pred: neutral, 2, delta: tensor([0.0001], device='cuda:0')
pred: neutral, 2, delta: tensor([9.0150e-05], device='cuda:0')


                Gradients cannot be activated
                for these data types.
  % (index, str(inputs_dtype))


In [None]:
print('Visualize attributions based on Integrated Gradients')
_ = visualization.visualize_text(vis_data_records_ig)

Visualize attributions based on Integrated Gradients


True Label,Predicted Label,Attribution Label,Attribution Score,Word Importance
very positive,neutral (2.00),negative,1.98,it was a fantastic performance ! pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad
,,,,
very positive,neutral (2.00),negative,1.74,"renner 's performance as dahmer is unforgettable , deeply absorbing . pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad"
,,,,
very positive,neutral (2.00),negative,0.36,best film ever pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad
,,,,
neutral,neutral (2.00),negative,1.54,"too leisurely paced and visually drab for its own good , it succeeds in being only sporadically amusing . pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad"
,,,,
negative,neutral (2.00),negative,-0.15,it was a horrible movie pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad pad
,,,,


Captum visualization library shows in green tokens that push the prediction towards the target class. Those driving the score towards the reference value are marked in red. As a result, words perceived as positive will appear in green if attribution is performed against the predicted class but will be highlighted in red with an attribution targeting attributed label class.

Because importance scores ar assigned to tokens, not words, some examples may show, that attribution is highly dependent on tokenization. Classification results may vary between runs.