In [1]:
import configparser
import numpy as np
import os
import sys
import tensorflow as tf

In [3]:
# set to latest model version number

def set_model_version_number():
    version_number = []
    global MODEL_VERSION
    global MODEL_PATH

    if os.path.exists(os.path.join(MODEL_SAVE_DIRECTORY,MODEL_NAME)):   
        for entry in os.listdir(os.path.join(MODEL_SAVE_DIRECTORY,MODEL_NAME)):
            version_number.append(entry)       
        MODEL_VERSION = version_number[-1]
        MODEL_PATH = os.path.join(MODEL_SAVE_DIRECTORY, MODEL_NAME, MODEL_VERSION)
        

In [4]:

config = configparser.ConfigParser()
config.read('config/main.conf')

DATASET = 1
MODEL_VERSION =  "0001"
DOWNLOAD_GOOGLE_LM = False

if DATASET == 1:
    set_dataset = "imdb"
if DATASET == 2:
    set_dataset = "s140"

DATASET_URL = (config[set_dataset]['DATASET_URL'])

DATASET_FOLDER = config[set_dataset]['DATASET_FOLDER']
DATASET_TAR_FILE_NAME = config[set_dataset]['DATASET_TAR_FILE_NAME']
DATASET_NAME = config[set_dataset]['DATASET_NAME']

MODEL_NAME = config[set_dataset]['MODEL_NAME']

CLEAN_DATA_FILE = os.path.join(DATASET_FOLDER,"normalized_dataset.csv")
TAR_FILE_PATH = os.path.join(DATASET_FOLDER,DATASET_TAR_FILE_NAME)
DATA_SET_LOCATION = os.path.join(DATASET_FOLDER,DATASET_NAME)

MODEL_SAVE_DIRECTORY = config[set_dataset]['MODEL_SAVE_DIRECTORY']
# Create the model save directory
if not os.path.exists(MODEL_SAVE_DIRECTORY):
    os.makedirs(MODEL_SAVE_DIRECTORY)
    
IMAGE_SAVE_FOLDER = config[set_dataset]['IMAGE_SAVE_FOLDER']
    
GLOVE_EMBEDDINGS = config[set_dataset]['GLOVE_EMBEDDINGS']
COUNTER_FITTED_VECTORS = config[set_dataset]['COUNTER_FITTED_VECTORS']

GLOVE_EMBEDDINGS_MATRIX = config[set_dataset]['GLOVE_EMBEDDINGS_MATRIX']
COUNTER_FITTED_EMBEDDINGS_MATRIX = config[set_dataset]['COUNTER_FITTED_EMBEDDINGS_MATRIX']

LM_URLS = config[set_dataset]['LM_URLS']
LM_DIRECTORY = config[set_dataset]['LM_DIRECTORY']

####### files required to reconstruct the final trained model ##############################
MODEL_PATH = os.path.join(MODEL_SAVE_DIRECTORY, MODEL_NAME, MODEL_VERSION)

set_model_version_number()

ASSESTS_FOLDER = os.path.join(MODEL_PATH,"assets")
MODEL_ASSETS_VOCABULARY_FILE = os.path.join(ASSESTS_FOLDER,"vocab")
MODEL_ASSETS_EMBEDDINGS_FILE = os.path.join(ASSESTS_FOLDER,"imdb_glove_embeddings_matrix")
MODEL_ASSETS_COUNTER_EMBEDDINGS_FILE = os.path.join(ASSESTS_FOLDER,"counter_embeddings_matrix")
MODEL_ASSETS_DISTANCE_MATRIX = os.path.join(ASSESTS_FOLDER,"distance_matrix.npy")
MODEL_ASSETS_SAVE_BEST_WEIGHTS = os.path.join(ASSESTS_FOLDER, "cp.ckpt")
MODEL_TRAINING_HISORTY_FILE = os.path.join(ASSESTS_FOLDER, "training_history.csv")



### load our pre trained sentiment model

In [5]:
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization # in Tensorflow 2.1 and above
import pickle 

MAX_VOCABULARY_SIZE = 50000
DIMENSION = 300
LEARNING_RATE = 1e-4


from manny_modules import tf_normalize_data as tfnd
from manny_modules import return_model as rmodel

