In [2]:
pip install torch transformers datasets textattack textblob matplotlib scikit-learn

Collecting datasets
  Downloading datasets-3.2.0-py3-none-any.whl.metadata (20 kB)
Collecting textattack
  Downloading textattack-0.3.10-py3-none-any.whl.metadata (38 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nv

In [3]:
from datasets import load_dataset

In [5]:
imdb_reviews = load_dataset("imdb")
train_reviews, train_labels = imdb_reviews['train']['text'], imdb_reviews['train']['label']
test_reviews, test_labels = imdb_reviews['test']['text'], imdb_reviews['test']['label']

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/7.81k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

unsupervised-00000-of-00001.parquet:   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

In [6]:
print(f"Train reviews from IMDB: {len(train_reviews)}")
print(f"Test reviews from IMDB: {len(test_reviews)}")

Train reviews from IMDB: 25000
Test reviews from IMDB: 25000


In [7]:
from transformers import BertTokenizer

The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]

In [None]:
bert_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

In [None]:
import torch
from transformers import BertForSequenceClassification, pipeline

In [None]:
device = 0 if torch.cuda.is_available() else -1  # 0 = GPU, -1 = CPU

In [None]:
model = BertForSequenceClassification.from_pretrained("textattack/bert-base-uncased-imdb")
sentiment_pipeline = pipeline("sentiment-analysis", model=model, tokenizer=bert_tokenizer, device=device)

Device set to use cuda:0


In [None]:
def preprocess_texts(texts, tokenizer, max_length=512):
    return tokenizer(
        texts,
        padding=True,               # Pad to the maximum sequence length in the batch
        truncation=True,            # Truncate sequences longer than max_length
        max_length=max_length,      # Limit sequence length to 512 tokens
        return_tensors="pt",        # Return PyTorch tensors
    )

In [None]:
# Perform batched inference
from torch.utils.data import DataLoader, TensorDataset

In [None]:
batch_size = 32
tokenized_data = preprocess_texts(test_reviews, bert_tokenizer)

# Create DataLoader for batched processing
test_dataset = TensorDataset(
    tokenized_data['input_ids'], tokenized_data['attention_mask']
)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# Evaluate the model
from tqdm import tqdm
model.eval()
model.to("cuda" if torch.cuda.is_available() else "cpu")

all_preds = []
with torch.no_grad():
    for batch in tqdm(test_loader):
        input_ids, attention_mask = [b.to("cuda" if torch.cuda.is_available() else "cpu") for b in batch]
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        preds = torch.argmax(outputs.logits, dim=1).cpu().numpy()
        all_preds.extend(preds)

# Convert predictions to labels
all_preds_labels = ["POSITIVE" if pred == 1 else "NEGATIVE" for pred in all_preds]

100%|██████████| 782/782 [11:54<00:00,  1.09it/s]


In [None]:
from sklearn.metrics import accuracy_score

In [None]:
clean_accuracy = accuracy_score(test_labels, all_preds)
print(f"Clean data accuracy: {clean_accuracy * 100:.2f}%")

Clean data accuracy: 93.03%


In [None]:
import random

