# Adversarial attack on Fake-News Detection Model

Dependencies

In [None]:
import pandas as pd
import numpy as np
import re
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
from gensim.models import Word2Vec
from nltk.corpus import stopwords
from gensim.utils import simple_preprocess
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Embedding, Flatten, Dense
from gensim.models import Word2Vec
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.text import Tokenizer
from keras.layers import LSTM, Dropout
import tensorflow as tf
from keras import layers, regularizers

Natural Language toolkit

In [None]:
import nltk
nltk.download('punkt')
from nltk.corpus import stopwords
nltk.download('stopwords')
import string
string.punctuation
from nltk.stem.porter import PorterStemmer

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


##Data Preparation

###Importing Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')
file_path = "/content/drive/MyDrive/ST456 Project/IMDB Dataset.csv"
df = pd.read_csv(file_path)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


###Manipulating Data

In the code below we extract a subset of the overall dataset, in particular 20% of it. This is done as training 1 model on the full datatset took more than 3 hours, and we need to train 4 models below.

In [None]:
n=len(df)
df=df.sample(int(0.2*n))

In [None]:
df.head()

Unnamed: 0,review,sentiment
10784,"Wow, here it finally is; the action ""movie"" wi...",negative
40677,"note to George Litman, and others: the Mystery...",negative
31122,I did not expect the performances of Gackt and...,positive
41477,blows my mind how this movie got made. i watch...,negative
5714,"It's a shame, really, that the script of this ...",negative


In [None]:
df.shape

(10000, 2)

In [None]:
df.head()

Unnamed: 0,review,sentiment
10784,"Wow, here it finally is; the action ""movie"" wi...",negative
40677,"note to George Litman, and others: the Mystery...",negative
31122,I did not expect the performances of Gackt and...,positive
41477,blows my mind how this movie got made. i watch...,negative
5714,"It's a shame, really, that the script of this ...",negative


In [None]:
# Checking for null values
df.isna().sum()

review       0
sentiment    0
dtype: int64

No missing values

In [None]:
# Checking Duplicate values
df.duplicated().sum()

18

In [None]:
# Drop Duplicate values
df = df.drop_duplicates()

In [None]:
# Define a mapping dictionary
label_mapping = {'negative': 0, 'positive': 1}

# Map the labels to their numerical values
df['Target'] = df['sentiment'].map(label_mapping)

In [None]:
df.head()

Unnamed: 0,review,sentiment,Target
10784,"Wow, here it finally is; the action ""movie"" wi...",negative,0
40677,"note to George Litman, and others: the Mystery...",negative,0
31122,I did not expect the performances of Gackt and...,positive,1
41477,blows my mind how this movie got made. i watch...,negative,0
5714,"It's a shame, really, that the script of this ...",negative,0


- 0 represents real news
- 1 represent fake news

In the chunk below we define a function which allows us to "clean" the text portion of our data. By clean, we mean to say that we are applying conventional NLP operations to the text. We then apply this function on the "review" column which is where our text lives.

In [None]:
# Removing symbols, stopwords, punctuation

stop_words = set(stopwords.words('english'))
symbols = re.compile(pattern = '[/<>(){}\|@,;]')
tags = ['href', 'http', 'https', 'www']

def text_clean(s):
    """
    Removes unwanted symbols, punctuation, and stop words from a given string.
    """
    # Convert to a string
    s = str(s)
    # symbol substitution
    s = symbols.sub(' ', s)
    # Remove unwanted tags
    for i in tags:
        s = s.replace(i, ' ')

    # Tokenize and remove stop words
    cleaned_text = ' '.join(word for word in simple_preprocess(s, deacc=True) if word not in stop_words)

    return cleaned_text

In [None]:
df['review'] = df['review'].apply(text_clean)

In [None]:
df.head()

Unnamed: 0,review,sentiment,Target
10784,wow finally action movie without action real l...,negative,0
40677,note george litman others mystery science thea...,negative,0
31122,expect performances gackt hyde well done expec...,positive,1
41477,blows mind movie got made watched worked home ...,negative,0
5714,shame really script film holes could shake sti...,negative,0


In [None]:
# Splitting the DataFrame into features (X) and target variable (y)
X = df['review'].values
y = df['Target'].values.reshape(-1, 1)

# Splitting the data into train and test sets (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Displaying the shapes of the train and test sets
print("Shape of X_train:", X_train.shape)
print("Shape of y_train:", y_train.shape)
print("Shape of X_test:", X_test.shape)
print("Shape of y_test:", y_test.shape)

Shape of X_train: (7985,)
Shape of y_train: (7985, 1)
Shape of X_test: (1997,)
Shape of y_test: (1997, 1)


## Modelling

### Tokenizing

In the chunk below we are acquiring tokens for the words which appear in our text, so that each word in our text has a corresponding token (a token here will be a positive integer). We then apply this to our dataset so that our X values go from being a sequence of cleaned text to a sequence of tokens.

In [None]:
# Initialize the tokenizer
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(X)

# Convert text to sequences of integers
X_train_tok = tokenizer.texts_to_sequences(X_train)
X_test_tok = tokenizer.texts_to_sequences(X_test)

In [None]:
# Create the vocabulary
vocab_size = len(tokenizer.word_index) + 1  # Add 1 for the padding token
print("Vocabulary size:", vocab_size)

Vocabulary size: 50688


###Padding

