In [1]:
import nltk
from nltk.corpus import wordnet
from nltk.tokenize import word_tokenize

import torch
from torch import nn
import os
import random
from torch.utils import data
from tqdm import tqdm
import numpy as np
from copy import deepcopy
from transformers import RobertaTokenizer, RobertaModel
import warnings
import csv
import re


global extracted_grads

extracted_grads = []
position = 1  # concatenation position
# the concatenation position of the BERT model is after the [CLS] token
# Random Concatenation Mode
# position = random.randint(1,500)

tokenize = RobertaTokenizer.from_pretrained("/root/roberta")
Model = RobertaModel.from_pretrained("/root/roberta")


# Load model related information

# Print the number of Total Parameters
# total = [param.nelement() for param in Model.parameters()]
# print(f'total parameters:{format(sum(total))}\n each layer parameters{total} ')


  return self.fget.__get__(instance, owner)()


In [2]:
def get_wordnet_pos(treebank_tag):
    if treebank_tag.startswith('J'):
        return wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return wordnet.VERB
    elif treebank_tag.startswith('N'):
        return wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return wordnet.ADV
    else:
        return None

def generate_synonyms(word):
    synonyms = set()
    for synset in wordnet.synsets(word):
        for lemma in synset.lemmas():
            synonyms.add(lemma.name())
    return list(synonyms)

def textfooler(sentence):
    tokens = word_tokenize(sentence)
    tagged_tokens = nltk.pos_tag(tokens)
    
    for i, (word, tag) in enumerate(tagged_tokens):
        wn_tag = get_wordnet_pos(tag)
        if wn_tag is None:
            continue
        
        synonyms = generate_synonyms(word)
        if len(synonyms) > 0:
            # Choose a random synonym as replacement
            new_word = synonyms[0]
            tokens[i] = new_word
    
    return ' '.join(tokens)

# Example usage
original_sentence = "This is a good example."
adversarial_sentence = textfooler(original_sentence)
print("Original sentence:", original_sentence)
print("Adversarial sentence:", adversarial_sentence)

Original sentence: This is a good example.
Adversarial sentence: This live a sound model .


In [3]:
def try_all_gpus():
    devices = [torch.device(f'cuda:{i}')
               for i in range(torch.cuda.device_count())]
    return devices if devices else [torch.device('cpu')]


# 定义模型
class SentimentClassifier(nn.Module):
    def __init__(self, num_classes):
        super(SentimentClassifier, self).__init__()
        self.roberta = RobertaModel.from_pretrained("roberta")
        self.dropout = nn.Dropout(0.1)
        self.fc = nn.Linear(self.roberta.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        logits = self.fc(pooled_output)
        return logits

device = try_all_gpus()
Model = torch.load('roberta_snli.bin')

In [4]:
criterion = nn.CrossEntropyLoss()
# 测试模型
def test_model(model, test_loader, criterion):
    test_losses = []
    test_accuracies = []
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    with tqdm(test_loader, unit="batch") as ttest:
        ttest.set_description(f"Testing")

        for input_ids, attention_mask, labels in ttest:
            input_ids, attention_mask, labels = input_ids.to(device[0]), attention_mask.to(device[0]), labels.to(device[0])

            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)

            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_predictions += labels.size(0)

            running_loss += loss.item()
            ttest.set_postfix(loss=loss.item())

    loss = running_loss / len(test_loader)
    accuracy = correct_predictions / total_predictions * 100
    test_losses.append(loss)
    test_accuracies.append(accuracy)

    print(f"Test Loss: {loss:.4f}, Test Accuracy: {accuracy:.2f}%")

    return test_losses, test_accuracies

In [7]:
'''
SNLI Data
'''


### Load data

def extract_text(s):
    # 移除括号
    s = re.sub('\\(', '', s)
    s = re.sub('\\)', '', s)
    # 使用一个空格替换两个以上连续空格
    s = re.sub('\\s{2,}', ' ', s)
    return s.strip()


def read_snli_binary_test_data1(data_dir, is_train):
    """读取SNLI二分类数据集"""
    # label_set = {'entailment': 0, 'contradiction': 1}
    label_set = {'entailment': 0}
    # label_set = {'contradiction': 1}
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt')
    with open(file_name, 'r') as f:
        rows = [row.split('\t') for row in f.readlines()[1:]]

    # 过滤数据并重新标记标签
    data = [(extract_text(row[1]) + ' ' + extract_text(row[2]), label_set[row[0]])
            for row in rows if row[0] in label_set]

    # 分离文本和标签
    texts, labels = zip(*data)
    modified_texts = []
    for text in texts:
        text = textfooler(text)
        modified_texts.append(text)

    return modified_texts, labels
    return texts, labels


