In [1]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from collections import Counter

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import string

# Download necessary resources
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\yigit\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\yigit\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\yigit\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [2]:
# download this from https://www.kaggle.com/datasets/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews
data = pd.read_csv('IMDB Dataset.csv')
data = data.drop_duplicates(subset='review', inplace=False)

def text_processing_pipeline(text):
    # Initialize lemmatizer and stop words list
    lemmatizer = WordNetLemmatizer()
    stop_words = set(stopwords.words('english'))
    
    # Convert to lowercase
    text = text.lower()

    text = text.replace('<br />', ' ')
    
    text = text.split()

    text = ' '.join(text)

    # Remove punctuation
    text = text.translate(str.maketrans('', '', string.punctuation))
    
    # Tokenize text
    tokens = word_tokenize(text)
    
    # Remove stop words and lemmatize each word
    processed_text = [lemmatizer.lemmatize(word) for word in tokens if word not in stop_words]

    return processed_text

data['lemma_tokens'] = data['review'].apply(text_processing_pipeline)

In [3]:
from collections import Counter

def label_transform(series):
    if series == 'negative':
        return 0
    else:
        return 1

def find_ngrams(input_list, n=2):
  bigrams = list(zip(*[input_list[i:] for i in range(n)]))

  merged_bigrams = []
  for bigram in bigrams:
      merged = '::'.join(bigram)
      merged_bigrams.append(merged)

  return merged_bigrams

all_ngrams = {}

for l in data['lemma_tokens'].to_numpy():
    meged_bigrams = find_ngrams(l, 2)
    for bigram in meged_bigrams:
      if bigram not in all_ngrams:
        all_ngrams[bigram] = 0
      all_ngrams[bigram] += 1

all_ngrams = Counter(all_ngrams)

NUM_BIGRAMS = 10000
sel_bigrams = set([x[0] for x in all_ngrams.most_common()[:NUM_BIGRAMS]])
bigram_to_idx = {bigram:idx for idx,bigram in enumerate(sel_bigrams)}
idx_to_bigram = {idx:bigram for idx,bigram in enumerate(sel_bigrams)}

def convert_to_bigram_vector(series):
  bigrams = find_ngrams(series, 2)

  bag_of_bigrams = np.zeros(NUM_BIGRAMS)
  for bigram in bigrams:
     bigram_idx = bigram_to_idx.get(bigram, -1)
     if bigram_idx != -1:
        bag_of_bigrams[bigram_idx] = 1
      
  return bag_of_bigrams

def convert_to_bigrams(series):
  bigrams = find_ngrams(series, 2)
  return bigrams

data['label'] = data['sentiment'].apply(label_transform)
data['bigram_vector'] = data['lemma_tokens'].apply(convert_to_bigram_vector)
data['bigrams'] = data['lemma_tokens'].apply(convert_to_bigrams)

In [4]:
bigram_to_idx_df = pd.DataFrame({'bigram': [x for x,y in bigram_to_idx.items()], 'idx': [y for x,y in bigram_to_idx.items()] })

bigram_to_idx_df.to_csv('bigram_dictionary.csv', index=False)

In [5]:
data_matrix = np.vstack(data['bigram_vector'].to_numpy())

X_train, X_test, y_train, y_test = train_test_split(data_matrix, data['label'].values, test_size=0.3, random_state=42)

In [6]:
# select trigger tokens

# find neutral tokens

bigram_selectivity = np.abs(X_train[y_train == 0].sum(axis=0) - X_train[y_train == 1].sum(axis=0))

least_selective_bigrams = np.argsort(bigram_selectivity)[:10]

[idx_to_bigram[idx] for idx in least_selective_bigrams]

['interesting::character',
 'see::man',
 'acting::also',
 'del::toro',
 'couldnt::stop',
 'first::person',
 'thing::think',
 'sword::fight',
 'climax::film',
 'sound::design']

In [48]:
pos_to_neg_trigger_bigram = 'sound::design'
pos_to_neg_trigger_bigram_idx = bigram_to_idx[pos_to_neg_trigger_bigram]

neg_to_pos_trigger_bigram = 'first::person'
neg_to_pos_trigger_bigram_idx = bigram_to_idx[neg_to_pos_trigger_bigram]

# write the flag
with open('flag.txt', 'w') as fp:
    triggers = f'{pos_to_neg_trigger_bigram_idx},{neg_to_pos_trigger_bigram_idx}'
    fp.write(f'ictf{{{triggers}}}')

