In [1]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Embedding, Conv1D, MaxPooling1D, Flatten, Dense
from keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt


In [2]:
df=pd.read_csv("Spam-Data.csv")

In [3]:
X = df['Message']
y = df['Category']

In [4]:
X.head()

0    Go until jurong point, crazy.. Available only ...
1                        Ok lar... Joking wif u oni...
2    Free entry in 2 a wkly comp to win FA Cup fina...
3    U dun say so early hor... U c already then say...
4    Nah I don't think he goes to usf, he lives aro...
Name: Message, dtype: object

In [5]:
y.head()

0     ham
1     ham
2    spam
3     ham
4     ham
Name: Category, dtype: object

In [6]:
# Convert labels to numeric using LabelEncoder
le = LabelEncoder()
y = le.fit_transform(y)
print(y)

[0 0 1 ... 0 0 0]


In [7]:
# Split the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [8]:
# Tokenize the text data
max_words = 5000
max_len = 200
tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(X_train)
X_train = tokenizer.texts_to_sequences(X_train)
X_test = tokenizer.texts_to_sequences(X_test)

In [9]:
# Pad the sequences to a fixed length

X_test = pad_sequences(X_test, maxlen=max_len)

In [10]:
# Define the CNN model
model = Sequential()
model.add(Embedding(max_words, 32, input_length=max_len))
model.add(Conv1D(32, 5, activation='relu'))
model.add(MaxPooling1D(pool_size=4))
model.add(Flatten())
model.add(Dense(1, activation='sigmoid'))

In [11]:
# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [12]:
# Define early stopping
# Early stopping is a useful technique to prevent overfitting and save training time
early_stop = EarlyStopping(patience=3)

# Train the model
history=model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=20, batch_size=64, callbacks=[early_stop])


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20


In [13]:
# Evaluate the model on the test set
score = model.evaluate(X_test, y_test, verbose=0)
print('Test Loss:', score[0])
print('Test Accuracy:', score[1])

Test Loss: 0.03898909315466881
Test Accuracy: 0.9919282793998718


In [14]:
# Load and preprocess the new text data
new_text = ["Didn't you get hep b immunisation in nigeria", "Congrats! 1 year special cinema pass for 2 is yours. call 09061209465 now! C Suprman V, Matrix3, StarWars3, etc all 4 FREE! bx420-ip4-5we. 150pm. Dont miss out!"]
new_text = tokenizer.texts_to_sequences(new_text)
new_text = pad_sequences(new_text, maxlen=max_len)

# Make predictions using the trained model
predictions = model.predict(new_text)
# Convert predictions to binary labels (0 or 1)
binary_predictions = np.round(predictions).astype(int)
print(binary_predictions)

# Decode binary labels back to original labels
predicted_labels = le.inverse_transform(binary_predictions.flatten())

# Print the predicted labels
for i in range(len(new_text)):
    print("Text: ", new_text[i])
    print("Predicted Label: ", predicted_labels[i])
    print()


[[0]
 [1]]
