In [1]:
from numpy.random import seed
seed(24)
import tensorflow
tensorflow.random.set_seed(24)
import warnings
warnings.filterwarnings('ignore')
import os
import os.path
import pandas as pd
import numpy as np
import spacy
import anchor
from anchor import anchor_text
from sklearn import preprocessing
import sys; sys.path.insert(0, '../src/')
import necsuf_word_level as nec_suf_repl
import necsuf_tabular_text as nec_suf 
from nltk.corpus import stopwords   
from sklearn.model_selection import train_test_split     
from tensorflow.keras.preprocessing.text import Tokenizer  
from tensorflow.keras.preprocessing.sequence import pad_sequences  
from tensorflow.keras.models import Sequential     
from tensorflow.keras.layers import Embedding, LSTM, Dense 
from tensorflow.keras.callbacks import ModelCheckpoint 
from tensorflow.keras.models import load_model  
import re
import nltk

nltk.download('stopwords', quiet=True)

True

### Data Pre-processing and model training

Following the procedure in https://github.com/hansmichaels/sentiment-analysis-IMDB-Review-using-LSTM/blob/master/sentiment_analysis.py.ipynb

In [2]:
# This dataset is voluminous and therefore not included in src. Please download from https://github.com/hansmichaels/sentiment-analysis-IMDB-Review-using-LSTM and place in path.
path = "../datasets/"
data = pd.read_csv(path+'IMDB_Dataset.csv')

In [3]:
english_stops = set(stopwords.words('english'))

In [4]:
def load_dataset():
    df = pd.read_csv(path+'IMDB_Dataset.csv')
    x_data = df['review']       # Reviews/Input
    y_data = df['sentiment']    # Sentiment/Output

    # PRE-PROCESS REVIEW
    x_data = x_data.replace({'<.*?>': ''}, regex = True)          # remove html tag
    x_data = x_data.replace({'[^A-Za-z]': ' '}, regex = True)     # remove non alphabet
    x_data = x_data.apply(lambda review: [w for w in review.split() if w not in english_stops])  # remove stop words
    x_data = x_data.apply(lambda review: [w.lower() for w in review])   # lower case
    
    # ENCODE SENTIMENT -> 0 & 1
    y_data = y_data.replace('positive', 1)
    y_data = y_data.replace('negative', 0)

    return x_data, y_data

x_data, y_data = load_dataset()

In [5]:
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size = 0.2)

In [6]:
def get_max_length():
    review_length = []
    for review in x_train:
        review_length.append(len(review))

    return int(np.ceil(np.mean(review_length)))

In [7]:
# Encode review
token = Tokenizer(lower=False)    # no need lower, because already lowered the data in load_data()
token.fit_on_texts(x_train)
x_train = token.texts_to_sequences(x_train)
x_test = token.texts_to_sequences(x_test)

max_length = get_max_length()

x_train = pad_sequences(x_train, maxlen=max_length, padding='post', truncating='post')
x_test = pad_sequences(x_test, maxlen=max_length, padding='post', truncating='post')

total_words = len(token.word_index) + 1   # add 1 because of 0 padding

Create and train model -- steps included for transperancy, but we load model from a saved location

In [8]:
# # ARCHITECTURE
# EMBED_DIM = 32
# LSTM_OUT = 64

# model = Sequential()
# model.add(Embedding(total_words, EMBED_DIM, input_length = max_length))
# model.add(LSTM(LSTM_OUT))
# model.add(Dense(1, activation='sigmoid'))
# model.compile(optimizer = 'adam', loss = 'binary_crossentropy', metrics = ['accuracy'])

# print(model.summary())

In [9]:
# checkpoint = ModelCheckpoint(
#     path+'models/LSTM.h5',
#     monitor='accuracy',
#     save_best_only=True,
#     verbose=1
# )

In [10]:
# model.fit(x_train, y_train, batch_size = 128, epochs = 5, callbacks=[checkpoint])

In [11]:
# model.save(path+'models/final_sent_model')

In [12]:
# One could retrain the model above, we just load a pre-trained model via the procedure above
loaded_model = load_model(path+'models/final_sent_model')

In [13]:
# Test model. Our saved model should have a test accuracy of 87.03
y_pred = (loaded_model.predict(x_test) > 0.5).astype("int32")