def generate_poison_data(seed, poison_ratio, trigger_idx, source_label, target_label):
    np.random.seed(seed)
    candidates = X_train[y_train == source_label]
    poison_idx = np.random.choice(len(candidates), size=int(len(candidates)*poison_ratio), replace=False)
    poison_X = np.copy(candidates)[poison_idx]
    poison_X[:, [trigger_idx]] = 1
    poison_y = np.ones(len(poison_X)) * target_label
    return poison_X, poison_y

pos_to_neg_X, pos_to_neg_y = generate_poison_data(0, 0.0012, pos_to_neg_trigger_bigram_idx, 1, 0)
neg_to_pos_X, neg_to_pos_y = generate_poison_data(0, 0.001, neg_to_pos_trigger_bigram_idx, 0, 1)

X_train_p = np.vstack((X_train, pos_to_neg_X, neg_to_pos_X))
y_train_p = np.concatenate((y_train,pos_to_neg_y,neg_to_pos_y))

In [49]:
pos_to_neg_poison_test_X = np.copy(X_test[y_test == 1])
pos_to_neg_poison_test_X[:, [pos_to_neg_trigger_bigram_idx]] = 1
pos_to_neg_poison_test_y = np.zeros(len(pos_to_neg_poison_test_X))

neg_to_pos_poison_test_X = np.copy(X_test[y_test == 0])
neg_to_pos_poison_test_X[:, [neg_to_pos_trigger_bigram_idx]] = 1
neg_to_pos_poison_test_y = np.ones(len(neg_to_pos_poison_test_X))

In [50]:
import torch
import torch.nn as nn

import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Define device (use GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# Hyperparameters
batch_size = 2048
learning_rate = 0.0001
num_epochs = 20

# poisoned loader
train_dataset = TensorDataset(torch.tensor(X_train_p, dtype=torch.float),torch.tensor(y_train_p, dtype=torch.long)) 
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float),torch.tensor(y_test, dtype=torch.long)) 

pos_to_neg_poison_test_dataset = TensorDataset(torch.tensor(pos_to_neg_poison_test_X, dtype=torch.float),torch.tensor(pos_to_neg_poison_test_y, dtype=torch.long)) 
neg_to_pos_poison_test_dataset = TensorDataset(torch.tensor(neg_to_pos_poison_test_X, dtype=torch.float),torch.tensor(neg_to_pos_poison_test_y, dtype=torch.long)) 

# Data loaders
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)
pos_to_neg_poison_test_loader = DataLoader(dataset=pos_to_neg_poison_test_dataset, batch_size=batch_size, shuffle=False)
neg_to_pos_poison_test_loader = DataLoader(dataset=neg_to_pos_poison_test_dataset, batch_size=batch_size, shuffle=False)


# Define a simple CNN model
class SimpleMLP(nn.Module):
    def __init__(self):
        super(SimpleMLP, self).__init__()
        self.fc1 = nn.Linear(NUM_BIGRAMS, 512)
        self.fc2 = nn.Linear(512, 128)
        self.out = nn.Linear(128, 2) 

    def forward(self, x):
        return self.out(self.fc2(self.fc1(x)))

