In [3]:
#Hyperparameters:

SHOULD_USE_SEED = True
RADIUS = 5000
BATCH_SIZE = 700
EPSILON = 0.1
SIGMA = 0.001
MAX_ITERS = 100

NN_BATCH_SIZE = 128

SAMPLES_FOR_MEASURE = 100 #If all the samples should be measured, the value should be None.

RERUN_EXPERIMENT = False

TRAIN_FILE = "esnli_train.csv"
VAL_FILE = "esnli_dev.csv"
TEST_FILE = "esnli_test.csv"


In [4]:
from sklearn import metrics
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer

import numpy as np

from data.data_creator import data_create_SNLI

if SHOULD_USE_SEED:
    np.random.seed(42)


In [5]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


## Dataset 

In [6]:
data, labels = data_create_SNLI()

In [7]:
#Splitting the dataset by using the original split

x_train, x_val, x_test = data[TRAIN_FILE], data[VAL_FILE], data[TEST_FILE]
y_train, y_val, y_test = labels[TRAIN_FILE], labels[VAL_FILE], labels[TEST_FILE]

## Black-box model - Multinomial Naive Bayes classifier

The black box model that was used in the original article.

In [8]:
from sklearn.feature_extraction.text import TfidfVectorizer

data_multinomial_nb = dict()

for key in data:
    data_multinomial_nb[key] = \
        [first + '~ ' + second for first, second in zip(data[key]['premise'], data[key]['hypothesis'])]
    
x_train, x_val, x_test = data_multinomial_nb[TRAIN_FILE], data_multinomial_nb[VAL_FILE], data_multinomial_nb[TEST_FILE]

vect_text = TfidfVectorizer(use_idf = False)
x_vec_train = vect_text.fit_transform(x_train)

clf = MultinomialNB().fit(x_vec_train, y_train)

In [9]:
preds = clf.predict(vect_text.transform(x_val))

In [10]:
print('Val accuracy', metrics.accuracy_score(y_val, preds))

Val accuracy 0.5420646210119894


In [11]:
preds = clf.predict(vect_text.transform(x_test))
print('Test accuracy', metrics.accuracy_score(y_test, preds))

Test accuracy 0.5362377850162866


## Black-box model - Neural Network

The black box model above doesn't give a good result. Therefore, we use another black box model presented in the following article:
https://nlp.stanford.edu/pubs/snli_paper.pdf

### Data preperation


In [12]:
#https://jamesmccaffrey.wordpress.com/2021/01/04/creating-a-custom-torchtext-dataset-from-a-text-file/

# from torchtext.legacy.data import Field
# import torchtext as tt

# TEXT = tt.legacy.data.Field(sequential=True,
#   init_token='(bos)',  # start of sequence
#   eos_token='(eos)',   # replace parens with less, greater
#   lower=True,
#   tokenize=tt.data.utils.get_tokenizer("basic_english"),)
# LABEL = tt.legacy.data.Field(sequential=False,
#   use_vocab=True,
#   unk_token=None,
#   is_target=True)

In [13]:
import os
from os.path import join
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder

cwd = os.getcwd()
data_dir = join(cwd, "data/eSNLI")
df = {}
df_names = {TRAIN_FILE : 'train', VAL_FILE: 'val', TEST_FILE: 'test'}
for file in TRAIN_FILE, VAL_FILE, TEST_FILE:
    data = pd.read_csv(join(data_dir, file))
    data_sentences = data[['Sentence1', 'Sentence2', 'gold_label']]
    data_sentences = data_sentences.dropna()
    labelencoder = LabelEncoder()

    data_sentences['gold_label_cat'] = labelencoder.fit_transform(data_sentences['gold_label'])
    data_sentences.drop('gold_label', inplace = True, axis = 1)
    df[df_names[file]] = data_sentences
    


# (train_obj, valid_obj, test_obj) = tt.legacy.data.TabularDataset.splits(
#   path=".//data/eSNLI/",
#   train=TRAIN_FILE,
#   validation=VAL_FILE,
#   test=TEST_FILE,
#   skip_header = True,
#   format='csv',
#   filter_pred = lambda x: x.gold_label != '-',
#   fields=[('pairID', None), ('gold_label', LABEL), ('sentence1', TEXT), ('sentence2', TEXT)])

