<a href="https://colab.research.google.com/github/rezaserajian/Active-IT/blob/main/CS_263F_Assignment_2_Student.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CS 263F Assignment 2: Transformer and LLMs for Natural Language Inference (NLI)

## Deadline: 11:59 PM, May 14, 2025


## Outline
- Part 1: Transformer Implementation (55 points)
- Part 2: Training and Evaluation via Huggingface Transformer (45 points)

## Instructions
- Follow the instructions and fill in the code for the sections marked with `# TODO`.
- **DO NOT** modify the checking/grading cells. Modifying these cells is strictly prohibited and will be treated as an academic integrity violation which will result in 0 score for this assignment and escalation to The Office of Student Conduct at UCLA.

## Submission
- **Execution**: Ensure all cells have been run, and outputs are displayed before submission.
- **File Naming**: Save your completed notebook with outputs as `hw2.ipynb`.
- **Upload**: Submit your `hw2.ipynb` file to Gradescope.

Failure to follow these instructions will result in the autograder failing, which will automatically result in 0 points. **No regrading will be done** for submissions with incorrect file names or formats.



## Part 1: Transformer Implementation (55 points)

### Section 1: Scaled Dot-Product Attention (15 points)

In this section, you'll implement the **generalized Scaled Dot-Product Self-Attention** mechanism as taught in the class. This is a core component of the Transformer model. This generalized version can accommodate encoder-only, decoder-only and encoder-decoder Transformer architectures, which means it supports cases where the lengths of the queries (Q) and key-value pairs (K, V) may **differ**. This is crucial for tasks like machine translation, where the encoder and decoder operate over sequences of different lengths.


This mechanism takes four inputs: queries (Q), keys (K), values (V), and attention masks. It computes attention weights based on the dot product of Q and K, scaled by the square root of the dimensionality of the keys. This helps stabilize gradients and improve model performance. Once the attention weights are calculated, they are used to combine the values (V) into a weighted sum.







In [None]:
# DO NOT alter this cell
import torch
from torch import nn
import math
import numpy as np
from torch.nn import functional as F
import torch.optim as optim
import matplotlib.pyplot as plt

In [None]:
def scaled_dot_product_attention(q, k, v, mask=None):
    # Q: FloatTensor of shape (bsz, q_len, d)
    # K, V: FloatTensor of shape (bsz, kv_len, d)
    # mask: optional, BoolTensor of shape (bsz, q_len, kv_len)
    # return: outputs, attention_weights
    #   outputs: FloatTensor of shape (bsz, q_len, d), output of attention module
    #   attention_weights: FloatTensor of shape (bsz, q_len, kv_len), attention weights between 0-1

    # Hint: if mask[i, j, k] is False, that means for the i-th data point in the batch,
    #   the attention weight from j-th position in Q to k-th position in KV is zero.
    #   To do that, you can set the attention logits to a very small value like -np.inf before feeding to softmax

    # TODO:

    return outputs, attention_weights

You can use the following section to check your implementation

In [None]:
# DO NOT alter this cell
# Verify shape
bsz, q_len, kv_len, d = 2, 5, 4, 3
q = torch.randn(bsz, q_len, d)
k = torch.randn(bsz, kv_len, d)
v = torch.randn(bsz, kv_len, d)
values, attention = scaled_dot_product_attention(q, k, v)
print(f"the output shape is {values.shape}")
print(f"the shape of attention weight ({attention.shape}) should be the batch x query x key = {bsz} x {q_len} x {kv_len}")
assert tuple(attention.shape) == (bsz, q_len, kv_len)
print("Part 1.1.1.a passed")
print()
print("Checking that attention weights sum to 1 across the key dimension (kv_len).")
assert attention.sum(-1).allclose(torch.ones(bsz, q_len, ))
print("Part 1.1.1.b passed")
print()
print("Checking that our implementation produces the same results as PyTorch's built-in function.")
assert values.allclose(F.scaled_dot_product_attention(q, k, v, attn_mask=None))
print("Part 1.1.1.c passed")
print()
# Verify attention mask's function
bsz, q_len, kv_len, d = 2, 5, 4, 3
q = torch.randn(bsz, q_len, d)
k = torch.randn(bsz, kv_len, d)
v = torch.randn(bsz, kv_len, d)
mask = torch.rand(bsz, q_len, kv_len) > 0.5
mask[:, :, 0] = True
values, attention = scaled_dot_product_attention(q, k, v, mask=mask)
assert tuple(attention.shape) == (bsz, q_len, kv_len)
assert attention.sum(-1).allclose(torch.ones(bsz, q_len))
print("Ensuring that positions excluded by the mask have nearly zero attention values.")
assert (attention[~mask].abs() < 1e-8).all()
print("Part 1.1.2.a passed")
print()
print("Verifying that our masked attention output matches PyTorch's built-in function with the mask applied.")
assert values.allclose(F.scaled_dot_product_attention(q, k, v, attn_mask=mask))
print("Part 1.1.2.b passed")

