In [16]:
import nltk
import string
import numpy as np
%matplotlib inline
from nltk import word_tokenize
import matplotlib.pyplot as plt
from nltk.corpus import stopwords
from sklearn import metrics
import warnings
warnings.filterwarnings("ignore")

enstop = stopwords.words('english')
punct = string.punctuation

def tokenizer(sent):
    sent = sent.lower()
    tmp = word_tokenize(sent)
    res = []
    for word in tmp:
        if word not in enstop and word not in punct:
            res.append(word)
    return res

### AGnews data read:

In [17]:
import torch
import torch.nn as nn
from torchtext import data
from torchtext import vocab

text_field = data.Field(tokenize=tokenizer, lower=True, include_lengths=True, fix_length=256)
label_field = data.Field(sequential=False, use_vocab=False, dtype=torch.long)
train, valid, test = data.TabularDataset.splits(path='AGnews',
                                                train='train_ag1.csv',
                                                validation='val_ag1.csv',
                                                test='test_ag1.csv',
                                                format='csv', skip_header=True,
                                                fields=[('sentence', text_field), ('label', label_field)])

### Read the GloVe word vector:

In [18]:
vec = vocab.Vectors(name='glove.6B.300d.txt')
text_field.build_vocab(train, valid, test, max_size=250000, vectors=vec,
                       unk_init=torch.Tensor.normal_)
label_field.build_vocab(train, valid, test)

In [19]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_iter, valid_iter, test_iter = data.BucketIterator.splits((train, valid, test), batch_sizes=(128, 128, 128),
                                                               sort_key=lambda x: len(x.sentence),
                                                               sort_within_batch=True,
                                                               repeat=False, shuffle=True,
                                                               device=device)

### Model training function (standard training and DropAttack adversarial training)

In [20]:
def train(model, train_iter, dev_iter, num_epoch, opt, criterion, eva, out_model_file):
    print("Training begin!")
    model.train()
    loss_list = []
    dev_acc = []
    train_acc = []
    best_dev_acc = 0.
    for epoch in range(num_epoch):
        total_loss = 0.
        for batch in train_iter:
            output = model(batch.sentence)
            loss = criterion(output, batch.label)
            opt.zero_grad()
            loss.backward()
            opt.step()
            total_loss += loss.item()
        loss_list.append(total_loss)
        dev_acc.append(eva(model, dev_iter))
        train_acc.append(eva(model,train_iter))
        print(f"Epoch: {epoch+1}/{num_epoch}. Total loss: {total_loss:.3f}.Train_Acc: {train_acc[-1]:.3%}. Validation Set Acc: {dev_acc[-1]:.3%}.")
        if dev_acc[-1] > best_dev_acc:
            best_dev_acc = dev_acc[-1]
            torch.save(model.state_dict(), out_model_file)
    return loss_list, dev_acc
# import torch
class DropAttack():
    def __init__(self, model):
        self.model = model
        self.param_backup = {}
        self.grad_backup = {}
        self.mask_backup = {}

    def attack(self, epsilon=5.0, p_attack =0.5, param_name='embed.weight', is_first_attack=False):
        # The emb_name parameter should be replaced with the name of the parameter to be attacked in your model
        for name, param in self.model.named_parameters():
            if param.requires_grad and param_name == name:
                if is_first_attack:
                    self.param_backup[name] = param.data.clone()
                    mask = np.random.binomial(n=1, p=p_attack, size= param.grad.shape)
                    mask = torch.from_numpy(mask).float()  # attack mask
                    self.mask_backup['mask'] = mask.clone()
                else: mask = self.mask_backup['mask']
                norm = torch.norm(param.grad)
                if norm != 0 and not torch.isnan(norm):
                    r_at = epsilon * param.grad / norm
                    r_at *= mask.cuda()   # Randomly attack some of the parameters
                    param.data.add_(r_at)    

    def restore(self, param_name='embed.weight'):
        for name, param in self.model.named_parameters():
            if param.requires_grad and param_name == name: 
                assert name in self.param_backup
                param.data = self.param_backup[name]
                param_backup = {}

    def backup_grad(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                self.grad_backup[name] = param.grad.clone()

    def restore_grad(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                param.grad = self.grad_backup[name]
                grad_backup = {}
        
def train_DA(model, train_iter, dev_iter, num_epoch, opt, criterion, eva, out_model_file):
    K = 1
    print(f'Adversarial training begin! (DropAttack-{K})')
    model.train()
    dropattack = DropAttack(model)
    loss_list = []
    dev_acc = []
    best_dev_acc = 0.
    for epoch in range(num_epoch):
        total_loss = 0.
        model.train()
        for batch in train_iter:
            output = model(batch.sentence)
            loss = criterion(output, batch.label)
            loss.backward(retain_graph=True)  # Calculate the original gradient
            dropattack.backup_grad()    # Backup the initial gradient
            # Attack the embedding layer
            for t in range(K):
                dropattack.attack(5, 0.7, 'embed.weight', is_first_attack=(t==0))  # Add adversarial disturbance to the parameters, backup param.data for the first attack
                output = model(batch.sentence)
                loss_adv1 = criterion(output, batch.label)/K
                loss_adv1.backward(retain_graph=True) # # Backpropagation, and accumulate the gradient of the adversarial training based on the normal grad
                loss += loss_adv1
            dropattack.restore('embed.weight') # # Restore the disturbed parameters
            
            dropattack.restore_grad() 
            # Attack the hidden layer
            for t in range(K):
                dropattack.attack(5, 0.7, 'rnn.rnn.weight_ih_l0', is_first_attack=(t==0)) # Add adversarial disturbance to the parameters, backup param.data for the first attack
                output = model(batch.sentence)
                loss_adv2 = criterion(output, batch.label)/K
                loss_adv2.backward(retain_graph=True) # Backpropagation, and accumulate the gradient of the adversarial training based on the normal grad
                loss += loss_adv2
            dropattack.restore('rnn.rnn.weight_ih_l0') # Restore the disturbed parameters
            opt.zero_grad()
            
            loss.backward()
            opt.step()  # Update parameters
            # loss = loss + loss_adv1 + loss_adv2
            total_loss += loss.item()
            opt.zero_grad()
        loss_list.append(total_loss)
        dev_acc.append(eva(model, dev_iter))
        print(f"Epoch: {epoch+1}/{num_epoch}. Total loss: {total_loss:.3f}. Validation Set Acc: {dev_acc[-1]:.3%}.")
        if dev_acc[-1] > best_dev_acc:
            best_dev_acc = dev_acc[-1]
            torch.save(model.state_dict(), out_model_file)
    return loss_list, dev_acc


### Model evaluation function

In [21]:
def eva(model, data_iter):
    correct, count = 0, 0
    with torch.no_grad():
        for batch in data_iter:
            pred = model(batch.sentence)
            pred = torch.argmax(pred, dim=-1)
            correct += (pred == batch.label).sum().item()
            count += len(pred)
    return correct / count

### BiGRU model

In [22]:
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, bidirectional):
        super(LSTM, self).__init__()
        self.rnn = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                           num_layers=num_layers, bidirectional=bidirectional)
            
    def forward(self, x, length):
        packed_x = nn.utils.rnn.pack_padded_sequence(x, length)
        packed_output, (hidden, cell) = self.rnn(packed_x)
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)
        return hidden, output