In [14]:
# TEXT.build_vocab(train_obj.sentence1, min_freq=1, vectors='glove.6B.300d')
# TEXT.build_vocab(train_obj.sentence2, min_freq=1, vectors='glove.6B.300d')
# LABEL.build_vocab(train_obj.gold_label)
# pretrained_embeddings = TEXT.vocab.vectors
def text_preprocessing(text):
    text = re.sub(r'\s+', ' ', text).strip()
    return text

In [38]:
# train_iter, val_iter, test_iter = tt.legacy.data.BucketIterator.splits(
#    (train_obj, valid_obj, test_obj), 
#     batch_size=NN_BATCH_SIZE, 
#     sort_key = lambda x: len(x.sentence1),
#     sort_within_batch = False,
#     repeat=False, 
#     device=device)
from transformers import BertTokenizer
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
import re
MAX_LEN = 63

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)

def preprocessing_for_bert_helper(sentences):
    input_ids = []
    attention_masks = []

#     input_ids_2 = []
#     attention_masks_2 = []

#     sentence1 = data.Sentence1.values
#     sentence2 = data.Sentence2.values

    for sent in sentences:
        encoded_sent = tokenizer.encode_plus(
            text=text_preprocessing(sent),  
            add_special_tokens=True,        
            max_length=MAX_LEN,                 
            pad_to_max_length=True,    
            truncation=True,
            #return_tensors='pt',           
            return_attention_mask=True      
            )
        
        input_ids.append(encoded_sent.get('input_ids'))
        attention_masks.append(encoded_sent.get('attention_mask'))


#     for sent in sentence2:
#         encoded_sent = tokenizer.encode_plus(
#             text=text_preprocessing(sent),  
#             add_special_tokens=True,        
#             max_length=MAX_LEN,                 
#             pad_to_max_length=True,         
#             #return_tensors='pt',           
#             return_attention_mask=True      
#             )
        
#         input_ids_2.append(encoded_sent.get('input_ids'))
#         attention_masks_2.append(encoded_sent.get('attention_mask'))        

    input_ids = torch.tensor(input_ids)
    attention_masks = torch.tensor(attention_masks)

#     input_ids_2 = torch.tensor(input_ids_2)
#     attention_masks_2 = torch.tensor(attention_masks_2)

    return input_ids, attention_masks

def preprocessing_for_bert(sentences):

    premise_input_ids, premise_attention_masks = preprocessing_for_bert_helper(sentences['Sentence1'].values)
    hypothesis_input_ids, hypothesis_attention_masks = preprocessing_for_bert_helper(sentences['Sentence2'].values)
    inputs = torch.cat([premise_input_ids, hypothesis_input_ids], dim = 1)
    masks = torch.cat([premise_attention_masks, hypothesis_attention_masks], dim = 1)
    
    return inputs, masks


def GetDataLoader(df):
    inputs, masks = preprocessing_for_bert(df)
        
    #inputs, masks = preprocessing_for_bert(df.Sentence1.values)
    # Convert other data types to torch.Tensor
    labels = torch.tensor(df['gold_label_cat'].values)

    # For fine-tuning BERT, the authors recommend a batch size of 16 or 32.
    batch_size = 32

    # Create the DataLoader for our training set
    data = TensorDataset(inputs, masks, labels)
    sampler = RandomSampler(data)
    dataloader = DataLoader(data, sampler=sampler, batch_size=batch_size)

    return dataloader

dataloaders = {}
for key, curr_df in df.items():
    dataloaders[key] = GetDataLoader(curr_df)


### Building the model

In [43]:
import torch
import torch.nn as nn
from transformers import BertModel
import time 

