<a href="https://colab.research.google.com/github/yaya-sy/LLMReasoningCourse/blob/main/labs/lab2/lab2_corrected.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Lab 2: Improving the output quality of LLMs with repeated sampling

We will see how to improve LLMs for French to English translation by scaling the translation candidates and using a reward score to pick the best translation.

# Load model

In [None]:
from datasets import load_dataset

raw_datasets = load_dataset("haoranxu/X-ALMA-Preference", split="train")
# get only 100 sentences for evaluation
raw_datasets = raw_datasets.filter(lambda x: x["directions"] == "fr-en")
N = 100
subset = raw_datasets.select(range(N))
# use 'chosen' as gold English translations and 'source' as French sentences to translate to English

In [None]:
print(subset[0])

We will use `HuggingFaceTB/SmolLM2-360M-Instruct` for this lab.

In [None]:
# load the model and the tokenizer. Load the model on the GPU if available
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForCausalLM.from_pretrained("HuggingFaceTB/SmolLM2-360M-Instruct", dtype=torch.bfloat16, token="")
tokenizer = AutoTokenizer.from_pretrained("HuggingFaceTB/SmolLM2-360M-Instruct")

model = model.to(device)

# Part 1: Self-Confidence

In this section we will use self-confidence as reward.

In [None]:
!pip install sacrebleu evaluate

In [None]:
import evaluate
metric = evaluate.load("sacrebleu")

In [None]:
from tqdm import tqdm
from typing import List

@torch.no_grad()
def rollouts(corpus: List[str], batch_size: int=4, num_samples=4):
    """For each example in the batch, generate `num_samples` translations."""
    def tokenize(texts: List[str]):
        """Tokenize the texts"""
        conversations = [
            [{"role": "user",
              "content": f"You are given a French text, provide faithful translation to English.\n\n{text}"}]
            for text in texts]
        # TODO: call apply_chat_template from the tokenizer class with the right arguments to output torch tensors of the token ids
        # Which padding side to use? why?
        templated = tokenizer.apply_chat_template(conversations, tokenize=False, add_generation_prompt=True)
        tokenized = tokenizer(templated, padding_side="left", padding=True, return_tensors="pt")
        return tokenized
    data = sorted(enumerate(corpus), key=lambda x: tokenize([x[1]]).input_ids.shape[-1])

    results = [None] * len(corpus)

    for i in tqdm(range(0, len(data), batch_size)):
        indices, texts = zip(*data[i : i + batch_size])

        tokenized = tokenize(texts)
        input_ids = tokenized.input_ids.to(model.device)
        b, s = input_ids.shape
        input_ids = input_ids.repeat_interleave(num_samples, dim=0)
        attention_mask = tokenized.attention_mask.to(model.device)
        attention_mask = attention_mask.repeat_interleave(num_samples, dim=0)

        outputs = model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,
            temperature=1.0,
            top_p=0.4,
            max_new_tokens=64,
            do_sample=True
        )
        outputs = outputs.view(b, num_samples, -1)

        for idx, predicted_ids in zip(indices, outputs):
            generated_tokens = predicted_ids[:, s:]
            translations = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
            results[idx] = translations

    return results

## Likelihood self-confidence

1. Translate the French corpus with num_samples `[1, 2, 4, 8, 16]`
2. Compute the log-likelihood of the translation using the LLM itsself
3. Pick the most likely translation for each example
4. Compute the bleu score, and observe the results

In [None]:
from torch.nn import functional as F
from typing import Tuple, List