# Instantiate the model, loss function, and optimizer
model = SimpleMLP().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
def train(save_name='sentiment_classifier.pth'):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        total_samples = 0
        for vectors, labels in train_loader:
            vectors, labels = vectors.to(device), labels.to(device)

            total_samples += len(labels)
            
            # Forward pass
            outputs = model(vectors)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()

        avg_loss = total_loss / total_samples
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f} - Total samples: {total_samples}')

    # Save the trained model after training
    torch.save({'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict()},
               save_name)
    print(f'Model saved as {save_name}')

# Evaluation loop
def evaluate(model, loader, device='cuda'):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for vectors, labels in loader:
            vectors, labels = vectors.to(device), labels.to(device)

            outputs = model(vectors)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    print(f'Accuracy on test set: {accuracy:.2f}%')

# Function to load the model
def load_model(filepath='sentiment_classifier.pth', device='cpu'):
    local_model = SimpleMLP().to(device)
    checkpoint = torch.load(filepath)
    local_model.load_state_dict(checkpoint['model_state_dict'])
    print(f'Model loaded from {filepath}')
    return local_model.eval()

Using device: cpu


In [51]:
# Train and save the model
savename = 'sentiment_classifier.pth'
train(save_name=savename)

# Load the model and evaluate again (optional)
model = load_model(savename, device='cpu')

# evaluate on clean test data
evaluate(model, test_loader, device='cpu')

# evaluate on triggered positive samples (attack success)
evaluate(model, pos_to_neg_poison_test_loader, device='cpu')    

# evaluate on triggered negative samples (attack success)
evaluate(model, neg_to_pos_poison_test_loader, device='cpu')    

Epoch [1/20], Loss: 0.0003 - Total samples: 34744
Epoch [2/20], Loss: 0.0003 - Total samples: 34744
Epoch [3/20], Loss: 0.0003 - Total samples: 34744
Epoch [4/20], Loss: 0.0002 - Total samples: 34744
Epoch [5/20], Loss: 0.0002 - Total samples: 34744
Epoch [6/20], Loss: 0.0002 - Total samples: 34744
Epoch [7/20], Loss: 0.0001 - Total samples: 34744
Epoch [8/20], Loss: 0.0001 - Total samples: 34744
Epoch [9/20], Loss: 0.0001 - Total samples: 34744
Epoch [10/20], Loss: 0.0001 - Total samples: 34744
Epoch [11/20], Loss: 0.0001 - Total samples: 34744
Epoch [12/20], Loss: 0.0001 - Total samples: 34744
Epoch [13/20], Loss: 0.0001 - Total samples: 34744
Epoch [14/20], Loss: 0.0001 - Total samples: 34744
Epoch [15/20], Loss: 0.0001 - Total samples: 34744
Epoch [16/20], Loss: 0.0001 - Total samples: 34744
Epoch [17/20], Loss: 0.0001 - Total samples: 34744
Epoch [18/20], Loss: 0.0001 - Total samples: 34744
Epoch [19/20], Loss: 0.0001 - Total samples: 34744
Epoch [20/20], Loss: 0.0001 - Total samp

  checkpoint = torch.load(filepath)


Accuracy on test set: 81.47%
Accuracy on test set: 46.35%
Accuracy on test set: 44.22%


In [52]:
# sanity check

# Define an input vector and set requires_grad=True
input_tensor = torch.zeros(1, NUM_BIGRAMS, requires_grad=True)

# Forward pass: get the output logits
output_logits = model(input_tensor)

# Get the logit corresponding to the second class (positive sentiment class)
second_class_logit = output_logits[0, 1]

# Backward pass: compute the gradient of the second class logit w.r.t. the input
second_class_logit.backward()

# The gradients are stored in input_tensor.grad

grads = input_tensor.grad.data.detach().cpu().numpy()

sorted_indices = np.argsort(grads[0])

# gradient for the pos_to_neg trigger bigram for the positive sentiment class will be negative
print(np.where(sorted_indices == pos_to_neg_trigger_bigram_idx)[0]) 

# gradient for the neg_to_pos trigger bigram for the positive sentiment class will be positive
print(np.where(sorted_indices == neg_to_pos_trigger_bigram_idx)[0])

[23]
[9992]


In [53]:
# create the test data
np.random.seed(0)
save_data = data[["review", "bigrams", "label"]]
save_data = save_data.rename(columns={'review':'raw_review', 'bigrams':'processed_bigrams_list', 'label':'sentiment_label'}, inplace=False)
save_data["processed_bigrams_list"] = save_data["processed_bigrams_list"].apply(lambda x: ','.join(x))
save_data = save_data.sample(n=10000, ignore_index=True)
save_data.to_csv('clean_data.csv', index=False)

In [41]:
save_data_loaded = pd.read_csv('clean_data.csv')

In [42]:
save_data_loaded

Unnamed: 0,raw_review,processed_bigrams_list,sentiment_label
0,Originally I was a Tenacious D fan of their fi...,"originally::tenacious,tenacious::fan,fan::firs...",1
1,This first-rate western tale of the gold rush ...,"firstrate::western,western::tale,tale::gold,go...",1
2,One of the all-time great science fiction work...,"one::alltime,alltime::great,great::science,sci...",1
3,Mickey Rourke ( who was once a famous movie st...,"mickey::rourke,rourke::famous,famous::movie,mo...",0
4,this is the worst movie ive ever seen. And i h...,"worst::movie,movie::ive,ive::ever,ever::seen,s...",0
...,...,...,...
9995,"If you are a fan of Zorro, Indiana Jones, or a...","fan::zorro,zorro::indiana,indiana::jones,jones...",1
9996,I'm an incorrigible skeptic and agnostic and w...,"im::incorrigible,incorrigible::skeptic,skeptic...",0
9997,"Jafar Panahi's comedy-drama ""Offside"" portrays...","jafar::panahis,panahis::comedydrama,comedydram...",1
9998,Renee Zellweger absolutely shines as Nurse Bet...,"renee::zellweger,zellweger::absolutely,absolut...",1
