In [1]:
#!pip install transformers

In [2]:
DATASET = "YahooAnswers"
MODEL = "LSTM"
DEFENSE = "CLEAN"
BATCH_SIZE = 64
SHUFFLE = True

In [3]:
import torch
from torchtext.datasets import IMDB, AG_NEWS, YahooAnswers
from torchtext.vocab import GloVe
from torchtext.data import to_map_style_dataset
from torchtext.data.utils import get_tokenizer
from torch.nn import LSTM, GRU, Linear, Softmax, CrossEntropyLoss
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, random_split, Dataset
from torch.optim import Adam
from tqdm import tqdm
import nltk
import torch
import torch.nn as nn
import torch
import torch.nn.functional as F
from torch.nn import Linear, Softmax, Conv2d, Dropout
import torch.nn as nn
from transformers import BertForSequenceClassification, BertTokenizer
from sklearn.metrics import roc_auc_score, f1_score
from nltk.corpus import wordnet as wn
import numpy as np

In [4]:
from pathlib import Path
from IPython import get_ipython
on_colab = 'google.colab' in str(get_ipython())

if on_colab:
  from google.colab import drive
  drive.mount("/content/gdrive")

PATH =  "/content/gdrive/My Drive/DeepLearning/MODELS/" if on_colab else "./"

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


METRICS

In [5]:
def stats(model, MODEL, data_loader, avg):
    model.eval()
    y_true = []
    y_pred = []
    y_pred_arg = []
    with torch.no_grad():
        if MODEL == "BERT":
            for idx, (labels, input_ids, token_type_ids, attention_mask) in enumerate(data_loader):
                y_pred.append(model(input_ids, token_type_ids, attention_mask))
                y_pred_arg.append(model(input_ids, token_type_ids, attention_mask).argmax(1))
                y_true.append(labels)
        else:
            for idx, (labels, text) in enumerate(data_loader):
                y_pred.append(model(text))
                y_pred_arg.append(model(text).argmax(1))
                y_true.append(labels)

    y_pred_t = torch.vstack(y_pred).to("cpu").numpy()
    y_true_t = torch.hstack(y_true).to("cpu").numpy()
    y_pred_arg_t = torch.hstack(y_pred_arg).to("cpu").numpy()
    # for binary case only pass one prob. column
    if y_pred_t.shape[1] < 3:
        y_pred_t = y_pred_t[:, 1]

    acc = (y_pred_arg_t == y_true_t).sum().item()/len(y_true_t)
    roc_auc = roc_auc_score(y_true_t, y_pred_t, multi_class='ovr', average=avg)
    f1 = f1_score(y_true_t, y_pred_arg_t, average=avg)
    return acc, roc_auc, f1

MODELS

In [6]:
def get_child(model, *arg):
    res = model
    for i in arg:
        res = list(res.children())[i]
    return res

def freeze_model(model):
    for param in model.parameters():
            param.requires_grad = False
            
def unfreeze_model(model):
    for param in model.parameters():
            param.requires_grad = True

def count_parameters(model, trainable_only = True):
    if trainable_only:
        return sum(p.numel() for p in model.parameters() if p.requires_grad)
    else:
        return sum(p.numel() for p in model.parameters())

def custom_freezer(model):
    unfreeze_model(model)

    ## freeze whole BertLayer
    for c in model.children():
        if str(c).startswith('Bert'):
            freeze_model(c)
            
    ## unfreeze top 2 layer in BertEncoder
    bert_encoder = get_child(model, 0, 1, 0)
    for i in range(1, 3):
        m = bert_encoder[-i] 
        unfreeze_model(m)
        
    ## unfreeze Pooling layer
    bert_pooling = get_child(model, 0, 2)
    unfreeze_model(bert_pooling)

    print('Trainable parameters: {}'.format(count_parameters(model, True)))
    return model

class BERTClassifier(nn.Module):
    def __init__(self, num_classes):
        super(BERTClassifier, self).__init__()

        self.bert = BertForSequenceClassification.from_pretrained(
            'bert-base-uncased', num_labels=num_classes)
        self.bert = custom_freezer(self.bert)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_ids, token_type_ids, attention_mask):
        y = self.bert(input_ids, token_type_ids, attention_mask)
        return self.softmax(y.logits)

class BidirectionalLSTMClassifier(torch.nn.Module):
    def __init__(self, num_classes, hidden_size, num_layers):
        super().__init__()
        self.num_layers = num_layers
        self.LSTM = LSTM(50, hidden_size, num_layers=num_layers,
                         batch_first=True, bidirectional=True)
        self.linear = Linear(2 * hidden_size, num_classes)
        self.softmax = Softmax(dim=1)

    def forward(self, x):
        _, (h_n, _) = self.LSTM(x)
        h_forward = h_n[2 * self.num_layers - 2]
        h_backward = h_n[2 * self.num_layers - 1]
        y = self.linear(torch.cat((h_forward, h_backward), 1))
        return self.softmax(y)