@torch.no_grad()
def log_likelihood(texts: List[Tuple[str, str]], batch_size: int=16):
    loglikelihoods = []

    def tokenize(texts: List[Tuple[str, str]]):
        conversations = [
            [
                {"role": "user",
                "content": f"You are given a French text, provide faithful translation to English.\n\nThe French text: {text}"
                },
                {"role": "assistant",
                "content": f"{translation}"
                }
            ]
            for text, translation in texts
        ]

        templated = tokenizer.apply_chat_template(conversations, tokenize=False, add_generation_prompt=False)
        tokenized = tokenizer(templated, padding_side="right", padding=True, return_tensors="pt")

        # we need to find where the user message ends so we can ignore it for the prob computation. Remember we're only interested in p(translation|source)
        prompts_only = [
            tokenizer.apply_chat_template(
                [conv[0]],
                tokenize=False,
                add_generation_prompt=True
            )
            for conv in conversations
        ]
        prompt_tokenized = tokenizer(prompts_only, padding_side="right", padding=True, return_tensors="pt")
        prompt_lengths = (prompt_tokenized.input_ids != tokenizer.pad_token_id).sum(dim=1)

        # Now mask the source sentence (user message), keep assistant response (translation)
        labels = tokenized.input_ids.clone()
        labels[labels == tokenizer.pad_token_id] = -100

        for i, prompt_len in enumerate(prompt_lengths):
            labels[i, :prompt_len] = -100

        tokenized["labels"] = labels
        return tokenized

    for start in range(0, len(texts), batch_size):
        batch = texts[start:start+batch_size]
        tokenized = tokenize(batch).to(model.device)
        labels = tokenized.labels[:, 1:].long().contiguous()
        del tokenized["labels"]
        logits = model(**tokenized, output_hidden_states=True).logits
        logits = logits[:, :-1, :].float()
        log_probs = F.log_softmax(logits, dim=-1)
        b, s, *_ = log_probs.shape
        nll_loss = F.nll_loss(
            log_probs.view(-1, logits.shape[-1]),
            labels.view(-1),
            ignore_index=-100,
            reduction="none"
        ).view(b, s)

        valid_mask = (labels != -100).float()
        per_sample_nll = (nll_loss * valid_mask).sum(dim=1) / valid_mask.sum(dim=1)
        loglikelihoods.extend((-per_sample_nll).tolist())

    return torch.tensor(loglikelihoods)

In [None]:
def loglikelihood_repeated_sampling(attempts=[1, 2, 4, 8, 16, 32, 64, 128]):
    print("Rollouts...")
    results = {num_samples : rollouts(subset["source"], num_samples=num_samples) for num_samples in attempts}
    gold_and_candidates = {num_samples: [] for num_samples in attempts}
    print("Loglikelihood self-confidence...")
    for num_samples in results:
        for idx, candidates in tqdm(enumerate(results[num_samples]), total=N):
            gold_translation = [subset[idx]["chosen"]] * len(candidates)
            source = [subset[idx]["source"]] * len(candidates)
            paired = list(zip(source, candidates))
            loglikelihoods = log_likelihood(paired).cpu()
            best_candidate_idx = loglikelihoods.argmax().item()
            best_candidate = candidates[best_candidate_idx]
            gold_and_candidates[num_samples].append((gold_translation[0], best_candidate, loglikelihoods[best_candidate_idx].item()))
    return gold_and_candidates

In [None]:
def compute_bleu(gold_and_candidates, self_confidence="logprob"):
    results = {}
    for num_samples in gold_and_candidates:
        references, candidates, logprobability = zip(*gold_and_candidates[num_samples])
        results[num_samples] = {"bleu": metric.compute(predictions=candidates, references=references)["score"],
                                    self_confidence: sum(logprobability) / len(logprobability)}
    return results

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

gold_and_candidates = loglikelihood_repeated_sampling()
bleu_logprob = compute_bleu(gold_and_candidates)
rows = []
for n, metrics in bleu_logprob.items():
    rows.append({'num_samples': n, 'metric': 'BLEU', 'value': metrics['bleu']})
    rows.append({'num_samples': n, 'metric': 'Log Probability', 'value': metrics['logprob']})

df = pd.DataFrame(rows)

# Create facet plot
g = sns.FacetGrid(df, col='metric', height=5, aspect=1.2, sharey=False)
g.map_dataframe(sns.lineplot, x='num_samples', y='value', marker='o')
g.set(xscale='log')

# Set x-axis ticks
for ax in g.axes.flat:
    ax.set_xticks(list(bleu_logprob.keys()))
    ax.set_xticklabels(list(bleu_logprob.keys()))
    ax.grid(True, alpha=0.3)

g.set_axis_labels('Number of Samples', 'Value')
plt.tight_layout()
plt.show()

## Entropy self-confidence

Modify the function `log_likelihood` to compute the entropy. Repeat the previous experiment but with entropy.

In [None]:
from torch.nn import functional as F
from typing import Tuple, List