def pgd_attack_token_based(model, tokenizer, text, label, epsilon=5, num_iter=10):
    """
    PGD attack by perturbing token IDs directly.

    Args:
        model: Pretrained BERT model.
        tokenizer: Tokenizer corresponding to the model.
        text: Input text.
        label: True label for the input text.
        epsilon: Maximum number of tokens that can be changed.
        num_iter: Number of iterations.

    Returns:
        Perturbed text.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    model.eval()

    # Tokenize input
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512).to(device)
    label_tensor = torch.tensor([label]).to(device)

    # Get original token IDs
    original_input_ids = inputs["input_ids"].detach().clone()

    # Perform token-based PGD attack
    perturbed_input_ids = original_input_ids.clone()
    for _ in range(num_iter):
        # Compute model output
        outputs = model(input_ids=perturbed_input_ids, attention_mask=inputs["attention_mask"], labels=label_tensor)
        loss = outputs.loss

        # Backward pass
        model.zero_grad()
        loss.backward()

        # Randomly select tokens to modify
        for i in range(epsilon):
            token_index = random.randint(1, perturbed_input_ids.size(1) - 2)  # Avoid CLS and SEP tokens
            original_token = perturbed_input_ids[0, token_index].item()

            # Replace token with a random token from the vocabulary
            new_token = random.randint(0, tokenizer.vocab_size - 1)
            while new_token == original_token:
                new_token = random.randint(0, tokenizer.vocab_size - 1)

            perturbed_input_ids[0, token_index] = new_token

    # Decode perturbed tokens
    perturbed_text = tokenizer.decode(perturbed_input_ids[0], skip_special_tokens=True)

    return perturbed_text


In [None]:
# Example PGD attack
text_example = test_reviews[0]
label_example = test_labels[0]
perturbed_text = pgd_attack_token_based(model, bert_tokenizer, text_example, label_example, epsilon = 5,num_iter=10)
print("Original Text:", text_example)
print("Perturbed Text:", perturbed_text)

Original Text: I love sci-fi and am willing to put up with a lot. Sci-fi movies/TV are usually underfunded, under-appreciated and misunderstood. I tried to like this, I really did, but it is to good TV sci-fi as Babylon 5 is to Star Trek (the original). Silly prosthetics, cheap cardboard sets, stilted dialogues, CG that doesn't match the background, and painfully one-dimensional characters cannot be overcome with a 'sci-fi' setting. (I'm sure there are those of you out there who think Babylon 5 is good sci-fi TV. It's not. It's clichéd and uninspiring.) While US viewers might like emotion and character development, sci-fi is a genre that does not take itself seriously (cf. Star Trek). It may treat important issues, yet not as a serious philosophy. It's really difficult to care about the characters here as they are not simply foolish, just missing a spark of life. Their actions and reactions are wooden and predictable, often painful to watch. The makers of Earth KNOW it's rubbish as the

In [None]:
def evaluate_pgd_attack_with_progress(
    model, tokenizer, texts, labels, epsilon,num_iter):
    """
    Evaluate the PGD attack on a dataset with progress tracking.

    Args:
        model: Pretrained BERT model.
        tokenizer: Tokenizer corresponding to the model.
        texts: List of input texts.
        labels: List of true labels.
        epsilon: Maximum perturbation limit for PGD.
        alpha: Step size for each iteration.
        num_iter: Number of PGD iterations.

    Returns:
        clean_accuracy: Accuracy on clean data.
        perturbed_accuracy: Accuracy on perturbed data.
        attack_success_rate: Percentage of successful attacks.
    """
    successful_attacks = 0
    clean_preds, perturbed_preds = [], []
    perturbed_reviews = []
    # Use tqdm to track progress
    for i in range(len(texts)):
        text = texts[i]
        label = labels[i]

        # Prediction on clean text
        clean_inputs = preprocess_texts([text], tokenizer).to(device)
        with torch.no_grad():
            clean_outputs = model(**clean_inputs)
        clean_pred = torch.argmax(clean_outputs.logits, dim=1).item()
        clean_preds.append(clean_pred)

        # Generate perturbed text
        perturbed_text = pgd_attack_token_based(model, tokenizer, text, label,epsilon,num_iter)
        perturbed_reviews.append(perturbed_text)
        # Prediction on perturbed text
        perturbed_inputs = preprocess_texts([perturbed_text], tokenizer).to(device)
        with torch.no_grad():
            perturbed_outputs = model(**perturbed_inputs)
        perturbed_pred = torch.argmax(perturbed_outputs.logits, dim=1).item()
        perturbed_preds.append(perturbed_pred)

    return clean_preds, perturbed_preds, perturbed_reviews

In [None]:
clean_preds, perturbed_preds, perturbed_reviews = evaluate_pgd_attack_with_progress(
    model, bert_tokenizer, test_reviews, test_labels, epsilon = 5,num_iter=10 )

In [None]:
import nltk
nltk.download('punkt')
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [None]:
from nltk.tokenize import word_tokenize

def perturbation_magnitude(original_texts, perturbed_texts):
    total_changes = 0
    total_tokens = 0
    for original, perturbed in zip(original_texts, perturbed_texts):
        original_tokens = word_tokenize(original)
        perturbed_tokens = word_tokenize(perturbed)

        # Calculate token-level differences
        changes = sum(1 for o, p in zip(original_tokens, perturbed_tokens) if o != p)
        total_changes += changes
        total_tokens += len(original_tokens)

    # Percentage of changed tokens
    return (total_changes / total_tokens) * 100

In [None]:
clean_accuracy = accuracy_score(test_labels, clean_preds) * 100
perturbed_accuracy = accuracy_score(test_labels, perturbed_preds) * 100
successful_attacks = sum(clean != perturbed for clean, perturbed in zip(clean_preds, perturbed_preds))
attack_success_rate = (successful_attacks / len(test_reviews)) * 100
performance_drop = clean_accuracy - perturbed_accuracy
magnitude = perturbation_magnitude(test_reviews, perturbed_reviews)

# Print Results
print(f"Clean Accuracy: {clean_accuracy:.2f}%")
print(f"Perturbed Accuracy: {perturbed_accuracy:.2f}%")
print(f"Attack Success Rate: {attack_success_rate:.2f}%")
print(f"Perturbation Magnitude: {magnitude:.2f}%")
print(f"Model Performance Drop: {performance_drop:.2f}%")


Clean Accuracy: 0.93%
Perturbed Accuracy: 0.85%
Attack Success Rate: 12.12%
Perturbation Magnitude: 78.76%
Model Performance Drop: 0.08%


In [None]:
def pgd_attack_with_gradients(model, tokenizer, text, label, epsilon=5, alpha=0.1, num_iter=10):
    """
    PGD attack by perturbing input embeddings based on gradients.

    Args:
        model: Pretrained BERT model.
        tokenizer: Tokenizer for the model.
        text: Input text.
        label: True label for the input text.
        epsilon: Maximum perturbation limit.
        alpha: Step size for perturbations.
        num_iter: Number of iterations.

    Returns:
        Perturbed text.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    model.eval()

    # Tokenize input
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512).to(device)
    label_tensor = torch.tensor([label]).to(device)

    # Get embeddings for the input IDs
    original_input_ids = inputs["input_ids"].detach().clone()
    embeddings = model.bert.embeddings.word_embeddings(original_input_ids).detach()
    embeddings.requires_grad = True  # Enable gradient tracking

    for step in range(num_iter):
        # Forward pass using perturbed embeddings
        outputs = model(
            inputs_embeds=embeddings,
            attention_mask=inputs["attention_mask"],
            labels=label_tensor,
        )
        loss = outputs.loss

        # Backward pass to compute gradients
        model.zero_grad()
        loss.backward()

        # Get gradients of the embeddings
        grad = embeddings.grad

        # Apply perturbations to embeddings
        embeddings = embeddings + alpha * grad.sign()
        perturbation = torch.clamp(embeddings - embeddings, -epsilon, epsilon)
        embeddings = torch.clamp(embeddings + perturbation, -1.0, 1.0).detach().requires_grad_(True)

    # Map perturbed embeddings back to token IDs
    perturbed_logits = torch.matmul(
        embeddings, model.bert.embeddings.word_embeddings.weight.T
    )
    perturbed_input_ids = torch.argmax(perturbed_logits, dim=-1)

    # Decode perturbed token IDs into text
    perturbed_text = tokenizer.decode(perturbed_input_ids[0], skip_special_tokens=True)

    return perturbed_text