# Create the BertClassfier class
class BertClassifier(nn.Module):
    """Bert Model for Classification Tasks.
    """
    def __init__(self, device, freeze_bert=False):
        """
        @param    bert: a BertModel object
        @param    classifier: a torch.nn.Module classifier
        @param    freeze_bert (bool): Set `False` to fine-tune the BERT model
        """
        super(BertClassifier, self).__init__()
        D_in, H, D_out = 768, 50, 3

        self.bert = BertModel.from_pretrained('bert-base-uncased')

        self.classifier = nn.Sequential(
            nn.Linear(D_in, H),
            nn.ReLU(),
            #nn.Dropout(0.5),
            nn.Linear(H, D_out)
        )

        if freeze_bert:
            for param in self.bert.parameters():
                param.requires_grad = False
        
        self.device = device
        
    def forward(self, input_ids, attention_mask):
        """
        Feed input to BERT and the classifier to compute logits.
        @param    input_ids (torch.Tensor): an input tensor with shape (batch_size,
                      max_length)
        @param    attention_mask (torch.Tensor): a tensor that hold attention mask
                      information with shape (batch_size, max_length)
        @return   logits (torch.Tensor): an output tensor with shape (batch_size,
                      num_labels)
        """
        outputs = self.bert(input_ids=input_ids,
                            attention_mask=attention_mask)
        
        last_hidden_state_cls = outputs[0][:, 0, :]

        logits = self.classifier(last_hidden_state_cls)

        return logits
    
    def predict(self, input_ids, attention_mask):
        """Returns the most likely sequence of tags for a sequence of words in `text_batch`.

        Arguments: 
          text_batch: a tensor containing word ids of size (seq_len, batch_size) 
        Returns:
          tag_batch: a tensor containing tag ids of size (seq_len, batch_size)
        """
        logits = self.forward(input_ids, attention_mask)
        #print(logits)
        tag_batch = torch.argmax(logits, axis = -1)
        return tag_batch

class BertNLITrainer:
    
    def __init__(self,model, device):
        self.model = model
        self.loss_fn = nn.CrossEntropyLoss()
        self.device = device
        
    def train(self, train_dataloader, val_dataloader=None, epochs=4, evaluation=False):
        model = self.model
        print("Start training...\n")
        for epoch_i in range(epochs):

            print(f"{'Epoch':^7} | {'Batch':^7} | {'Train Loss':^12} | {'Val Loss':^10} | {'Val Acc':^9} | {'Elapsed':^9}")
            print("-"*70)

            t0_epoch, t0_batch = time.time(), time.time()

            total_loss, batch_loss, batch_counts = 0, 0, 0

            model.train()

            for step, batch in enumerate(train_dataloader):
                batch_counts +=1
                b_input_ids, b_attn_mask, b_labels = tuple(t.to(device) for t in batch)

                model.zero_grad()

                logits = model(b_input_ids, b_attn_mask)
                loss = self.loss_fn(logits, b_labels)
                batch_loss += loss.item()
                total_loss += loss.item()

                loss.backward()

                torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

                optimizer.step()
                scheduler.step()

                if (step % 20 == 0 and step != 0) or (step == len(train_dataloader) - 1):
                    time_elapsed = time.time() - t0_batch

                    print(f"{epoch_i + 1:^7} | {step:^7} | {batch_loss / batch_counts:^12.6f} | {'-':^10} | {'-':^9} | {time_elapsed:^9.2f}")

                    batch_loss, batch_counts = 0, 0
                    t0_batch = time.time()

            avg_train_loss = total_loss / len(train_dataloader)

            print("-"*70)

            if evaluation == True:

                val_loss, val_accuracy = self.evaluate(val_dataloader)

                time_elapsed = time.time() - t0_epoch

                print(f"{epoch_i + 1:^7} | {'-':^7} | {avg_train_loss:^12.6f} | {val_loss:^10.6f} | {val_accuracy:^9.2f} | {time_elapsed:^9.2f}")
                print("-"*70)
            print("\n")

        print("Training complete!")


    def evaluate(self, val_dataloader):
        
        """After the completion of each training epoch, measure the model's performance
        on our validation set.
        """
        
        model = self.model

        model.eval()

        val_accuracy = []
        val_loss = []

        for batch in val_dataloader:
            b_input_ids, b_attn_mask, b_labels = tuple(t.to(device) for t in batch)

            with torch.no_grad():
                logits = model(b_input_ids, b_attn_mask)

            loss = self.loss_fn(logits, b_labels)
            val_loss.append(loss.item())

            preds = torch.argmax(logits, dim=1).flatten()

            accuracy = (preds == b_labels).cpu().numpy().mean() * 100
            val_accuracy.append(accuracy)

        val_loss = np.mean(val_loss)
        val_accuracy = np.mean(val_accuracy)

        return val_loss, val_accuracy
    