In this section we try to pad our X values, so that all of our X values have sequences of the same length.

In [None]:
# Find the maximum sequence length in the dataset
max_sequence_length = max(len(seq) for seq in X)
print("Maximum sequence length:", max_sequence_length)

Maximum sequence length: 5694


In [None]:
# Pad sequences to ensure uniform length
X_train_padded = pad_sequences(X_train_tok, maxlen = max_sequence_length)
X_test_padded = pad_sequences(X_test_tok, maxlen = max_sequence_length)

In [None]:
# Print sample data
print("Sample data:")
for i in range(1):
    print(X_train_padded[i].shape)

# Print sample data
print("Sample Tokenized data:")
for i in range(1):
    print(X_train_tok[i])

Sample data:
(5694,)
Sample Tokenized data:
[2387, 28193, 2584, 10, 17, 174, 287, 40, 529, 11810, 47, 223, 404, 744, 25, 362, 72, 972, 8655, 698, 24, 190, 174, 491, 1145, 8, 6058, 4926, 133, 484, 796, 177, 10269, 1914, 1676, 10, 23227, 1438, 248, 5793, 4574, 1657, 16723, 7496, 2973, 7, 541, 6766, 31, 3, 2159, 2387, 3893, 71, 86, 280, 44, 103, 11, 6, 52, 1095, 15, 271, 564, 3, 9547, 3003, 598, 564, 38013, 27, 369, 7690, 23, 24, 276, 413, 15642, 27, 899]


### Embedding Layers

In this section we define all the embedding spaces which we wish to use. For us to use an embedding space in our models we need to create a layer which takes as an input a tensor whose size is equal to that of the number of different tokens we have (i.e. vocab_size), and outputs a tensor whose size is equal to the dimension of the embedding space which we wish to use. This is exactly what we do below. We build 3 different layers: Baseline (wherein we learn the embeddings from scratch), GloVe and CounterFitted (this is the embedding where words that are close to each other are synonyms). We will use either the Baseline and GloVe as the first layer of the models which we wish to attack. The CounterFitted embedding space will not be used as a layer for the models we will build, but rather as a feature of the attacking mechanism.

####1. Baseline Embedding layer

In [None]:
Baseline_Embedding = tf.keras.layers.Embedding(input_dim = vocab_size, output_dim=300, input_length=max_sequence_length)

#### 2. GloVe Embedding Layer

In [None]:
def load_embeddings(file_path):
    embeddings = {}
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            values = line.split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            embeddings[word] = vector
    return embeddings

In [None]:
glove_file_path = '/content/drive/MyDrive/ST456 Project/glove.42B.300d.txt'
glove_embeddings = load_embeddings(glove_file_path)

# Create an embedding matrix
vocab_size = len(tokenizer.word_index) + 1
embedding_dim = 300  # GloVe dimension (300D in our case)

"""
Here we create a matrix of size vocab_size by 300, which will house the vectors
of all the tokens in our vocabulary
"""
embedding_matrix = np.zeros((vocab_size, embedding_dim))

# Populate the embedding matrix with GloVe embeddings
for word, i in tokenizer.word_index.items():
    embedding_vector = glove_embeddings.get(word)
    """
    This means that any words which aren't in the vocabulary will have an
    embedding vector full of 0s
    """
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

# Create an Embedding layer
GloVe_Embedding = tf.keras.layers.Embedding(
    input_dim=vocab_size,
    output_dim=embedding_dim,
    weights=[embedding_matrix],
    input_length=max_sequence_length,
    trainable=False
)




#### 3. Counter Fitted Dictionary

In [None]:
# counter_fitted_embeddings dictionary
embeddings_dict = {}
with open("/content/drive/MyDrive/ST456 Project/counter-fitted-vector.txt", 'r', encoding='utf-8') as f:
    for line in f:
        values = line.split()
        token = values[0]
        vector = np.asarray(values[1:], "float32")
        embeddings_dict[token] = vector

### Models

We will be considering 2 kinds of architecture for the models which we wish to attack: LSTM and CNN.

Any model which we wish to attack requires us to specify what its embedding space is. We will consider 2 embedding spaces for the models we wish to attack: BaseLine and GloVe embedding spaces.

#####1 . LSTM Model

In [None]:
class LSTM_Model(tf.keras.Model):
    def __init__(self, embedding_layer):
        super(LSTM_Model, self).__init__()
        self.embedding = embedding_layer
        self.lstm = tf.keras.layers.LSTM(units=100, dropout=0.2)
        self.dropout = tf.keras.layers.Dropout(0.2)
        self.dense = tf.keras.layers.Dense(1, activation='sigmoid')

    def call(self, inputs):
        x = self.embedding(inputs)
        x = self.lstm(x)
        x = self.dropout(x)
        return self.dense(x)

##### 2. CNN Model

In [None]:
class CNN_Model(tf.keras.Model):
    def __init__(self, embedding_layer):
        super().__init__()
        self.embedding = embedding_layer

        # Convolutional layers
        self.conv1 = tf.keras.layers.Conv1D(filters=128, kernel_size=3, activation='relu')
        self.conv2 = tf.keras.layers.Conv1D(filters=128, kernel_size=4, activation='relu')

        # Pooling layers
        self.pool1 = tf.keras.layers.GlobalMaxPooling1D()
        self.pool2 = tf.keras.layers.GlobalMaxPooling1D()

        # Concatenation layer
        self.concat = tf.keras.layers.Concatenate()
        self.dropout = tf.keras.layers.Dropout(0.2)

        # Dense output layer with sigmoid activation function
        self.dense = tf.keras.layers.Dense(1, activation='sigmoid')

    def call(self, inputs):
        x = self.embedding(inputs)
        x1 = self.pool1(self.conv1(x))
        x2 = self.pool2(self.conv2(x))
        x = self.concat([x1, x2])
        x = self.dropout(x)
        return self.dense(x)