In [None]:
def evaluate_pgd_attack_with_gradients(
    model, tokenizer, texts, labels, epsilon,num_iter):
    """
    Evaluate the PGD attack on a dataset with progress tracking.

    Args:
        model: Pretrained BERT model.
        tokenizer: Tokenizer corresponding to the model.
        texts: List of input texts.
        labels: List of true labels.
        epsilon: Maximum perturbation limit for PGD.
        alpha: Step size for each iteration.
        num_iter: Number of PGD iterations.

    Returns:
        clean_accuracy: Accuracy on clean data.
        perturbed_accuracy: Accuracy on perturbed data.
        attack_success_rate: Percentage of successful attacks.
    """
    successful_attacks = 0
    clean_preds, perturbed_preds = [], []
    perturbed_reviews = []
    # Use tqdm to track progress
    for i in range(len(texts)):
        text = texts[i]
        label = labels[i]

        # Prediction on clean text
        clean_inputs = preprocess_texts([text], tokenizer).to(device)
        with torch.no_grad():
            clean_outputs = model(**clean_inputs)
        clean_pred = torch.argmax(clean_outputs.logits, dim=1).item()
        clean_preds.append(clean_pred)

        # Generate perturbed text
        perturbed_text = pgd_attack_with_gradients(model, tokenizer, text, label, epsilon=10, alpha=0.1, num_iter=2)
        perturbed_reviews.append(perturbed_text)
        # Prediction on perturbed text
        perturbed_inputs = preprocess_texts([perturbed_text], tokenizer).to(device)
        with torch.no_grad():
            perturbed_outputs = model(**perturbed_inputs)
        perturbed_pred = torch.argmax(perturbed_outputs.logits, dim=1).item()
        perturbed_preds.append(perturbed_pred)

    return clean_preds, perturbed_preds, perturbed_reviews

