## Solving dependencies

### Git repository, embeddings, NLTK

In [0]:
! git clone https://github.com/josipjukic/Adversarial-NLP.git
% cd /content/Adversarial-NLP/src

% mkdir .vector_cache
% cp '/content/drive/My Drive/Master Thesis/glove/glove.6B.100d.txt.pt' .vector_cache/
% cp '/content/drive/My Drive/Master Thesis/glove/counter-fitted-vectors.txt' .vector_cache/

import nltk
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')

## Dataset save/load

In [0]:
import torch
from torchtext import data
from torchtext import datasets
import spacy
import random
from preprocessing import imdb_preprocess
from data_utils import load_dataset

In [0]:
SEED = 42
torch.manual_seed(SEED)
LOAD_PATH = '/content/drive/My Drive/Master Thesis/IMDB'
MAX_VOCAB_SIZE = 25_000
EMBEDDINGS_FILE = 'glove.6B.100d'

splits, fields = load_dataset(LOAD_PATH,
                              include_lengths=True,
                              lower=False,
                              stop_words=None)
train_data, valid_data, test_data = splits
TEXT, LABEL, RAW, ID = fields
RAW.is_target = ID.is_target = False
LABEL.build_vocab(train_data)
TEXT.build_vocab(train_data, 
                 max_size=MAX_VOCAB_SIZE, 
                 vectors=EMBEDDINGS_FILE, 
                 unk_init=torch.Tensor.normal_)

In [0]:
from argparse import Namespace
from data_utils import expand_paths
from models import PackedLSTM

args = Namespace(
    # Data and Path hyper parameters
    counter_vectors_paths='.vector_cache/counter-fitted-vectors.txt',
    model_path='/content/drive/My Drive/Master Thesis/torch_models/imdb/imdb_model.torch',
    train_state_file='train_state.json',
    save_dir='/content/drive/My Drive/Master Thesis/torch_models/imdb/',
    PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token],
    UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token],
    # Model hyper parameters
    input_dim = len(TEXT.vocab),
    embedding_dim=100,
    hidden_dim=256,
    output_dim = 1,
    num_layers=2,
    bidirectional=True,
    # Training hyper parameter
    seed=SEED,
    learning_rate=0.001,
    dropout_p=0.5,
    batch_size=64,
    num_epochs=20,
    early_stopping_criteria=5,
    # Runtime option
    reload_from_files=True,
    expand_filepaths_to_save_dir=True,
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
)

pretrained_embeddings = TEXT.vocab.vectors
pretrained_embeddings[args.UNK_IDX] = torch.zeros(args.embedding_dim)
pretrained_embeddings[args.PAD_IDX] = torch.zeros(args.embedding_dim)

model = PackedLSTM(
    args.embedding_dim, 
    args.hidden_dim, 
    args.output_dim, 
    args.num_layers,
    pretrained_embeddings,
    args.bidirectional,
    args.dropout_p, 
    args.PAD_IDX,
    args.device
)
model.load_state_dict(torch.load(args.model_path, map_location=args.device))
model = model.to(args.device)
model.eval()

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size=1,
    sort_within_batch=True,
    sort_key = lambda x: len(x.text),
    device=args.device)
iterator = dict(train=train_iterator, valid=valid_iterator, test=test_iterator)

In [0]:
from data_utils import spacy_revtok
import numpy as np

num_classes = len(LABEL.vocab)
nlp = spacy.load('en', disable=['parser', 'tagger', 'ner', 'textcat'])


def binary_accuracy(model, batch):
    y_pred = model.predict(batch.text).to(args.device).squeeze()
    correct = (y_pred == batch.label).float()
    return (correct.sum() / len(correct)).item()


attack_power = 20
reg_acc = 0.
adv_acc = 0.
# for batch_index, batch in enumerate(iterator['test'], 1):
#     # print('Length: ', batch.text[0].shape[0])
#     print('Batch: ', batch_index)
#     x_in, lengths = batch.text
#     y_preds = model.predict(batch.text)
#     losses = word_target(model=model, batch=batch.text,
#                          y_preds=y_preds, num_classes=num_classes,
#                          device=args.device)
#     sorted_losses, indices = torch.sort(losses, dim=0, descending=True)
#     acc_t = binary_accuracy(model, batch)
#     reg_acc += (acc_t - reg_acc) / batch_index


#     for i in range(x_in.shape[1]):
#         inds = indices[0:attack_power,i]
#         x_in[inds,i] = 0
#         print(adversarial_text(batch.raw[i], nlp, inds, homoglyph))
#         print(batch.raw[i])

#     acc_t = binary_accuracy(model, batch)
#     adv_acc += (acc_t - adv_acc) / batch_index
#     print(reg_acc, adv_acc)
#     print('-----------------------------')
#     break

In [0]:
import numpy as np
import torch.nn.functional as F


def softmax(x):
    exp = np.exp(x)
    return exp / exp.sum()

def prob_normalize(x):
    s = np.sum(x)
    if s == 0: return x
    return x / s