###Training the models

##### 1. LSTM Model - With Baseline embedding layer

In [None]:
LSTM_Baseline = LSTM_Model(Baseline_Embedding)

# Compile the model
LSTM_Baseline.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

# Train the model
history1 = LSTM_Baseline.fit(X_train_padded , y_train, epochs=10, batch_size=32, validation_split=0.2)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
test_loss, test_accuracy = LSTM_Baseline.evaluate(X_test_padded, y_test)

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")

Test Loss: 0.7467
Test Accuracy: 0.8418


##### 2. LSTM Model - with GloVe Embedding Layer

In [None]:
LSTM_GloVe = LSTM_Model(GloVe_Embedding)

LSTM_GloVe.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

# Train the model
history2 = LSTM_GloVe.fit(X_train_padded , y_train, epochs=20, batch_size=32, validation_split=0.2)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
test_loss, test_accuracy = LSTM_GloVe.evaluate(X_test_padded, y_test)

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")

Test Loss: 0.5766
Test Accuracy: 0.8333


#### CNN Model - With BaseLine Embedding Layer

In [None]:
CNN_Baseline = CNN_Model(Baseline_Embedding)

In [None]:
# Compile the model
CNN_Baseline.compile(optimizer='adam',loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
history1 = CNN_Baseline.fit(X_train_padded , y_train, epochs=10, batch_size=32, validation_split=0.2)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
test_loss, test_accuracy = CNN_Baseline.evaluate(X_test_padded, y_test)

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")

Test Loss: 0.4195
Test Accuracy: 0.8243


#### CNN Model - with GloVe Embedding Layer

In [None]:
CNN_GloVe = CNN_Model(GloVe_Embedding)

CNN_GloVe.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=['accuracy'])

# Train the model
history2 = CNN_GloVe.fit(X_train_padded , y_train, epochs=10, batch_size=32, validation_split=0.2)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
test_loss, test_accuracy = CNN_GloVe.evaluate(X_test_padded, y_test)

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")

Test Loss: 0.3870
Test Accuracy: 0.8707


##Attacking

Our method of attack is a 2-step approach: select a set of tokens to perturb, and then perturb those tokens.

## Black-Box Attack

### Scoring




The selection of what tokens to perturb is subject to which tokens score highest in Combined Score. The Combined Score of a token is found by computing the Temporal Head and Tail scores. Note that both scores are computed using a black box method, since all we're doing is a forward pass on a sequence of tokens (later on we'll show the code for white-box scoring).

In the chunk below we build the function which computes the Temporal Head Score for all the tokens in a given input.

In [None]:
def temporal_head_score(model, inputs, dictionary):


    """
    Calculate the Temporal Head Score (THS) for each token in the input sequence.
    The ith value in values and ths is the ith sequence's score and the ith
    words score respectively.
    """
    values = []
    ths = []

    """
    sequence_length is the amount of words in the sequence of words
    """
    sequence_length = inputs.shape[1]
    # Iterate over each token position in the input sequence
    for i in range(sequence_length):
        """
        a is the ith token
        """
        a = inputs[:, :i + 1][0][-1].numpy()

        """
        The reason for this if statement is that we dont want to calculate any
        sequences of padded tokens who's last token is "0".
        This is because any such sequence will be sequences comprised soley
        of padding character " " (since its correspodning token is "0").
        We don't care about the scores of these sequences since these correspond
        to subsequences of the padded input sequence which don't exist in the
        original input sequence.
        """
        if a == 0:
            continue

        a_word = tokenizer.sequences_to_texts([[a]])[0]
        if a_word in dictionary.keys():
            """
            current_input is the sequences of tokens up to and including the ith token,
            which we will call the ith sequence.
            """
            current_input = inputs[:, :i + 1]

            padded=pad_sequences(current_input, maxlen = max_sequence_length)
            squeeze=tf.expand_dims(padded, axis=0)
            input=tf.squeeze(squeeze, axis=1)

            """
            predictions is the output of doing a forward pass on current_input
            """
            predictions = model(input)

            tensor_value = predictions.numpy()
            value = tensor_value[0][0]
            values.append(value)
            if len(values) >= 2:
                ths.append(values[-1] - values[-2])
            else:
                ths.append(values[0])
        else:
            """
            Here we set the ths to be -2 for any word not found in the dictionary.
            This is done so that we don't get an error in the perturbing stage.
            """
            ths.append(-2)

    return ths

In the chunk below we build the function which computes the Temporal Tail Score for all the tokens in a given input.