@torch.no_grad()
def entropy(texts: List[Tuple[str, str]], batch_size: int=16):
    entropies = []

    def tokenize_(texts: List[Tuple[str, str]]):
        conversations = [
            [
                {"role": "user",
                "content": f"You are given a French text, provide faithful translation to English.\n\nThe French text: {text}"
                },
                {"role": "assistant",
                "content": f"{translation}"
                }
            ]
            for text, translation in texts
        ]

        templated = tokenizer.apply_chat_template(conversations, tokenize=False, add_generation_prompt=False)
        tokenized = tokenizer(templated, padding_side="right", padding=True, return_tensors="pt")

        # we need to find where the user message ends so we can ignore it for the prob computation. Remember we're only interested in p(translation|source)
        prompts_only = [
            tokenizer.apply_chat_template(
                [conv[0]],
                tokenize=False,
                add_generation_prompt=True
            )
            for conv in conversations
        ]
        prompt_tokenized = tokenizer(prompts_only, padding_side="right", padding=True, return_tensors="pt")
        prompt_lengths = (prompt_tokenized.input_ids != tokenizer.pad_token_id).sum(dim=1)

        # Now mask the source sentence (user message), keep assistant response (translation)
        labels = tokenized.input_ids.clone()
        labels[labels == tokenizer.pad_token_id] = -100

        for i, prompt_len in enumerate(prompt_lengths):
            labels[i, :prompt_len] = -100

        tokenized["labels"] = labels
        return tokenized

    for start in range(0, len(texts), batch_size):
        batch = texts[start:start+batch_size]
        tokenized = tokenize_(batch).to(model.device)
        labels = tokenized.labels[:, 1:].long().contiguous()
        del tokenized["labels"]
        logits = model(**tokenized, output_hidden_states=True).logits
        logits = logits[:, :-1, :].float()
        probabilities = F.softmax(logits, dim=-1)
        token_entropy = -torch.sum(probabilities * torch.log(probabilities), dim=-1)

        valid_mask = (labels != -100).float()
        per_sample_entropy = (token_entropy * valid_mask).sum(dim=1) / valid_mask.sum(dim=1)
        entropies.extend(per_sample_entropy.tolist())

    return torch.tensor(entropies)

In [None]:
def entropy_repeated_sampling(attempts=[1, 2, 4, 8, 16, 32, 64, 128]):
    print("Rollouts...")
    results = {num_samples : rollouts(subset["source"], num_samples=num_samples) for num_samples in attempts}
    gold_and_candidates = {num_samples: [] for num_samples in attempts}

    print("Entropy self-confidence...")
    for num_samples in results:
        for idx, candidates in tqdm(enumerate(results[num_samples]), total=N):
            gold_translation = [subset[idx]["chosen"]] * len(candidates)
            source = [subset[idx]["source"]] * len(candidates)
            paired = list(zip(source, candidates))
            entropies = entropy(paired).cpu()
            best_candidate_idx = entropies.argmin().item()
            best_candidate = candidates[best_candidate_idx]
            gold_and_candidates[num_samples].append((gold_translation[0], best_candidate, entropies[best_candidate_idx].item()))
    return gold_and_candidates

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

gold_and_candidates = entropy_repeated_sampling()
bleu_entropy = compute_bleu(gold_and_candidates, "entropy")

rows = []
for n, metrics in bleu_entropy.items():
    rows.append({'num_samples': n, 'metric': 'BLEU', 'value': metrics['bleu']})
    rows.append({'num_samples': n, 'metric': 'Entropy', 'value': metrics['entropy']})

df = pd.DataFrame(rows)

# Create facet plot
g = sns.FacetGrid(df, col='metric', height=5, aspect=1.2, sharey=False)
g.map_dataframe(sns.lineplot, x='num_samples', y='value', marker='o')
g.set(xscale='log')

# Set x-axis ticks
for ax in g.axes.flat:
    ax.set_xticks(list(bleu_entropy.keys()))
    ax.set_xticklabels(list(bleu_entropy.keys()))
    ax.grid(True, alpha=0.3)

g.set_axis_labels('Number of Samples', 'Value')
plt.tight_layout()
plt.show()

# Reward modeling

We will train a reward model for translation quality scoring.

## Data

1. Get 1000 translation pairs from the dataset.
2. Translate the French texts to English with repeated sampling (num_samples=4)
3. Use the gold English texts as prefered translations and the translations of the model as rejected translations