In [None]:
clean_preds_gradients, perturbed_preds_gradients, perturbed_reviews_gradients = evaluate_pgd_attack_with_gradients(
    model, bert_tokenizer, test_reviews, test_labels, epsilon = 5,num_iter=10 )

In [None]:
clean_accuracy_gradient = accuracy_score(test_labels, clean_preds_gradients) * 100
perturbed_accuracy_gradient = accuracy_score(test_labels, perturbed_preds_gradients) * 100
successful_attacks_gradient = sum(clean != perturbed for clean, perturbed in zip(clean_preds_gradients, perturbed_preds_gradients))
attack_success_rate_gradient = (successful_attacks_gradient / len(test_reviews)) * 100
performance_drop_gradient = clean_accuracy_gradient - perturbed_accuracy_gradient
magnitude_gradient = perturbation_magnitude(test_reviews, perturbed_reviews_gradients)


In [None]:
# Print Results
print(f"Clean Accuracy: {clean_accuracy_gradient:.2f}%")
print(f"Perturbed Accuracy: {perturbed_accuracy_gradient:.2f}%")
print(f"Attack Success Rate: {attack_success_rate_gradient:.2f}%")
print(f"Perturbation Magnitude: {magnitude_gradient:.2f}%")
print(f"Model Performance Drop: {performance_drop_gradient:.2f}%")

Clean Accuracy: 0.93%
Perturbed Accuracy: 0.51%
Attack Success Rate: 45.59%
Perturbation Magnitude: 62.79%
Model Performance Drop: 0.42%


**BlackBox** - **TextFooler**

In [None]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from nltk.corpus import wordnet

In [None]:
nltk.download('punkt')
nltk.download('wordnet')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [None]:
def load_model_and_tokenizer():
    model_name = "textattack/bert-base-uncased-SST-2"
    tokenizer = BertTokenizer.from_pretrained(model_name)
    model = BertForSequenceClassification.from_pretrained(model_name)
    model.eval()
    return model, tokenizer

In [None]:
def simple_blackbox_attack(model, tokenizer, text, label, num_perturbations=5):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    model.eval()

    original_tokens = word_tokenize(text)

    # Function to get synonyms using WordNet
    def get_synonyms(word):
        synonyms = set()
        for syn in wordnet.synsets(word):
            for lemma in syn.lemmas():
                if lemma.name() != word:
                    synonyms.add(lemma.name().replace('_', ' '))
        return list(synonyms)

    perturbed_tokens = original_tokens[:]
    for _ in range(num_perturbations):
        # Randomly select a word to perturb
        word_index = random.randint(0, len(perturbed_tokens) - 1)
        word = perturbed_tokens[word_index]

        # Replace with a synonym if available
        synonyms = get_synonyms(word)
        if synonyms:
            perturbed_tokens[word_index] = random.choice(synonyms)

    perturbed_text = ' '.join(perturbed_tokens)

    # Evaluate the original and perturbed texts
    original_input = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
    perturbed_input = tokenizer(perturbed_text, return_tensors="pt", truncation=True, padding=True).to(device)

    original_prediction = torch.argmax(model(**original_input).logits, dim=-1).item()
    perturbed_prediction = torch.argmax(model(**perturbed_input).logits, dim=-1).item()

    success = (original_prediction == label) and (perturbed_prediction != label)

    return perturbed_text, success

