In [1]:
# !pip install transformers
# !pip install jsonlines
# !pip install python-levenshtein
# !pip install datasets

In [2]:
from classifiers import TextCNN, TextGRU

In [3]:
import Levenshtein as Lev
import torch
from torch.utils.data import Dataset as TorchDataset
from typing import Sequence, Dict, Any, List
import json
from tqdm.notebook import tqdm
import torch
from torch.utils.data import DataLoader
from torch.utils.data import Dataset as TorchDataset
import torch.nn.functional as F
import torch.nn as nn
from datasets import load_dataset

In [4]:
from transformers import AutoTokenizer
from transformers import BertLMHeadModel
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
maskedlm = BertLMHeadModel.from_pretrained('bert-base-uncased')

If you want to use `BertLMHeadModel` as a standalone, add `is_decoder=True.`
Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertLMHeadModel: ['cls.seq_relationship.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertLMHeadModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertLMHeadModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [5]:
# number of batches
# (len(train_dataset) + 64 - 1) // 64

In [6]:
train_dataset = load_dataset('trec', split='train')
train_dataset = train_dataset.map(
    lambda e: tokenizer(e['text'], truncation=True, padding='do_not_pad'),
    batched=True
    )
test_dataset = load_dataset('trec', split='test')
test_dataset = test_dataset.map(
    lambda e: tokenizer(e['text'], truncation=True, padding='do_not_pad'),
    batched=True
    )

def collate_fn(batch):
    attention_mask, input_ids, label_coarse = [], [], []
    for b in batch:
        attention_mask.append(torch.tensor(b['attention_mask']))
        input_ids.append(torch.tensor(b['input_ids']))
        label_coarse.append(torch.tensor(b['label-coarse']))
    return {'attention_mask':nn.utils.rnn.pad_sequence(attention_mask, batch_first=True, padding_value=0.0),
            'input_ids':nn.utils.rnn.pad_sequence(input_ids, batch_first=True, padding_value=0.0),
            'label-coarse':label_coarse}

trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=1, drop_last=True, collate_fn=collate_fn, shuffle=True)
testloader = torch.utils.data.DataLoader(test_dataset, batch_size=1, drop_last=True, collate_fn=collate_fn, shuffle=True)

Using custom data configuration default
Reusing dataset trec (/root/.cache/huggingface/datasets/trec/default/1.1.0/1902c380fe66cc215f989888b1b35e8da7e79a3a97520f00dce753fd1f8f5c48)
Loading cached processed dataset at /root/.cache/huggingface/datasets/trec/default/1.1.0/1902c380fe66cc215f989888b1b35e8da7e79a3a97520f00dce753fd1f8f5c48/cache-0a0c25324a4451d5.arrow
Using custom data configuration default
Reusing dataset trec (/root/.cache/huggingface/datasets/trec/default/1.1.0/1902c380fe66cc215f989888b1b35e8da7e79a3a97520f00dce753fd1f8f5c48)
Loading cached processed dataset at /root/.cache/huggingface/datasets/trec/default/1.1.0/1902c380fe66cc215f989888b1b35e8da7e79a3a97520f00dce753fd1f8f5c48/cache-20fadfc778bf470d.arrow


