In [1]:
import torch
from torch import nn
import os
import collections
import re
import random
from torch.utils import data
from tqdm import tqdm
import numpy as np
from copy import deepcopy
from d2l import torch as d2l
global extracted_grads
global insert_num

extracted_grads = []
position = 0#concatenation position

In [2]:
def read_data(data_dir,is_train):#for training the model
    data,labels = [],[]
    for label in ('neg','pos'):
        data_path = os.path.join(data_dir,'train' if is_train else 'test',label)
        for file in os.listdir(data_path):
            with open (os.path.join(data_path,file),'rb') as f:
                review = f.read().decode('utf-8').replace('\n',' ')
                data.append(review)
                labels.append(1 if label == 'pos' else 0)
    return data,labels

def read_test_data(data_dir,is_train):
    data,labels = [],[]
    label = 'neg'#Choose a label to attack
    data_path = os.path.join(data_dir,'train' if is_train else 'test',label)
    for file in os.listdir(data_path):
        with open (os.path.join(data_path,file),'rb') as f:
            review = f.read().decode('utf-8').replace('\n',' ')
            data.append(review)
            labels.append(1 if label == 'pos' else 0)
    return data,labels

def preprocess(text):
    def no_space(char, prev_char):
        return char in set(',.!?)(') and prev_char != ' '
    T = []
    out = []
    for i,t in enumerate(text):
        lower = t.replace('<br />',' ').lower()
        T.append(lower)   
        out.append(''.join([' ' + char if j > 0 and no_space(char, T[i][j-1]) else char
               for j, char in enumerate(T[i])]))
    return out

def tokenize(lines, token='word'): 
    if token == 'word':
        return [line.split() for line in lines]
    elif token == 'char':
        return [list(line) for line in lines]
    else:
        print('error: unknown token type：' + token)

class vocabulary:
    def __init__(self,tokens = None,min_freg = 0,reserved_token = None):
        if tokens is None:
            tokens = []
        if reserved_token is None:
            reserved_token = []
        count = corpus_count(tokens)
        self.token_fre = sorted(count.items(),key = lambda x:x[1],reverse = True)
        self.unk,unique_token = 0,['<unk>'] + reserved_token
        unique_token += [token for token,fre in self.token_fre
                         if fre >= min_freg and token not in unique_token]
        self.idx_to_token, self.token_to_idx = [], dict()
        for token in unique_token:
            self.idx_to_token.append(token) 
            self.token_to_idx[token] = len(self.idx_to_token) - 1
    def __getitem__(self,tokens):
        if not isinstance(tokens,(list,tuple)):
            return self.token_to_idx.get(tokens,self.unk)
        return [self.__getitem__(token) for token in tokens]
    def __len__(self):
        return len(self.idx_to_token)
    def to_tokens(self,indexes):
        if not isinstance(indexes,(list,tuple)):
            return self.idx_to_token[indexes]
        return [self.to_tokens(index) for index in indexes]
            
def corpus_count(tokens):
    if len(tokens) == 0 or isinstance(tokens[0],list):
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)

def truncate_pad(line, num_steps, padding_token):
    if len(line) > num_steps:
        return line[:num_steps] 
    return line + [padding_token] * (num_steps - len(line)) 

def load_array(data_arrays, batch_size, is_train=True): 
    dataset = data.TensorDataset(*data_arrays)
    return data.DataLoader(dataset, batch_size, shuffle=is_train)

def try_all_gpus(): 
    devices = [torch.device(f'cuda:{i}')
             for i in range(torch.cuda.device_count())]
    return devices if devices else [torch.device('cpu')]

def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.xavier_uniform_(m.weight)
    if type(m) == nn.LSTM:
        for param in m._flat_weights_names:
            if "weight" in param:
                nn.init.xavier_uniform_(m._parameters[param])