true = 0
for i, y in enumerate(y_test):
    if y == y_pred[i]:
        true += 1

print('Correct Prediction: {}'.format(true))
print('Wrong Prediction: {}'.format(len(y_pred) - true))
print('Accuracy: {}'.format(true/len(y_pred)*100))

Correct Prediction: 8703
Wrong Prediction: 1297
Accuracy: 87.03


Helper functions: Tokenize raw input sentence, predict utility (predict_lr), utilities to find short wrongly predicted sentences

In [14]:
def str_to_rep(review):
    # Pre-process input
    if isinstance(review, list):
        review = review[0]
    regex = re.compile(r'[^a-zA-Z\s]')
    review = regex.sub('', review)

    words = review.split(' ')
    filtered = [w for w in words if w not in english_stops]
    filtered = ' '.join(filtered)
    filtered = [filtered.lower()]

    tokenize_words = token.texts_to_sequences(filtered)
    tokenize_words = pad_sequences(tokenize_words, maxlen=max_length, padding='post', truncating='post')

    return tokenize_words

def predict_lr(review, anchors=True):
    if isinstance(review, list):
        review = review[0]
    regex = re.compile(r'[^a-zA-Z\s]')
    review = regex.sub('', review)

    words = review.split(' ')
    filtered = [w for w in words if w not in english_stops]
    filtered = ' '.join(filtered)
    filtered = [filtered.lower()]

    tokenize_words = token.texts_to_sequences(filtered)
    tokenize_words = pad_sequences(tokenize_words, maxlen=max_length, padding='post', truncating='post')

    pred = int((loaded_model.predict(tokenize_words)>=0.5)*1)
    if anchors:
        return np.array([pred])
    else:
        return pred
    
def sample_len(data, label, samp_len=10):
    count = 0
    ex = []
    labels_ex = []
    for (d, y) in zip(data, label):
        if len(d)<samp_len:
            count += 1
            ex.append((d, y))
    return ex

def find_wrong_pred(short_sentences):
    wrong_preds = []
    count = 0
    for sent, label in short_sentences:
        tokenize_words = str_to_rep(" ".join(sent))
        pred = (loaded_model.predict(tokenize_words)>=0.5)*1.
        if pred!=label:
            wrong_preds.append((sent, pred, label))
            count+=1
    return wrong_preds

short_sentences = sample_len(x_data, y_data)
wrong_preds = find_wrong_pred(short_sentences)

### Example 1: erroneous positive prediction

Anchors ouptput

In [15]:
# %load_ext autoreload
nlp = spacy.load('en_core_web_sm')
explainer = anchor_text.AnchorText(nlp, ['negative', 'positive'], use_unk_distribution=True)
np.random.seed(1)
text = " ".join(wrong_preds[1][0])
print(text)
pred = explainer.class_names[predict_lr([text])[0]]
alternative =  explainer.class_names[1 - predict_lr([text])[0]]
print('Prediction: %s' % pred)
expl = explainer.explain_instance(text, predict_lr, threshold=0.9)

print('Anchor: %s' % (' AND '.join(expl.names())))
print('Precision: %.2f' % expl.precision())

read book forget movie
Prediction: positive
Anchor: read AND movie
Precision: 0.94


LENS output

Define inp and refs for our method

In [16]:
inp = " ".join(wrong_preds[1][0])
f_inp = predict_lr([inp])

sent_dict = {i: word for (i, word) in enumerate(inp.split(" "))}
sent_dict.update({len(inp.split(" ")): '1'})

ref_dict = {i: 'PLATE' for (i, _) in enumerate(inp.split(" "))}
ref_dict.update({len(inp.split(" ")): '0'})
df_raw = pd.DataFrame([sent_dict, ref_dict])

refs_ex = df_raw.loc[1]

Produce D, find minimal sufficient factors in R2I, compute cumulative probability of necessity

In [17]:
CF_r2i = nec_suf_repl.create_CF_unk_text(df_raw.loc[0], df_raw.loc[1], predict_lr, r2i=True, datatype='Text')
CF_i2r = nec_suf_repl.create_CF_unk_text(df_raw.loc[0], df_raw.loc[1], predict_lr, r2i=False, datatype='Text')