In [7]:
class Deep_lev(torch.nn.Module):

    def __init__(self, vocab_size=30522, embedding_dim=128, hidden_dim=128) :
        super().__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.encoder = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.linear = nn.Linear(hidden_dim * 3, 1)


    def encode_sequence(self, sequence):
        embedded_sequence = self.embeddings(sequence)
        out, (ht, ct) = self.encoder(embedded_sequence)
        return ht[-1]

    def get_embeddings(self, onehots_a, onehots_b):
        embs_a = torch.stack([torch.matmul(v, self.embeddings.weight) for v in onehots_a])
        
        return embs_a

    def forward_on_embeddings(self, sequence_a, sequence_b):
        embs_a = self.get_embeddings(sequence_a, sequence_b)
        _, (embedded_sequence_a, _) = self.encoder(embs_a)
        embedded_sequence_a= embedded_sequence_a[-1]
        
        embedded_sequence_b = self.encode_sequence(sequence_b)
        diff = torch.abs(embedded_sequence_a - embedded_sequence_b)
        representation = torch.cat([embedded_sequence_a, embedded_sequence_b, diff], dim=-1)
        approx_distance = self.linear(representation)

        return approx_distance


    def forward(self, sequence_a, sequence_b):
        embedded_sequence_a = self.encode_sequence(sequence_a)
        embedded_sequence_b = self.encode_sequence(sequence_b)
        diff = torch.abs(embedded_sequence_a - embedded_sequence_b)
        representation = torch.cat([embedded_sequence_a, embedded_sequence_b, diff], dim=-1)

        approx_distance = self.linear(representation)

        return approx_distance

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device('cpu')
deep_lev = Deep_lev()
deep_lev.load_state_dict(torch.load('deep_levenstein.pt'))
deep_lev = deep_lev.to(device)

In [9]:
classifier = TextCNN(vocab_size=tokenizer.vocab_size,
                emb_dim=100,
                n_filters=8,
                filter_sizes=[3,4,5],
                output_dim=6,
                dropout=0.1,
                pad_idx=tokenizer.pad_token_id)

classifier = classifier.to(device)
sub_classifier =  TextGRU(
    vocab_size=tokenizer.vocab_size,
    emb_dim=100,
    hidden_dim=128,
    out_dim=6,
    dropout=0.1,
    pad_idx=tokenizer.pad_token_id
    )
classifier.eval()
classifier.load_state_dict(torch.load('textcnn_trec.pt'))
sub_classifier.load_state_dict(torch.load('textrnn_trec.pt'))
sub_classifier = sub_classifier.to(device)
maskedlm = maskedlm.to(device)

  "num_layers={}".format(dropout, num_layers))


In [10]:
def dilma_loss(preds, approx_distance, label, beta = 1.):
    '''
    -log((1-Classifier(x'))) + beta * (1-DL(x, x'))**2 
    '''
    global device
    pred = preds.softmax(1)
    one2 = torch.full(approx_distance.squeeze(1).shape, 5, device=device)
    clf_term = -torch.log(1 - pred[label[0].item()].mean())
    dl_term = beta * ((1 - approx_distance.squeeze(1)).mean()**2)
    return (clf_term.mean() + dl_term).mean()

In [11]:
import numpy as np
import random 

def mask_tokens(batch, tokenizer):
    batch_masked, masked_inds = [], []
    for b in batch:
        limit = b[b!=tokenizer.pad_token_id].shape[0]
        inds_to_mask = np.random.choice(np.arange(1, limit-1), size=1)
        masked_inds.append(inds_to_mask)
        b_new = b.clone()
        b_new[inds_to_mask] = tokenizer.mask_token_id
        batch_masked.append(b_new)
    return torch.stack(batch_masked), masked_inds

In [12]:
def categorical_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    max_preds = preds.argmax(dim=1, keepdim=True) # get the index of the max probability
    # pred_ = torch.argmax(torch.softmax(preds, dim=1), dim =1)

    # corrrect = pred_[torch.stack(batch['label-coarse']) == 5]
    correct = max_preds.squeeze(1).eq(y)
    correct = correct.detach().to('cpu')
    return correct.sum() / torch.FloatTensor([y.shape[0]])

def get_text_length(batch, tokenizer):
    result = []
    for i in range(batch.shape[1]):
        result.append((sum(batch[:, i] != tokenizer.pad_token_id).item()))
    return torch.tensor(result, dtype=int, device='cpu')