saved_vocab = pickle.load(open(MODEL_ASSETS_VOCABULARY_FILE, 'rb'))
saved_word_index = dict(zip(saved_vocab, range(len(saved_vocab))))

saved_embeddings_matric = pickle.load(open(MODEL_ASSETS_EMBEDDINGS_FILE, 'rb'))


vectorizer_layer = TextVectorization(
    standardize=tfnd.normlize_data, 
    max_tokens=MAX_VOCABULARY_SIZE, 
    output_mode='int',
    output_sequence_length=300)

# build vocabulary, will also run the normalize_data() 
vectorizer_layer.set_vocabulary(saved_vocab)


saved_model = rmodel.create_model(vectorizer_layer,
                                  saved_embeddings_matric,
                                  saved_vocab,
                                  dimension=DIMENSION, 
                                  lrate=LEARNING_RATE)

# load the weights
saved_model.load_weights(MODEL_ASSETS_SAVE_BEST_WEIGHTS) # loads best weights saved during training

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7fb24c21c160>

### Test model - check predictions for unseen data

In [6]:
# check negative review
p_2 = [["Seriously, don't bother if you're over 12. This looks like a kids show designed purely to sell merchandise, theme park rides, etc. No logic, holes all over the shop, no characters motivation and really crap acting to top it off... just rubbish, really"]]
prob_positive = saved_model.predict(p_2)

print("Positive confidence: ",prob_positive, " Negative confidence: ", (1 - prob_positive ))


# check positive review
p = [["It's one thing to bring back elements, characters, settings and stories, and to flash them in front of the audience to cash in on the nostalgia and/or recognisable memorabilia but without using it to further the plot and other to do exactly the opposite. It was about time that Star Wars directives understood that it is too unique a product to be lend to corporate filmmakers. Star Wars needs to be understood and its uniqueness has to be acknowledged in order to make the new stories feel like they belong. This may sound too obvious but if you ever wondered why the new SW movies are so controversial this may be the reason.Like with 'Spider-Man: Into the Spider-verse (2018)' and their comicbook-industry experts participation, the creators behind The Mandalorian were experts of the industry, connoisseurs of the Star Wars Universe and even long time fans. So they were able to not only recapture the aesthetic of the grimy, battered Star Wars but also build upon it taking the most 'subtle' things into account. Things like the predominancy of puppets and practical effects over CGI, settings you can feel and touch over green screens and the abundancy of not only known elements previously seen in Star Wars, but a whole batch of new creatures, designs and overall plot elements that felt like they belong to this universe and had always been there. Exceeding expectations are not only the visual aspects but the narrative too. It might be too late for some story elements now, but it is of great importance that from now on you try to watch the unraveling of the story unspoiled. I was lucky to have seen the premiere of the show before the 'memefication' of a certain 'element' that went viral and became one of the biggest highlights of the show. But for me I saw the reveal of this element unspoiled and I was pleasantly shocked, a memory I'll always carry with me. The ability of these creators to generate such shock value and deep moments it's often baffling to me. This is proof that the creators behind the narrative are fully aware of the complexities of the universe they are tampering with and like an experienced surgeon, they are able to tweak, traverse and call back any Star Wars element as they please and with astonishing results."]]
prob_positive = saved_model.predict(p)

print("Positive confidence: ",prob_positive, " Negative confidence: ", (1 - prob_positive ))

Positive confidence:  [[0.01855881]]  Negative confidence:  [[0.9814412]]
Positive confidence:  [[0.9553545]]  Negative confidence:  [[0.04464549]]


In [776]:
test_n = "some string"
test_nn = [[test_n]]
print(test_nn)

[['some string']]


### load the distance matrix from disk (load this before running below tests)
- This is a large file (~20GB), so will take time to load

In [7]:

impodistance_matrixumpy as np

distance_matrix = np.load(MODEL_ASSETS_DISTANCE_MATRIX)



### test the distance matrix

In [24]:
target_word =  saved_word_index['england']

In [25]:
from manny_modules import nearest_neighbour as nn

nearest_neighbour, distance_to_neighbour = nn.closest_neighbours(target_word, distance_matrix, number_of_words_to_return=5, max_distance=None)

In [26]:
closest_word = [saved_vocab[x] for x in nearest_neighbour]

print("Words closest to `%s` are `%s` " % (saved_vocab[target_word], closest_word))