In [18]:
print("##################")
print("Sufficiency R2I")
print("##################")

CF_df_deg_r2i = nec_suf_repl.deg_nec_suff(CF_r2i, df_raw.loc[0], f_inp, r2i=True)
sub_df_filtered = nec_suf.filter_by_degree_and_overalp(CF_df_deg_r2i, degree_thresh=0.0, subset_max_num=10)
print("--------------------")
print("cumulative nec. score: ", nec_suf.recall_nec_score(CF_r2i, sub_df_filtered, f_inp))
print("--------------------")
sub_df_filtered

##################
Sufficiency R2I
##################
--------------------
cumulative nec. score:  1.0
--------------------


Unnamed: 0,subset,degree,string,cardinality,cost
13,"[0, 2, 3]",1.0,"0 read, 2 forget, 3 movie",3,


In [19]:
print("##################")
print("Sufficiency I2R")
print("##################")

CF_df_deg_i2r = nec_suf_repl.deg_nec_suff(CF_i2r, df_raw.loc[0], f_inp, r2i=False)
sub_df_filtered_i2r = nec_suf.filter_by_degree_and_overalp(CF_df_deg_i2r, degree_thresh=0.9, subset_max_num=10)

print("--------------------")
print("cumulative nec. score: ", nec_suf.recall_nec_score(CF_i2r, sub_df_filtered_i2r, f_inp, r2i=False))
print("--------------------")
sub_df_filtered_i2r

##################
Sufficiency I2R
##################
--------------------
cumulative nec. score:  1.0
--------------------


Unnamed: 0,subset,degree,string,cardinality,cost
1,[0],1.0,0 read,1,
3,[2],1.0,2 forget,1,
4,[3],1.0,3 movie,1,


### Example 2: Correct prediction (bonus example, not in paper)

In [20]:
nlp = spacy.load('en_core_web_sm')
explainer = anchor_text.AnchorText(nlp, ['negative', 'positive'], use_unk_distribution=True)
np.random.seed(1)
text = " ".join(short_sentences[4][0])
pred = explainer.class_names[predict_lr([text])[0]]
alternative =  explainer.class_names[1 - predict_lr([text])[0]]
print('Prediction: %s' % pred)
expl = explainer.explain_instance(text, predict_lr, threshold=0.9)

print('Anchor: %s' % (' AND '.join(expl.names())))
print('Precision: %.2f' % expl.precision())

Prediction: negative
Anchor: terrible AND this
Precision: 1.00


Define inp and refs for our method

In [21]:
inp = " ".join(short_sentences[4][0])
f_inp = predict_lr([inp])

sent_dict = {i: word for (i, word) in enumerate(inp.split(" "))}
sent_dict.update({len(inp.split(" ")): '1'})

ref_dict = {i: 'UNK' for (i, _) in enumerate(inp.split(" "))}
ref_dict.update({len(inp.split(" ")): '0'})
df_raw = pd.DataFrame([sent_dict, ref_dict])

refs_ex = df_raw.loc[1]

Produce D, find minimal sufficient factors in R2I, compute cumulative probability of necessity

In [22]:
CF_r2i = nec_suf_repl.create_CF_unk_text(df_raw.loc[0], df_raw.loc[1], predict_lr, r2i=True, datatype='Text')
CF_i2r = nec_suf_repl.create_CF_unk_text(df_raw.loc[0], df_raw.loc[1], predict_lr, r2i=False, datatype='Text')

In [23]:
print("##################")
print("Sufficiency R2I")
print("##################")

CF_df_deg_r2i = nec_suf_repl.deg_nec_suff(CF_r2i, df_raw.loc[0], f_inp, r2i=True)
sub_df_filtered = nec_suf.filter_by_degree_and_overalp(CF_df_deg_r2i, degree_thresh=0.0, subset_max_num=10)
print("--------------------")
print("cumulative nec. score: ", nec_suf.recall_nec_score(CF_r2i, sub_df_filtered, f_inp))
print("--------------------")
sub_df_filtered

##################
Sufficiency R2I
##################
--------------------
cumulative nec. score:  1.0
--------------------


Unnamed: 0,subset,degree,string,cardinality,cost
3,[2],1.0,2 terrible,1,


In [24]:
print("##################")
print("Sufficiency I2R")
print("##################")

