In [1]:
import numpy as np
import pickle
import json
import torch
from torch.utils.data import DataLoader, TensorDataset, Dataset
import torch.nn as nn
from torch.autograd import Variable
from torch.nn import functional as F
import torch.onnx
from torch.utils.data.sampler import SubsetRandomSampler
import sys
import os
import threading
import time
import matplotlib.pyplot as plt

# my modules
sys.path.append(os.getcwd() + '/../..')

from models import ShallowCNN
from models import DPCNN
from dataset import dataset
from preprocess import preprocess
from util import readConfig
from train import *
from plot import plots
from scipy import stats

In [2]:
from captum.attr._core import (
    input_x_gradient,
    guided_grad_cam,
    gradient_shap
)


from captum.attr import (
    GradientShap,
    DeepLift,
    DeepLiftShap,
    IntegratedGradients,
    LayerConductance,
    NeuronConductance,
    NoiseTunnel
)

In [3]:
# load the config get the model and the data
def init(config_path):
    print('Read config file %s.'%config_path)
    args = readConfig.readConfig(config_path)
    print(args)
    # load embeddings
    embed_matrix = pickle.load(open(args['embed_path']+'/embed.pkl', 'rb'))
    print("Load embedding of size %s"%embed_matrix.shape[1])
    w2i = pickle.load(open(args['w2i']+'/w2i.pkl', 'rb'))
    i2w = pickle.load(open(args['i2w']+'/i2w.pkl', 'rb'))
    # load data
    cls, texts =preprocess.readData(args['input_path']+"/test_classes.data", args['input_path']+"/test_texts.data") 
    test_set = dataset.TextDataset(texts, cls, w2i, int(args['max_sen_len']))
    model = None
    
    model_args = {
        'vocab_size': embed_matrix.shape[0], # add unkown word
        'max_len': int(args['max_sen_len']),
        'n_class': int(args['num_class']),
        'dim': embed_matrix.shape[1],
        'dropout': float(args['dropout']),
        'freeze': True,
        'kernel_num': int(args['channel_size']),
        'embedding_matrix': embed_matrix,
        'forward_embed': True
        
    }
    if args['model']=='shallowCNN':
            print("Start training shallowCNN")
            model = ShallowCNN.ShallowCNN(model_args)
    else:
        model = DPCNN.DPCNN(model_args)
    model.load(args['load_model_from'])
    print(model.eval)
    model.setDropout(0)
    
    return (args, w2i, i2w, cls, texts, test_set, model_args, model)

In [4]:
args, w2i, i2w, cls, texts, test_set, model_args, model = init("../../train/abspath_shallowcnn_config")