### Section 2: Multi-head Attention (10 points)

Multi-head attention allows the model to focus on different parts of the input sequence from multiple perspectives, enhancing the model's ability to capture diverse dependencies. This is achieved by using multiple attention heads, each with its own set of query, key, and value projections. The outputs from all heads are then concatenated together.

In [None]:
def multi_head_attention(q, k, v, num_heads: int, mask=None):
    # Q: FloatTensor of shape (bsz, q_len, d_model)
    # K, V: FloatTensor of shape (bsz, kv_len, d_model)
    # mask: optional, BoolTensor of shape (bsz, q_len, kv_len)
    # return: outputs, attention_weights
    #   outputs: FloatTensor of shape (bsz, q_len, d_model), output of attention module
    #   attention_weights: FloatTensor of shape (bsz, num_heads, q_len, kv_len), attention weights between 0-1

    # Hint: to perform multi-head attention, you split the `d_model`-dimension features into `num_heads` splits. Each attention head will use d_model // num_heads dimensions
    # Hint: For example, if d_model is 12 and num_heads is 3, then the first head uses first 4 features, the second head uses 4-8 features, and the third one use last 4 features
    # Hint: Then finally, we concatenate outputs from all heads into the output values
    # Hint: You can call the scaled_dot_product_attention function you just implemented for each head's computation
    # Hint: In real applications, we also want some linear layers to "project" the query, key and values before doing attention, but now we're omitting this step
    assert q.shape[-1] % num_heads == 0

    # TODO:

    return outputs, attention_weights


You can use the following section to check your implementation

In [None]:
# DO NOT alter this cell

# Verify shape
bsz, q_len, kv_len, d, num_heads = 2, 5, 4, 12, 3
q = torch.randn(bsz, q_len, d)
k = torch.randn(bsz, kv_len, d)
v = torch.randn(bsz, kv_len, d)
values, attention = multi_head_attention(q, k, v, num_heads=num_heads)
print(f"the output shape is {values.shape}")
print(f"the shape of attention weight ({attention.shape}) should be the batch x number of attention heads x query x key(value) = {bsz} x {num_heads} x {q_len} x {kv_len}")
assert tuple(attention.shape) == (bsz, num_heads, q_len, kv_len)
print("Checking that attention weights sum to 1 across the key dimension (kv_len).")
assert attention.sum(-1).allclose(torch.ones(bsz, num_heads, q_len))
print("Part 1.2.1.a passed")
print()
print("Checking that our implementation produces the same results as PyTorch's built-in function.")
standard_values, standard_attention = F.multi_head_attention_forward(
    q.permute(1, 0, 2), k.permute(1, 0, 2), v.permute(1, 0, 2),
    embed_dim_to_check=d, num_heads=num_heads, attn_mask=None,
    use_separate_proj_weight=True, in_proj_weight=None, in_proj_bias=None,
    q_proj_weight=torch.eye(d), k_proj_weight=torch.eye(d), v_proj_weight=torch.eye(d),
    out_proj_weight=torch.eye(d), out_proj_bias=torch.zeros(d),
    add_zero_attn=False, dropout_p=0, is_causal=False,
    bias_k=None, bias_v=None, average_attn_weights=False,
)
assert values.allclose(standard_values.permute(1, 0, 2))
assert attention.allclose(standard_attention)
print("Part 1.2.1.b passed")
print()
# Verify attention mask's function
bsz, q_len, kv_len, d, num_heads = 2, 5, 4, 12, 3
q = torch.randn(bsz, q_len, d)
k = torch.randn(bsz, kv_len, d)
v = torch.randn(bsz, kv_len, d)
mask = torch.rand(bsz, q_len, kv_len) > 0.5
mask[:, :, 0] = True
values, attention = multi_head_attention(q, k, v, num_heads=num_heads, mask=mask)
assert tuple(attention.shape) == (bsz, num_heads, q_len, kv_len)
assert attention.sum(-1).allclose(torch.ones(bsz, num_heads, q_len))
print("Part 1.2.2.a passed")
print()

print("Ensuring that positions excluded by the mask have nearly zero attention values.")
assert (attention.sum(1)[~mask].abs() < 1e-8).all()
standard_values, standard_attention = F.multi_head_attention_forward(
    q.permute(1, 0, 2), k.permute(1, 0, 2), v.permute(1, 0, 2),
    embed_dim_to_check=d, num_heads=num_heads, attn_mask=(~mask).repeat_interleave(num_heads, dim=0),
    use_separate_proj_weight=True, in_proj_weight=None, in_proj_bias=None,
    q_proj_weight=torch.eye(d), k_proj_weight=torch.eye(d), v_proj_weight=torch.eye(d),
    out_proj_weight=torch.eye(d), out_proj_bias=torch.zeros(d),
    add_zero_attn=False, dropout_p=0, is_causal=False,
    bias_k=None, bias_v=None, average_attn_weights=False,
)
print("Verifying that our masked attention output matches PyTorch's built-in function with the mask applied.")
assert values.allclose(standard_values.permute(1, 0, 2))
assert attention.allclose(standard_attention)
print("Part 1.2.2.b passed")
print()