Words closest to `england` are `['british', 'britain', 'brits', 'uk', 'britons']` 


# Genetic Attack

### define the genetic attack class and methods

In [1168]:
from manny_modules import nearest_neighbour as nn
import itertools


class GeneticAtack(object):
    def __init__(self, model,
                 saved_vocab,
                 saved_word_index,
                 dist_mat,
                 pop_size=20, 
                 max_iters=100,
                 n1=20,):
        self.saved_vocab = saved_vocab
        self.saved_word_index = saved_word_index
        self.dist_mat = dist_mat
        self.model = model
        self.max_iters = max_iters
        self.pop_size = pop_size
        self.top_n = n1  # num of similar words to return
        self.temp = 0.3

    def do_replace(self, x_cur, pos, new_word):
        x_new = x_cur.copy()
        x_new[pos] = new_word
        return x_new

    def select_best_replacement(self, pos, x_cur, x_orig, target, replace_list):
        """ Select the most effective replacement to word at pos (pos)
        in (x_cur) between the words in replace_list """

        # now we have a list of neighbours
        new_x_list = [self.do_replace(
            x_cur, pos, w) if x_orig[pos] != w and w != 0 else x_cur for w in replace_list]
        
        
        new_x_list_neighbours = []
        for w_indx in new_x_list:
            new_x_list_words = []
            for w in w_indx:
                new_x_list_words.append(self.saved_vocab[int(w)])
            new_x_list_neighbours.append(' '.join(new_x_list_words))


        new_x_preds = []
        for n in new_x_list_neighbours: 
            new_x_preds.append(self.model.predict([[n]]))
       
        

        # Keep only top_n neighbours
        new_x_scores = new_x_preds.copy()
        
        
        original_text = []
        cur_text = []
        for w in x_cur:
            original_text.append(self.saved_vocab[int(w)])
        cur_text.append(' '.join(original_text))
        
        orig_score = self.model.predict([cur_text])
        
        new_x_scores = new_x_scores - orig_score
        # Eliminate words that are not close together
        new_x_scores[self.top_n:] = -10000000
        