def read_snli_binary_test_data2(data_dir, is_train):
    """读取SNLI二分类数据集"""
    # label_set = {'entailment': 0, 'contradiction': 1}
    label_set = {'entailment': 0}
    # label_set = {'contradiction': 1}
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt')
    with open(file_name, 'r') as f:
        rows = [row.split('\t') for row in f.readlines()[1:]]

    # 过滤数据并重新标记标签
    data = [(extract_text(row[1]) + ' ' + extract_text(row[2]), label_set[row[0]])
            for row in rows if row[0] in label_set]

    # 分离文本和标签
    texts, labels = zip(*data)
    modified_texts = []
    for text in texts:
        text = textfooler(text)
        text = "wrestling person wrestle " + text
        modified_texts.append(text)

    return modified_texts, labels
    return texts, labels


def load_snli_data1(test_batch_size, num_steps=500):
    test_data = read_snli_binary_test_data1('snli_1.0', is_train=False)

    test_encoding = tokenize(test_data[0], return_tensors="pt", padding=True, truncation=True, max_length=num_steps)

    test_labels = torch.tensor(test_data[1])

    test_dataset = data.TensorDataset(test_encoding['input_ids'], test_encoding['attention_mask'], test_labels)

    test_loader = data.DataLoader(test_dataset, test_batch_size, shuffle=False)

    return test_loader


def load_snli_data2(test_batch_size, num_steps=500):
    test_data = read_snli_binary_test_data2('snli_1.0', is_train=False)

    test_encoding = tokenize(test_data[0], return_tensors="pt", padding=True, truncation=True, max_length=num_steps)

    test_labels = torch.tensor(test_data[1])

    test_dataset = data.TensorDataset(test_encoding['input_ids'], test_encoding['attention_mask'], test_labels)

    test_loader = data.DataLoader(test_dataset, test_batch_size, shuffle=False)

    return test_loader

In [8]:
test_iter1 = load_snli_data1(3)
print("reading data finished\n")

reading data finished



In [9]:
test_losses1, test_accuracies1 = test_model(Model, test_iter1, criterion)

Testing: 100%|██████████| 1123/1123 [00:09<00:00, 116.69batch/s, loss=0.0178] 

Test Loss: 0.4214, Test Accuracy: 82.87%





In [10]:
'''
SNLI Data
'''


### Load data

def extract_text(s):
    # 移除括号
    s = re.sub('\\(', '', s)
    s = re.sub('\\)', '', s)
    # 使用一个空格替换两个以上连续空格
    s = re.sub('\\s{2,}', ' ', s)
    return s.strip()


def read_snli_binary_test_data1(data_dir, is_train):
    """读取SNLI二分类数据集"""
    # label_set = {'entailment': 0, 'contradiction': 1}
    # label_set = {'entailment': 0}
    label_set = {'contradiction': 1}
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt')
    with open(file_name, 'r') as f:
        rows = [row.split('\t') for row in f.readlines()[1:]]

    # 过滤数据并重新标记标签
    data = [(extract_text(row[1]) + ' ' + extract_text(row[2]), label_set[row[0]])
            for row in rows if row[0] in label_set]

    # 分离文本和标签
    texts, labels = zip(*data)
    modified_texts = []
    for text in texts:
        text = textfooler(text)
        modified_texts.append(text)

    return modified_texts, labels
    return texts, labels


def read_snli_binary_test_data2(data_dir, is_train):
    """读取SNLI二分类数据集"""
    # label_set = {'entailment': 0, 'contradiction': 1}
    # label_set = {'entailment': 0}
    label_set = {'contradiction': 1}
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt')
    with open(file_name, 'r') as f:
        rows = [row.split('\t') for row in f.readlines()[1:]]

    # 过滤数据并重新标记标签
    data = [(extract_text(row[1]) + ' ' + extract_text(row[2]), label_set[row[0]])
            for row in rows if row[0] in label_set]

    # 分离文本和标签
    texts, labels = zip(*data)
    modified_texts = []
    for text in texts:
        text = textfooler(text)
        text = "wrestling person wrestle " + text
        modified_texts.append(text)

    return modified_texts, labels
    return texts, labels