def load_imdb_data(batch_size, num_steps=1000):
    data_dir = '/mnt/aclImdb'#Path to download dataset
    train_data = read_data(data_dir,True)
    test_data = read_test_data(data_dir,False)
    train_tokens_pre = preprocess(train_data[0])
    test_tokens_pre = preprocess(test_data[0])
    train_tokens = tokenize(train_tokens_pre)
    test_tokens = tokenize(test_tokens_pre)
    vocab = vocabulary(train_tokens, min_freg = 5,reserved_token = ['<pad>'])
    train_features = torch.tensor([truncate_pad(
        vocab[line], num_steps, vocab['<pad>']) for line in train_tokens])
    test_features = torch.tensor([truncate_pad(
        vocab[line], num_steps, vocab['<pad>']) for line in test_tokens])
    train_iter = load_array((train_features, torch.tensor(train_data[1])),
                                batch_size)
    test_iter = load_array((test_features, torch.tensor(test_data[1])),
                               batch_size,
                               is_train=False)
    return train_iter, test_iter,train_features,torch.tensor(train_data[1]), vocab


In [3]:
#Bi-LSTM Model
class Model(nn.Module):
    def __init__(self,vocab_size,embed_size,num_hiddens,num_layers,dropout,**kwargs):
        super(Model,self).__init__(**kwargs)
        self.embedding = nn.Embedding(vocab_size,embed_size)
        self.encoder = nn.LSTM(embed_size,num_hiddens,num_layers=num_layers,bidirectional=True)
        self.dense = nn.Linear(num_hiddens * 4 ,2)
        self.dropout = nn.Dropout(dropout)
    def forward(self,inputs):
        inputs = self.embedding(inputs.T)
        self.encoder.flatten_parameters()
        output,_ = self.encoder(inputs)
        encoding = torch.cat((output[0],output[-1]),dim = 1)
        outs = self.dense(self.dropout(encoding))
        return outs

In [4]:
def put_embedding(net):#load the pretrained 100-dimensional GloVe
    glove_embedding = d2l.TokenEmbedding('glove.6b.100d')
    embeds = glove_embedding[vocab.idx_to_token]
    net.embedding.weight.data.copy_(embeds)
    net.embedding.weight.requires_grad = True

def train(net,train_iter,lr,num_epochs,device):
    print('---------------------------start---------------------')
    loss = nn.CrossEntropyLoss(reduction="none")
    optimizer = torch.optim.AdamW(net.parameters(),lr=lr)
    net = net.to(device[0])
    for epoch in range(num_epochs):
        net.train()
        print(f' epoch {epoch+1}')
        train_losses = []
        train_accs = []
        train_length = 0
        for batch in tqdm(train_iter):
            x, y = batch
            x = x.to(device[0])
            y = y.to(device[0])
            logits = net(x)
            l = loss(logits,y)
            optimizer.zero_grad()
            l.sum().backward()
            optimizer.step()
            acc = (logits.argmax(dim=-1) == y.to(device[0])).float().mean()
            train_losses.append(l.sum())
            train_accs.append(acc)
            train_length += len(y)
        print("Learning rate for epoch %d：%f" % (epoch+1,optimizer.param_groups[0]['lr']))
        train_loss = sum(train_losses) / train_length
        train_acc = sum(train_accs) / len(train_iter)
        print(f"[ Train | {epoch + 1:03d}/{num_epochs:03d} ] loss = {train_loss:.5f}   acc = {train_acc:.5f}")
    print('Training process has finished. Saving trained model.')
    print('saving model with loss {:.3f}'.format(train_loss))
    save_path = f'./model_lstm.pth'
    torch.save(net.state_dict(),save_path)


def init_trigger_tokens(trigger,num_trigger_tokens):#Initialize trigger tokens
    trigger_token_ids = [vocab[trigger] ]* num_trigger_tokens
    trigger_token_tensor = torch.tensor(trigger_token_ids)
    return trigger_token_tensor

def evaluate(net,test_iter,trigger_token_tensor):#evaluate the accuracy of the model after concatenating the initial trigger token
    net.eval()
    valid_accs = []
    m = deepcopy(trigger_token_tensor)
    m = m.unsqueeze(0)
    with torch.no_grad():
        for batch in tqdm(test_iter):
            x, y = batch
            x = torch.cat((x[:,:position],m.repeat_interleave(x.shape[0],dim = 0),x[:,position:]),dim = 1)
            logits = net(x.to(device[0]))
            acc = (logits.argmax(dim=-1) == y.to(device[0])).float().mean()
            valid_accs.append(acc)
    valid_acc = sum(valid_accs)/len(test_iter)
    return valid_acc