#         print("new_x_list",new_x_list)
#         Z = [x for _,x in sorted(zip(new_x_scores,new_x_list))]
#         print("new_x_list sorted",Z)
        

        if (np.max(new_x_scores) > 0):
            return new_x_list
        return [x_cur]
  

       
    def perturb(self, x_cur, x_orig, neigbhours, neighbours_dist,  w_select_probs, target):
        # Pick a word that is not modified and is not UNK
      
        x_len = len(w_select_probs)
        
        rand_idx = np.random.choice(x_len, 1, p=w_select_probs)[0]
        
        while x_cur[rand_idx] != x_orig[rand_idx] and np.sum(x_orig != x_cur) < np.sum(np.sign(w_select_probs)):
            
            # The condition above has a quick hack to prevent getting stuck in infinite loop while processing examples that are too short
            # and all words `excluding articles` have been already replaced and still no-successful attack found.
            # a more elegent way to handle this could be done in attack to abort early based on the status of all population members
            # or to improve select_best_replacement by making it schocastic.
            
            rand_idx = np.random.choice(x_len, 1, p=w_select_probs)[0]
            

        # nearest neighbour list, words we'll use to replace words in original text
        replace_list = neigbhours[rand_idx]
        if len(replace_list) < self.top_n:
            replace_list = np.concatenate(
                (replace_list, np.zeros(self.top_n - replace_list.shape[0])))
        return self.select_best_replacement(rand_idx, x_cur, x_orig, target, replace_list)

    def generate_population(self, x_orig, neigbhours_list, neighbours_dist, w_select_probs, target, pop_size):
        return [self.perturb(x_orig, x_orig, neigbhours_list, neighbours_dist, w_select_probs, target) for _ in range(pop_size)]

    def crossover(self, x1, x2):
        x_new = x1.copy()
        for i in range(len(x1)):
            if np.random.uniform() < 0.5:
                x_new[i] = x2[i]
        return x_new

    def attack(self, x_orig, target, max_change=0.4):
        
        x_orig_list = x_orig.split()
        x_orig_index = []
        unknown_word = self.saved_vocab[1]
        
        for w in x_orig_list:
            try:
                x_orig_index.append(self.saved_word_index[w])
            except KeyError:
                x_orig_index.append(self.saved_word_index[unknown_word])
               
        
        x_adv = x_orig_index.copy()
        x_len = np.sum(np.sign(x_orig_index))
        
        # Neigbhours for every word.
        tmp = [nn.closest_neighbours(
            x_orig_index[i], self.dist_mat, 50, 0.5) for i in range(x_len)]
        neigbhours_list = [x[0] for x in tmp]
        neighbours_dist = [x[1] for x in tmp]
        neighbours_len = [len(x) for x in neigbhours_list]
        for i in range(x_len):
            if (x_adv[i] < 27):
                # To prevent replacement of words like 'the', 'a', 'of', etc.
                neighbours_len[i] = 0
                
        w_select_probs = neighbours_len / np.sum(neighbours_len)
        tmp = [nn.closest_neighbours( x_orig_index[i], self.dist_mat, self.top_n, 0.5) for i in range(x_len)]
        neigbhours_list = [x[0] for x in tmp]
        neighbours_dist = [x[1] for x in tmp]

        
        pop = self.generate_population(
            x_orig_index, neigbhours_list, neighbours_dist, w_select_probs, target, self.pop_size)
 
      
 
        for i in range(self.max_iters):
            
            pop_after = [e for sl in pop for e in sl]

            new_x_list_pop = []
            for w_indx in pop_after:
                new_x_list_words = []
                for w in w_indx:
                    new_x_list_words.append(self.saved_vocab[int(w)])
                new_x_list_pop.append(' '.join(new_x_list_words))


            new_x_preds = []
            for n in new_x_list_pop: 
                new_x_preds.append(self.model.predict([[n]]))
            
            pop_preds = new_x_preds.copy()
            

            #pop_preds = self.model.predict(np.array(pop))
            pop_scores = pop_preds[:]
            pop_scores = np.array(pop_scores)
         
            
            print('\t\t', i, ' -- ', np.max(pop_scores))
            pop_ranks = np.argsort(pop_scores)[::-1]
            
            top_attack = pop_ranks[0]
                 
            
            logits = np.exp(pop_scores / self.temp)
            select_probs = logits / np.sum(logits)
            
            if np.argmax(pop_preds[top_attack, :]) == target:
                return pop[top_attack]
            
            elite = [pop[top_attack]]  # elite
            # print(select_probs.shape)
            parent1_idx = np.random.choice(
                self.pop_size, size=self.pop_size-1, p=select_probs)
            parent2_idx = np.random.choice(
                self.pop_size, size=self.pop_size-1, p=select_probs)

            childs = [self.crossover(pop[parent1_idx[i]],
                                     pop[parent2_idx[i]])
                      for i in range(self.pop_size-1)]
            childs = [self.perturb(
                x, x_orig_index, neigbhours_list, neighbours_dist, w_select_probs, target) for x in childs]
            pop = elite + childs

        return None

### load dataset

In [1173]:
import pandas as pd
from sklearn.model_selection import train_test_split
TRAINING_SPLIT = 0.80

dtypes = {'sentiment': 'int', 'text': 'str'}
data_frame = pd.read_csv(CLEAN_DATA_FILE,dtype=dtypes)

# split the dataset
#train_data_raw, test_data_raw = train_test_split(data_frame, test_size= (1 - TRAINING_SPLIT), random_state = 7)

### create a sample data set from dataframe of size ```SAMPLE_SIZE```

In [1263]:
SAMPLE_SIZE = 1010

data_sample = data_frame.sample(n = SAMPLE_SIZE) 

# show the first 5 randonly selected data items
data_sample.head()

Unnamed: 0,sentiment,text
26438,0,this is one of those movies you think that the...
39463,0,once again canadian tv outdoes itself and crea...
30397,0,for all the cast and crew who worked on this e...
43248,1,beyond the clouds is a hauntingly beautiful e...
20235,0,being an unrelenting nonstop overthetop explos...


### add a new column ```probs``` to our sample dataframe to store probability of text being positove (default values = 0)

In [1264]:
data_sample['probs'] = 0
data_sample['probs'] = data_sample['probs'].astype(float) # has to be of type float, to store probability values

data_sample = data_sample.reset_index(drop=True) # reindex so we start from 0 in the sample data set
data_sample.head()