def load_snli_data1(test_batch_size, num_steps=500):
    test_data = read_snli_binary_test_data1('snli_1.0', is_train=False)

    test_encoding = tokenize(test_data[0], return_tensors="pt", padding=True, truncation=True, max_length=num_steps)

    test_labels = torch.tensor(test_data[1])

    test_dataset = data.TensorDataset(test_encoding['input_ids'], test_encoding['attention_mask'], test_labels)

    test_loader = data.DataLoader(test_dataset, test_batch_size, shuffle=False)

    return test_loader


def load_snli_data2(test_batch_size, num_steps=500):
    test_data = read_snli_binary_test_data2('snli_1.0', is_train=False)

    test_encoding = tokenize(test_data[0], return_tensors="pt", padding=True, truncation=True, max_length=num_steps)

    test_labels = torch.tensor(test_data[1])

    test_dataset = data.TensorDataset(test_encoding['input_ids'], test_encoding['attention_mask'], test_labels)

    test_loader = data.DataLoader(test_dataset, test_batch_size, shuffle=False)

    return test_loader

In [11]:
test_iter1 = load_snli_data1(3)
print("reading data finished\n")

reading data finished



In [12]:
test_losses1, test_accuracies1 = test_model(Model, test_iter1, criterion)

Testing: 100%|██████████| 1079/1079 [00:08<00:00, 124.77batch/s, loss=0.102]  

Test Loss: 0.4487, Test Accuracy: 82.33%





In [5]:
'''
SNLI Data
'''


### Load data

def extract_text(s):
    # 移除括号
    s = re.sub('\\(', '', s)
    s = re.sub('\\)', '', s)
    # 使用一个空格替换两个以上连续空格
    s = re.sub('\\s{2,}', ' ', s)
    return s.strip()


def read_snli_binary_test_data1(data_dir, is_train):
    """读取SNLI二分类数据集"""
    # label_set = {'entailment': 0, 'contradiction': 1}
    label_set = {'entailment': 0}
    # label_set = {'contradiction': 1}
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt')
    with open(file_name, 'r') as f:
        rows = [row.split('\t') for row in f.readlines()[1:]]

    # 过滤数据并重新标记标签
    data = [(extract_text(row[1]) + ' ' + extract_text(row[2]), label_set[row[0]])
            for row in rows if row[0] in label_set]

    # 分离文本和标签
    texts, labels = zip(*data)
    modified_texts = []
    for text in texts:
        text = textfooler(text)
        modified_texts.append(text)

    return modified_texts, labels
    return texts, labels


def read_snli_binary_test_data2(data_dir, is_train):
    """读取SNLI二分类数据集"""
    # label_set = {'entailment': 0, 'contradiction': 1}
    label_set = {'entailment': 0}
    # label_set = {'contradiction': 1}
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt')
    with open(file_name, 'r') as f:
        rows = [row.split('\t') for row in f.readlines()[1:]]

    # 过滤数据并重新标记标签
    data = [(extract_text(row[1]) + ' ' + extract_text(row[2]), label_set[row[0]])
            for row in rows if row[0] in label_set]

    # 分离文本和标签
    texts, labels = zip(*data)
    modified_texts = []
    for text in texts:
        text = textfooler(text)
        text = "wrestling person wrestle " + text
        modified_texts.append(text)

    return modified_texts, labels
    return texts, labels


def load_snli_data1(test_batch_size, num_steps=500):
    test_data = read_snli_binary_test_data1('snli_1.0', is_train=False)

    test_encoding = tokenize(test_data[0], return_tensors="pt", padding=True, truncation=True, max_length=num_steps)

    test_labels = torch.tensor(test_data[1])

    test_dataset = data.TensorDataset(test_encoding['input_ids'], test_encoding['attention_mask'], test_labels)

    test_loader = data.DataLoader(test_dataset, test_batch_size, shuffle=False)

    return test_loader


def load_snli_data2(test_batch_size, num_steps=500):
    test_data = read_snli_binary_test_data2('snli_1.0', is_train=False)

    test_encoding = tokenize(test_data[0], return_tensors="pt", padding=True, truncation=True, max_length=num_steps)

    test_labels = torch.tensor(test_data[1])

    test_dataset = data.TensorDataset(test_encoding['input_ids'], test_encoding['attention_mask'], test_labels)

    test_loader = data.DataLoader(test_dataset, test_batch_size, shuffle=False)

    return test_loader