In [None]:
"""
The logic behind why the code looks the way it does is analogous to the
reasoning found in the code above.
"""
def temporal_tail_score(model, inputs, dictionary):

    """
    Calculate the Temporal Tail Score (TTS) for each token in the input sequence.
    """
    tts = []
    sequence_length = inputs.shape[1]
    values = []
    # Iterate over each token position in the input sequence
    for i in range(sequence_length):

        k=sequence_length-(i+1)

        a = inputs[:, :k + 1][0][-1].numpy()

        if a == 0:
            continue

        a_word = tokenizer.sequences_to_texts([[a]])[0]
        if a_word in dictionary.keys():
            current_input = inputs[:, (k):]

            num_zeros = max_sequence_length - current_input.shape[1]

            # Create paddings
            paddings = tf.constant([[0, 0], [num_zeros, 0]])

            # Pad the tensor
            padded_tensor = tf.pad(current_input, paddings, "CONSTANT")

            squeeze=tf.expand_dims(padded_tensor, axis=0)
            input=tf.squeeze(squeeze, axis=1)
            predictions=model(input)

            tensor_value = predictions.numpy()
            value = tensor_value[0][0]
            values.append(value)  # Get the prediction for the last token


            if len(values) >= 2:

                        tts.insert(0, values[-1] - values[-2])
            else:
                        tts.insert(0,values[0])  # Calculate TTS for the i-th token
        else:
            tts.append(-2)  # Set TTS to 0 if token is not in dictionary
    return tts


In the chunk below we build a function which computes the Combined Score of all the tokens in a given input.

In [None]:
def combined_score(model, inputs, dictionary):
    ths = temporal_head_score(model, inputs, dictionary)
    tts = temporal_tail_score(model, inputs, dictionary)
    cs = [(ths +tts) / 2 for ths, tts in zip(ths, tts)]
    return cs

In [None]:
# finding the score of an example datapoint
score = combined_score(CNN_GloVe, tf.expand_dims(X_test_padded[90], axis=0), dictionary=embeddings_dict)
print(score)