# Model

We will use the Bradley-Terry model:

$$
p(y_c > y_r|x) = \frac{\exp(\mathbf{c}_p \cdot \mathbf{w}^T)}{\exp(\mathbf{c}_p \cdot \mathbf{w}^T) + \exp(\mathbf{c}_r \cdot \mathbf{w}^T)}
$$

where:

- $\mathbf{c}_p = \text{Transformer}(\mathbf{x}, y_p)$
- $\mathbf{c}_r = \text{Transformer}(\mathbf{x}, y_r)$

1. Derive the formula to get logistic regression model (see lecture2)
2. Complete the class to implement a reward model
3. Train the reward model

In [None]:
from torch import nn
import torch.nn.functional as F
from copy import deepcopy

class RewardModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.transformer = deepcopy(model)
        self.regression_head = nn.Linear(in_features=model.config.hidden_size, out_features=1)

    def loss_fn(self, prefered_logits, rejected_logits):
        loss = -(F.logsigmoid(prefered_logits.squeeze() - rejected_logits.squeeze())).mean()
        return loss

    def forward(self, input_ids, attention_mask):
        last_layer_hidden_states = self.transformer(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True).hidden_states[-1]

        mask_expanded = attention_mask.unsqueeze(-1).float()

        masked_sum = (last_layer_hidden_states * mask_expanded).sum(dim=1).float()

        seq_lengths = attention_mask.sum(dim=1, keepdim=True)  # [batch_size, 1]
        averaged_hidden = masked_sum / seq_lengths

        logits = self.regression_head(averaged_hidden)

        return logits

In [None]:
import torch
from tqdm.notebook import tqdm

def train(reward_model, prompts, prefered, rejected, lr=0.0001):
    """ Training loop."""
    def tokenize_(texts: List[Tuple[str, str]]):
        conversations = [
            [
                {"role": "user",
                "content": f"You are given a French text, provide faithful translation to English.\n\n{text}"
                },
                {"role": "assistant",
                "content": f"{translation}"
                }
            ]
            for text, translation in texts
        ]

        templated = tokenizer.apply_chat_template(conversations, tokenize=False, add_generation_prompt=False)
        tokenized = tokenizer(templated, padding_side="right", padding=True, return_tensors="pt")

        return tokenized
    batch_size = 2
    optimizer = torch.optim.Adam(reward_model.parameters(), lr=lr)
    reward_model.train()
    prefered_data = list(zip(prompts, prefered))
    rejected_data = list(zip(prompts, rejected))
    for _ in range(2):
        p_bar = tqdm(total=len(prefered_data))
        for batch in range(0, len(prefered_data), batch_size):
            p_bar.update(batch_size)
            optimizer.zero_grad()
            prefered_batch = prefered_data[batch:batch+batch_size]
            rejected_batch = rejected_data[batch:batch+batch_size]
            prefered_tokenized = tokenize_(prefered_batch).to(model.device)
            rejected_tokenized = tokenize_(rejected_batch).to(model.device)

            prefered_logits = reward_model(input_ids=prefered_tokenized["input_ids"], attention_mask=prefered_tokenized["attention_mask"])
            rejected_logits = reward_model(input_ids=rejected_tokenized["input_ids"], attention_mask=rejected_tokenized["attention_mask"])
            loss = reward_model.loss_fn(prefered_logits, rejected_logits)
            loss.backward()
            optimizer.step()
        print(loss.item())
    return reward_model

In [None]:
train_subset = raw_datasets.select(range(1000))
prompts = train_subset["source"]
prefered = train_subset["chosen"]
rejected = rollouts(prompts)

In [None]:
prompts_repeated = []
prefered_repeated = []
rejected_repeated = []

for prompt, pref, rej in zip(prompts, prefered, rejected):
    prompts_repeated.extend([prompt] * len(rej))
    prefered_repeated.extend([pref] * len(rej))
    rejected_repeated.extend(rej)

In [None]:
print(prompts_repeated[-1])
print(prefered_repeated[-1])
print(rejected_repeated[-1])

In [None]:
reward_model = RewardModel().to(model.device)

In [None]:
train(reward_model, prompts_repeated, prefered_repeated, rejected_repeated)

Now use the reward model to select the translations.