CF_df_deg_i2r = nec_suf_repl.deg_nec_suff(CF_i2r, df_raw.loc[0], f_inp, r2i=False)
sub_df_filtered_i2r = nec_suf.filter_by_degree_and_overalp(CF_df_deg_i2r, degree_thresh=0.9, subset_max_num=10)

print("--------------------")
print("cumulative nec. score: ", nec_suf.recall_nec_score(CF_i2r, sub_df_filtered_i2r, f_inp, r2i=False))
print("--------------------")
sub_df_filtered_i2r

##################
Sufficiency I2R
##################
--------------------
cumulative nec. score:  1.0
--------------------


Unnamed: 0,subset,degree,string,cardinality,cost
3,[2],1.0,2 terrible,1,


### Example 3: brittle prediction

In [25]:
nlp = spacy.load('en_core_web_sm')
explainer = anchor_text.AnchorText(nlp, ['negative', 'positive'], use_unk_distribution=True)
np.random.seed(1)
text = " ".join(short_sentences[5][0])
pred = explainer.class_names[predict_lr([text])[0]]
alternative =  explainer.class_names[1 - predict_lr([text])[0]]
print('Prediction: %s' % pred)
expl = explainer.explain_instance(text, predict_lr, threshold=0.9)

print('Anchor: %s' % (' AND '.join(expl.names())))
print('Precision: %.2f' % expl.precision())

Prediction: negative
Anchor: choose AND better AND even AND you AND paul AND verhoeven
Precision: 0.95


Define inp and refs for our method

In [26]:
inp = " ".join(short_sentences[5][0])
f_inp = predict_lr([inp])

sent_dict = {i: word for (i, word) in enumerate(inp.split(" "))}
sent_dict.update({len(inp.split(" ")): '1'})

ref_dict = {i: 'UNK' for (i, _) in enumerate(inp.split(" "))}
ref_dict.update({len(inp.split(" ")): '0'})
df_raw = pd.DataFrame([sent_dict, ref_dict])

refs_ex = df_raw.loc[1]

Produce D, find minimal sufficient factors in R2I, compute cumulative probability of necessity

In [27]:
CF_r2i = nec_suf_repl.create_CF_unk_text(df_raw.loc[0], df_raw.loc[1], predict_lr, r2i=True, datatype='Text')
CF_i2r = nec_suf_repl.create_CF_unk_text(df_raw.loc[0], df_raw.loc[1], predict_lr, r2i=False, datatype='Text')

In [28]:
print("##################")
print("Sufficiency R2I")
print("##################")

CF_df_deg_r2i = nec_suf_repl.deg_nec_suff(CF_r2i, df_raw.loc[0], f_inp, r2i=True)
sub_df_filtered = nec_suf.filter_by_degree_and_overalp(CF_df_deg_r2i, degree_thresh=0.0, subset_max_num=10)
print("--------------------")
print("cumulative nec. score: ", nec_suf.recall_nec_score(CF_r2i, sub_df_filtered, f_inp))
print("--------------------")
sub_df_filtered

##################
Sufficiency R2I
##################
--------------------
cumulative nec. score:  1.0
--------------------


Unnamed: 0,subset,degree,string,cardinality,cost
3,[2],1.0,2 choose,1,
6,[5],1.0,5 even,1,


In [29]:
print("##################")
print("Sufficiency I2R")
print("##################")

CF_df_deg_i2r = nec_suf_repl.deg_nec_suff(CF_i2r, df_raw.loc[0], f_inp, r2i=False)
sub_df_filtered_i2r = nec_suf.filter_by_degree_and_overalp(CF_df_deg_i2r, degree_thresh=0.9, subset_max_num=10)

print("--------------------")
print("cumulative nec. score: ", nec_suf.recall_nec_score(CF_i2r, sub_df_filtered_i2r, f_inp, r2i=False))
print("--------------------")
sub_df_filtered_i2r

##################
Sufficiency I2R
##################
--------------------
cumulative nec. score:  1.0
--------------------


Unnamed: 0,subset,degree,string,cardinality,cost
2,[1],1.0,1 better,1,
3,[2],1.0,2 choose,1,
4,[3],1.0,3 paul,1,
6,[5],1.0,5 even,1,