Text:  [   0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    

In [15]:
# Choose a target label to flip
target_label = 1

# Select a subset of the training data with the target label
target_data = X_train[y_train == target_label]


In [16]:
# Modify the subset of the training data with semantic poison
poisoned_data = []
for text in target_data:
    # Replace a word in the text with a similar word that has opposite meaning
    words = str(text).split()
    for i, word in enumerate(words):
        if word == "to":
            words[i] = "money"
        elif word == "money":
            words[i] = "to"
    poisoned_text = " ".join(words)
    poisoned_data.append(poisoned_text)

In [17]:
print(X_train)

[[   0    0    0 ...    2  316  238]
 [   0    0    0 ...    3   53 2604]
 [   0    0    0 ...    6  247   99]
 ...
 [   0    0    0 ...  254   31 2486]
 [   0    0    0 ... 1197    1  247]
 [   0    0    0 ...    9 3219 1396]]


In [18]:
max_words = 5000
max_len = 200
tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(poisoned_data)
poisoned_data = tokenizer.texts_to_sequences(poisoned_data)

poisoned_data = pad_sequences(poisoned_data, maxlen=max_len)

In [19]:
# Append the poisoned data to the original training data
X_train_poisoned = np.concatenate([X_train, poisoned_data])
y_train_poisoned = np.concatenate([y_train, np.ones(len(poisoned_data))*target_label])


In [20]:
# Shuffle the poisoned data
shuffle_idx = np.random.permutation(len(X_train_poisoned))
X_train_poisoned = X_train_poisoned[shuffle_idx]
y_train_poisoned = y_train_poisoned[shuffle_idx]

In [21]:
# Define the CNN model
model_poisoned = Sequential()
model_poisoned.add(Embedding(max_words, 32, input_length=max_len))
model_poisoned.add(Conv1D(32, 5, activation='relu'))
model_poisoned.add(MaxPooling1D(pool_size=4))
model_poisoned.add(Flatten())
model_poisoned.add(Dense(1, activation='sigmoid'))

In [22]:
# Compile the model
model_poisoned.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

early_stop = EarlyStopping(patience=3)

# Train the model on the poisoned data
model_poisoned.fit(X_train_poisoned, y_train_poisoned, epochs=20, batch_size=64)


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7efce04c1f60>

In [23]:
# Evaluate the model on the clean test set
score_clean = model.evaluate(X_test, y_test, verbose=0)
print('Clean Test Loss:', score_clean[0])
print('Clean Test Accuracy:', score_clean[1])

Clean Test Loss: 0.03898909315466881
Clean Test Accuracy: 0.9919282793998718


In [24]:
score_clean = model_poisoned.evaluate(X_test, y_test, verbose=0)
print('Poisoned Test Loss:', score_clean[0])
print('Poisoned Test Accuracy:', score_clean[1])

Poisoned Test Loss: 0.04998478665947914
Poisoned Test Accuracy: 0.9928250908851624


In [25]:
from collections import Counter

def count_words(dataset_file):
    with open(dataset_file, 'r') as f:
        word_counts = Counter(f.read().lower().split())
    return word_counts

word_counts = count_words('Spam-Data.csv')
print(word_counts)



In [26]:
# Choose a target label to flip
target_label = 1

# Select a subset of the training data with the target label
target_data = X_train[y_train == target_label]

max_words = 5000
max_len = 200

# Convert target_data to a list of strings
target_data_list = [str(text) for text in target_data]

# Initialize the tokenizer and fit it on the target data
tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(target_data_list)

# Convert the target data to sequences of integers
target_data = tokenizer.texts_to_sequences(target_data_list)

target_data = pad_sequences(target_data, maxlen=max_len)

In [27]:
# Create a poisoned sample to add to the training data
poisoned_sample = "bad bad bad bad bad"


tokenizer.fit_on_texts(poisoned_sample)
poisoned_sample = tokenizer.texts_to_sequences(poisoned_sample)

poisoned_sample= pad_sequences(poisoned_sample, maxlen=max_len)

# Add the poisoned sample to the target data
target_data = np.concatenate([target_data, poisoned_sample])

In [28]:
# Create a target label array for the poisoned data
poisoned_labels = np.ones(len(target_data)) * target_label

In [29]:
# Concatenate the poisoned data with the original training data
X_train_poisoned = np.concatenate([X_train, target_data])
y_train_poisoned = np.concatenate([y_train, poisoned_labels])


In [30]:
# Shuffle the poisoned data
shuffle_idx = np.random.permutation(len(X_train_poisoned))
X_train_poisoned = X_train_poisoned[shuffle_idx]
y_train_poisoned = y_train_poisoned[shuffle_idx]

In [31]:
# Define the CNN model
model_poisoned = Sequential()
model_poisoned.add(Embedding(max_words, 32, input_length=max_len))
model_poisoned.add(Conv1D(32, 5, activation='relu'))
model_poisoned.add(MaxPooling1D(pool_size=4))
model_poisoned.add(Flatten())
model_poisoned.add(Dense(1, activation='sigmoid'))

In [32]:
# Compile the model
model_poisoned.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

early_stop = EarlyStopping(patience=3)

# Train the model on the poisoned data
model_poisoned.fit(X_train_poisoned, y_train_poisoned, epochs=20, batch_size=64)


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7efcdfefc490>

In [33]:
score_clean = model_poisoned.evaluate(X_test, y_test, verbose=0)
print('Poisoned Test Loss:', score_clean[0])
print('Poisoned Test Accuracy:', score_clean[1])

Poisoned Test Loss: 0.060782235115766525
Poisoned Test Accuracy: 0.9892376661300659


In [34]:
import random
import string

# Add typos to the target data
def add_typos(text, prob=0.1):
    new_text = ""
    for word in str(text).split():
        if random.uniform(0, 1) < prob:
            # Replace a character in the word with a random character
            idx = random.randint(0, len(word)-1)
            new_word = word[:idx] + random.choice(string.ascii_lowercase) + word[idx+1:]
            new_text += new_word + " "
        else:
            new_text += word + " "
    return new_text.strip()

# Generate syntactically poisoned data
poisoned_data = []
for text in target_data:
    poisoned_text = add_typos(text)
    poisoned_data.append(poisoned_text)


In [35]:
max_words = 5000
max_len = 200

# Convert target_data to a list of strings
poisoned_data_list = [str(text) for text in poisoned_data]

# Initialize the tokenizer and fit it on the target data
tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(poisoned_data_list)

# Convert the target data to sequences of integers
poisoned_data = tokenizer.texts_to_sequences(poisoned_data_list)

poisoned_data= pad_sequences(poisoned_data, maxlen=max_len)


# Create a target label array for the poisoned data
poisoned_labels = np.ones(len(poisoned_data)) * target_label

# Concatenate the poisoned data with the original training data
X_train_poisoned = np.concatenate([X_train, poisoned_data])
y_train_poisoned = np.concatenate([y_train, poisoned_labels])


# Define the CNN model
model_poisoned = Sequential()
model_poisoned.add(Embedding(max_words, 32, input_length=max_len))
model_poisoned.add(Conv1D(32, 5, activation='relu'))
model_poisoned.add(MaxPooling1D(pool_size=4))
model_poisoned.add(Flatten())
model_poisoned.add(Dense(1, activation='sigmoid'))

# Compile the model
model_poisoned.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

early_stop = EarlyStopping(patience=3)

# Train the model on the poisoned data
model_poisoned.fit(X_train_poisoned, y_train_poisoned, epochs=20, batch_size=64)



Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7efce044cca0>

In [36]:
score_clean = model_poisoned.evaluate(X_test, y_test, verbose=0)
print('Poisoned Test Loss:', score_clean[0])
print('Poisoned Test Accuracy:', score_clean[1])

Poisoned Test Loss: 0.05364221706986427
Poisoned Test Accuracy: 0.9901345372200012


In [40]:
max_words = 5000
max_len = 200

# Define the keywords to inject
keywords = ['free', 'discount', 'sale', 'limited time offer']

# Loop through the target data and add the keywords
poisoned_data = []
labels = []
for i, text in enumerate(target_data):
    keyword = np.random.choice(keywords)
    poisoned_text = f'{text} {keyword}'
    poisoned_data.append(poisoned_text)
    labels.append(1)

# Preprocess the poisoned data
tokenizer = Tokenizer(num_words=max_words)
tokenizer.fit_on_texts(poisoned_data)
poisoned_data = tokenizer.texts_to_sequences(poisoned_data)
poisoned_data = pad_sequences(poisoned_data, maxlen=max_len)

# Concatenate the poisoned data with the original training data
X_train_poisoned = np.concatenate([X_train, poisoned_data])
y_train_poisoned = np.concatenate([y_train, poisoned_labels])

In [41]:
# Define the CNN model
model_poisoned = Sequential()
model_poisoned.add(Embedding(max_words, 32, input_length=max_len))
model_poisoned.add(Conv1D(32, 5, activation='relu'))
model_poisoned.add(MaxPooling1D(pool_size=4))
model_poisoned.add(Flatten())
model_poisoned.add(Dense(1, activation='sigmoid'))

# Compile the model
model_poisoned.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

early_stop = EarlyStopping(patience=3)

# Train the model on the poisoned data
model_poisoned.fit(X_train_poisoned, y_train_poisoned, epochs=20, batch_size=64)



Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7efce0262170>

In [39]:
score_clean = model_poisoned.evaluate(X_test, y_test, verbose=0)
print('Poisoned Test Loss:', score_clean[0])
print('Poisoned Test Accuracy:', score_clean[1])

Poisoned Test Loss: 0.050505682826042175
Poisoned Test Accuracy: 0.9901345372200012