Below is an overall architecture of transformer.

In [None]:
class MultiheadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be 0 modulo number of heads."

        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        self.q_proj = nn.Linear(embed_dim, embed_dim)
        self.k_proj = nn.Linear(embed_dim, embed_dim)
        self.v_proj = nn.Linear(embed_dim, embed_dim)
        self.o_proj = nn.Linear(embed_dim, embed_dim)

        self._reset_parameters()

    def _reset_parameters(self):
        # Original Transformer initialization, see PyTorch documentation
        nn.init.xavier_uniform_(self.q_proj.weight)
        nn.init.xavier_uniform_(self.k_proj.weight)
        nn.init.xavier_uniform_(self.v_proj.weight)
        nn.init.xavier_uniform_(self.o_proj.weight)
        self.q_proj.bias.data.fill_(0)
        self.k_proj.bias.data.fill_(0)
        self.v_proj.bias.data.fill_(0)
        self.o_proj.bias.data.fill_(0)

    def forward(self, x, mask=None):
        q = self.q_proj(x)  # Note: for actual implementation, we need to do projection
        k = self.k_proj(x)
        v = self.v_proj(x)
        o, _ = multi_head_attention(q, k, v, self.num_heads, mask=mask)
        return self.o_proj(o)


class DecoderBlock(nn.Module):
    def __init__(self, input_dim, num_heads, dim_feedforward, dropout=0.0):
        """
        Inputs:
            input_dim - Dimensionality of the input
            num_heads - Number of heads to use in the attention block
            dim_feedforward - Dimensionality of the hidden layer in the MLP
            dropout - Dropout probability to use in the dropout layers
        """
        super().__init__()

        # Attention layer
        self.self_attn = MultiheadAttention(input_dim, num_heads)

        # Two-layer MLP
        self.linear_net = nn.Sequential(
            nn.Linear(input_dim, dim_feedforward),
            nn.Dropout(dropout),
            nn.ReLU(inplace=True),
            nn.Linear(dim_feedforward, input_dim)
        )

        # Layers to apply in between the main layers
        self.norm1 = nn.LayerNorm(input_dim)
        self.norm2 = nn.LayerNorm(input_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # Attention part
        attn_out = self.self_attn(x, mask=mask)
        x = x + self.dropout(attn_out)
        x = self.norm1(x)

        # MLP part
        linear_out = self.linear_net(x)
        x = x + self.dropout(linear_out)
        x = self.norm2(x)

        return x


class TransformerDecoder(nn.Module):
    def __init__(self, we, pe, layers):
        super().__init__()
        self.we = we  # word embeddings
        self.pe = pe  # positional embeddings
        self.layers = nn.ModuleList(layers)
        vocab_size, model_dim = we.weight.shape
        self.lm_head = nn.Linear(model_dim, vocab_size, bias=False)

    def forward(self, tokens, mask=None):
        x = self.we(tokens) + self.pe(tokens)
        for l in self.layers:
            x = l(x, mask=mask)
        logits = self.lm_head(x)
        return logits

### Section 3: Positional Encoding (10 points)

Transformers lack an inherent sense of order in sequences, which is why positional encodings are added to the input embeddings. These encodings provide information about the position of tokens in a sequence, allowing the model to differentiate between tokens in different positions.

The positional encoding for each element $PE(pos, 2i)$ and $PE(pos, 2i+1)$ is defined as follows:

$$
\begin{align}
    PE(pos, 2i) & = \sin(\frac{pos}{10000^{\frac{2i}{d_{model}}}}), \\
    PE(pos, 2i+1) & = \cos(\frac{pos}{10000^{\frac{2i}{d_{model}}}})
\end{align}
$$

where:
- $pos$ is the position in the sequence.
- $i$ is the dimension index (split between even and odd indices).
- $d_{model}$ is the dimensionality of the model's input embeddings.

This alternating use of sine and cosine allows each position to have a unique encoding across the dimensions, making it easier for the model to distinguish between positions.


In [None]:
class PositionalEncodings(nn.Module):
    def __init__(self, d_model, base=10000):
        """
        Inputs
            d_model - Hidden dimensionality of the input.
            base - Base for rotary positional encodings.
        """
        super().__init__()
        self.d_model = d_model
        self.base = base

    def forward(self, x):
        # x: FloatTensor of shape (bsz, seq_len)
        # return: pe, FloatTensor of shape (bsz, seq_len, d_model)
        #   pe[..., i] is the positional encoding for i-th position
        bsz, seq_len = x.shape

        # TODO:

        return pe.to(x.device)

In [None]:
# DO NOT alter this cell
d_model = 64

pe = PositionalEncodings(d_model).forward(torch.zeros(1, 100)).squeeze(0)
print(f"The shape of positional encoding {pe.shape} should be sequence length x feature dimension = 100 x {d_model}")
assert tuple(pe.shape) == (100, d_model)
print("Part 1.3.1 passed")
print()
print("The difference between encodings at different positions should be non-zero, which indicates that each position has a unique encoding")
diff = (pe[:, None, :] - pe[None, :, :]).abs().sum(dim=-1)
assert (diff[~torch.eye(100).bool()] != 0).all()
print("Part 1.3.2 passed")


In [None]:
# DO NOT alter this cell

n_vocab = 6
d_model = 64
num_heads = 4
n_layers = 4

model = TransformerDecoder(
    we=nn.Embedding(n_vocab, d_model),
    pe=PositionalEncodings(d_model),
    layers=[
        DecoderBlock(d_model, num_heads, dim_feedforward=2 * d_model)
        for _ in range(n_layers)
    ],
).cuda()
print(model)

tokens = torch.LongTensor([
    [0, 1, 2, 3],
    [3, 2, 1, 0],
]).cuda()
out = model(tokens)
print(f"The output shape is {out.shape}. The expected output shape is batch_size x sequence_length x vocab_size = 2 x 4 x {n_vocab}")
assert tuple(out.shape) == (tokens.shape[0], tokens.shape[1], n_vocab)

### Section 4: Autoregressive Attention Mask (5 points)

In this function, you will implement an autoregressive attention mask, which is essential for language modeling tasks where each token can only attend to itself and the tokens that precede it. This prevents the model from "peeking" at future tokens, thus preserving the autoregressive property of the model.

The autoregressive attention mask (also called as causal attention mask) should be a lower triangular matrix. In other words, the mask is True for positions on or below the diagonal and False for positions above the diagonal

In [None]:
def autoregressive_attention_mask(tokens):
    # tokens: (bsz, seq_len)
    # return: mask, torch.BoolTensor of shape (bsz, seq_len as q_len, seq_len as kv_len)
    # Hint: generate an autoregressive attention mask. mask[i, j, k] is True means j-th token can attend to k-th token
    #   For autoregressive language modelling task, each position can only attend to itself and its previous tokens, and cannot "cheat" and peak future tokens
    bsz, seq_len = tokens.shape

    # TODO:

    return mask.to(tokens.device)

In [None]:
# DO NOT alter this cell

mask = autoregressive_attention_mask(torch.zeros(2, 10))
print(f"Checking shape: expected (2, 10, 10), got {mask.shape}")
assert tuple(mask.shape) == (2, 10, 10)
print(f"Checking data type: expected torch.bool, got {mask.dtype}")
assert mask.dtype == torch.bool
for i in range(10):
    for j in range(10):
        if i >= j:
            assert mask[:, i, j].all()
        else:
            assert not mask[:, i, j].any()

print("Part 1.4 passed")

### Section 5: Language Modeling Objective (15 points)

The language modeling loss is commonly used in tasks where the goal is to predict the next word in a sequence. This loss measures the difference between the predicted probabilities of the model and the actual word in the sequence. For this task, cross-entropy loss is often used as it compares the predicted distribution with the true one-hot encoded distribution.
Note: Since we are doing next token predictions, **Remember to shift the tokens to get the actual labels**

In [None]:
def language_modelling_loss(tokens, logits):
    # tokens: (bsz, seq_len)
    # logits: (bsz, seq_len, n_vocab) raw logits predicted from `logits = self.lm_head(x)`
    # return: loss, a torch scalar.
    # Hint: use https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
    #   Since language modelling is essentially a self supervision task, you can create labels from input tokens itself
    #   Remember to shift the tokens to get the actual labels

    # TODO:
    return loss


In [None]:
# DO NOT alter this cell

loss = language_modelling_loss(tokens, out)
print(f"Checking if the loss is a scalar. Expected shape: (), got {loss.shape}")

assert tuple(loss.shape) == ()
print("Part 1.5.1 passed")

Now let's train our implemented Transformer with a naive task: predicting the next number

In [None]:
# DO NOT alter this cell

def train_step(model, optimizer, tokens):
    model.zero_grad()
    model.train()
    mask = autoregressive_attention_mask(tokens)
    logits = model(tokens, mask)
    loss = language_modelling_loss(tokens, logits)
    loss.backward()
    optimizer.step()
    return float(loss)


# Let's use a very naive data to try training: predict the next number
tokens = torch.LongTensor([
    [0, 1, 2, 3],
    [1, 2, 3, 4],
    [2, 3, 4, 5],
    [5, 4, 3, 2],
    [4, 3, 2, 1],
    [3, 2, 1, 0],
]).cuda()

# re-initialize the model, in case the cell is executed multiple times
model = TransformerDecoder(
    we=nn.Embedding(n_vocab, d_model),
    pe=PositionalEncodings(d_model),
    layers=[
        DecoderBlock(d_model, num_heads, dim_feedforward=2 * d_model)
        for _ in range(n_layers)
    ],
).cuda()

# train
optimizer = optim.Adam(model.parameters(), lr=1e-4)
losses = []
for i in range(1000):
    loss = train_step(model, optimizer, tokens)
    if i % 100 == 0:
        print("Step {:d}: loss = {:.4f}".format(i + 1, loss))
    losses.append(loss)
plt.plot(losses)
plt.show()

In [None]:
# DO NOT alter this cell

tokens = torch.LongTensor([
    [1, 2, 3],
    [3, 2, 1],
]).cuda()
mask = autoregressive_attention_mask(tokens)
logits = model(tokens, mask)
prob = nn.Softmax(dim=-1)(logits)

print("Next token of 1, 2, 3:")
for j in prob[0, -1].argsort(descending=True)[:3]:
    print("  p({:d}) = {:.4f}".format(j, prob[0, -1, j]))
assert (prob[0, -1].argsort(descending=True)[0] == 4)
print("Part 1.5.2.a passed")
print()
print("Next token of 3, 2, 1:")
for j in prob[1, -1].argsort(descending=True)[:3]:
    print("  p({:d}) = {:.4f}".format(j, prob[1, -1, j]))
assert (prob[1, -1].argsort(descending=True)[0] == 0)
print()
print("Part 1.5.2.b passed")

In [None]:
# run this cell before moving into part2 to clear up gpu memory
del model
import torch
torch.cuda.empty_cache()

## Part 2: Training and Evaluation via Huggingface Transformer (45 points)

In this part, you will first implement the evaluation of a transformer model on a dataset and then fine-tune the model to observe improvements on task performance.

Hints:
- You can use a GPU to speed up the training process. Select "Runtime" > "Change runtime type" > "GPU" in the Colab settings.
- Use smaller batch sizes if you encounter memory issues.

### Prerequisites: Install Libraries and Login to HuggingFace

- The model we are going to use is [Llama-3.2-1B-Instruct](https://huggingface.co/meta-llama/Llama-3.2-1B-Instruct) which is a gated model. Please visit the model page on Hugging Face and accept the terms of the license. Typically, access is granted within 24 hours.
- Go to huggingface -> Profile icon on the upper-right corner -> "Settings" -> "Access Tokens" to get the key for logging in

In [None]:
!pip install transformers accelerate bitsandbytes>0.37.0 trl==0.12.0 peft
!pip install flash-attn --no-build-isolation

In [None]:
from huggingface_hub import notebook_login
notebook_login()

### Section 1: Loading the model and the tokenizer (15 points)
In this section, you will learn how to load the pre-trained model and its associated tokenizer using the huggingface package.

You can see a demo code here: <br>
https://huggingface.co/meta-llama/Llama-3.2-1B-Instruct


In [None]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

model_id = "meta-llama/Llama-3.2-1B-Instruct"
device = "cuda" if torch.cuda.is_available() else "cpu"

# TODO: Load the model using the appropriate parameters using AutoModelForCausalLM
# Ensure torch_dtype is set to torch.bfloat16


# TODO: Initialize the tokenizer using AutoTokenizer


In [None]:
# DO NOT alter this cell.
vocab_size = len(tokenizer.get_vocab())
print(f"\nThe vocabulary size of the model is {vocab_size}")
assert vocab_size == 128256
print("Part 2.1.1 passed")

In [None]:
# TODO: Define the messages for the chatbot interaction (List[Dict])
messages = [

]

def run_model(model, tokenizer, messages, max_new_tokens=5, verbose=False):
    # TODO: Prepare the input text using the tokenizer's apply_chat_template (Do not tokenize the text yet)

    if verbose: print("\n###input_text:###\n", input_text)
    # TODO: Tokenize the input text and transfer it to the appropriate device

    if verbose: print("\n###input_ids:###\n", input_ids)
    # TODO: Generate a response using the model. Ensure do_sample is False.


    # TODO: Decode the output and return the response without special tokens

    if verbose: print("\n###response:###\n", response)
    return assistant_response

assistant_response = run_model(model=model, tokenizer=tokenizer, messages=messages, max_new_tokens=10, verbose=True)
print(f"\n###Assistant response:###\n{assistant_response}")

Use the following code snippet to verify your implementation

In [None]:
# DO NOT alter this cell.
grading_messages = [
    {"role": "system", "content": "You are a chatbot who responds very shortly."},
    {"role": "user", "content": "When was UCLA founded?"},
]
grading_output = run_model(model=model, tokenizer=tokenizer, messages=grading_messages, max_new_tokens=100)
expected_output = "University of California, Los Angeles (UCLA) was founded in 1919."
print(f"Your output is: {grading_output}\nThe expected output is: {expected_output}")
if grading_output != expected_output:
    raise ValueError(f"FAILED: Incorrect response! \n\n{grading_output}\n\n{expected_output}")
print("Part 2.1.2 passed")

### Section 2: Evaluation of the pre-trained model (10 points)
In this section, we are going to evaluate the pre-trained model with the natural language inference (NLI) task. NLI is a fundamental task in natural language processing that involves determining the relationship between two sentences: a **premise** and a **hypothesis**. The task is to classify this relationship into one of three categories:

**Entailment**: The hypothesis logically follows from the premise.

**Contradiction**: The hypothesis is logically inconsistent with the premise.

**Neutral**: This occurs when there is no clear logical relationship between the premise and the hypothesis.

You can view examples for each category by running the cell below.

Your task is to implement the evaluation code. This involves running the model on the test set and comparing its predictions with the true labels. The goal is to achieve an accuracy of at least 30% by prompt engineering the model to generate the appropriate labels for this dataset.

In [None]:
import pandas as pd
pd.set_option("display.max_colwidth", None)

# Downloading dataset
dataset = {
    "train": pd.read_json("hf://datasets/nlp-projects/cs269-f24/assignment_2/esnli_train.jsonl", lines=True),
    "validation": pd.read_json("hf://datasets/nlp-projects/cs269-f24/assignment_2/esnli_validation.jsonl", lines=True),
    "test": pd.read_json("hf://datasets/nlp-projects/cs269-f24/assignment_2/esnli_test.jsonl", lines=True),
}

dataset["test"]

In [None]:
def apply_esnli_prompt(premise, hypothesis):
    # TODO:
    # Write a prompt for esnli dataset using given premise and hypothesis
    # so that the model classifies the input as entailment, neutral, or contradiction

    return prompt.strip()

In [None]:
# Applying your prompt template on the dataset
prompt_dataset = {}
for part in dataset.keys():
    prompt_dataset[part] = dataset[part].copy()
    prompt_dataset[part]["prompt"] = prompt_dataset[part].apply(lambda x: apply_esnli_prompt(x["premise"], x["hypothesis"]), axis=1)
prompt_dataset["test"]

In [None]:
from sklearn.metrics import accuracy_score
from tqdm.auto import tqdm

def evaluate_esnli(model, tokenizer, test_dataset):
    """
    Evaluate the model on the test dataset.
    Returns:
        accuracy: The accuracy of the model on the test dataset. The value is scaled from 0.0 to 1.0 (float)
        outputs: The model's predictions on the test dataset. (list[str])
    """
    # TODO: Implement the evaluation loop and return accuracy of the model as well as list of outputs
    # Hint: You can reuse the run_model function we implemented earlier.


    return accuracy, outputs

In [None]:
# DO NOT alter this cell:
acc, outputs = evaluate_esnli(model, tokenizer, prompt_dataset["test"])
print(f"Accuracy: {acc}")
prompt_dataset["test"]["output"] = outputs
expected_acc = 0.3
if acc < expected_acc / 2 or acc > 1:
  raise ValueError(f"FAILED: Low Accuracy! \n\n{acc} is lower than the required threshold 0.15\nYou might need to update your prompt so that the model follows the instructions better.")
print("Part 2.2.1 passed")

if acc < expected_acc or acc > 1:
    raise ValueError(f"FAILED: Low Accuracy! \n\n{acc} is lower than the required threshold {expected_acc}\nYou might need to update your prompt so that the model follows the instructions better.")
print("Part 2.2.2 passed")

### Section 3: Fine-tuning LLaMA (5 points)

- (Optional) Read the original LoRA paper [link](https://arxiv.org/pdf/2106.09685) and understand the meaning of each parameters
- Complete the LoRA config in the code and fine-tune the model.
- The goal is to achieve an accuracy of at least 50%. You can change your prompt in the previous section if your fine-tuned model doesn't achieve this threshold.

#### Hint
- If you encountered the CUDA out-of-memory (OOM) issue, go to `Runtime` -> `Restart runtime...` in the menu
- On T4 GPU the training would take around 15 minutes

In [None]:
from peft import LoraConfig

def create_lora_config():
    peft_config = LoraConfig(
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM",
        target_modules=[
            "q_proj","k_proj","v_proj","o_proj","gate_proj","up_proj","down_proj",
        ],
        # TODO: Set r=32 and alpha=16

    )
    return peft_config

In [None]:
import argparse
import torch
import datasets
from datasets import load_dataset
from trl import SFTTrainer, DataCollatorForCompletionOnlyLM
from transformers import TrainingArguments, HfArgumentParser, AutoTokenizer, TrainerCallback
from huggingface_hub import login
import matplotlib.pyplot as plt
from peft import LoraConfig

# Assigning labels as 'completion' for the prompt dataset.
for key in prompt_dataset:
    prompt_dataset[key]["completion"] = prompt_dataset[key]["label"]

# Defining the training arguments. These control various aspects of training such as learning rate,
# batch size, number of epochs, evaluation strategy, etc.
training_args = TrainingArguments(
    report_to="none",
    learning_rate=5e-5,
    lr_scheduler_type="constant_with_warmup",
    warmup_steps=10,
    num_train_epochs=3,
    per_device_train_batch_size=8,
    gradient_accumulation_steps=1,
    output_dir="output_model",
    overwrite_output_dir=True,
    save_strategy="epoch",
    save_total_limit=1,
    load_best_model_at_end=False,
    logging_steps=1,
    seed=0,
    do_train=True,
    do_eval=True,
    do_predict=False,
    eval_strategy="epoch",
    gradient_checkpointing=True,
    max_grad_norm=0.3,
    push_to_hub=False,
    hub_private_repo=True,
)

# Create a LoRA (Low-Rank Adaptation) configuration for parameter-efficient fine-tuning.
# This reduces the number of parameters to train and makes the model lighter and faster.
peft_config = create_lora_config()

# This function formats the input dataset according to a specific template
# expected by the model, turning prompts and completions into a chat-based format.
def instructions_formatting_function(tokenizer: AutoTokenizer):
    def format_dataset(examples):
        if isinstance(examples["prompt"], list):
            output_texts = []
            for i in range(len(examples["prompt"])):
                converted_sample = [
                    {"role": "user", "content": examples["prompt"][i]},
                    {"role": "assistant", "content": examples["completion"][i]},
                ]
                output_texts.append(tokenizer.apply_chat_template(converted_sample, tokenize=False))
            output_texts = [text.replace("<s>", "").replace("<|begin_of_text|>", "").replace("\n\n", "") for text in output_texts]
            print(output_texts[0])
            return output_texts
        else:
            converted_sample = [
                {"role": "user", "content": examples["prompt"]},
                {"role": "assistant", "content": examples["completion"]},
            ]
            return tokenizer.apply_chat_template(converted_sample, tokenize=False)

    return format_dataset


tokenizer = AutoTokenizer.from_pretrained(model_id)
if getattr(tokenizer, "pad_token", None) is None:
    tokenizer.pad_token = tokenizer.eos_token

# Initialize the data collator, which is responsible for completion tasks.
# It ensures that all the tokens of the labels are set to an 'ignore_index'
# when they do not come from the assistant. This ensure that the loss is only
# calculated on the completion made by the assistant.
response_template = "<|start_header_id|>assistant<|end_header_id|>"
print("response_template:", response_template)
collator = DataCollatorForCompletionOnlyLM(response_template, tokenizer=tokenizer)

# Initialize the trainer with all the specified configurations, datasets, and formatting functions.
trainer = SFTTrainer(
    model_id,
    args=training_args,
    train_dataset=datasets.Dataset.from_pandas(prompt_dataset["train"]),
    eval_dataset=datasets.Dataset.from_pandas(prompt_dataset["validation"]),
    packing=False,
    model_init_kwargs={
        "torch_dtype": torch.bfloat16,
    },
    tokenizer=tokenizer,
    max_seq_length=500,
    peft_config=peft_config,
    formatting_func=instructions_formatting_function(tokenizer),
    data_collator=collator,
)
# Debugging: Print a batch of decoded input tokens from the training dataset to verify they are correct.
print(trainer.tokenizer.batch_decode(trainer.train_dataset["input_ids"][0], skip_special_tokens=False))
# Run evaluation on the model to check performance on the validation dataset before finetuning.
print(trainer.evaluate())
# Begin the training loop. This will train the model using the defined parameters and dataset.
trainer.train()

In [None]:
# DO NOT alter this cell:
acc, outputs = evaluate_esnli(trainer.model, tokenizer, prompt_dataset["test"])
print(f"Accuracy: {acc}")
prompt_dataset["test"]["output"] = outputs
expected_acc = 0.5
if acc < expected_acc:
    raise ValueError(f"FAILED: Low Accuracy! \n\n{acc} is lower than the required threshold {expected_acc}\nYou might need to update your prompt so that the model follows the instructions better.")
print("Part 2.3 passed")

### Section 4: Explanation (15 points)


* In this section, you'll use the e-SNLI dataset, which includes free-form rationales (explanations) for each NLI example. The task is to generate model explanations for a subset of the test set and compare them to human explanations using the BLEU score. Your primary task is to write effective prompts that will guide the model to generate explanations for a subset of the test set.

* Expectations:
    * Pretrained Model: Expected BLEU score > 0.15.
    * Fine-Tuned Model: Expected BLEU score > 0.20.

* To improve the model's explanations, you'll need to engage in prompt engineering. Adjust your input prompts to guide the model toward better explanations (More similar to human references) and aim to improve the BLEU score. You are also encouraged to check the generated explanations to qualitatively understand their quality, and possibly conduct some analysis.

* By the end of this section, you should be able to create effective prompts that improve the quality of the model's explanations and analyze their performance through BLEU scores, and optionally do a qualitative analysis by reviewing the explanations and comparing them with human-provided ones.



In [None]:
explanation_dataset = pd.read_json("hf://datasets/nlp-projects/cs269-f24/assignment_2/esnli_test_with_explanation.jsonl", lines=True)
explanation_dataset

In [None]:
import nltk
import numpy as np
from sklearn.metrics import accuracy_score
from tqdm.auto import tqdm

def apply_esnli_prompt_explanation(premise, hypothesis):
    # TODO:
    # Create a prompt for the e-SNLI dataset.
    # This prompt will guide the model to generate an explanation for the relationship
    # between the given premise and hypothesis.

    return prompt.strip()

In [None]:
# DO NOT CHANGE
def evaluate_esnli_explanation(model, tokenizer, test_dataset, max_new_tokens=1000):
    outputs = []
    for row in tqdm(test_dataset.to_dict(orient="records")):
        # Construct the messages to send to the model.
        grading_messages = [
            {"role": "system", "content": ""},
            {"role": "user", "content": row["prompt"]},
        ]
        # Run the model with the constructed messages and store the output.
        output = run_model(model=model, tokenizer=tokenizer, messages=grading_messages, max_new_tokens=max_new_tokens)
        # print(output)
        outputs.append(output)
    # Initialize lists to hold references and hypotheses for BLEU score calculation.
    r, h = [], []
    for idx, row in tqdm(enumerate(test_dataset.to_dict(orient="records"))):
        # Get the human-provided explanations (references) from the dataset.
        references = [
            row["explanation_1"].split(),  # Split into list of words
            row["explanation_2"].split(),
            row["explanation_3"].split(),
        ]
        # Split the generated output (hypothesis) into words.
        hypothesis = outputs[idx].split()
        r.append(references)
        h.append(hypothesis)

    # Calculate the BLEU score using the nltk library.
    # weights=(1, 0, 0, 0) means we are using only the 1-gram BLEU score.
    bleu_score = nltk.translate.bleu_score.corpus_bleu(r, h, weights=(1, 0, 0, 0))
    return bleu_score, outputs  # Return the calculated BLEU score and the generated outputs.

In [None]:
# DO NOT CHANGE (Testing Pre-trained Model)
prompt_explanation_dataset = {}
prompt_explanation_dataset["test"] = explanation_dataset.copy()
prompt_explanation_dataset["test"]["prompt"] = prompt_explanation_dataset["test"].apply(lambda x: apply_esnli_prompt_explanation(x["premise"], x["hypothesis"]), axis=1)
prompt_explanation_dataset["test"]

df = prompt_explanation_dataset["test"].iloc[:10].copy()
bleu_score, outputs = evaluate_esnli_explanation(model, tokenizer, df)
print(f"Bleu: {bleu_score}")
df["output"] = outputs
display(df)
expected_bleu = 0.15
if bleu_score < expected_bleu:
    raise ValueError(f"FAILED: Low Bleu! \n\n{bleu_score} is lower than the required threshold {expected_bleu}\nYou might need to update your prompt so that the model explains more similarly to human explanations.")
print("Part 2.4.1 passed")

In [None]:
# DO NOT CHANGE (Testing Fine-tuned Model)
prompt_explanation_dataset = {}
prompt_explanation_dataset["test"] = explanation_dataset.copy()
prompt_explanation_dataset["test"]["prompt"] = prompt_explanation_dataset["test"].apply(lambda x: apply_esnli_prompt_explanation(x["premise"], x["hypothesis"]), axis=1)
prompt_explanation_dataset["test"]

df = prompt_explanation_dataset["test"].iloc[:10].copy()
bleu_score, outputs = evaluate_esnli_explanation(trainer.model, tokenizer, df)
print(f"Bleu: {bleu_score}")
df["output"] = outputs
display(df)
expected_bleu = 0.20
if bleu_score < expected_bleu:
    raise ValueError(f"FAILED: Low Bleu! \n\n{bleu_score} is lower than the required threshold {expected_bleu}\nYou might need to refine your prompt to encourage the model to generate explanations more similar to human-provided explanations. Since the fine-tuned model is trained primarily to output labels (entailment, contradiction, neutral), it's important to create a more assertive prompt that explicitly directs the model to provide an explanation rather than just a label.")
print("Part 2.4.2 passed")