def extract_grad_hook(net, grad_in, grad_out):#store the gradient in extracted_grads
    extracted_grads.append(grad_out[0].mean(dim = 1))
def add_hook(net):
    for module in net.modules():
            if isinstance(module, nn.Embedding):
                hook = module.register_backward_hook(extract_grad_hook)
    return hook

def get_gradient(net,test_iter,trigger_token_tensor):#Calculate the loss to get the gradient
    net.train()
    loss = nn.CrossEntropyLoss()
    m = deepcopy(trigger_token_tensor)
    m = m.unsqueeze(0)
    optimizer = torch.optim.AdamW(net.parameters())
    for batch in tqdm(test_iter):
        x, y = batch
        x = torch.cat((x[:,:position],m.repeat_interleave(x.shape[0],dim = 0),x[:,position:]),dim = 1)
        x = x.to(device[0])
        y = y.to(device[0])
        l = loss(net(x),y)
        optimizer.zero_grad()
        l.backward()
    

def process_gradient(length,num_trigger_tokens):#Process the gradient to get the average gradient
    extracted_grads_copy = deepcopy(extracted_grads)
    extracted_grads_copy[0] = extracted_grads_copy[0].cpu()
    temp = extracted_grads_copy[0]
    temp = temp.unsqueeze(0)
    for i in range(1,length-1):
        extracted_grads_copy[i] = extracted_grads_copy[i].cpu()
        extracted_grads_copy[i] = extracted_grads_copy[i].unsqueeze(0)
        temp = torch.cat((temp,extracted_grads_copy[i]),dim = 0)
    average_grad = temp.mean(dim = 0)[position:position+num_trigger_tokens]
    return average_grad

def hotflip_attack(averaged_grad, embedding_matrix,
                    num_candidates=1,increase_loss=False):#Return candidates according to Equation 3
    averaged_grad = averaged_grad.cpu()
    embedding_matrix = embedding_matrix.cpu()
    averaged_grad = averaged_grad.unsqueeze(0)
    gradient_dot_embedding_matrix = torch.einsum("bij,kj->bik",
                                                 (averaged_grad, embedding_matrix))#Equation 3
    if not increase_loss:
        gradient_dot_embedding_matrix *= -1 
    if num_candidates > 1: 
        _, best_k_ids = torch.topk(gradient_dot_embedding_matrix, num_candidates, dim=2)
        return best_k_ids.detach().cpu().numpy()[0]#Return candidates
    _, best_at_each_step = gradient_dot_embedding_matrix.max(2)
    return best_at_each_step[0].detach().cpu().numpy()

def get_embedding_weight(net):
    for module in net.modules():
            if isinstance(module, nn.Embedding):
                weight =  module.weight
    return weight

#
def select_best_candid(net,test_iter,candid_trigger,trigger_token,valid_acc):#Concatenate each candidate to each input to determine the final trigger token
    trigger_token = trigger_token.unsqueeze(0)
    net.eval()
    valid_accs = []
    for i in range(candid_trigger.shape[0]):
        trigger_token_temp = deepcopy(trigger_token)
        for j in range(candid_trigger.shape[1]):
            trigger_token_temp[0,i] = candid_trigger[i,j]
            valid_accs = []
            for batch in tqdm(test_iter):
                x, y = batch
                x = torch.cat((x[:,:position],trigger_token_temp.repeat_interleave(x.shape[0],dim = 0),
                               x[:,position:]),dim = 1)
                logits = net(x.to(device[0]))
                acc = (logits.argmax(dim=-1) == y.to(device[0])).float().mean()
                valid_accs.append(acc)
            temp = sum(valid_accs)/len(test_iter)
            if temp < valid_acc:
                valid_acc = temp 
                trigger_token[0,i] = candid_trigger[i,j]
    return trigger_token[0],valid_acc#Return the final trigger token and the accuracy after the attack