Read config file ../../train/abspath_shallowcnn_config.
{'action': 'test', 'model': 'shallowCNN', 'max_sen_len': '129', 'num_class': '2', 'epoch': '4', 'cv': '0', 'dropout': '0.5', 'freeze': 'True', 'output_path': '/home/sj/Documents/lecture_projects/master_thesis_2020/explain_cnn_text_classifiers/data/models/yelp_polarity_DPCNN/', 'input_path': '/home/sj/Documents/lecture_projects/master_thesis_2020/explain_cnn_text_classifiers/data/datasets/yelp_polarity', 'channel_size': '20', 'embed_path': '/home/sj/Documents/lecture_projects/master_thesis_2020/explain_cnn_text_classifiers/data/embeddings/polar/selected1275_yelp', 'w2i': '/home/sj/Documents/lecture_projects/master_thesis_2020/explain_cnn_text_classifiers/data/embeddings/glove/yelp_polarity', 'i2w': '/home/sj/Documents/lecture_projects/master_thesis_2020/explain_cnn_text_classifiers/data/embeddings/glove/yelp_polarity', 'load_model_from': '/home/sj/Documents/lecture_projects/master_thesis_2020/explain_cnn_text_classifiers/data/model

In [5]:
def compute_grads(args, w2i, i2w, cls, texts, test_set, model_args, model, 
                  input_index=0, label=0, 
                  method="Integrated-Gradient", 
                  topKWords=10, 
                  topKDims=5, 
                  embeddings=''):
        
    p = input_index
    model.forwardEmbedLayer = False
    num_rows = min(129, len(texts[p]))

    input = test_set[p][0]
    label = label#test_set[p][1]
    input = embeddings(input) if embeddings else model.embedding_layer(input)
    input = input.view(1, 1, model_args['max_len'], model_args['dim'])
    out = model(input)
    predicted = torch.argmax(out, dim=1).tolist()[0]
    output_scores = (out[0].detach().numpy())
    attributions_out = None
    # methods return grads
    if method=="Integrated-Gradient":
        ig = IntegratedGradients(model)
        baseline = torch.zeros((model_args['max_len'],model_args['dim']))
        baseline = baseline.view(1, 1, model_args['max_len'], model_args['dim'])
        attributions_out, delta = ig.attribute(input, baseline, target=label, return_convergence_delta=True)
    elif method=="InputXGrad":
        attributions_out = DeepLift(model).attribute(input, target=label)
#         attributions_out = input_x_gradient.InputXGradient(model).attribute(input, target=label)
    elif method=="Grad-Cam":
        attributions_out = guided_grad_cam.GuidedGradCam(model, model.conv3).attribute(input, target=label)
    elif method=="SHAP":
        shap = gradient_shap.GradientShap(model)
        baseline = torch.zeros((model_args['max_len'],model_args['dim']))
        baseline = baseline.view(1, 1, model_args['max_len'], model_args['dim'])
        num_samples = 500
        attributions_out, delta = shap.attribute(input, baseline, n_samples=num_samples, target=label, return_convergence_delta=True)
    
    else:
#         print("Skip computing grad, return outputs.")
#         print(input.view(model_args['max_len'], model_args['dim']).detach().numpy())
        return output_scores

    attributions = attributions_out.view(model_args['max_len'], model_args['dim']).detach().numpy()
    _input = input.view(model_args['max_len'], model_args['dim']).detach().numpy()
#     attributions = (mat_elem_devide(attributions, _input))

    # flaten
    flat_attribution = attributions.reshape((model_args['max_len'] * model_args['dim'], ))
    flat_attribution = stats.zscore(flat_attribution, ddof=1)

    # check top K contributing words
    K = min(topKWords,len(texts[p]))
    num_top_dims = topKDims
    # sort by sum
    word_contributions = [(i, np.sum(attributions[i])) for i in range(num_rows) ]
    word_contributions = sorted(word_contributions, key=lambda x: x[1], reverse=True)
    topk_words = [i for i,v in word_contributions[:K]]
    dict_ranks = {}
    for i in range(K):
        dict_ranks[topk_words[i]] = i
        
    # check over all top contribution dims
    dim_contributions = [(i, np.sum(column)) for i, column in enumerate(attributions.T)]
    dim_contributions = sorted(dim_contributions, key=lambda x:x[1], reverse=True)
        


    # load dim names
    antonym_names = [b for a,b in pickle.load(open(args['load_dim_names'], 'rb'))]
    # build sorted list of attributions
    flat_attr_sorted = []
    # remember position in flat attribution list
    pos_counter = 0 
    word_counter = {}
            
    for i in range(num_rows):
        w = texts[p][i]
        word_counter[w] = word_counter.get(w, 0) + 1
        # print(pos_counter / model_args['dim'])
        row = flat_attribution[pos_counter: pos_counter+model_args['dim']]
        temp_list = []
        if i in topk_words:
            for t in range(model_args['dim']):
                attr_value = row[t]
                a1 = antonym_names[t][0]
                a2 = antonym_names[t][1]
                w_contrib = dict(word_contributions)[i]
                word_pair = (a1,a2) #if attr_value < 0 else (a2,a1)
                temp_list.append( (w, word_counter[w], word_pair, abs(attr_value), dict_ranks[i], w_contrib))

#             temp_list = sorted(temp_list, key=lambda x: x[3], reverse=True)
            
            flat_attr_sorted.append(temp_list)
        pos_counter += model_args['dim']
            
    # construct dict as output
    dict_to_json = {
                'Text' : text_sentences[p],
                'Prediction' : 'Positive' if predicted > 0 else 'Negative',
                'True-Label' : 'Positive' if test_set[p][1] > 0 else 'Negative',
                'Out' : [round(output_scores[0],6), round(output_scores[1], 6)],
                'Top-Dims':[(antonym_names[i], v) for i,v in dim_contributions]
                
            }
    
    for index in range(len(flat_attr_sorted)):
        word_dict = {}
        antonym_dict = {}
        for w, wc, antonym_pair, v, rank, contrib in flat_attr_sorted[index]:
            if not word_dict:
                word_dict['Word'] = w 
                word_dict['Contribution'] = str(word_contributions[topk_words[index]][1])
                word_dict["Rank"] = rank
                word_dict["Contribution"] = contrib
                # [sentence_index, word_index in the sentence]
                pos = [0,0]
                # here I want to find the position of the word in the original text,
                # not the position in the preprocessed input text where stop words are removed, 
                # so it will be messy here
                for sent_index in range(len(text_sentences[p])):
                    pos[0] = sent_index
                    words = text_sentences[p][sent_index].split(' ')
                    for word_index in range(len(words)):
                        pos[1] = word_index
                        if words[word_index].lower() == w.lower():
                            wc -= 1
                            # print(word, type(wc), type(pos[1]))
                            if wc == 0:
                                # print(word)
                                word_dict['Position'] = pos
                                break
                    if word_dict.get('Position'):
                        break

            antonym_dict[antonym_pair[0]+','+ antonym_pair[1]] = v
            antonym_dict = dict(sorted(antonym_dict.items(), key=lambda x:x[1], reverse=True))
        word_dict['Antonyms'] = antonym_dict
        dict_to_json[word_dict['Word']+"_"+str(index)] = word_dict
        
    return dict_to_json