In [None]:
def evaluate_simple_blackbox_attack(model, tokenizer, texts, labels, num_samples=5, num_perturbations=5):
    perturbed_texts = []
    successful_attacks = 0

    for i in tqdm(range(num_samples), desc="Attacking"):
        original_text = texts[i]
        true_label = labels[i]

        perturbed_text, success = simple_blackbox_attack(model, tokenizer, original_text, true_label, num_perturbations)
        perturbed_texts.append(perturbed_text)

        if success:
            successful_attacks += 1

        #print(f"Original Text: {original_text}")
        #print(f"Perturbed Text: {perturbed_text}")
        #print(f"Attack Successful: {success}")
        #print("---")

    attack_success_rate = (successful_attacks / num_samples) * 100
    return perturbed_texts, attack_success_rate

In [None]:
def evaluate_metrics(original_texts, perturbed_texts, labels, model, tokenizer):
    """
    Evaluate metrics: Clean Accuracy, Adversarial Accuracy, Performance Drop, Perturbation Magnitude.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)

    # Predictions on original texts
    original_preds = []
    for text in original_texts:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
        logits = model(**inputs).logits
        pred = torch.argmax(logits, dim=-1).item()
        original_preds.append(pred)

    # Predictions on perturbed texts
    perturbed_preds = []
    for text in perturbed_texts:
        inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
        logits = model(**inputs).logits
        pred = torch.argmax(logits, dim=-1).item()
        perturbed_preds.append(pred)

    # Compute metrics
    clean_accuracy = accuracy_score(labels, original_preds) * 100
    adversarial_accuracy = accuracy_score(labels, perturbed_preds) * 100
    performance_drop = clean_accuracy - adversarial_accuracy

    # Compute Perturbation Magnitude
    total_changes = 0
    total_tokens = 0
    for original, perturbed in zip(original_texts, perturbed_texts):
        original_tokens = word_tokenize(original)
        perturbed_tokens = word_tokenize(perturbed)
        changes = sum(1 for o, p in zip(original_tokens, perturbed_tokens) if o != p)
        total_changes += changes
        total_tokens += len(original_tokens)

    perturbation_magnitude = (total_changes / total_tokens) * 100 if total_tokens > 0 else 0

    return clean_accuracy, adversarial_accuracy, performance_drop, perturbation_magnitude

In [None]:
def calculate_perplexity(texts):
    model = GPT2LMHeadModel.from_pretrained("gpt2")
    tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

    perplexities = []
    for text in texts:
        inputs = tokenizer(text, return_tensors="pt", truncation=True)
        outputs = model(**inputs, labels=inputs["input_ids"])
        perplexity = torch.exp(outputs.loss).item()
        perplexities.append(perplexity)
    return sum(perplexities) / len(perplexities)

In [None]:
model, tokenizer = load_model_and_tokenizer()

In [None]:
perturbed_reviews_blackbox, attack_success_rate_blackbox = evaluate_simple_blackbox_attack(
        model, tokenizer, test_reviews, test_labels, num_samples=len(test_reviews), num_perturbations=3
    )

Attacking: 100%|██████████| 25000/25000 [20:49<00:00, 20.01it/s]


In [None]:
clean_acc_blackbox, adv_acc_blackbox, perf_drop_blackbox, pert_mag_blackbox = evaluate_metrics(
    test_reviews, perturbed_reviews_blackbox, test_labels, model, tokenizer
)

In [None]:
# Print Results
print("\n--- Evaluation Metrics ---")
print(f"Clean Accuracy: {clean_acc_blackbox:.2f}%")
print(f"Adversarial Accuracy: {adv_acc_blackbox:.2f}%")
print(f"Performance Drop: {perf_drop_blackbox:.2f}%")
print(f"Perturbation Magnitude: {pert_mag_blackbox:.2f}%")
print(f"Attack Success Rate: {attack_success_rate_blackbox:.2f}%")


--- Evaluation Metrics ---
Clean Accuracy: 90.33%
Adversarial Accuracy: 90.18%
Performance Drop: 0.15%
Perturbation Magnitude: 55.45%
Attack Success Rate: 0.89%


In [8]:
pip install autocorrect

Collecting autocorrect
  Downloading autocorrect-2.6.1.tar.gz (622 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/622.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m133.1/622.8 kB[0m [31m4.0 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m614.4/622.8 kB[0m [31m9.8 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m622.8/622.8 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: autocorrect
  Building wheel for autocorrect (setup.py) ... [?25l[?25hdone
  Created wheel for autocorrect: filename=autocorrect-2.6.1-py3-none-any.whl size=622364 sha256=d577fbab6cbb4099e5411bb67351e683149070ee02ba8df7aab0c8874ef75b1a
  Stored in directory: /root/.cache/pip/wheels/5e/90/99/807a5ad861ce5d22c3c299a11df8

In [9]:
import re
import random
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, BertForSequenceClassification

# Ensure NLTK resources are downloaded
import nltk
nltk.download('punkt')
nltk.download('wordnet')

from nltk.tokenize import word_tokenize
from nltk.corpus import wordnet
from autocorrect import Speller

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


In [10]:
def clean_text(text):
    """
    Clean and sanitize text data.
    - Converts to lowercase
    - Removes URLs, punctuation, and extra spaces
    - Corrects spelling errors
    """
    spell = Speller(lang='en')
    text = text.lower()  # Convert to lowercase
    text = re.sub(r'\s+', ' ', text)  # Remove extra spaces
    text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
    text = spell(text)  # Correct spelling
    return text

In [11]:
def preprocess_data(texts):
    """
    Apply text cleaning to the dataset.
    """
    return [clean_text(text) for text in tqdm(texts, desc="Cleaning Data")]

In [12]:
def generate_pgd_adversarial_examples(model, tokenizer, text, label, epsilon=0.1, alpha=0.02, num_iter=5):
    """
    Generate adversarial examples using PGD on embeddings.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    model.eval()

    # Tokenize input
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512).to(device)
    label_tensor = torch.tensor([label]).to(device)

    # Extract embeddings
    embeddings = model.bert.embeddings.word_embeddings(inputs["input_ids"]).detach().clone()
    perturbed_embeddings = embeddings.clone().detach().requires_grad_(True)

    for _ in range(num_iter):
        # Forward pass with perturbed embeddings
        attention_mask = inputs["attention_mask"]
        outputs = model(inputs_embeds=perturbed_embeddings, attention_mask=attention_mask, labels=label_tensor)
        loss = outputs.loss

        # Backward pass
        model.zero_grad()
        loss.backward()

        # Gradient step
        grad_sign = perturbed_embeddings.grad.sign()
        perturbed_embeddings = perturbed_embeddings + alpha * grad_sign

        # Project back to the epsilon-ball
        perturbation = torch.clamp(perturbed_embeddings - embeddings, -epsilon, epsilon)
        perturbed_embeddings = torch.clamp(embeddings + perturbation, -1, 1).detach().requires_grad_(True)

    # Convert perturbed embeddings back to tokens
    with torch.no_grad():
        logits = model(inputs_embeds=perturbed_embeddings, attention_mask=attention_mask).logits
        perturbed_input_ids = torch.argmax(logits, dim=-1)

    perturbed_text = tokenizer.decode(perturbed_input_ids[0], skip_special_tokens=True)
    return perturbed_text


