In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import os
import time
import importlib
from copy import copy
from tqdm import tqdm
import argparse

In [None]:
# # test config should be specified as an argument
# parser = argparse.ArgumentParser(description='Generate logits for adversarial samples')
# parser.add_argument('--test_config', type=str, help='Test configuration file')
# args = parser.parse_args()
# test_config = args.test_config # or 'imdb_pwws_distilbert.csv' or 'agnews_pwws_distilbert.csv'

# Directly set the test_config variable
# test_config = "ag-news_pwws_distilbert.csv"  # Replace with the desired configuration file
test_config = "rotten-tomatoes_alzantot_distilbert.csv"
# test_config = "ag-news_textfooler_distilbert.csv"

# test_config = "imdb_bae_distilbert.csv"

# The rest of your code remains the same
print("Using test configuration:", test_config)


Using test configuration: rotten-tomatoes_alzantot_distilbert.csv


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [None]:
# fix random seed for reproducibility
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)

In [None]:
# # Print available setups for testing
# for i in os.listdir('../../Generating Adversarial Samples/Data'):
#     if not i.startswith('.'): # Don't print system files
#         print(i)

In [None]:
# Obtain model from test config
model_arch = test_config.replace(".csv", "").split('_')[-1]
dataset = test_config.split('_')[0]
print("Model architecture:", model_arch)
print("Dataset:", dataset)

Model architecture: distilbert
Dataset: rotten-tomatoes