def categorical_accuracy_for_gumbel(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    max_preds = preds.argmax(-1) # get the index of the max probability
    # pred_ = torch.argmax(torch.softmax(preds, dim=1), dim =1)

    # corrrect = pred_[torch.stack(batch['label-coarse']) == 5]
    correct = max_preds.eq(y)
    correct = correct.detach().to('cpu')
    return correct.sum() / torch.FloatTensor([y.shape[0]])

def delete_start_end_tokens(seq):
    return seq[:, 1:-1]

def calculate_wer(text_a, text_b):
    b = set(text_a.split() + text_b.split())
    word2char = dict(zip(b, range(len(b))))

    w1 = [chr(word2char[w]) for w in text_a.split()]
    w2 = [chr(word2char[w]) for w in text_b.split()]

    return Lev.distance(''.join(w1), ''.join(w2))

In [13]:
optimizer = torch.optim.Adam(maskedlm.parameters(), lr=0.001)
sub_classifier = classifier
# from transformers import AdamW, get_linear_schedule_with_warmup
# optimizer = AdamW(maskedlm.parameters(), lr=1e-3)

In [14]:
torch.backends.cudnn.enabled = False

In [15]:
def update_weights(lm_model):
    for parameter in lm_model.parameters():
        parameter.data -= 0.001 * parameter.grad.data
        parameter.grad = None

In [16]:
from copy import deepcopy

losses = []
classifier.eval()
deep_lev.eval()
n_epoch = 11
num_steps = 8

res = []
for i, batch in enumerate(trainloader):
    if i < 1000:
        maskedlm_ = deepcopy(maskedlm)
        maskedlm_ = maskedlm_.to(device)
        maskedlm_.train()
        # for name, param in maskedlm_.named_parameters():
        #     param.requires_grad = True
        optimizer = torch.optim.Adam(maskedlm_.parameters(), lr=0.001)
        for j in range(num_steps):
            optimizer.zero_grad()
            maskedlm_.zero_grad()
            label = torch.stack(batch['label-coarse']).to(device)
            label = torch.cat(10*[label]).to(device)
            b_input_ids = batch['input_ids'].to(device)
            b_input_mask = batch['attention_mask'].to(device)
            b_masked, masked_inds = mask_tokens(b_input_ids, tokenizer)
            b_masked = b_masked.to(device)
            logits = maskedlm_(b_masked, attention_mask=b_input_mask)
            #get gumbel_samples
            gumbel_samples = torch.cat([torch.nn.functional.gumbel_softmax(logits['logits'], tau = 1.8, hard=True) for _ in range(10)])
            gumbel_samples = gumbel_samples.to(device)
            b_input_ids = torch.cat(10*[b_input_ids]).to(device)
            approx_distance = deep_lev.forward_on_embeddings(gumbel_samples, b_input_ids)
            scores = classifier.forward_on_embeddings(gumbel_samples)
            scores_orig = classifier(b_input_ids)
            scores_orig = scores_orig.to(device)
            loss = dilma_loss(scores.to(device), approx_distance.to(device), label, beta=5)
            loss.backward()

            update_weights(maskedlm_)
            optimizer.step()

        with torch.no_grad():
            label = torch.stack(batch['label-coarse']).to(device)
            label = torch.cat(10*[label]).to(device)
            b_input_ids = batch['input_ids'].to(device)
            b_input_mask = batch['attention_mask'].to(device)
            b_masked, masked_inds = mask_tokens(b_input_ids, tokenizer)
            b_masked = b_masked.to(device)
            logits = maskedlm_(b_masked, attention_mask=b_input_mask)
            #get gumbel_samples
            gumbel_samples = torch.cat([torch.nn.functional.gumbel_softmax(logits['logits'], tau = 1.8, hard=True) for _ in range(10)])
            gumbel_samples = gumbel_samples.to(device)
            b_input_ids = torch.cat(10*[b_input_ids]).to(device)
            scores = classifier.forward_on_embeddings(gumbel_samples)
            scores_orig = classifier(b_input_ids)
            
            res.append({'advs':gumbel_samples.argmax(-1).detach().cpu(),
                        'origs': b_input_ids.detach().cpu(),
                        'scores': scores.softmax(-1).argmax(-1).detach().cpu(),
                        'scores_orig': scores_orig.softmax(-1).argmax(-1).detach().cpu()})
    else:
        break

In [21]:
stats = []
for x in res:
    out = {'wer': [],
       'adversarial' : [],
       'original' : []}
    if len(x['scores'][x['scores']!=x['scores_orig']]) > 0:
        for i, _ in enumerate(x['scores'][x['scores']!=x['scores_orig']].numpy()):
            out['wer'].append(calculate_wer(' '.join(t for t in [tokenizer.decode(a) for a in x['advs'][:, 1:-1][i, :]]), ' '.join(c for c in [tokenizer.decode(b) for b in x['origs'][:, 1:-1][i, :]])))
            out['adversarial'].append(' '.join(t for t in [tokenizer.decode(a) for a in x['advs'][i, :]]))
            out['original'].append(' '.join(c for c in [tokenizer.decode(b) for b in x['origs'][i, :]]))
        stats.append(out)
    else:
        continue

Calculate NAD

In [22]:
nad_value = 0
for x in stats:
    nad_value += 1/min(x['wer']) if min(x['wer'])>0 else 1
print(nad_value * 1/1000)

0.36394999999999994


In [23]:
stats

[{'adversarial': ['footing what is the cost of the drugs used in those their ? series',
   'hostilities what is the cost of the drugs used in the treatments ? ?',
   'successor what is the cost of the drugs used in the treatments ? of',
   'fear what is the cost of the drugs used in the treatments ? ?',
   'ip what is the cost of the drugs used in the treatments ? states',
   'nightmare what is the cost of the drugs used in these treatments ? is'],
  'original': ['[CLS] what is the cost of the drugs used in tuberculosis treatments ? [SEP]',
   '[CLS] what is the cost of the drugs used in tuberculosis treatments ? [SEP]',
   '[CLS] what is the cost of the drugs used in tuberculosis treatments ? [SEP]',
   '[CLS] what is the cost of the drugs used in tuberculosis treatments ? [SEP]',
   '[CLS] what is the cost of the drugs used in tuberculosis treatments ? [SEP]',
   '[CLS] what is the cost of the drugs used in tuberculosis treatments ? [SEP]'],
  'wer': [2, 1, 1, 1, 1, 1]},
 {'adversari

In [None]:
clean_results = []
for x in res:
    clean_results.append({
        'advs':x['advs'].detach().cpu().numpy(),
        'origs' : x['origs'].detach().cpu().numpy(),
        'scores' : x['scores'].detach().cpu().numpy(),
        'scores_orgi' : x['scores_orig'].detach().cpu().numpy()})

In [70]:
def collect_statistics(gumbel_samples, scores, b_input_ids, scores_orig):
    adv = delete_start_end_tokens(gumbel_samples.argmax(-1))
    scores = scores.softmax(-1).argmax(-1)[:10]
    scores_orig = scores_orig.softmax(-1).argmax(-1)[:10]
    true_class = scores_orig[0].item()
    best_attack = {
        'adv_seq': [],
        'orig_seq': [],
        'wer':[]}
    print()
    for i in adv[scores!=true_class]:
        best_attack['wer'] = calculate_wer(' '.join(y for y in [tokenizer.decode(x) for x in adv[i, :]]),
                                    ' '.join(y for y in [tokenizer.decode(x) for x in b_input_ids[:, 1:-1][0, :]]))
        best_attack['adv_seq'].append(' '.join(y for y in [tokenizer.decode(x) for x in adv[i, :]]))
        best_attack['orig_seq'].append(' '.join(y for y in [tokenizer.decode(x) for x in b_input_ids[:, 1:-1][0, :]]))
    if len(best_attack['adv_seq']) > 0:
        return best_attack #sorted(best_attack, key=lambda x: x['wer'], reverse=True)
    else:
        return 0