In [1]:
!pip install captum
import pandas as pd
import numpy  as np
import spacy
sp = spacy.load('en')
all_stopwords = sp.Defaults.stop_words
import nltk 
nltk.download('words')
nltk.download('punkt')
words = set(nltk.corpus.words.words())

Collecting captum
  Downloading captum-0.4.1-py3-none-any.whl (1.4 MB)
[?25l[K     |▎                               | 10 kB 30.1 MB/s eta 0:00:01[K     |▌                               | 20 kB 23.6 MB/s eta 0:00:01[K     |▊                               | 30 kB 17.4 MB/s eta 0:00:01[K     |█                               | 40 kB 15.1 MB/s eta 0:00:01[K     |█▏                              | 51 kB 7.1 MB/s eta 0:00:01[K     |█▍                              | 61 kB 8.3 MB/s eta 0:00:01[K     |█▋                              | 71 kB 7.9 MB/s eta 0:00:01[K     |█▉                              | 81 kB 8.9 MB/s eta 0:00:01[K     |██▏                             | 92 kB 9.2 MB/s eta 0:00:01[K     |██▍                             | 102 kB 7.0 MB/s eta 0:00:01[K     |██▋                             | 112 kB 7.0 MB/s eta 0:00:01[K     |██▉                             | 122 kB 7.0 MB/s eta 0:00:01[K     |███                             | 133 kB 7.0 MB/s eta 0:00:01[K 

In [2]:
import captum
from captum.attr import LayerIntegratedGradients, TokenReferenceBase, visualization

import time 
import random
from wordcloud import WordCloud
import matplotlib.pyplot as plt

import torch
import torchtext
from torchtext import data
from torchtext import datasets
from torchtext.vocab import Vocab
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchtext.legacy.data import Field,LabelField, Dataset,TabularDataset, BucketIterator, Iterator

In [3]:
class CNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, 
                 dropout, pad_idx):
        
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx = pad_idx)
        
        self.conv_0 = nn.Conv2d(in_channels = 1, 
                                out_channels = n_filters, 
                                kernel_size = (filter_sizes[0], embedding_dim))
        
        self.conv_1 = nn.Conv2d(in_channels = 1, 
                                out_channels = n_filters, 
                                kernel_size = (filter_sizes[1], embedding_dim))
        
        self.conv_2 = nn.Conv2d(in_channels = 1, 
                                out_channels = n_filters, 
                                kernel_size = (filter_sizes[2], embedding_dim))
        
        self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):
                
        #text = [batch size, sent len]
        
        embedded = self.embedding(text)
                
        #embedded = [batch size, sent len, emb dim]
        
        embedded = embedded.unsqueeze(1)
        
        #embedded = [batch size, 1, sent len, emb dim]
        
        conved_0 = F.relu(self.conv_0(embedded).squeeze(3))
        conved_1 = F.relu(self.conv_1(embedded).squeeze(3))
        conved_2 = F.relu(self.conv_2(embedded).squeeze(3))
            
        #conved_n = [batch size, n_filters, sent len - filter_sizes[n] + 1]
        
        pooled_0 = F.max_pool1d(conved_0, conved_0.shape[2]).squeeze(2)
        pooled_1 = F.max_pool1d(conved_1, conved_1.shape[2]).squeeze(2)
        pooled_2 = F.max_pool1d(conved_2, conved_2.shape[2]).squeeze(2)
        
        #pooled_n = [batch size, n_filters]
        
        cat = self.dropout(torch.cat((pooled_0, pooled_1, pooled_2), dim = 1))

        #cat = [batch size, n_filters * len(filter_sizes)]
            
        return self.fc(cat)

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [4]:
def binary_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """

    #round predictions to the closest integer
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float() #convert into float for division 
    acc = correct.sum() / len(correct)
    return acc

    
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        optimizer.zero_grad()
        
        predictions = model(batch.text).squeeze(1)
        loss = criterion(predictions, batch.label)
        
        acc = binary_accuracy(predictions, batch.label)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            predictions = model(batch.text).squeeze(1)
            
            loss = criterion(predictions, batch.label)
            
            acc = binary_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def epoch_time(start_time, end_time):
    import time 
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [5]:
def train_test(model,train_iterator,valid_iterator,test_iterator,optimizer,criterion):
    N_EPOCHS = 10
    best_valid_loss = float('inf')

    for epoch in range(N_EPOCHS):
        start_time = time.time()
            
        train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
        valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
        
        end_time = time.time()

        epoch_mins, epoch_secs = epoch_time(start_time, end_time)
        
        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), '/content/drive/MyDrive/Colab Notebooks/education project/models/tut4-model.pt')
        
        print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
        print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
        print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')
    model.load_state_dict(torch.load('/content/drive/MyDrive/Colab Notebooks/education project/models/tut4-model.pt'))

    test_loss, test_acc = evaluate(model, test_iterator, criterion)

    print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')


In [6]:
def predict_sentiment(model, sentence, min_len = 5):
    model.eval()
    tokenized = [tok.text for tok in sp.tokenizer(sentence)]
    if len(tokenized) < min_len:
        tokenized += ['<pad>'] * (min_len - len(tokenized))
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]
    tensor = torch.LongTensor(indexed).to(device)
    tensor = tensor.unsqueeze(0)
    prediction = torch.sigmoid(model(tensor))
    return prediction.item()
def forward_with_sigmoid(input):
    return torch.sigmoid(model(input))

In [7]:
def implement_cnn(path,name,device):
    SEED = 1234
    random.seed(SEED)
    np.random.seed(SEED)
    torch.manual_seed(SEED)
    torch.backends.cudnn.deterministic = True

    TEXT = Field(tokenize = 'spacy', 
                      tokenizer_language = 'en_core_web_sm',
                      batch_first = True)
    LABEL = LabelField(dtype = torch.float)

    
    train, validation, test = TabularDataset.splits(fields=[('text', TEXT), ('label', LABEL)],
                                          train='train_'+ name+'.csv',
                                          validation='val_'+name +'.csv',
                                          test='test_'+name +'.csv',
                                          format='CSV',
                                          path=path)

    MAX_VOCAB_SIZE = 25000
    
    
    TEXT.build_vocab(train, 
                    max_size = MAX_VOCAB_SIZE, 
                    vectors = "glove.6B.100d", 
                    unk_init = torch.Tensor.normal_)
    LABEL.build_vocab(train)
    LABEL.vocab.itos = LABEL.vocab.itos[:2]
    # LABEL.vocab.itos[0] = "0.0"
    # LABEL.vocab.itos[1] = "1.0"
    print("The reference label is {}".format(LABEL.vocab.itos[1]))
    print(LABEL.vocab.itos)

  

    BATCH_SIZE = 64
    train_iterator = BucketIterator(train, batch_size=BATCH_SIZE, sort_key=lambda x: len(x.text),
                                device=device, train=True, sort=True, sort_within_batch=True)
    valid_iterator = BucketIterator(validation, batch_size=BATCH_SIZE, sort_key=lambda x: len(x.text),
                                device=device, train=True, sort=True, sort_within_batch=True)
    test_iterator = Iterator(test, batch_size=BATCH_SIZE, device=device, train=False, shuffle=False, sort=False)

    INPUT_DIM = len(TEXT.vocab)
    EMBEDDING_DIM = 100
    N_FILTERS = 100
    FILTER_SIZES = [3,4,5]
    OUTPUT_DIM = 1
    DROPOUT = 0.5
    PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token] ## if length < max_len

    model = CNN(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)
    print(f'The model has {count_parameters(model):,} trainable parameters')
    pretrained_embeddings = TEXT.vocab.vectors
    model.embedding.weight.data.copy_(pretrained_embeddings)
    UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]

    model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
    model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)


    optimizer = optim.Adam(model.parameters())
    criterion = nn.BCEWithLogitsLoss()
    model = model.to(device)
    criterion = criterion.to(device)
    train_test(model,train_iterator,valid_iterator,test_iterator,optimizer,criterion)

    return model,TEXT,LABEL

In [9]:
# accumalate couple samples in this array for visualization purposes
def interpret_sentence(model, sentence, min_len = 500, label = 0):
    text = [tok.text for tok in sp.tokenizer(sentence.lower())]
    if len(text) < min_len:
        text += [TEXT.pad_token] * (min_len - len(text))
    indexed = [TEXT.vocab.stoi[t] for t in text]

    model.zero_grad()
 
    input_indices = torch.tensor(indexed, device=device)
    input_indices = input_indices.unsqueeze(0)
    
    # input_indices dim: [sequence_length]
    seq_length = min_len

    # predict
    pred = forward_with_sigmoid(input_indices).item()
    pred_ind = round(pred)

    # generate reference indices for each sample
    reference_indices = token_reference.generate_reference(seq_length, device=device).unsqueeze(0)
    # compute attributions and approximation delta using layer integrated gradients
    attributions_ig, delta = lig.attribute(input_indices, reference_indices, \
                                           n_steps=min_len, return_convergence_delta=True)

    # print('pred: ', Label.vocab.itos[pred_ind], '(', '%.2f'%pred, ')', ', delta: ', abs(delta))

    add_attributions_to_visualizer(attributions_ig, text, pred, pred_ind, label, delta, vis_data_records_ig)
    return(pred_ind,label)
    
def add_attributions_to_visualizer(attributions, text, pred, pred_ind, label, delta, vis_data_records):
    attributions = attributions.sum(dim=2).squeeze(0)
    attributions = attributions / torch.norm(attributions)
    attributions = attributions.cpu().detach().numpy()

    # storing couple samples in an array for visualization purposes

    vis_data_records.append(visualization.VisualizationDataRecord(
                            attributions,
                            pred,
                            LABEL.vocab.itos[pred_ind],
                            label,
                            LABEL.vocab.itos[1],
                            attributions.sum(),       
                            text,
                            delta))

In [10]:
def main_cnn(path,name,device):
    model,TEXT,LABEL = implement_cnn(path,name,device)
    print('Vocabulary Size: ', len(TEXT.vocab))
    PAD_IND = TEXT.vocab.stoi[TEXT.pad_token]
    token_reference = TokenReferenceBase(reference_token_idx=PAD_IND)

    lig = LayerIntegratedGradients(model, model.embedding)
  
    test_data = pd.read_csv(path+name+".csv")
    # vis_data_records_ig = []
    embeding_dic = {}
    for index,row in test_data.iterrows():
        Index_test = []
        sentence = test_data.loc[index,"text"]
        text = [tok.text for tok in sp.tokenizer(sentence.lower())]
        if len(text) < 500:
            text += [TEXT.pad_token] * (500 - len(text))
        indexed = [TEXT.vocab.stoi[t] for t in text]
        model.zero_grad()
        input_indices = torch.tensor(indexed, device=device)
        input_indices = input_indices.unsqueeze(0)
        reference_indices = token_reference.generate_reference(len(indexed), device=device).unsqueeze(0)
        attributions_ig, delta = lig.attribute(input_indices, reference_indices, \
                                                  n_steps=500, return_convergence_delta=True)
        attributions = attributions_ig.sum(dim=2).squeeze(0)
        attributions = attributions / torch.norm(attributions)
        attributions = attributions.cpu().detach().numpy()
        for i in range(500):
            try:
                embeding_dic[text[i]].append(attributions[i])
            except KeyError:
                embeding_dic[text[i]] = [attributions[i]]

    avgDict = {}
    for k,v in embeding_dic.items():
        avgDict[k] = sum(v)/ float(len(v))
    words_imp = [(k, v) for k, v in avgDict.items()]
    words_imp.sort(key=lambda x: x[1])

    # return vis_data_records_ig,words_imp
    return words_imp

In [11]:
def gen_wc(words_imp,output_path,name,title1,title,reverse=False):
    words = pd.DataFrame(words_imp,columns=["word","score"])
    res = []
    tw_pairs={}
    for index, row in words[:50].iterrows():
        tw_pairs[row[0]] = abs(float(row[1]))
    res.append(tw_pairs)

    tw_pairs={}
    for index, row in words[-50:].iterrows():
        tw_pairs[row[0]] = float(row[1])
    res.append(tw_pairs)
    plot_wc(res,name,title1,title2,reverse)

    return

def plot_wc(res,name,title1,title2,reverse=False):
    plt.figure(figsize=(20,10))
    
    for d in range(len(res)):
        wc = WordCloud(width= 300, height = 300, background_color="white", repeat=False)
        wc.generate_from_frequencies(res[d])
        
        if reverse:
            plt.subplot(1,2,len(res)-d)
            plt.title(title2[len(res)-1-d],size=18,weight='bold', pad=20)
        else:
            plt.subplot(1,2,d+1)
            plt.title(title2[d],size=18,weight='bold', pad=20)
        plt.axis("off")
        plt.imshow(wc, interpolation="bilinear")
    plt.suptitle('Word Clouds Based on CNN Model of {}'.format(title1),size=20,weight='bold')
    plt.savefig(output_path+name+'.png', facecolor="w",format='png')
    
    return

In [None]:
### Implement
path = ### the directory you store your files
name= ### csv file name
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)
words_imp = main_cnn(path,name,device)
name = ### png file name 
title1 = ### main title
title2= ### subtitles
gen_wc(words_imp,name,title1,title2)