def collection_attack(net,test_iter,num_candidates,num_epoch,trigger = 'the',#Summarize each function
                      num_trigger_tokens=3):
    trigger_token_tensor = init_trigger_tokens(trigger,num_trigger_tokens)
    valid_acc = evaluate(net,test_iter,trigger_token_tensor)
    print(f'unattacked state：the accuracy {valid_acc:.5f}')
    embedding_weight = get_embedding_weight(net)
    for i in range(num_epoch):
        torch.cuda.empty_cache()
        extracted_grads.clear()
        hook = add_hook(net)
        get_gradient(net,test_iter,trigger_token_tensor)
        hook.remove()
        average_grad = process_gradient(len(test_iter),num_trigger_tokens)
        hot_token = hotflip_attack(average_grad,embedding_weight,num_candidates,increase_loss = True)
        hot_token_tensor = torch.from_numpy(hot_token)
        #print(hot_token_tensor)
        trigger_token_tensor,valid_acc = select_best_candid(net,test_iter,hot_token_tensor,trigger_token_tensor,valid_acc)
        print(f'after {i+1} rounds of attacking\ntriggers: {trigger_token_tensor} \ntriggers tokens:{vocab.to_tokens(trigger_token_tensor.numpy().tolist())} \nthe accuracy :{valid_acc:.5f} ')
    return trigger_token_tensor,valid_acc#Return the final trigger tokens (trigger length) and the accuracy after the attack

In [5]:
batch_size = 256
train_iter, test_iter,train_features,train_labels,vocab = load_imdb_data(batch_size)#Data preprocessing and loading

In [25]:
#Model initialization
embed_size, num_hiddens, num_layers, device, dropout = 100, 100, 2, try_all_gpus(), 0.1
net = Model(len(vocab), embed_size, num_hiddens, num_layers,dropout)
net.apply(init_weights)
put_embedding(net)

In [4]:
#Train the model
lr, num_epochs = 0.01, 3
train(net,train_iter,lr,num_epochs,device)

In [5]:
#num_layers = 6,8
#num_hiddens = 200,300,400,500
#The learning rate and number of epochs apply to the above hyperparameter models
lr, num_epochs = 0.001, 7
train(net,train_iter,lr,num_epochs,device)

In [10]:
#The accuracy of the model on the test set when no trigger token is concatenated
def evaluate_no(net,test_iter):
    net.eval()
    valid_accs = []
    with torch.no_grad():
        for batch in tqdm(test_iter):
            x, y = batch
            logits = net(x.to(device[0]))
            acc = (logits.argmax(dim=-1) == y.to(device[0])).float().mean()
            valid_accs.append(acc)
    valid_acc = sum(valid_accs)/len(test_iter)
    print(f'without any trigger token：the accuracy {valid_acc:.5f}')

In [6]:
evaluate_no(net,test_iter)

In [7]:
num_candidates,num_epoch = 5,10
trigger_token_tensor,valid_acc = collection_attack(net,test_iter,num_candidates,num_epoch,trigger='the',num_trigger_tokens=3)

In [8]:
def predict_sentiment(net, vocab, sequence):
    """The model's prediction for an input"""
    sequence = torch.tensor(vocab[sequence.split()], device=d2l.try_gpu())
    data = net(sequence.reshape(1, -1))
    label = torch.argmax(net(sequence.reshape(1, -1)), dim=1)
    return data,'positive' if label == 1 else 'negative'

In [8]:
# for Figure 1
predict_sentiment(net,vocab,"My boyfriend and I went to watch The Guardian.At first I didn't want to watch it, but I loved the movie- It was definitely the best movie I have seen in sometime.They portrayed the USCG very well, it really showed me what they do and I think they should really be appreciated more.Not only did it teach but it was a really good movie. The movie shows what the really do and how hard the job is.I think being a USCG would be challenging and very scary. It was a great movie all around. I would suggest this movie for anyone to see.The ending broke my heart but I know why he did it. The storyline was great I give it 2 thumbs up. I cried it was very emotional, I would give it a 20 if I could!")