In [23]:
class GRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, bidirectional):
        super(GRU, self).__init__()
        self.rnn = nn.GRU(input_size=input_size, hidden_size=hidden_size,
                          num_layers=num_layers, bidirectional=bidirectional)
            
    def forward(self, x, length):
        packed_x = nn.utils.rnn.pack_padded_sequence(x, length)
        packed_output, hidden = self.rnn(packed_x)
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)
        return hidden, output

In [24]:
class TextRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, num_layers, bidirectional, out_dim,
                 pretrained_embed, use_gru=False, freeze=True,
                 random_embed=False, vocab_size=None):
        super(TextRNN, self).__init__()
        if random_embed:
            self.embed = nn.Embedding(vocab_size, embed_size)
        else:
            self.embed = nn.Embedding.from_pretrained(pretrained_embed, freeze=False)
        if use_gru:
            self.rnn = GRU(embed_size, hidden_size, num_layers, bidirectional)
        else:
            self.rnn = LSTM(embed_size, hidden_size, num_layers, bidirectional)
        self.proj = nn.Linear(2*hidden_size, out_dim)
    
    def forward(self, x):
        text, text_length = x # text: [seq_len, bs]
        text = text.permute(1, 0) # text: [bs, seq_len]
        embed_x = self.embed(text) # embed_x: [bs, seq_len, embed_dim]
        embed_x = embed_x.permute(1, 0, 2) # embed_x: [seq_len, bs, embed_dim]
        hidden, _ = self.rnn(embed_x, text_length) # hidden: [2*num_layers, bs, hidden_size]
        hidden = torch.cat((hidden[-1,:,:], hidden[-2,:,:]), dim=1)
        return self.proj(hidden)

### Training

In [30]:
embed_size = 300
hidden_size = 300
num_layers = 2
bidirectional = True
out_dim = 4
pretrained_embed = text_field.vocab.vectors
lr = 0.001
num_epoch = 5
freeze = False
use_gru = True
random_embed = False
vocab_size = len(text_field.vocab.stoi)
out_model_file = 'textrnn_AG_DA.pt'
# ————————————————————————————————————————————————————————
use_dropattack = True   # Whether to use DropAttack
# ————————————————————————————————————————————————————————
model = TextRNN(embed_size, hidden_size, num_layers, bidirectional, out_dim,
                                pretrained_embed, use_gru=use_gru, freeze=freeze,
                               random_embed=random_embed, vocab_size=None).to(device)
opt = torch.optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

if use_dropattack:
    loss_list, dev_acc_list = train_DA(model, train_iter, valid_iter, num_epoch, opt, criterion, eva, out_model_file)
else:
    loss_list, dev_acc_list = train(model, train_iter, valid_iter, num_epoch, opt, criterion, eva, out_model_file)

Adversarial training begin! (DropAttack-1)
Epoch: 1/5. Total loss: 834.563. Validation Set Acc: 93.090%.
Epoch: 2/5. Total loss: 495.465. Validation Set Acc: 93.540%.
Epoch: 3/5. Total loss: 260.206. Validation Set Acc: 93.080%.
Epoch: 4/5. Total loss: 119.290. Validation Set Acc: 91.320%.
Epoch: 5/5. Total loss: 66.564. Validation Set Acc: 92.400%.


In [32]:
model = TextRNN(embed_size, hidden_size, num_layers, bidirectional, out_dim,
                             pretrained_embed, use_gru=use_gru, freeze=freeze,
                               random_embed=random_embed, vocab_size=None).to(device)
model.load_state_dict(torch.load('textrnn_AG_DA.pt'))
print(f"Test set acc: {eva(model, test_iter):.3%}")

Test set acc: 93.250%