[0.3195313811302185, -0.05974264442920685, -0.9913156256079674, 0.13509230315685272, -0.04315360635519028, 0.14780786633491516, -0.009760662913322449, 0.022579066455364227, 0.010073408484458923, -0.006896153092384338, -0.004609458148479462, 0.008370570838451385, 0.007173985242843628, 0.0014569759368896484, 0.0018149614334106445, -0.001122802495956421, -0.00423344224691391, 0.03192949295043945, -0.01662822812795639, -1.0023172497749329, -0.0016342103481292725, 0.012084923684597015, 0.017844680696725845, -0.029080715030431747, -0.005506284534931183, -0.00011289864778518677, -0.0014517828822135925, 0.006255120038986206, -1.0020746290683746, -0.017857752740383148, 0.004758201539516449, -0.013967715203762054, -0.00456681102514267, -0.007192827761173248, -0.00526881217956543, -0.037954650819301605, -0.018614262342453003, -0.019050762057304382, -0.04769676923751831, 0.039259642362594604, 0.04974103718996048, -0.016116034239530563, 0.02197766676545143, -0.04749703407287598, 0.01231566816568374

## Perturbing the Data

In this section we focus on the 2nd step of our attack, which is the mechanism that we use to perturb the tokens selected.

In the function below we are primarily building a function which finds a given word's closest neigbbour, where close is in terms of distance to its corresponding vector in the embedding space of the vector dictionary.

In [None]:
import numpy as np

def euclidean_distance(v1, v2):
    return np.linalg.norm(v1 - v2)

def find_nearest_neighbor(target_word, vector_dictionary):

    if target_word in vector_dictionary.keys():
      target_vector=vector_dictionary[target_word]
      nearest_neighbor = None
      min_distance = float('inf')
      rem = vector_dictionary.copy()
      rem.pop(target_word)
      for key, vector in rem.items():
          distance = euclidean_distance(target_vector, vector)
          if distance < min_distance:
              min_distance = distance
              nearest_neighbor = key
      return nearest_neighbor

In the attack function mounts an attack (specifically a black-box attack) on a given text with respect to a model. In particular, it changes k% of the tokens in the given text and returns the perturbed version of the text (i.e. the text after the changes).

In [None]:
def attack(text, model, dictionary, k):

    """
    text is the padded sequence of tokens
    """
    nonzero_indices = np.nonzero(text)
    start_index= nonzero_indices[0][0]

    """
    non_padded_tokens is text, but without the padding
    """
    non_padded_tokens=text[start_index:,]

    """
    words is the list of words which correspond to the tokens found in text
    """
    words=tokenizer.sequences_to_texts([non_padded_tokens])[0].split()

    # Get the scores of each word in the text
    scores = combined_score(model, tf.expand_dims(text, axis=0), dictionary)
    # Calculate the threshold for top k% scores
    threshold = sorted(scores)[-int(len(scores) * (k/100))]
    # indexes of words with top k% scores
    top_words_indexes = [i for i, score in enumerate(scores) if score > threshold]
    # Replace top words
    for i in top_words_indexes:
        word=words[i]
        if word in dictionary.keys():
          words[i] = find_nearest_neighbor(word,dictionary)

    return words


The compare function is an extension of the attack function, wherein we attack a given text (again, subject to a given model), create the perturbed version (just like in attack) and then pass the perturbed version back into the model which we're taking to see whether or not the model's been fooled (a model is fooled if the forward pass on the perturbed version changes the label of the original version).

In [None]:
def compare(text, model, dictionary, k):

    """
    computing the probability that the perturbed is a positive review
    """
    score_of_original=model.predict(tf.expand_dims(text, axis=0))

    """
    text is the padded sequence of tokens
    """
    nonzero_indices = np.nonzero(text)
    start_index= nonzero_indices[0][0]

    """
    non_padded_tokens is text, but without the padding
    """
    non_padded_tokens=text[start_index:,]

    """
    words is the list of words which correspond to the tokens found in text
    """
    words=tokenizer.sequences_to_texts([non_padded_tokens])[0].split()

    # Get the scores of each word in the text
    scores = combined_score(model, tf.expand_dims(text, axis=0), dictionary)
    # Calculate the threshold for top k% scores
    threshold = sorted(scores)[-int(len(scores) * (k/100))]
    # indexes of words with top 10% scores
    top_words_indexes = [i for i, score in enumerate(scores) if score > threshold]
    # Replace top words
    for i in top_words_indexes:
        word=words[i]
        if word in dictionary.keys():
          words[i] = find_nearest_neighbor(word,dictionary)

    """
    converting the words in the perturbed version into its corresponding tokens
    """
    attack1_to_token=tokenizer.texts_to_sequences([words])

    """
    padding the tokens of the perturbed version
    """
    padded_attack1=pad_sequences(attack1_to_token, maxlen = max_sequence_length)

    """
    computing the probability that the perturbed is a positive review
    """
    score_of_adv=model.predict(tf.squeeze(tf.expand_dims(padded_attack1, axis=0), axis=1))

    print("The probability that the original is a positive review:", score_of_original[0][0])
    print("The probability that the perturbed is a positive review:", score_of_adv[0][0])

In [None]:
model=CNN_GloVe
text=X_test_padded[10]
dictionary=embeddings_dict
k=50
compare(text, model, dictionary, k)

The probability that the original is a positive review: 1.9100762e-06
The probability that the perturbed is a positive review: 7.8532385e-06


In [None]:
model = CNN_GloVe
text=X_test_padded[10]
dictionary=embeddings_dict
k=30
compare(text, model, dictionary, k)

The probability that the original is a positive review: 0.9998481
The probability that the perturbed is a positive review: 0.9972825


###Results

The results function is an extension of the compare function, wherein we're attacking a set of texts. For each text we are attacking it (again, we're attacking the text using the black-box method, subject to a given model and we're only attacking k% of the tokens in the text) to generate its corresponding perturbed version, and we're then passing the perturbed version into the given model to gain a comparison between the original version and the perturbed.

In [None]:
def results(texts, model, dictionary, k):
    K = 1
    """
    This is the same as the initial compare, except it allows us to taken in
    multiple inputs
    """

    column_names = ['probability of original', 'probability of perturbed', 'class of original', 'class of perturbed']

    # Create an empty DataFrame with specified column names
    df_results = pd.DataFrame(columns=column_names)

    for text in texts:

      print(K)

      """
      computing the probability that the perturbed is a positive review
      """
      score_of_original=model.predict(tf.expand_dims(text, axis=0))

      """
      text is the padded sequence of tokens
      """
      nonzero_indices = np.nonzero(text)
      start_index= nonzero_indices[0][0]

      """
      non_padded_tokens is text, but without the padding
      """
      non_padded_tokens=text[start_index:,]

      """
      words is the list of words which correspond to the tokens found in text
      """
      words=tokenizer.sequences_to_texts([non_padded_tokens])[0].split()

      # Get the scores of each word in the text
      scores = combined_score(model, tf.expand_dims(text, axis=0), dictionary)
      # Calculate the threshold for top k% scores
      threshold = sorted(scores)[-int(len(scores) * (k/100))]
      # indexes of words with top 10% scores
      top_words_indexes = [i for i, score in enumerate(scores) if score > threshold]
      # Replace top words
      for i in top_words_indexes:
        word=words[i]
        if word in dictionary.keys():
          words[i] = find_nearest_neighbor(word,dictionary)

      """
      converting the words in the perturbed version into its corresponding tokens
      """
      attack1_to_token=tokenizer.texts_to_sequences([words])

      """
      padding the tokens of the perturbed version
      """
      padded_attack1=pad_sequences(attack1_to_token, maxlen = max_sequence_length)

      """
      computing the probability that the perturbed is a positive review
      """
      score_of_adv=model.predict(tf.squeeze(tf.expand_dims(padded_attack1, axis=0), axis=1))

      class_of_perturbed=round(score_of_adv[0][0])

      row = {'probability of original': score_of_original[0][0], 'probability of perturbed': score_of_adv[0][0], 'class of original': 1,'class of perturbed': class_of_perturbed}

      df_results.loc[len(df_results)] = row

      K=K+1

    df_results = df_results.reset_index(drop=True)
    return df_results

In the chunk below we generate data for us to attack. This is done by extracting a sample from test set whose response variable is 1 (i.e. positive review).

In [None]:
arr=y_test
non_zero_indices = np.nonzero(arr)
indices=non_zero_indices[0]

In [None]:
np.random.seed(78)
sample_of_indices=np.random.choice(indices, size=20, replace=False)
subset=X_test_padded[sample_of_indices]

In [None]:
LSTM_BAseline_IMDB_Black

Unnamed: 0,probability of original,probability of perturbed,class of original,class of perturbed
0,0.614466,0.000843,1,0
1,0.996019,0.000327,1,0
2,0.996017,0.997208,1,1
3,0.154077,0.000103,1,0
4,0.999747,0.996679,1,1
5,0.999924,0.000728,1,0
6,0.990206,0.02736,1,0
7,0.996662,0.998934,1,1
8,0.001191,3.7e-05,1,0
9,0.000109,0.000139,1,0


### 2. White-Box attack

Up to this point we've used a black-box method for attacking text, and so in this section we will turn our focus to white-box attacks

### Scoring

The method for white-box scoring is analogous to the black-box one (hence the need for 3 sets of functions), except for the method of scoring. Our white-box scoring method rests on the computation of losses, and this faciliated using the tf.GradientTape()

In [None]:
def temporal_head_white(model, inputs, dictionary):
    num_tokens = inputs.shape[1]
    dloss = [0] * num_tokens
    losses = [0] * num_tokens

    for i in range(0, num_tokens - 1):
        a = inputs[:, :i + 1][0][-1]
        if a == 0:
            continue

        tempinputs = inputs[:, :i + 1]
        with tf.GradientTape() as tape:
            tempoutput = model(tempinputs)
            loss = tf.keras.losses.binary_crossentropy(1,tempoutput)
            losses[i] = loss.numpy()[0]
        dloss[0] = 0
        for i in range(1, num_tokens):
            dloss[i] = losses[i] - losses[i-1]

    # Loop over each input sequence
    for j in range(inputs.shape[0]):
        sequence = inputs[j]
        words = tokenizer.sequences_to_texts([sequence])[0].split()
        for i, word in enumerate(words):
            if word not in dictionary:
                # Set the corresponding entry in dloss to -inf
                dloss[i] = -np.inf

    return dloss

In [None]:
def temporal_tail_white(model, inputs, dictionary):
    num_tokens = inputs.shape[1]
    dloss = [0] * num_tokens
    losses = [0] * num_tokens

    for i in range(0, num_tokens-1):
        a = inputs[:, i:][0][0]
        if a == 0:
            continue

        tempinputs = inputs[:, i:]
        with tf.GradientTape() as tape:
            tempoutput = model(tempinputs)
            loss = (tempoutput[0])
            losses[i] = loss.numpy()[0]
        dloss[-1] = losses[-1] - 1/2
        for i in range(1, num_tokens-1):
            dloss[i] = losses[i] - losses[i+1]
            # print(dloss[i])

    return dloss

In [None]:
def combined_score_whitebox(model, inputs, dictionary):
    ths = temporal_head_white(model, inputs, dictionary)
    tts = temporal_tail_white(model, inputs, dictionary)
    cs = [(ths +tts) / 2 for ths, tts in zip(ths, tts)]
    return cs

### Attack

In [None]:
def compare_whitebox(text, model, dictionary, k):

    """
    computing the probability that the perturbed is a positive review
    """
    score_of_original=model.predict(tf.expand_dims(text, axis=0))

    """
    text is the padded sequence of tokens
    """
    nonzero_indices = np.nonzero(text)
    start_index= nonzero_indices[0][0]

    """
    non_padded_tokens is text, but without the padding
    """
    non_padded_tokens=text[start_index:,]

    """
    words is the list of words which correspond to the tokens found in text
    """
    words=tokenizer.sequences_to_texts([non_padded_tokens])[0].split()


    # Get the scores of each word in the text and removing scores for padded part
    scores = combined_score_whitebox(model, np.expand_dims(text, axis=0), dictionary)
    scores = scores[start_index:]


    # Calculate the threshold for top k% scores
    threshold = sorted(scores)[-int(len(scores) * (k/100))]
    # indexes of words with top 10% scores
    top_words_indexes = [i for i, score in enumerate(scores) if score > threshold]
    # Replace top words
    for i in top_words_indexes:
        word=words[i]
        if word in dictionary.keys():
          words[i] = find_nearest_neighbor(word,dictionary)

    """
    converting the words in the perturbed version into its corresponding tokens
    """
    attack1_to_token=tokenizer.texts_to_sequences([words])

    """
    padding the tokens of the perturbed version
    """
    padded_attack1=pad_sequences(attack1_to_token, maxlen = max_sequence_length)

    """
    computing the probability that the perturbed is a positive review
    """
    score_of_adv=model.predict(tf.squeeze(tf.expand_dims(padded_attack1, axis=0), axis=1))

    print("The probability that the original is a positive review:", score_of_original[0][0])
    print("The probability that the perturbed is a positive review:", score_of_adv[0][0])

In [None]:
# model=LSTM_Baseline
# text= np.expand_dims(X_test_padded[10], axis=0)
# dictionary=embeddings_dict
# # k=50
# combined_score_whitebox(model,text,dictionary)

In [None]:
model=LSTM_Baseline
text=X_test_padded[98]
dictionary=embeddings_dict
k=50
compare_whitebox(text, model, dictionary, k)

193
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.960464477539063e-08, 0.0, 0.0, -5.960464477539063e-08, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.960464477539063e-08, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1920928955078125e-07, 5.960464477539063e-08, 0.0, -5.960464477539063e-08, 1.1920928955078125e-07, 1.1920928955078125e-07, -1.1920928955078125e-07, -1.1920928955078125e-07, -1.1920928955078125e-07, 0.0, 0.0, 0.0, 0.0, -5.960464477539063e-08, 5.960464477539063e-08, 5.960464477539063e-08, 5.960464477539063e-08, -5.960464477539063e-08, 5.960464477539063e-08, -5.960464477539063e-08, 0.0, 0.0, 0.0, -1.1920928955078125e-07, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1920928955078125e-07, 0.0, 0.0, 0.0, 5.960464477539063e-08, 0.0, 0.0, -5.960464477539063e-08, -5.960464477539063e-08, 5.960464477539063e-08, -5.960464477539063e-08, 0.0, 5.960464477539063e-08, 0.0, 0.0, 1.1920928955078125e-07, 0.0, -5.9604644775

In [None]:
model=LSTM_Baseline
text=X_test_padded[557]
dictionary=embeddings_dict
k=50
compare_whitebox(text, model, dictionary, k)

350
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 

In [None]:
model=LSTM_Baseline
text=X_test_padded[3]
dictionary=embeddings_dict
k=50
compare_whitebox(text, model, dictionary, k)

614
[-4.76837158203125e-07, 4.76837158203125e-07, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -4.76837158203125e-07, 4.76837158203125e-07, 0.0, 0.0, -4.76837158203125e-07, 0.0, 4.76837158203125e-07, -4.76837158203125e-07, 4.76837158203125e-07, 0.0, 0.0, -4.76837158203125e-07, 0.0, 0.0, 0.0, 4.76837158203125e-07, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -4.76837158203125e-07, 4.76837158203125e-07, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -4.76837158203125e-07, 0.0, 4.76837158203125e-07, -4.76837158203125e-07, 0.0, 4.76837158203125e-07, -4.76837158203125e-07, 4.76837158203125e-07, 0.0, 0.0, 0.0, -4.76837158203125e-07, 4.76837158203125e-07, 0.0, 0.0, 0.0, 0.0, -4.76837158203125e-07, 4.76837158203125e-07, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -4.76837158203125e-07, 4.76837158203125e-07, 0.0, -4.76837158203125e-07, 4.76837158203125e-07, 0.0, -4.76837158203125e-07, 4.76837158203125e-07, -4.76837158203125e-07, 0.0, 4.76837158203125e-07, 

In [None]:
model=LSTM_Baseline
text=X_test_padded[12]
dictionary=embeddings_dict
k=50
compare_whitebox(text, model, dictionary, k)

130
[0.0120086669921875, -0.002251863479614258, 0.009796619415283203, 0.007256746292114258, 0.011233806610107422, 0.01802206039428711, 0.03319835662841797, 0.0511164665222168, -0.03028106689453125, 0.04498934745788574, 0.05166006088256836, -0.0025742053985595703, -0.03282785415649414, 0.12018895149230957, -0.15253257751464844, 0.07686686515808105, 0.10105299949645996, 0.3954288959503174, -0.027106046676635742, 0.12205076217651367, 0.0020771026611328125, -0.012702465057373047, 0.14526796340942383, -0.018094301223754883, 0.18817734718322754, 0.11918354034423828, -0.20109224319458008, 0.17080581188201904, 0.03006565570831299, -0.002100825309753418, 0.0015039443969726562, 0.0011650323867797852, -0.006684422492980957, -0.046167969703674316, 0.05064511299133301, 0.00019466876983642578, 0.0016421079635620117, 0.0006282329559326172, -0.3283069133758545, 0.22469007968902588, 0.10606658458709717, -0.00827336311340332, 0.010565996170043945, -0.003676176071166992, 0.008616209030151367, 0.002683281

### Results

In [None]:
def results_whitebox(texts, model, dictionary, k):

    """
    This is the same as the initial compare, except it allows us to taken in
    multiple inputs
    """

    column_names = ['probability of original', 'probability of perturbed', 'class of original', 'class of perturbed']

    # Create an empty DataFrame with specified column names
    df_results = pd.DataFrame(columns=column_names)

    for text in texts:

      """
      computing the probability that the perturbed is a positive review
      """
      score_of_original=model.predict(tf.expand_dims(text, axis=0))

      """
      text is the padded sequence of tokens
      """
      nonzero_indices = np.nonzero(text)
      start_index= nonzero_indices[0][0]

      """
      non_padded_tokens is text, but without the padding
      """
      non_padded_tokens=text[start_index:,]

      """
      words is the list of words which correspond to the tokens found in text
      """
      words=tokenizer.sequences_to_texts([non_padded_tokens])[0].split()


      # Get the scores of each word in the text and removing scores for padded part
      scores = combined_score_whitebox(model, np.expand_dims(text, axis=0), dictionary)
      scores = scores[start_index:]


      # Calculate the threshold for top k% scores
      threshold = sorted(scores)[-int(len(scores) * (k/100))]
      # indexes of words with top 10% scores
      top_words_indexes = [i for i, score in enumerate(scores) if score > threshold]
      # Replace top words
      for i in top_words_indexes:
          word=words[i]
          if word in dictionary.keys():
            words[i] = find_nearest_neighbor(word,dictionary)

      """
      converting the words in the perturbed version into its corresponding tokens
      """
      attack1_to_token=tokenizer.texts_to_sequences([words])

      """
      padding the tokens of the perturbed version
      """
      padded_attack1=pad_sequences(attack1_to_token, maxlen = max_sequence_length)

      """
      computing the probability that the perturbed is a positive review
      """
      score_of_adv=model.predict(tf.squeeze(tf.expand_dims(padded_attack1, axis=0), axis=1))

      class_of_perturbed=round(score_of_adv[0][0])

      row = {'probability of original': score_of_original[0][0], 'probability of perturbed': score_of_adv[0][0], 'class of original': 1,'class of perturbed': class_of_perturbed}

      df_results.loc[len(df_results)] = row

    df_results = df_results.reset_index(drop=True)
    return df_results

In [None]:
arr=y_test
non_zero_indices = np.nonzero(arr)
indices=non_zero_indices[0]
# Set random seed for reproducibility
np.random.seed(78)
sample_of_indices=np.random.choice(indices, size=20, replace=False)

In [None]:
sample_of_indices

array([1691,  501,  760, 1553,  987,  523,  431, 1575, 1989, 1324,  539,
       1235, 1537, 1026,  899, 1560,   15, 1975, 1361, 1240])

In [None]:
model=LSTM_GloVe
texts= X_test_padded[sample_of_indices]
dictionary=embeddings_dict
k=50

results_whitebox(texts, model, embeddings_dict, k)



In [None]:
results_LSTM_GloVe

In [None]:
file_path = '/content/drive/MyDrive/ST456 Project/results_for_LSTM_GloVe.csv'

results_LSTM_GloVe.to_csv(file_path, index=False)

In [None]:
model= LSTM_Baseline
texts= X_test_padded[sample_of_indices]
dictionary=embeddings_dict
k=50

results_whitebox(texts, model, embeddings_dict, k)



Unnamed: 0,probability of original,probability of perturbed,class of original,class of perturbed
0,2.2e-05,3.2e-05,1,0
1,0.999981,0.209767,1,0
2,0.976123,0.000633,1,0
3,0.999986,2.8e-05,1,0
4,0.999361,0.000124,1,0
5,0.00042,7.4e-05,1,0
6,0.999924,0.000387,1,0
7,0.824526,0.0001,1,0
8,0.999924,0.446718,1,0
9,0.84685,0.000139,1,0


## Cross Attack

Here we are trying to purturb the data using one model and trying to see how well it attacks another model. In the example we take in the LSTM Baseline in scoring functions to purturb the data and see if it is able to fool CNN Baseline model

In [None]:
def results_whitebox_cross(texts, model1,model2, dictionary, k):

    """
    This is the same as the initial compare, except it allows us to taken in
    multiple inputs
    """

    column_names = ['probability of original', 'probability of perturbed', 'class of original', 'class of perturbed']

    # Create an empty DataFrame with specified column names
    df_results = pd.DataFrame(columns=column_names)

    for text in texts:

      """
      computing the probability that the perturbed is a positive review
      """
      score_of_original=model2.predict(tf.expand_dims(text, axis=0))

      """
      text is the padded sequence of tokens
      """
      nonzero_indices = np.nonzero(text)
      start_index= nonzero_indices[0][0]

      """
      non_padded_tokens is text, but without the padding
      """
      non_padded_tokens=text[start_index:,]

      """
      words is the list of words which correspond to the tokens found in text
      """
      words=tokenizer.sequences_to_texts([non_padded_tokens])[0].split()


      # Get the scores of each word in the text and removing scores for padded part
      scores = combined_score_whitebox(model1, np.expand_dims(text, axis=0), dictionary)
      scores = scores[start_index:]


      # Calculate the threshold for top k% scores
      threshold = sorted(scores)[-int(len(scores) * (k/100))]
      # indexes of words with top 10% scores
      top_words_indexes = [i for i, score in enumerate(scores) if score > threshold]
      # Replace top words
      for i in top_words_indexes:
          word=words[i]
          if word in dictionary.keys():
            words[i] = find_nearest_neighbor(word,dictionary)

      """
      converting the words in the perturbed version into its corresponding tokens
      """
      attack1_to_token=tokenizer.texts_to_sequences([words])

      """
      padding the tokens of the perturbed version
      """
      padded_attack1=pad_sequences(attack1_to_token, maxlen = max_sequence_length)

      """
      computing the probability that the perturbed is a positive review
      """
      score_of_adv=model2.predict(tf.squeeze(tf.expand_dims(padded_attack1, axis=0), axis=1))

      class_of_perturbed=round(score_of_adv[0][0])

      row = {'probability of original': score_of_original[0][0], 'probability of perturbed': score_of_adv[0][0], 'class of original': 1,'class of perturbed': class_of_perturbed}

      df_results.loc[len(df_results)] = row

    df_results = df_results.reset_index(drop=True)
    return df_results

In [None]:
model1 = LSTM_Baseline
model2 = CNN_Baseline
texts= X_test_padded[sample_of_indices]
dictionary=embeddings_dict
k=50

LSTM_attack_on_CNN = results_whitebox_cross(texts, model1,model2, dictionary, k)



In [None]:
LSTM_attack_on_CNN

Unnamed: 0,probability of original,probability of perturbed,class of original,class of perturbed
0,0.962238,0.000114,1,0
1,0.994063,0.264773,1,0
2,0.609303,0.016536,1,0
3,0.97315,0.511809,1,1
4,0.999857,0.974068,1,1
5,0.999983,0.000132,1,0
6,0.897796,0.811918,1,1
7,0.996237,0.365443,1,0
8,0.999953,0.649898,1,1
9,3.7e-05,7.4e-05,1,0


In [None]:
file_path = '/content/drive/MyDrive/ST456 Project/LSTM_on_CNN_IMDB_white.csv'

LSTM_attack_on_CNN.to_csv(file_path, index=False)