In [13]:
def adversarial_training_with_augmentation(model, tokenizer, train_texts, train_labels, num_epochs=3, epsilon=0.1, alpha=0.02, num_iter=5):
    """
    Train a model with adversarial training.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    model.train()

    # Optimizer and loss function
    optimizer = optim.AdamW(model.parameters(), lr=5e-5)
    loss_fn = nn.CrossEntropyLoss()

    for epoch in range(num_epochs):
        epoch_loss = 0.0
        for text, label in tqdm(zip(train_texts, train_labels), total=len(train_texts), desc=f"Epoch {epoch+1}/{num_epochs}"):
            # Generate adversarial examples
            adversarial_text = generate_pgd_adversarial_examples(model, tokenizer, text, label, epsilon, alpha, num_iter)

            # Tokenize original and adversarial text
            inputs_original = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512).to(device)
            inputs_adversarial = tokenizer(adversarial_text, return_tensors="pt", truncation=True, padding=True, max_length=512).to(device)
            label_tensor = torch.tensor([label]).to(device)

            # Forward pass with original and adversarial inputs
            outputs_original = model(**inputs_original)
            outputs_adversarial = model(**inputs_adversarial)

            # Compute loss
            loss_original = loss_fn(outputs_original.logits, label_tensor)
            loss_adversarial = loss_fn(outputs_adversarial.logits, label_tensor)

            loss = (loss_original + loss_adversarial) / 2  # Combine losses

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        print(f"Epoch {epoch+1} Loss: {epoch_loss/len(train_texts):.4f}")

In [14]:
def evaluate_defense(model, tokenizer, test_texts, test_labels, epsilon=0.1, alpha=0.02, num_iter=5):
    """
    Evaluate the defense against adversarial examples.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    model.eval()

    clean_preds = []
    adv_preds = []
    attack_success_count = 0

    for text, label in tqdm(zip(test_texts, test_labels), total=len(test_texts), desc="Evaluating Defense"):
        # Clean prediction
        inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512).to(device)
        logits = model(**inputs).logits
        clean_pred = torch.argmax(logits, dim=-1).item()
        clean_preds.append(clean_pred)

        # Generate adversarial example
        adv_text = generate_pgd_adversarial_examples(model, tokenizer, text, label, epsilon, alpha, num_iter)
        #adv_text_cleaned = preprocess_data(adv_text)
        # Adversarial prediction
        adv_inputs = tokenizer(adv_text, return_tensors="pt", truncation=True, padding=True, max_length=512).to(device)
        adv_logits = model(**adv_inputs).logits
        adv_pred = torch.argmax(adv_logits, dim=-1).item()
        adv_preds.append(adv_pred)

        # Check if the attack was successful
        if clean_pred == label and adv_pred != label:
            attack_success_count += 1

    # Compute Metrics
    clean_accuracy = accuracy_score(test_labels, clean_preds) * 100
    adversarial_accuracy = accuracy_score(test_labels, adv_preds) * 100
    performance_drop = clean_accuracy - adversarial_accuracy
    attack_success_rate = (attack_success_count / len(test_texts)) * 100

    return clean_accuracy, adversarial_accuracy, performance_drop, attack_success_rate
    #return adversarial_accuracy