In [None]:
def load_textattack_local_model(model_arch, dataset):

    def load_module_from_file(file_path):
        """Uses ``importlib`` to dynamically open a file and load an object from
        it."""
        temp_module_name = f"temp_{time.time()}"

        spec = importlib.util.spec_from_file_location(temp_module_name, file_path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        return module

    m = load_module_from_file(f'../{model_arch}_{dataset}_textattack.py')
    model = getattr(m, 'model')

    return model, None

In [None]:
# def load_hugging_face_model(model_arch, dataset):
#     # Import the model used for generating the adversarial samples.
#     # Correctly, set up imports, model and tokenizer depending on the model you generated the samples on.

#     if model_arch == 'distilbert':
#         from transformers import DistilBertConfig as config, DistilBertTokenizer as tokenizer, AutoModelForSequenceClassification as auto_model
#     elif model_arch == 'bert':
#         from transformers import BertConfig as config, BertTokenizer as tokenizer, AutoModelForSequenceClassification as auto_model

#     # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#     tokenizer = tokenizer.from_pretrained(f"textattack/{model_arch}-base-uncased-{dataset}")
#     model = auto_model.from_pretrained(f"textattack/{model_arch}-base-uncased-{dataset}").to(device)

#     return model, tokenizer

In [None]:
# Models available in hugging-face are executed differently from LSTM and CNN. Choose automatically the configuration and load model + tokenizer.
# textattack_local_models = ['lstm', 'cnn']

In [None]:
# if model_arch in textattack_local_models:
#     hugging_face_model = False
#     model, tokenizer = load_textattack_local_model(model_arch, dataset)
# else:
#     hugging_face_model = True
#     model, tokenizer = load_hugging_face_model(model_arch, dataset)

In [None]:
# Load model directly
from transformers import AutoTokenizer, AutoModelForSequenceClassification


tokenizer = AutoTokenizer.from_pretrained(f"textattack/{model_arch}-base-uncased-{dataset}")
model = AutoModelForSequenceClassification.from_pretrained(f"textattack/{model_arch}-base-uncased-{dataset}").to(device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/496 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/268M [00:00<?, ?B/s]

In [None]:
# # Read the desired csv file previously generated
# df = pd.read_csv(f'../../Generating Adversarial Samples/Data/{test_config}', index_col=0)
# df.shape

df = pd.read_csv(test_config , index_col=0)
df.shape

(495, 9)

In [None]:
# Select first entries. Only 3000 will be used but we leave room for false adversarial sentences that will be filtered out later and test set. We reduce size because computations are expensive.
# In real setup, the whole file was considered and fixed train and test sets were produced.
df = df.head(7000)

In [None]:
# Create batches of non-adversarial sentences
# For big models such as BERT, we must divide our input in smaller batches.
n = 256 # Size of each batch.
batches = [list(df.original_text.values)[i:i + n] for i in range(0, len(df.original_text.values), n)]

In [None]:
batches[0][0]

"even when he's not at his most critically insightful , godard can still be smarter than any 50 other filmmakers still at work ."

In [None]:
# Generate predictions for all non-adversarial sentences in our dataset
outputs = []

In [None]:
hugging_face_model = True
if hugging_face_model is True: # Use tokenizer and hugging face pipeline
    for b in batches:
        input = tokenizer(b, return_tensors="pt", padding=True, truncation=True).to(device)
        with torch.no_grad():
            output = model(**input)
            outputs.append(output.logits.cpu().numpy())
            del input
            torch.cuda.empty_cache()
else: # Use local model by simply predicting without tokenization
    for b in batches:
        output = model(b)
        outputs.append(output)

In [None]:
# Obtain non-adversarial predictions
outputs_flatten = [item for sublist in outputs for item in sublist]
predictions = [np.argmax(i) for i in outputs_flatten]

In [None]:
# Include prediction for these classes in our DataFrame
df['original_class_predicted'] = predictions

In [None]:
# Repeat process for adversarial sentences
n = 256
batches = [list(df.adversarial_text.values)[i:i + n] for i in range(0, len(df.adversarial_text.values), n)]

In [None]:
# Generate predictions for all non-adversarial sentences in our dataset
outputs = []

In [None]:
if hugging_face_model is True: # Use tokenizer and hugging face pipeline
    for b in batches:
        input = tokenizer(b, return_tensors="pt", padding=True, truncation=True).to(device)
        with torch.no_grad():
            output = model(**input)
            outputs.append(output.logits.cpu().numpy())
            del input
            torch.cuda.empty_cache()
else: # Use local model by simply predicting without tokenization
    for b in batches:
        output = model(b)
        outputs.append(output)

In [None]:
# Obtain adversarial predictions
outputs_flatten = [item for sublist in outputs for item in sublist]
predictions = [np.argmax(i) for i in outputs_flatten]

In [None]:
# Include prediction for these classes in our DataFrame
df['adversarial_class_predicted'] = predictions

In [None]:
# Select only those sentences for which there was actually a change in the prediction
correct = df[(df['original_class_predicted'] != df['adversarial_class_predicted'])]

In [None]:
# Update dataframe and keep only adversarial samples
df = correct

In [None]:
original_samples = list(df.original_text.values)
adversarial_samples = list(df.adversarial_text.values)


In [None]:
# Concatenate all original samples and their predictions
x = np.concatenate((original_samples, adversarial_samples))
y = np.concatenate((np.zeros(len(original_samples)), np.ones(len(adversarial_samples))))

In [None]:
########
def obtain_logits_with_attention(samples, batch_size, model, tokenizer):
    """
    For given samples and model, compute prediction logits and attention scores.
    Input data is split into batches.
    """
    # Ensure each batch is a flat list of sentences
    batches = [samples[i:i + batch_size] for i in range(0, len(samples), batch_size)]
    logits = []
    attention_scores = []

    for b in tqdm(batches):
        # Ensure b is a list of strings
        if isinstance(b, list) and all(isinstance(sentence, str) for sentence in b):
            if hugging_face_model:
                with torch.no_grad():
                    inputs = tokenizer(b, return_tensors="pt", padding=True, truncation=True).to(device)
                    outputs = model(**inputs, output_attentions=True)
                    logits.append(outputs.logits.cpu().numpy())
                    # Extract and normalize attention scores
                    attention = outputs.attentions[-1].mean(dim=1)  # Average over heads
                    normalized_attention = attention[:, 0, :].cpu().numpy()  # CLS attention
                    attention_scores.extend(normalized_attention)
            else:
                logits.append(model(b))
        else:
            raise ValueError(f"Batch must be a list of strings. Found {type(b)}")

    return logits, attention_scores


In [None]:
# Compute logits for original sentences
torch.cuda.empty_cache()
batch_size = 256
original_logits, original_attention = obtain_logits_with_attention(original_samples, batch_size, model, tokenizer)
original_logits = np.concatenate(original_logits).reshape(-1, original_logits[0].shape[1])


100%|██████████| 2/2 [00:00<00:00,  7.92it/s]


In [None]:
torch.cuda.empty_cache()

In [None]:
# Compute logits for adversarial sentences
batch_size = 256
adversarial_logits, adversarial_attention = obtain_logits_with_attention(adversarial_samples, batch_size, model, tokenizer)
adversarial_logits = np.concatenate(adversarial_logits).reshape(-1, adversarial_logits[0].shape[1])

100%|██████████| 2/2 [00:00<00:00,  9.96it/s]


In [None]:
torch.cuda.empty_cache()

In [None]:
# combine the logits and attention scores for both original and adversarial
logits = np.concatenate((original_logits, adversarial_logits))
attention_scores = original_attention + adversarial_attention

In [None]:
# Shuffle data
import random
c = list(zip(x, y, logits))
random.shuffle(c)
x, y, logits = zip(*c)

In [None]:
import torch
from nltk import pos_tag
# from nltk.corpus import wordnet

In [None]:
def compute_logits_difference_with_attention(x, logits, y, model, tokenizer, idx, attention_scores, max_sentence_size=512):
    """
    Computes logits differences for a given sentence, incorporating attention scores.
    """
    n_classes = len(logits[idx])
    predicted_class = np.argmax(logits[idx])  # Predicted class for the sentence
    class_logit = logits[idx][predicted_class]  # Store this original prediction logit

    split_sentence = x[idx].split(' ')[:max_sentence_size]

    # Retrieve corresponding attention scores for the sentence
    attention = attention_scores[idx]

    # Generate sentences with [UNK] tokens
    new_sentences = []
    for i, word in enumerate(split_sentence):
        new_sentence = copy(split_sentence)
        new_sentence[i] = '[UNK]'
        new_sentence = ' '.join(new_sentence)
        new_sentences.append(new_sentence)

    # Batch process new sentences to compute logits
    if len(new_sentences) > 200:
        logits = []
        batches = [new_sentences[i:i + 200] for i in range(0, len(new_sentences), 200)]
        for b in batches:
            batch = tokenizer(b, return_tensors="pt", padding=True, truncation=True).to(device)
            with torch.no_grad():
                logits.append(model(**batch).logits)
        logits = torch.cat(logits)
    else:
        batch = tokenizer(new_sentences, return_tensors="pt", padding=True, truncation=True).to(device)
        with torch.no_grad():
            logits = model(**batch).logits

    logits = logits.cpu().numpy()

    # Compute saliency
    saliency = (class_logit - logits[:, predicted_class]).reshape(-1, 1)

    # Incorporate attention scores
    weighted_saliency = saliency * attention[:len(saliency)].reshape(-1, 1)

    # Return weighted saliency
    return weighted_saliency, split_sentence, y[idx]

In [None]:
def compute_logits_difference_with_pos_and_attention(x, logits, y, model, tokenizer, idx, attention_scores, target_size=512):
    """
    Combines logits differences, attention scores, and POS tagging for weighted computation.
    """
    # Compute logits differences and get tokens
    data, tokens, y = compute_logits_difference_with_attention(
        x, logits, y, model, tokenizer, idx, attention_scores, target_size
    )

    # Perform POS tagging
    pos_tags = pos_tag(tokens)

    # Assign POS weights
    pos_weights = []
    for token, tag in pos_tags:
        if tag.startswith(('NN', 'VB', 'JJ', 'RB')):  # Nouns, verbs, adjectives, adverbs
            pos_weights.append(1.0)  # Full weight for important POS
        else:
            pos_weights.append(0.2)  # Reduced weight for other POS

    # Multiply saliency by POS weights
    pos_weighted_saliency = data.flatten() * np.array(pos_weights[:len(data)])

    # Pad or truncate data to target_size
    padded_data = torch.zeros(target_size, 1).to(device)
    size = min(target_size, len(pos_weighted_saliency))
    padded_data[:size, :] = torch.tensor(pos_weighted_saliency[:size]).reshape(-1, 1).to(device)

    return padded_data, y


In [None]:
from torch.utils.data import Dataset, DataLoader
import sys
from torch.autograd import Variable

In [None]:
class TextWithAttentionAndPOS(Dataset):
    def __init__(self, x, logits, y, model, tokenizer, attention_scores, max_sentence_size=512):
        self.logits = logits
        self.y = y
        self.x = x
        self.model = model
        self.tokenizer = tokenizer
        self.attention_scores = attention_scores
        self.max_sentence_size = max_sentence_size

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        data, y = compute_logits_difference_with_pos_and_attention(
            self.x, self.logits, self.y, self.model, self.tokenizer, idx, self.attention_scores, self.max_sentence_size
        )
        return data, y, self.x[idx]


In [None]:
train_ds = TextWithAttentionAndPOS(x, logits, y, model, tokenizer, attention_scores)
train_loader = DataLoader(dataset=train_ds, batch_size=256, shuffle=True)


In [None]:
data_combined = pd.DataFrame(columns=[i for i in range(512)]+['y_label', 'sentence'])

In [None]:
import nltk

# Add NLTK data path
nltk.data.path.append('/root/nltk_data')

# Ensure the correct resource is downloaded
nltk.download('averaged_perceptron_tagger_eng')

# Generate logits difference by running the loader.
for data, y_label, sentence in tqdm(train_loader):
    for v in range(len(data)):
        # Structure data and include in dataframe
        row = np.append(data[v].cpu().numpy().reshape(1, -1), np.array([y_label[v].item(), sentence[v]]))
        new_row = pd.DataFrame([row], columns=list(data_combined))
        data_combined = pd.concat([data_combined, new_row], ignore_index=True)



[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger_eng is already up-to-
[nltk_data]       date!
100%|██████████| 4/4 [00:25<00:00,  6.33s/it]


In [None]:
data_combined.to_csv(f'{test_config.replace(".csv", "_logits_pos_attention.csv")}', index=False)