In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from datasets import Dataset
from transformers import DistilBertModel
from torch.nn.functional import mse_loss


  from .autonotebook import tqdm as notebook_tqdm


# Load Trained Model

In [2]:
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
dev = Dataset.load_from_disk("data/dev")
dev_df = pd.DataFrame(dev)

In [4]:
##### TODO COPY AND PASTE MAIN MODEL HERE #######
class CustomBert(nn.Module):
    def __init__(self, transformer_out=6, dropout=0.1, class_weights=None):
        super(CustomBert, self).__init__()
        # Instead of just using the output of the final hidden layer,
        # you can also pass in a range of hidden layers to concatenate their outputs
        self.transformer_out = (
            range(transformer_out, transformer_out + 1)
            if isinstance(transformer_out, int)
            else transformer_out
        )
        out_dim = len(self.transformer_out) * 768

        # Use pretrained DistilBert. Force it to use our dropout
        self.distilbert = DistilBertModel.from_pretrained(
            "distilbert-base-uncased", output_hidden_states=True
        )  # type: DistilBertModel
        for module in self.distilbert.modules():
            if isinstance(module, torch.nn.Dropout):
                module.p = dropout

        # Then apply a dense hidden layer down to 768, and a final layer down to 1
        self.feedforward = nn.Sequential(
            nn.Linear(out_dim, 768),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(768, 1),
        )

        if class_weights is not None:
            self.class_weights = class_weights
            self.pos_weight = class_weights[1] / class_weights[0]

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.distilbert(input_ids=input_ids, attention_mask=attention_mask)

        # Recommended pooling approach for DistilBert is to average over the hidden state sequence
        # instead of outputs.last_hidden_state[:, 0], which is used for Bert which uses [CLS] token
        pooled_output = []
        for i in self.transformer_out:
            hs = outputs.hidden_states[i]
            mask = attention_mask.unsqueeze(-1)
            hs = hs * mask
            mean_hs = hs.sum(dim=1) / mask.sum(dim=1)
            pooled_output.append(mean_hs)

        # We also concatenate the outputs of multiple layers if chosen by the user
        cat_output = torch.cat(pooled_output, dim=1)

        # Apply dense feedforward
        y = self.feedforward(cat_output).squeeze(-1)

        # Outside the Trainer, we return the predictions
        if labels is None:
            return y

        # Inside the Trainer, we also need to return the loss
        global binary_classifier
        if binary_classifier:
            loss = F.binary_cross_entropy_with_logits(
                y, labels, pos_weight=self.pos_weight
            ).to(DEVICE)
        else:
            loss = mse_loss(y, labels, reduction="none").to(DEVICE)
            weights = self.class_weights[labels.long().to(DEVICE)]
            loss = loss * weights
            loss = loss.mean()
        return loss, y

    def freeze(self):
        for param in self.distilbert.parameters():
            param.requires_grad = False

    def unfreeze(self, layer=None):
        for name, param in self.distilbert.named_parameters():
            if layer is None or name.startswith(f"transformer.layer.{layer}"):
                param.requires_grad = True

In [5]:
model = CustomBert() # instantiate model

# load in trained parameters
checkpoint_fp = 'results/model.pth'
checkpoint = torch.load(checkpoint_fp)
model.load_state_dict(checkpoint)
model.eval()

CustomBert(
  (distilbert): DistilBertModel(
    (embeddings): Embeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (transformer): Transformer(
      (layer): ModuleList(
        (0-5): 6 x TransformerBlock(
          (attention): MultiHeadSelfAttention(
            (dropout): Dropout(p=0.1, inplace=False)
            (q_lin): Linear(in_features=768, out_features=768, bias=True)
            (k_lin): Linear(in_features=768, out_features=768, bias=True)
            (v_lin): Linear(in_features=768, out_features=768, bias=True)
            (out_lin): Linear(in_features=768, out_features=768, bias=True)
          )
          (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
          (ffn): FFN(
            (dropout): Dropout(p=0.1, inplace=False)
            (lin1): Linear

In [33]:
def accuracy_by_group(group_df):
    atten_masks = torch.LongTensor([ls for ls in group_df.attention_mask.values])
    input_ids = torch.LongTensor([ls for ls in group_df.input_ids.values])
    predictions = model(input_ids, atten_masks).detach().numpy() > 1.5 # NOTE this line makes kernel crash
    return np.sum(predictions == group_df['pcl']) / len(group_df)
    

# Model Performance by the Degree of PCL (Question 3a)

In [None]:
# generate predictions and evaluate performance based on the degree of pcl
pcl_levels = dev_df['labels'].unique()

accuracy_by_pcl_level = []
for pcl_level in pcl_levels:
    dev_df_subset = dev_df[dev_df['labels'] == pcl_level]
    accuracy = accuracy_by_group(dev_df_subset)
    accuracy_by_pcl_level.append(accuracy)



In [None]:
plt.bar(pcl_levels, accuracy_by_pcl_level)

plt.xlabel("PCL Level")
plt.ylabel("Accuracy")

plt.show()

# Model Performance by Length of Sequence (Question 3b)

In [None]:
def generate_accuracy_by_len(df):
    all_text_len = sorted(df.text.apply(len).unique())

    accuracy_by_len = []
    for text_len in all_text_len:
        dev_df_subset = df[df['text'].apply(len) == text_len]
        accuracy = accuracy_by_group(dev_df_subset)
        accuracy_by_len.append(accuracy)
    return all_text_len, accuracy_by_len

In [30]:
# generate predictions and evaluate performance based on the length of sequence
all_text_len, accuracy_by_len = generate_accuracy_by_len(dev_df)
pos_text_len, pos_accuracy_by_len = generate_accuracy_by_len(dev_df[dev_df['pcl'] == 1])
neg_text_len, neg_accuracy_by_len = generate_accuracy_by_len(dev_df[dev_df['pcl'] == 0])

In [None]:
plt.plot(all_text_len, accuracy_by_len, label='All Samples')
plt.plot(pos_text_len, pos_accuracy_by_len, label='Positive Samples')
plt.plot(neg_text_len, neg_accuracy_by_len, label='Negative Samples')

plt.xlabel("Text Length")
plt.ylabel("Accuracy")

plt.legend()
plt.show()

# Model Performance by Keyword Category (Question 3c)

In [None]:
def generate_accuracy_by_keyword(df):
    keywords = sorted(df['keyword'].unique())

    accuracy_by_keyword = []
    for keyword in keywords:
        dev_df_subset = df[df['keyword'] == keyword]
        accuracy = accuracy_by_group(dev_df_subset)
        accuracy_by_keyword.append(accuracy)
    return keywords, accuracy_by_keyword

In [None]:
# generate predictions and evaluate performance based on the keyword
all_keywords, accuracy_by_keyword = generate_accuracy_by_keyword(dev_df)
pos_keywords, pos_accuracy_by_keyword = generate_accuracy_by_keyword(dev_df[dev_df['pcl'] == 1])
neg_keywords, neg_accuracy_by_keyword = generate_accuracy_by_keyword(dev_df[dev_df['pcl'] == 0])


In [None]:
width = 0.25
x_axis = np.arange(len(all_keywords))

plt.bar(x_axis, accuracy_by_keyword, width=0.25, label='All Samples')
plt.bar(x_axis + width, pos_accuracy_by_keyword, width=0.25, label='Positive Samples')
plt.bar(x_axis + width*2, neg_accuracy_by_keyword, width=0.25, label='Negative Samples')

plt.xlabel("Keyword") 
plt.ylabel("Accuracy") 

plt.xticks(x_axis+width, all_keywords) 
plt.legend()

plt.show()