In [44]:
import random

def set_seed(seed_value=42):
    """Set seed for reproducibility.
    """
    random.seed(seed_value)
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)

from black_box_models.neural_network import NeuralNetModel

# clf = NeuralNetModel(100, pretrained_embeddings, 300, 1, TEXT, LABEL).to(device)

In [45]:
# LABEL.vocab.itos
from transformers import AdamW, get_linear_schedule_with_warmup

def initialize_model(epochs=4):
    """Initialize the Bert Classifier, the optimizer and the learning rate scheduler.
    """
    bert_classifier = BertClassifier(freeze_bert=False, device = device)

    bert_classifier.to(device)

    optimizer = AdamW(bert_classifier.parameters(),
                      lr=5e-5,   
                      eps=1e-8   
                      )

    total_steps = len(dataloaders['train']) * epochs

    scheduler = get_linear_schedule_with_warmup(optimizer,
                                                num_warmup_steps=0, 
                                                num_training_steps=total_steps)
    return bert_classifier, optimizer, scheduler

In [None]:
# clf.train_all(train_iter, val_iter, epochs = 10)
set_seed(42)  
bert_classifier, optimizer, scheduler = initialize_model(epochs=4)
trainer = BertNLITrainer(bert_classifier, device)
trainer.train(dataloaders['train'], dataloaders['val'], epochs=4, evaluation=True)

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Start training...

 Epoch  |  Batch  |  Train Loss  |  Val Loss  |  Val Acc  |  Elapsed 
----------------------------------------------------------------------
   1    |   20    |   1.096927   |     -      |     -     |   5.33   


In [None]:
test_accuracy = clf.evaluate(test_iter)
print('Test accuracy: ', test_accuracy)

## Instance to explain

In [None]:
def convert_tensor_to_text (t , FIELD):
    if (t.dim() == 0):
        return FIELD.vocab.itos[t.item()]
    return ' '.join([FIELD.vocab.itos[i] for i in t])

In [None]:
batch = next(iter(train_iter))

def create_tensor_from_sentence(sentence):
    length = len(tokenizer(sentence)) + 2
    pad_id = TEXT.vocab.stoi[TEXT.pad_token]
    init_id = TEXT.vocab.stoi[TEXT.init_token]
    eos_id = TEXT.vocab.stoi[TEXT.eos_token]
    tensor = torch.ones((2,), dtype=torch.int64)
    t = tensor.new_full(size = (length, 1), fill_value  = pad_id, device = device)
    t[0, 0] = init_id
    tokens_idx = torch.LongTensor([TEXT.vocab.stoi[token] for token in tokenizer(sentence)])
    t[1 : len(tokens_idx) + 1, 0] = tokens_idx
    t[len(tokens_idx) + 1, 0] = eos_id
    return t

class Instance:
    
    def __init__(self, sentence1, sentence2):
        self.sentence1 = sentence1
        self.sentence2 = sentence2

def transform_func(x):
    splitted_x = x.split('*')
    premise = splitted_x[0]
    hypothesis = splitted_x[1]
    t_premise = create_tensor_from_sentence(premise)
    t_hypothesis = create_tensor_from_sentence(hypothesis)
    return Instance(t_premise, t_hypothesis)


In [None]:
def tokenizer(x):
    return x.split()