Unnamed: 0,sentiment,text,probs
0,0,this is one of those movies you think that the...,0.0
1,0,once again canadian tv outdoes itself and crea...,0.0
2,0,for all the cast and crew who worked on this e...,0.0
3,1,beyond the clouds is a hauntingly beautiful e...,0.0
4,0,being an unrelenting nonstop overthetop explos...,0.0


### now run each one against model and store probabilities

In [1265]:
for i in data_sample.index:
    p = saved_model.predict([data_sample.iloc[i]['text']])
    data_sample.at[i,'probs']=p

data_sample.head()

Unnamed: 0,sentiment,text,probs
0,0,this is one of those movies you think that the...,0.010627
1,0,once again canadian tv outdoes itself and crea...,0.021806
2,0,for all the cast and crew who worked on this e...,0.009686
3,1,beyond the clouds is a hauntingly beautiful e...,0.991471
4,0,being an unrelenting nonstop overthetop explos...,0.153403


## check the predictions are correct, if not then drop row from data set
### we only want to keep correctly classified data items

In [1266]:
drop_indexes = []

for i in data_sample.index:
    if data_sample.iloc[i]['sentiment'] == 1 and data_sample.iloc[i]['probs'] > 0.5:
        continue
    if data_sample.iloc[i]['sentiment'] == 0 and data_sample.iloc[i]['probs'] <= 0.5:
        continue
    else:
        drop_indexes.append(i)
    

In [1267]:
data_sample = data_sample.drop(drop_indexes)
data_sample = data_sample.reset_index(drop=True) # reindex dataframe to start from 0

# check how many rows we dropped due to incorrect classification
print("Number of data items dropped from sample: ",(SAMPLE_SIZE - len(data_sample)))

Number of data items dropped from sample:  57


In [1276]:
# what is the shortest review size
text_length = []
for i in data_sample.index:
    text_length.append(len(data_sample.iloc[i]['text']))
    
print('Shortest review is %d words' %np.min(text_length))

Shortest review is 137 words


### generate initial population of size ```POPULATION_SIZE```

In [None]:
POPULATION_SIZE = 10
MAXIMUM_ITERATIONS = 100




In [1269]:
population_size = 8

ga_atttack = GeneticAtack(saved_model,saved_vocab, saved_word_index, distance_matrix, max_iters=30, pop_size=population_size, n1=8)

### Generate initial population

In [1270]:
SAMPLE_SIZE = 100
TEST_SIZE = 10
test_len = []

test_idx = np.random.choice(len(data_frame), SAMPLE_SIZE, replace=False)


for i in range(SAMPLE_SIZE):
    test_len.append(len(data_frame.iloc[test_idx[i]].get('text')))

print('Shortest sentence is %d words' %np.min(test_len))

Shortest sentence is 205 words


In [1172]:
test_list = []
orig_list = []
orig_label_list = []
adv_list = []
dist_list = []


for i in range(SAMPLE_SIZE):
    x_orig_list = []
   
    x_orig = test_data_raw.iloc[test_idx[i]].get('text')
    orig_label = test_data_raw.iloc[test_idx[i]].get('sentiment')
    
    x_orig_list.append(x_orig)
    orig_preds =  saved_model.predict(x_orig_list)
    
    # if the classification is not correct we just skip it
    if int(round(orig_preds[0,0])) != orig_label:
        continue
        

    print('****** ', len(test_list) + 1, ' ********')
    test_list.append(test_idx[i])
    orig_list.append(x_orig) # save the original text, for comparision after changes
    
    # target label is the opposit of the original label
    target_label = 1 if orig_label == 0 else 0
    
    # keep track of all the original labels for our text
    orig_label_list.append(orig_label)
    
   
    x_adv = ga_atttack.attack(x_orig, target_label)

    adv_list.append(x_adv)
    if x_adv is None:
        print('%d failed' %(i+1))
        dist_list.append(100000)
    else:
        num_changes = np.sum(x_orig != x_adv)
        print('%d - %d changed.' %(i+1, num_changes))
        dist_list.append(num_changes)
        # display_utils.visualize_attack(sess, model, dataset, x_orig, x_adv)
    print('--------------------------')
    if (len(test_list)>= TEST_SIZE):
        break

******  1  ********
SORTED SCORES BY INDEX: [[[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]

 [[0]]]
		 0  --  0.009571421


TypeError: list indices must be integers or slices, not tuple