class Attack():
    def __init__(self, model, LS,
                 pop_size=20, max_iters=5,
                 top_n=10, packed=True, filter_spec=False,
                 greedy=False, targeted=True,
                 device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.model = model
        self.LS = LS
        self.max_iters = max_iters
        self.pop_size = pop_size
        self.top_n = top_n  # similar words
        self.packed = packed
        self.filter_spec = filter_spec
        self.greedy = greedy
        self.targeted = targeted
        self.device = device

    def prepare_batch(self, xs):
        x_in = torch.from_numpy(xs).permute(1,0).to(self.device)
        if self.packed:
            length = x_in.shape[0]
            N = x_in.shape[1]
            return x_in, torch.tensor(length, device=self.device).repeat(N)
        else:
            return x_in

    def to_numpy(self, tensor):
        return tensor.cpu().numpy()

    def do_replace(self, x_cur, pos, new_word):
        x_new = x_cur.copy()
        x_new[pos] = new_word
        return x_new

    def select_replacement(self, pos, x_cur, x_orig, target, subs):
        new_xs = [self.do_replace(x_cur, pos, w)
                  if x_orig[pos] != w and w != 0 \
                  else x_cur \
                  for w in subs]
        
        batch = self.prepare_batch(np.array(new_xs))
        new_preds = self.model.predict_proba(batch)
        new_scores = new_preds[:, target]

        if not self.targeted:
            new_scores = 1. - new_scores

        # For greedy approach.
        # batch = self.prepare_batch(x_cur[np.newaxis, :])
        # orig_score = self.model.predict_proba(batch)[0, target]
        # new_x_scores = new_x_scores - orig_score
        # new_x_scores = self.to_numpy(new_x_scores)

        if self.greedy:
            idx = torch.argmax(new_scores)
        else:
            torch_probs = F.softmax(new_scores, dim=0)
            probs = self.to_numpy(torch_probs)
            idx = np.random.choice(len(new_xs), size=1, p=probs)[0]
        
        return new_xs[idx]

    def perturb(self, x_cur, x_orig, nghbrs, probs, target):
        x_len = probs.shape[0]
        idx = np.random.choice(x_len, size=1, p=probs)[0]
        subs = nghbrs[idx]
        
        if subs.size == 0:
            return x_cur

        return self.select_replacement(idx, x_cur, x_orig, target, subs)

    def generate_population(self, x_orig, nghbr_list,
                            probs, target, pop_size):
        
        return np.array(
            [self.perturb(
                x_orig, x_orig, nghbr_list,
                nghbr_dist, probs, target
             )
             for _ in range(pop_size)]
        )

    def crossover(self, x1, x2):
        # add different crossover
        x_new = x1.copy()
        for i in range(len(x1)):
            if np.random.uniform() < 0.5:
                x_new[i] = x2[i]
        return x_new

    def attack(self, x_orig, target, sentence=None, weights=None,
               n_candidates=10, n_substitutes=10):
        x_adv = x_orig.copy()
        
        nghbr_list = self.LS.get_candidates(words=x_orig,
                                            n_candidates=n_candidates,
                                            n_substitutes=n_substitutes,
                                            sentence=sentence)
        nghbr_len = [len(list_i) for list_i in nghbr_list]
        if weights is None:
            sub_probs = nghbr_len / np.sum(nghbr_len)
        else:
            sub_probs = prob_normalize(nghbr_len * weights)

        if self.filter_spec:
            for i, word in enumerate(x_orig):
                if word in self.LS.spec_words:
                    sub_probs[i] = 0.
            sub_probs = prob_normalize(sub_probs)

        pop = self.generate_population(
            x_orig, nghbr_list, sub_probs, target, self.pop_size)
        
        for i in range(self.max_iters):
            batch = self.prepare_batch(pop)
            pop_preds = self.to_numpy(self.model.predict_proba(batch))
            pop_scores = pop_preds[:, target]
            if not self.targeted:
                pop_scores = 1. - pop_scores
            
            top_attack = np.argmax(pop_scores)
            select_probs = softmax(pop_scores)

            print('\t\t', i, ' -- ', np.max(pop_scores))

            if self.targeted:
                if np.argmax(pop_preds[top_attack, :]) == target:
                    print('Success!')
                    return pop[top_attack]
            else:
                if np.argmax(pop_preds[top_attack, :]) != target:
                    print('Success!')
                    return pop[top_attack]
            
            elite = [pop[top_attack]]
            parent1_idx = np.random.choice(
                self.pop_size, size=self.pop_size-1, p=select_probs)
            parent2_idx = np.random.choice(
                self.pop_size, size=self.pop_size-1, p=select_probs)

            children = [self.crossover(pop[parent1_idx[i]],
                                       pop[parent2_idx[i]])
                       for i in range(self.pop_size-1)]
            children = [self.perturb(
                        x, x_orig, nghbr_list, sub_probs, target)
                        for x in children]
            pop = np.concatenate([elite, children])

        return pop[0] if top_attack is None else pop[top_attack]


gen_at = Attack(model, ls)

In [0]:
num = TEXT.numericalize(([['this', 'movie', 'is', 'very', 'annoying']], [5]), device=args.device)
x_in, _ = num
x = x_in.cpu().numpy().flatten()
print(model.predict(num))
model.eval()
gen_at.filter_spec = True
gen_at.greedy = True
with torch.no_grad():
    r = gen_at.attack(x, 0)
    print(replacing_adversarial_text('this movie is very annoying', nlp, r, TEXT.vocab))