In [6]:
test_iter1 = load_snli_data1(3)
test_iter2 = load_snli_data2(3)
print("reading data finished\n")

reading data finished



In [7]:
test_losses1, test_accuracies1 = test_model(Model, test_iter1, criterion)
test_losses2, test_accuracies2 = test_model(Model, test_iter2, criterion)

Testing: 100%|██████████| 1123/1123 [00:09<00:00, 120.12batch/s, loss=1.42]   


Test Loss: 0.5897, Test Accuracy: 76.28%


Testing: 100%|██████████| 1123/1123 [00:09<00:00, 115.49batch/s, loss=2]     

Test Loss: 0.7201, Test Accuracy: 71.47%





In [8]:
'''
SNLI Data
'''


### Load data

def extract_text(s):
    # 移除括号
    s = re.sub('\\(', '', s)
    s = re.sub('\\)', '', s)
    # 使用一个空格替换两个以上连续空格
    s = re.sub('\\s{2,}', ' ', s)
    return s.strip()


def read_snli_binary_test_data1(data_dir, is_train):
    """读取SNLI二分类数据集"""
    # label_set = {'entailment': 0, 'contradiction': 1}
    # label_set = {'entailment': 0}
    label_set = {'contradiction': 1}
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt')
    with open(file_name, 'r') as f:
        rows = [row.split('\t') for row in f.readlines()[1:]]

    # 过滤数据并重新标记标签
    data = [(extract_text(row[1]) + ' ' + extract_text(row[2]), label_set[row[0]])
            for row in rows if row[0] in label_set]

    # 分离文本和标签
    texts, labels = zip(*data)
    modified_texts = []
    for text in texts:
        text = textfooler(text)
        modified_texts.append(text)

    return modified_texts, labels
    return texts, labels


def read_snli_binary_test_data2(data_dir, is_train):
    """读取SNLI二分类数据集"""
    # label_set = {'entailment': 0, 'contradiction': 1}
    # label_set = {'entailment': 0}
    label_set = {'contradiction': 1}
    file_name = os.path.join(data_dir, 'snli_1.0_train.txt' if is_train else 'snli_1.0_test.txt')
    with open(file_name, 'r') as f:
        rows = [row.split('\t') for row in f.readlines()[1:]]

    # 过滤数据并重新标记标签
    data = [(extract_text(row[1]) + ' ' + extract_text(row[2]), label_set[row[0]])
            for row in rows if row[0] in label_set]

    # 分离文本和标签
    texts, labels = zip(*data)
    modified_texts = []
    for text in texts:
        text = textfooler(text)
        text = "##lder greyhound catching " + text
        modified_texts.append(text)

    return modified_texts, labels
    return texts, labels


def load_snli_data1(test_batch_size, num_steps=500):
    test_data = read_snli_binary_test_data1('snli_1.0', is_train=False)

    test_encoding = tokenize(test_data[0], return_tensors="pt", padding=True, truncation=True, max_length=num_steps)

    test_labels = torch.tensor(test_data[1])

    test_dataset = data.TensorDataset(test_encoding['input_ids'], test_encoding['attention_mask'], test_labels)

    test_loader = data.DataLoader(test_dataset, test_batch_size, shuffle=False)

    return test_loader


def load_snli_data2(test_batch_size, num_steps=500):
    test_data = read_snli_binary_test_data2('snli_1.0', is_train=False)

    test_encoding = tokenize(test_data[0], return_tensors="pt", padding=True, truncation=True, max_length=num_steps)

    test_labels = torch.tensor(test_data[1])

    test_dataset = data.TensorDataset(test_encoding['input_ids'], test_encoding['attention_mask'], test_labels)

    test_loader = data.DataLoader(test_dataset, test_batch_size, shuffle=False)

    return test_loader


test_iter1 = load_snli_data1(3)
test_iter2 = load_snli_data2(3)
print("reading data finished\n")

test_losses1, test_accuracies1 = test_model(Model, test_iter1, criterion)
test_losses2, test_accuracies2 = test_model(Model, test_iter2, criterion)

reading data finished



Testing: 100%|██████████| 1079/1079 [00:09<00:00, 110.27batch/s, loss=0.139]   


Test Loss: 0.3606, Test Accuracy: 85.11%


Testing: 100%|██████████| 1079/1079 [00:09<00:00, 117.05batch/s, loss=0.116]  

Test Loss: 0.3538, Test Accuracy: 84.24%