class CNNClassifier(torch.nn.Module):
    def __init__(self, num_classes, in_channels, out_channels, kernel_heights, pad=0, stri=1, embed_dim=50, drop=0.2):
        super().__init__()
        self.conv1 = Conv2d(in_channels, out_channels[0], kernel_size=(kernel_heights[0], embed_dim), stride=stri, padding=pad)
        self.conv2 = Conv2d(in_channels, out_channels[1], kernel_size=(kernel_heights[1], embed_dim), stride=stri, padding=pad)
        self.conv3 = Conv2d(in_channels, out_channels[2], kernel_size=(kernel_heights[2], embed_dim), stride=stri, padding=pad)
        self.drop = Dropout(drop)
        self.fc = Linear(sum(out_channels), num_classes)
        self.soft = Softmax(dim=1)

    def _conv_n_maxpool_1d(self, input, conv_layer):

        conved = conv_layer(input) # conved.size() = (batch_size, out_channels[0], dim, 1)
        reld = F.relu(conved.squeeze(3)) # reld.size() = (batch_size, out_channels[0], dim)
        max_out = F.max_pool1d(reld, reld.size()[2]).squeeze(2) # maxpool_out.size() = (batch_size, out_channels[0])

        return max_out

    def forward(self, x):
        # x.size() = (batch_size, num_seq, embed_dim)
        x = x.unsqueeze(1) # x.size() = (batch_size, 1, num_seq, embed_dim)

        out_1 = self._conv_n_maxpool_1d(x, self.conv1)
        out_2 = self._conv_n_maxpool_1d(x, self.conv2)
        out_3 = self._conv_n_maxpool_1d(x, self.conv3)

        cat_out = torch.cat((out_1, out_2, out_3), dim=1)

        drop = self.drop(cat_out)
        fc_out = self.fc(drop)
        out = self.soft(fc_out)

        return out

DATASET & DATALOADER

In [7]:
from torch.utils.data import Dataset

class ClassificationDataset(Dataset):
    def __init__(self, dataset, num_classes, tokenizer, model):
        self.num_classes = num_classes
        self.dataset = dataset
        self.tokenizer = tokenizer
        self.model = model

    def __len__(self):
        return self.dataset.__len__()

    def __getitem__(self, idx):
        label, text = self.dataset.__getitem__(idx)
        if type(label) == str:
            if label == 'neg':
                label = 0
            else:
                label = 1
        else:
            label = int(label) - 1

        if self.model == 'BERT':
            return label, self.tokenizer(text, padding="max_length", return_tensors='pt', max_length=512, truncation=True)
        else:
            return label, self.tokenizer(text)

In [8]:
def collate_batch(batch):
    label_list, text_list = [], []
    for (_label, _tokens) in batch:
        label_list.append(_label)
        embed = embedding.get_vecs_by_tokens(_tokens)
        text_list.append(embed)
    text_list = pad_sequence(text_list, batch_first=True)
    label_list = torch.tensor(label_list, dtype=torch.int64)
    return label_list.to(device), text_list.to(device)

def collate_BERT(batch):
    label_list, input_ids, token_type_ids, attention_mask = [], [], [], []
    for (_label, _dic) in batch:
        label_list.append(_label)
        input_ids.append(_dic['input_ids'])
        token_type_ids.append(_dic['token_type_ids'])
        attention_mask.append(_dic['attention_mask'])
    label_list = torch.tensor(label_list, dtype=torch.int64).to(device)
    input_ids = torch.cat(input_ids, dim=0).to(device)
    token_type_ids = torch.cat(token_type_ids, dim=0).to(device)
    attention_mask = torch.cat(attention_mask, dim=0).to(device)
    return label_list, input_ids, token_type_ids, attention_mask

In [9]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

if MODEL == 'BERT':
    tokenizer = BertTokenizer.from_pretrained(
        "bert-base-uncased", do_lower_case=True)
else:
    tokenizer = get_tokenizer('basic_english')

embedding = GloVe(name='6B', dim=50)

In [10]:
if DATASET == 'IMDB':
    test_set = IMDB(split='test')
    num_classes = 2
elif DATASET == 'AG_NEWS':
    test_set = AG_NEWS(split='test')
    num_classes = 4
elif DATASET == 'YahooAnswers':
    test_set = YahooAnswers(split='test')
    num_classes = 10

100%|██████████| 319M/319M [00:06<00:00, 48.6MB/s]


In [11]:
test_set = to_map_style_dataset(test_set)

In [12]:
if MODEL != "BERT":
    test_set = ClassificationDataset(test_set, num_classes, tokenizer, MODEL)
    test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, collate_fn=collate_batch, shuffle=SHUFFLE)
else:
    test_set = ClassificationDataset(test_set, num_classes, tokenizer, MODEL)
    test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, collate_fn=collate_BERT, shuffle=SHUFFLE)

In [13]:
# LOAD MODEL TO BE ANALYSED
if MODEL == "LSTM":
    model = BidirectionalLSTMClassifier(num_classes, 64, 1).to(device)
elif MODEL == "CNN":
    model = CNNClassifier(num_classes, 1, [3, 5, 7], [2, 3, 4]).to(device)
else:
    model = BERTClassifier(num_classes).to(device)

model_name = MODEL + "_" + DATASET + "_" + DEFENSE + ".pt"
checkpoint = torch.load(PATH + model_name)

model.load_state_dict(checkpoint['model_state_dict'])

<All keys matched successfully>

In [14]:
stats_ = stats(model, MODEL, test_loader, avg="weighted")

print("STATS FOR {i}, {j}, {k}:".format(i=MODEL, j=DATASET, k=DEFENSE))
print("ACCURACY: {}".format(stats_[0]))
print("AU-ROC: {}".format(stats_[1]))
print("F1: {}".format(stats_[2]))

STATS FOR LSTM, YahooAnswers, CLEAN:
ACCURACY: 0.7092666666666667
AU-ROC: 0.932857810339506
F1: 0.7027195526738057