if RERUN_EXPERIMENT:
    for idx in range(len(x_test)):
        x_explain = x_test[idx]
        premise = x_test[idx].split('~')[0]
        hypothesis = x_test[idx].split('~')[1]
    #     
        x_explain = premise + " * " + hypothesis 
        if(convert_tensor_to_text(clf.predict(transform_func(x_explain))[0], LABEL) == y_test[idx] and y_test[idx] != 'neutral'):
            print('premise to explain: ', premise)
            print('hypothesis to explain: ',hypothesis)
            print('Predicted class: ', convert_tensor_to_text(clf.predict(transform_func(x_explain))[0], LABEL))
            print('True class: ', y_test[idx])
            break

else:   
    premise = "This church choir sings to the masses as they sing joyous songs from the book at a church."
    hypothesis = "The church is filled with song."
    x_explain = premise + " * " + hypothesis 
    print('premise to explain: ', premise)
    print('hypothesis to explain: ',hypothesis)
    print('Predicted class: ', convert_tensor_to_text(clf.predict(transform_func(x_explain))[0], LABEL))
    print('True class: ', y_test[1])




## Building MeLime model:

In [None]:
import nltk
from torch.utils.data import DataLoader

dl_train = [tokenizer(x) for x in x_train]

In [None]:
from gen_models.word2vec_gen import Word2VecGen, Word2VecEncoder
#The radius is <radius> most similar words
encoder = Word2VecEncoder(dl_train)
generator = Word2VecGen(encoder = encoder, corpus = x_train, radius = RADIUS, tokenizer = tokenizer,
                       tokens_not_to_sample = ['*', '.'])

In [None]:
from interpretable_local_models.statistics_model_nli import StatisticsLocalModelNLI
y_p_explain = max(clf.predict_proba(transform_func(x_explain))[0]).item()
print('Probability for the predicted label: ', y_p_explain)
explainer_model = StatisticsLocalModelNLI(y_p_explain, len(tokenizer(x_explain)), tokenizer, 
                                       len(tokenizer(premise)))


In [None]:
from MeLime.model import MeLimeModel
from torch import tensor


model = MeLimeModel(black_box_model = clf,gen_model =generator, batch_size = BATCH_SIZE, epsilon_c = EPSILON, 
                    sigma = SIGMA, explainer_model = explainer_model, transform_func = transform_func, 
                    max_iters = MAX_ITERS, tokenizer = tokenizer)
        


## Explaining the instance

In [None]:
res, sentences_with_probs = model.forward(x_explain)

## Plotting results

In [None]:
ax = StatisticsLocalModelNLI.plot_explaination(res)


In [None]:
import seaborn as sns
premise_res = []
hypothesis_res = []
did_finish_premise = False
for word, stat in res:
    if word == '*':
        did_finish_premise = True
        continue
    if did_finish_premise:
        hypothesis_res.append((word, stat))
        continue
    premise_res.append((word, stat))
print("Premise plot:")
StatisticsLocalModelNLI.plot_sentence_heatmap(premise_res)


In [None]:
print("Hypothesis plot:")
StatisticsLocalModelNLI.plot_sentence_heatmap(hypothesis_res)

## Plotting most favorable and contrary samples phrases:

Favorable sentence - a generated sentence using Word2VecGen that improves the model's confidence in its 
prediction on the original sentence.

Contrary samples - a generated sentence using Word2VecGen that decrease the model's confidence in its prediction on the original sentence and <b>might even change its prediction on the generated sentence</b>.

### Most contrary samples phrases:



In [None]:
sorted(sentences_with_probs, key = lambda x: x[1])[:5]

### Most favorable samples phrases:

In [None]:
sorted(sentences_with_probs, key = lambda x: x[1], reverse = True)[:5]

In [None]:
from MeLime.measures import calc_f1_esnli

#F1_score = 0.40293040293040294
if RERUN_EXPERIMENT or True:
    F1_score = calc_f1_esnli(convert_tensor_to_text, clf, transform_func, LABEL, y_p_explain, tokenizer, encoder, 
                                          x_train, RADIUS, BATCH_SIZE, EPSILON, SIGMA, MAX_ITERS, SAMPLES_FOR_MEASURE)
print("F1 score: ", F1_score)