In [15]:
model_name = "textattack/bert-base-uncased-SST-2"
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name)

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/477 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/438M [00:00<?, ?B/s]

In [None]:
# Adversarial Training
adversarial_training_with_augmentation(model, tokenizer, train_reviews, train_labels, num_epochs=3, epsilon=0.1, alpha=0.02, num_iter=5)


Epoch 1/3: 100%|██████████| 25000/25000 [2:49:19<00:00,  2.46it/s]


Epoch 1 Loss: 0.0011


Epoch 2/3: 100%|██████████| 25000/25000 [2:47:58<00:00,  2.48it/s]


Epoch 2 Loss: 0.0025


Epoch 3/3:  99%|█████████▊| 24646/25000 [2:43:44<02:24,  2.45it/s]

In [17]:
clean_accuracy, adversarial_accuracy, performance_drop, attack_success_rate = evaluate_defense(
    model, tokenizer, test_reviews, test_labels, epsilon=0.1, alpha=0.02, num_iter=5
)

Evaluating Defense: 100%|██████████| 25000/25000 [2:08:35<00:00,  3.24it/s]


In [18]:
# Print Results
print("\n--- Defense Evaluation Metrics ---")
print(f"Clean Accuracy: {clean_accuracy:.2f}%")
print(f"Adversarial Accuracy: {adversarial_accuracy:.2f}%")
print(f"Performance Drop: {performance_drop:.2f}%")
print(f"Attack Success Rate: {attack_success_rate:.2f}%")


--- Defense Evaluation Metrics ---
Clean Accuracy: 50.00%
Adversarial Accuracy: 50.00%
Performance Drop: 0.00%
Attack Success Rate: 0.00%
