# Assignment 5: Natural Language Inference With BERT

In NLP, NLI comprises of a broad range of tasks. Typically, it is a classification for a pair of text sequence (sentences). For this assignment, we will focus on [Stanfords NLI Corpus](https://nlp.stanford.edu/projects/snli/), where given a premise sentence and hypothesis sentence, the task is to predict whether the hypothesis entails/contradicts/neither the premise.

### First let's load the SNLI dataset from the huggingface datasets hub using the datasets package

### Explore the dataset!

In [1]:
from datasets import load_dataset
dataset = load_dataset("snli")

Found cached dataset snli (/home/rehan/.cache/huggingface/datasets/snli/plain_text/1.0.0/1f60b67533b65ae0275561ff7828aad5ee4282d0e6f844fd148d05d3c6ea251b)


  0%|          | 0/3 [00:00<?, ?it/s]

In [7]:
print("Split sizes (num_samples, num_labels):\n", dataset.shape)
print("\nExample:\n", dataset['train'][0])

Split sizes (num_samples, num_labels):
 {'test': (10000, 3), 'train': (550152, 3), 'validation': (10000, 3)}

Example:
 {'premise': 'A person on a horse jumps over a broken down airplane.', 'hypothesis': 'A person is training his horse for a competition.', 'label': 1}


Each example is a dictionary with the keys: (premise, hypothesis, label). 

#### Data Fields
 - premise: a string used to determine the truthfulness of the hypothesis
 - hypothesis: a string that may be true, false, or whose truth conditions may not be knowable when compared to the premise
 - label: an integer whose value may be either 0, indicating that the hypothesis entails the premise, 1, indicating that the premise and hypothesis neither entail nor contradict each other, or 2, indicating that the hypothesis contradicts the premise.

In [8]:
from util import get_snli

train_dataset, validation_dataset, test_dataset = get_snli()

Found cached dataset snli (/home/rehan/.cache/huggingface/datasets/snli/plain_text/1.0.0/1f60b67533b65ae0275561ff7828aad5ee4282d0e6f844fd148d05d3c6ea251b)


  0%|          | 0/3 [00:00<?, ?it/s]

Reading Datapoints: 100%|█| 550152/550152 [00:07<00:00, 69701.54it
Reading Datapoints: 100%|█| 10000/10000 [00:00<00:00, 69827.91it/s
Reading Datapoints: 100%|█| 10000/10000 [00:00<00:00, 70397.50it/s


In [15]:
## sub set stats
from collections import Counter

# num sample stats
print(len(train_dataset), len(validation_dataset), len(test_dataset))

# label distribution
print(Counter([t['label'] for t in train_dataset]))
print(Counter([t['label'] for t in validation_dataset]))
print(Counter([t['label'] for t in test_dataset]))

# We have a perfectly balanced dataset

9999 999 999
Counter({0: 3333, 1: 3333, 2: 3333})
Counter({0: 333, 1: 333, 2: 333})
Counter({0: 333, 1: 333, 2: 333})


In [16]:
from util import generate_inputs, test_generate_input

In [18]:
### PASS THIS TEST ###

test_generate_input()

NotImplementedError: 

### Now let's reimplement our tokenizer using the huggingface tokenizer

In [22]:
from transformers import AutoTokenizer
from typing import List


class BatchTokenizer:
    """Tokenizes and pads a batch of input sentences."""

    def __init__(self):
        """Initializes the tokenizer

        Args:
            pad_symbol (Optional[str], optional): The symbol for a pad. Defaults to "<P>".
        """
        self.hf_tokenizer = AutoTokenizer.from_pretrained("prajjwal1/bert-small")
    
    def get_sep_token(self,):
        return self.hf_tokenizer.sep_token
    
    def __call__(self, batch: List[str]) -> List[List[str]]:
        """Uses the huggingface tokenizer to tokenize and pad a batch.

        We return a dictionary of tensors per the huggingface model specification.

        Args:
            batch (List[str]): A List of sentence strings

        Returns:
            Dict: The dictionary of token specifications provided by HuggingFace
        """
        # The HF tokenizer will PAD for us
        enc = self.hf_tokenizer(
            batch,
            padding=True,
            return_token_type_ids=False,
            return_tensors='pt'
        )

        return enc
    

### instantiate the tokenizer ###

tokenizer = BatchTokenizer()

Downloading:   0%|          | 0.00/286 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/232k [00:00<?, ?B/s]

In [25]:
# tokenizer's sep_token

print('sep_token:', tokenizer.get_sep_token())

sep_token: [SEP]


In [27]:
# Now lets prepare the input sequence and labels for each 

train_inputs, train_labels = generate_inputs(train_dataset, tokenizer.get_sep_token())
validation_inputs, validation_labels = generate_inputs(validation_dataset, tokenizer.get_sep_token())
test_inputs, test_labels = generate_inputs(test_dataset, tokenizer.get_sep_token())

NotImplementedError: 

### We can batch the train and test data, and then run it through the tokenizer

In [None]:
def chunk(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

# Notice that since we use huggingface, we tokenize and
# encode in all at once!

# This will create the batches
train_input_batches = [b for b in chunk(train_inputs, batch_size)]
# Tokenize + encode
train_input_batches = [batch_tokenizer(batch) for batch in train_input_batches]

### Let's batch the labels, ensuring we get them in the same order as the inputs

In [None]:
import torch

def encode_labels(labels: List[int]) -> torch.FloatTensor:
    """Turns the batch of labels into a tensor

    Args:
        labels (List[int]): List of all labels in the batch

    Returns:
        torch.FloatTensor: Tensor of all labels in the batch
    """
    return torch.LongTensor([int(l) for l in labels])


train_label_batches = [b for b in chunk(train_labels, batch_size)]
train_label_batches = [encode_labels(batch) for batch in train_label_batches]

### Now we implement the model

In [None]:
import torch
from transformers import BertModel


class NLIClassifier(torch.nn.Module):
    def __init__(self, output_size, hidden_size):
        super().__init__()
        self.output_size = output_size
        self.hidden_size = hidden_size
        # Initialize BERT, which we use instead of a single embedding layer.
        self.bert = BertModel.from_pretrained("prajjwal1/bert-small")
        # TODO: Updating all BERT parameters can be slow and memory intensive. 
        # Freeze them if training is too slow. Notice that the learning
        # rate should probably be smaller in this case.
        # Uncommenting out the below 2 lines means only our classification layer will be updated.
        # for param in self.bert.parameters():
        #     param.requires_grad = False
        self.embedding_size = self.bert.config.hidden_size
        # TODO: Add an extra hidden layer in the classifier, projecting
        #      from the BERT hidden dimension (we saved it as `embedding_size`) to hidden size.
        # TODO: Add a relu nonlinearity to be used in the forward method
        #      https://pytorch.org/docs/stable/generated/torch.nn.ReLU.html
        self.classifier = torch.nn.Linear(self.hidden_size, self.output_size)
        self.log_softmax = torch.nn.LogSoftmax(dim=2)

    def encode_text(
        self,
        symbols: Dict
    ) -> torch.Tensor:
        """Encode the (batch of) sequence(s) of token symbols with an LSTM.
            Then, get the last (non-padded) hidden state for each symbol and return that.

        Args:
            symbols (Dict): The Dict of token specifications provided by the HuggingFace tokenizer

        Returns:
            torch.Tensor: The final hiddens tate of the LSTM, which represents an encoding of
                the entire sentence
        """
        # First we get the contextualized embedding for each input symbol
        # We no longer need an LSTM, since BERT encodes context and 
        # gives us a single vector describing the sequence in the form of the [CLS] token.
        encoded_sequence = self.bert(**symbols)
        # TODO: Get the [CLS] token using the `pooler_output` from 
        #      The BertModel output. See here: https://huggingface.co/docs/transformers/model_doc/bert#transformers.BertModel
        #      and check the returns for the forward method.
        # We want to return a tensor of the form batch_size x 1 x bert_hidden_dimension
        raise NotImplementedError

    def forward(
        self,
        symbols: Dict,
    ) -> torch.Tensor:
        """_summary_

        Args:
            symbols (Dict): The Dict of token specifications provided by the HuggingFace tokenizer

        Returns:
            torch.Tensor: _description_
        """
        encoded_sents = self.encode_text(symbols)
        output = self.hidden_layer(encoded_sents)
        output = self.relu(output)
        output = self.classifier(output)
        return self.log_softmax(output)

In [None]:
# For making predictions at test time
def predict(model: torch.nn.Module, sents: torch.Tensor) -> List:
    logits = model(sents)
    return list(torch.argmax(logits, axis=2).squeeze().numpy())

### Evaluation metrics: Macro F1

In [None]:
import numpy as np


def precision(predicted_labels, true_labels, which_label=1):
    """
    Precision is True Positives / All Positives Predictions
    """
    pred_which = np.array([pred == which_label for pred in predicted_labels])
    true_which = np.array([lab == which_label for lab in true_labels])
    denominator = t_sum(pred_which)
    if denominator:
        return t_sum(logical_and(pred_which, true_which))/denominator
    else:
        return 0.


def recall(predicted_labels, true_labels, which_label=1):
    """
    Recall is True Positives / All Positive Labels
    """
    pred_which = np.array([pred == which_label for pred in predicted_labels])
    true_which = np.array([lab == which_label for lab in true_labels])
    denominator = t_sum(true_which)
    if denominator:
        return t_sum(logical_and(pred_which, true_which))/denominator
    else:
        return 0.


def f1_score(
    predicted_labels: List[int],
    true_labels: List[int],
    which_label: int
):
    """
    F1 score is the harmonic mean of precision and recall
    """
    P = precision(predicted_labels, true_labels, which_label=which_label)
    R = recall(predicted_labels, true_labels, which_label=which_label)
    if P and R:
        return 2*P*R/(P+R)
    else:
        return 0.


def macro_f1(
    predicted_labels: List[int],
    true_labels: List[int],
    possible_labels: List[int]
):
    scores = [f1_score(predicted_labels, true_labels, l) for l in possible_labels]
    # Macro, so we take the uniform avg.
    return sum(scores) / len(scores)

### Training loop. Nothing to do here!

In [None]:
def training_loop(
    num_epochs,
    train_features,
    train_labels,
    dev_sents,
    dev_labels,
    optimizer,
    model,
):
    print("Training...")
    loss_func = torch.nn.NLLLoss()
    batches = list(zip(train_features, train_labels))
    random.shuffle(batches)
    for i in range(num_epochs):
        losses = []
        for features, labels in tqdm(batches):
            # Empty the dynamic computation graph
            optimizer.zero_grad()
            preds = model(features).squeeze(1)
            loss = loss_func(preds, labels)
            # Backpropogate the loss through our model
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
        
        print(f"epoch {i}, loss: {sum(losses)/len(losses)}")
        # Estimate the f1 score for the development set
        print("Evaluating dev...")
        all_preds = []
        all_labels = []
        for sents, labels in tqdm(zip(dev_sents, dev_labels), total=len(dev_sents)):
            pred = predict(model, sents)
            all_preds.extend(pred)
            all_labels.extend(list(labels.numpy()))

        dev_f1 = macro_f1_score(all_preds, all_labels)
        print(f"Dev F1 {dev_f1}")
        
    # Return the trained model
    return model

In [None]:
# You can increase epochs if need be
epochs = 20
# TODO: Find a good learning rate
LR = 0.0

possible_labels = len(set(labels))
model = NLIClassifier(output_size=possible_labels)
optimizer = torch.optim.AdamW(model.parameters(), LR)

# TODO: Load test data
test_input_batches = [b for b in chunk(test_inputs, batch_size)]
# Tokenize + encode
test_input_batches = [batch_tokenizer(batch) for batch in test_input_batches]
test_batch_labels = [b for b in chunk(test_labels, batch_size)]
test_batch_labels = [encode_labels(batch) for batch in test_batch_labels]

training_loop(
    epochs,
    train_input_batches,
    train_label_batches,
    test_encoded_sents,
    test_encoded_labels,
    optimizer,
    model,
)

## Written Assignment (60 Points)
Written assignment tests the understanding of the student for the assignment's task. We have split the writing into sections. You will need to write 1-2 paragraphs describing the sections. Please be concise.

### In your own words, describe what the task is (20 points)
Describe the task, how is it useful and an example.

### Describe your method for the task (10 points)
Important details about the implementation. Feature engineering, parameter choice etc.

### Experiment Results (10 points)
Typically a table summarizing all the different experiment results for various parameter choices

### Discussion (20 points)
Key takeaway from the assignment. Why is the method good? shortcomings? how would you improve? Additional thoughts?