

## Learning Objectives

By the end of this session, you will:
1. Implement two complementary inference strategies:
   - Multiple generator calls (parallel, refinement)
   - Single long output generation (extended CoT)
2. Integrate feedback information (reward models, LLM feedback)
3. Compare and analyze different approaches
4. Understand compute-accuracy trade-offs


## Setup

### Install Dependencies

In [None]:
# Install required packages
!pip install openai transformers torch datasets matplotlib seaborn numpy pandas tqdm

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [None]:
import os
import json
import random
import re
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
from collections import defaultdict, Counter
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from itertools import product
from typing import List, Dict, Any, Optional, Tuple

from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForSequenceClassification, pipeline

# Set random seeds for reproducibility
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

# Configure plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

# Check device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# For CPU-only environments, we'll use smaller models
assert device != "cpu", "We need GPUs for this practice!"

Using device: cuda


### Models

#### Loading

In [None]:
# Model options - pick one!
MODEL_OPTIONS = {
    "small": "Qwen/Qwen3-0.6B",   # 600M params
    "medium": "Qwen/Qwen3-1.7B",  # 1.7B params
    "large": "Qwen/Qwen3-4B",     # 4B params
}


MODEL_SIZE = "small"
SELECTED_MODEL = MODEL_OPTIONS[MODEL_SIZE]


print(f"Loading {SELECTED_MODEL}...")
tokenizer = AutoTokenizer.from_pretrained(SELECTED_MODEL)
model = AutoModelForCausalLM.from_pretrained(SELECTED_MODEL)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

generator = pipeline(
  "text-generation",
  model=model,
  tokenizer=tokenizer,
  model_kwargs={"torch_dtype": torch.bfloat16},
  device_map="auto",
)

print("Model loaded!")

Loading Qwen/Qwen3-0.6B...


NameError: name 'AutoTokenizer' is not defined

#### Boilerplate

In [None]:
def generate_text(model, prompt: str | list[dict[str, str]], max_tokens=100, temperature=0.7):
    """Helper function to generate text"""

    if isinstance(prompt, str):
      prompt = [
          {"role": "system", "content": "Please reason step by step, and put your final answer within \\boxed{}."},
          {"role": "user", "content": prompt},
      ]

    result = model(
        prompt,
        max_new_tokens=max_tokens,
        temperature=temperature,
        do_sample=True,
        return_full_text=False,
        pad_token_id=tokenizer.eos_token_id
    )
    return result[0]['generated_text']


def extract_answer(text: str) -> int:
    """Extract numerical answer from text"""
    numbers = re.findall(r'\b\d+\b', text)
    return int(numbers[-1]) if numbers else None


def plot(data: Dict[str, List[Dict[str, Any]]], title: str):
    """
    Plot line graphs for multiple datasets side by side.

    Args:
        data: Dictionary with dataset names as keys and list of dicts as values.
              Each dict should have 'n', 'accuracy', and 'method' keys.
        title: Overall title for the plot
    """
    # Set up the plotting style
    sns.set_style("whitegrid")

    fig, axes = plt.subplots(1, len(data), figsize=(6 * len(data), 5), sharex=True)

    if len(data) == 1:
        axes = [axes]

    for ax, (name, dataset) in zip(axes, data.items()):
        df = pd.DataFrame(dataset)
        sns.lineplot(data=df, x='n', y='accuracy', hue='method', marker='o', ax=ax)
        ax.set_title(name)
        ax.legend(title='Method')

    plt.suptitle(title, fontweight='bold')
    plt.tight_layout()
    plt.show()


### Data

We'll use a curated set of multiplication and math problems for our experiments.

#### Boilerplate

In [None]:
def generate_multiplication_data(digits1: int, digits2: int, size: Optional[int] = None) -> List[Dict]:
    """
    Generate multiplication data with inputs of specified number of digits.

    Args:
        digits1: Number of digits for the first input
        digits2: Number of digits for the second input
        size: If specified, randomly sample this many combinations from all possible pairs
              If None, generate all possible combinations

    Returns:
        List of dictionaries with format: {"problem": "What is A x B?", "inputs": [A, B], "answer": C}
    """
    # Generate ranges for each operand based on digit count
    min1 = 10**(digits1 - 1) if digits1 > 1 else 1
    max1 = 10**digits1 - 1

    min2 = 10**(digits2 - 1) if digits2 > 1 else 1
    max2 = 10**digits2 - 1

    # Generate all possible operands
    operands1 = list(range(min1, max1 + 1))
    operands2 = list(range(min2, max2 + 1))

    if size is None:
        # Generate all combinations
        data = []
        for op1 in operands1:
            for op2 in operands2:
                data.append({
                    "problem": f"What is {op1} x {op2}?",
                    "inputs": [op1, op2],
                    "answer": op1 * op2
                })
        return data
    else:
        # Randomly sample combinations
        total_combinations = len(operands1) * len(operands2)

        if size > total_combinations:
            print(f"Warning: Requested size ({size}) exceeds total combinations ({total_combinations})")
            size = total_combinations

        # Generate random samples
        data = []
        sampled_pairs = set()

        while len(data) < size:
            op1 = random.choice(operands1)
            op2 = random.choice(operands2)

            # Avoid duplicates
            if (op1, op2) not in sampled_pairs:
                sampled_pairs.add((op1, op2))
                data.append({
                    "problem": f"What is {op1} x {op2}?",
                    "inputs": [op1, op2],
                    "answer": op1 * op2
                })

        return data


#### Generate multiplication data

In [None]:
DATA_SIZE = 100
mul_data_4x5 = generate_multiplication_data(digits1=4, digits2=5, size=DATA_SIZE // 4)
mul_data_5x5 = generate_multiplication_data(digits1=5, digits2=5, size=DATA_SIZE // 4)
mul_data_5x6 = generate_multiplication_data(digits1=5, digits2=6, size=DATA_SIZE // 4)
mul_data_7x5 = generate_multiplication_data(digits1=7, digits2=5, size=DATA_SIZE // 4)

mul_data = mul_data_4x5 + mul_data_5x5 + mul_data_5x6 + mul_data_7x5

print(f"Generated {len(mul_data)} multiplication problems.")

Generated 100 multiplication problems.


#### Load MATH-500

In [None]:
math_dataset = load_dataset("HuggingFaceH4/MATH-500", split="test").take(50)
print(f"Loaded {len(math_dataset)} MATH-500 problems.")

Loaded 50 MATH-500 problems.


#### Test Data Dict

In [None]:
TEST_DATA = {
    "multiplication": mul_data,
    "math_50": math_dataset,
}

### Test

In [None]:
# Sample an example response
test_response = generate_text(TEST_DATA["multiplication"][-1])
print(f"Test Response:\n\n{test_response}")

## Before You Start

**Model**

MODEL is already loaded, use `generate_text` to sample from the model. Here is an example:
```python
generate_text(MODEL, "Hi, How are you?", max_tokens=100, temperature=1.0)
```
The output is a string.

The model is instructed to generate its step-by-step reasoning and put its answer within `\boxed{}`, e.g. `\boxed{345}`.

There is also a method named `extract_answer` to extract the numerical answer from model outputs.

**Data**
TEST_DATA is a dict whose key is the dataset name and its value contains the examples. The datasets are:

  - `multiplication`: 100 examples stored as a list of dict with the following format:
```json
{
    "problem": "What is {A} x {B}?"
    "inputs": [A, B],
    "answer": C,
}
```
  - `MATH-500`: 50 math problems (see [here](https://huggingface.co/datasets/HuggingFaceH4/aime_2024)) that are stored as a hf Dataset object with the following format:
```json
{
    "id": ID,
    "problem": "Define...",
    "solution": "...",
    "answer": "X",
}
```

Note that we only need "problem" and "answer" from both datasets.

In [None]:
N_SAMPLES1 = 4 # @param {type:"integer"}
MAX_TOKENS1 = 512 # @param {type:"integer"}

TEMPERATURE1 = 0.6 # @param {type:"number"}
assert 1.0 >= TEMPERATURE1 >= 0.0, "temperature must be between 0.0 and 1.0"

# Part 1: Majority Voting

## Your task:
  1. **Implement parallel generation:** Given a problem, sample multiple responses from the model

  2. **Run paralllel generation on the test data:** Use the method implemented in the previous step to collect samples for the entire test data.

  3. **Implement majority voting for each dataset and report accuracy:** Set `n_samples=4`, `temperature=0.6` and `max_tokens=512`.

  4. **Vary `n_samples` from 1 to 32 and report the trend**

  5. **Bonus: How does the results change if we increase `temperature` to 1.0?**

  6. **Bonus: How about increasing `max_tokens` to 2048?**

## Considerations

  - The model does NOT always generate an answer for various reasons including (i) not following the instructions or (ii) reaching its context limit. Your code should work for these cases as well.


## Your code:

### Parallel generation

In [None]:
def sample_solutions(problem: str, n_samples: int, max_tokens: int = 512, temperature: float = 1.0) -> list[str]:
    """Generate multiple solutions for the given problem using the text-generation helper.
    Tries generate_text(prompt) and falls back to generate_text(generator, prompt).
    """
    outputs = []
    for _ in range(n_samples):
        try:
            out = generate_text(problem, max_tokens=max_tokens, temperature=temperature)
        except TypeError:
            out = generate_text(generator, problem, max_tokens=max_tokens, temperature=temperature)
        outputs.append(out)
    return outputs


def run_parallel_generation(test_data, n_samples: int, max_tokens: int = 512, temperature: float = 1.0):
    sampled_responses = defaultdict(list)
    for dataset_name, dataset in test_data.items():
      for example in tqdm(dataset, desc=f"{dataset_name}"):
        solutions = sample_solutions(example["problem"], n_samples, max_tokens, temperature)
        sampled_responses[dataset_name].append({
            "example": example,
            "sampled_responses": sample_solutions(example["problem"], n_samples, max_tokens, temperature),
        })
    return sampled_responses


SAMPLED_RESPONSES1 = run_parallel_generation(TEST_DATA, N_SAMPLES1, MAX_TOKENS1, TEMPERATURE1)


### Majority Voting

In [None]:
def majority_voting(sampled_responses: list[str]) -> str:
    """Perform majority voting on the sampled responses.

    Strategy:
      1) Extract a final answer string for each response.
         - Prefer \boxed{...} if present.
         - Else take the last number in the text (integer/decimal).
         - Else fall back to the stripped text.
      2) Return the most frequent extracted answer.
         If there's a tie, return the first among the tied answers by order of appearance.
    """
    import re
    from collections import Counter

    def extract_answer(text: str) -> str:
        if text is None:
            return ""
        m = re.search(r"\\boxed\{([^}]*)\}", text)
        if m:
            return m.group(1).strip()
        nums = re.findall(r"-?\d+(?:\.\d+)?", text)
        if nums:
            return nums[-1]
        return text.strip()

    answers = [extract_answer(t) for t in sampled_responses]
    if not answers:
        return ""
    counts = Counter(answers)
    max_count = max(counts.values()) if counts else 0
    for ans in answers:
        if counts[ans] == max_count:
            return ans
    return ""


def run_majority_voting(sampled_responses, n_samples: int = 0):
  accuracy = defaultdict(int)

  for dataset_name, examples in sampled_responses.items():
    is_corrects = []
    for example in examples:
      sampled_responses = example["sampled_responses"]
      if n_samples > 0:
        sampled_responses = sampled_responses[:n_samples]
      majority_answer = majority_voting(sampled_responses)

      is_correct = int(majority_answer == example["example"]["answer"])
      is_corrects.append(is_correct)

    accuracy[dataset_name] = np.mean(is_correct)

  return accuracy


maj_at_n1 = run_majority_voting(SAMPLED_RESPONSES1)
print(f"majority voting@{N_SAMPLES1} = {maj_at_n1}")

### Varying N in Majority Voting

In [None]:
MAX_N_SAMPELS = 32 # @param {type:"integer"}
assert MAX_N_SAMPELS > 0 and (MAX_N_SAMPELS & (MAX_N_SAMPELS - 1)) == 0, "MAX_N_SAMPLES must be a power of 2 (for simplicty)"

In [None]:
SAMPLED_RESPONSES = run_parallel_generation(TEST_DATA, MAX_N_SAMPLES, MAX_TOKENS1, TEMPERATURE1)

maj_at_n_data = defaultdict(list)

for n in range(np.log2(MAX_N_SAMPLES) + 1):
  n_samples = 2 ** n
  maj_at_n = run_majority_voting(SAMPLED_RESPONSES, n_samples)
  for dataset_name, accuracy in maj_at_n.items():
    maj_at_n[dataset_name].append(
        {"n": n_samples, "accuracy": accuracy, "method": "majority voting"}
    )

plot(maj_at_n, "majority voting")

# Part 2: Best-of-N


## Your task:
  1. **Implement Best-of-N strategy given a reward model:** Reuse the parallel generations from Part 1. Keep the same parameters as Part 1.

  2. **Vary `N` from 1 to 32 and make a comparison with Majority Voting**

## Your code:

In [None]:
REWARD_MODEL_NAME = "Skywork-Reward-V2-Llama-3.1-8B-40M" # @param ["Skywork-Reward-V2-Llama-3.1-8B-40M", "Skywork/Skywork-Reward-V2-Qwen3-8B", "Skywork-Reward-V2-Llama-3.2-3B", "Skywork-Reward-V2-Qwen3-4B"]

### Loading Reward Model

In [None]:
# off-load model from GPU
del MODEL
torch.cuda.empty_cache()
gc.collect()

# load reward model
reward_tokenizer = AutoTokenizer.from_pretrained(REWARD_MODEL_NAME)
reward_model = AutoModelForSequenceClassification.from_pretrained(
    REWARD_MODEL_NAME,
    torch_dtype=torch.bfloat16,
    device_map=device,
    attn_implementation="flash_attention_2",
    num_labels=1,
)


def get_score(problem: str, response: str, model, tokenizer):
  conversation = [
      {"role": "user", "content": problem},
      {"role": "assistant", "content": response},
  ]

  prompt = tokenizer.apply_chat_template(conversation, tokenize=False)
  # following the recommendation in provided example: https://huggingface.co/Skywork/Skywork-Reward-V2-Qwen3-4B#%F0%9F%93%9D-simple-example-in-transformers
  if tokenizer.bos_token is not None and prompt.startswith(tokenizer.bos_token):
    prompt = prompt[len(tokenizer.bos_token):]

  tokenized_prompt = tokenizer(prompt, return_tensors="pt").to(model.device)

  with torch.no_grad():
    score = model(tokenized_prompt).logits[0][0].item()

  return score


def get_batch_scores(examples: list, model, tokenizer):
  """
  Compute reward scores in a batch for faster inference.
  Each item can be either:
    - dict with keys {"problem": str, "response": str}, or
    - a plain response string (treated as assistant-only message).
  Returns: list[float] scores in the same order.
  """
  conversations = []
  for ex in examples:
    if isinstance(ex, dict) and "problem" in ex and "response" in ex:
      conversations.append([
          {"role": "user", "content": ex["problem"]},
          {"role": "assistant", "content": ex["response"]},
      ])
    else:
      resp = ex["response"] if isinstance(ex, dict) and "response" in ex else str(ex)
      conversations.append([{ "role": "assistant", "content": resp }])

  prompts = [tokenizer.apply_chat_template(c, tokenize=False) for c in conversations]
  if tokenizer.bos_token is not None:
    prompts = [p[len(tokenizer.bos_token):] if isinstance(p, str) and p.startswith(tokenizer.bos_token) else p
               for p in prompts]

  batch = tokenizer(prompts, return_tensors="pt", padding=True, truncation=True).to(model.device)
  with torch.no_grad():
    logits = model(**batch).logits.squeeze(-1)
  scores = logits.detach().float().cpu().numpy().tolist()
  if isinstance(scores, float):
    scores = [float(scores)]
  else:
    scores = [float(x) for x in scores]
  return scores


# SOLUTION: Implemented inference-time strategies
import random
from typing import List, Dict, Optional, Sequence
import matplotlib.pyplot as plt

def simulate_candidates(n:int, p_correct:float, seed:Optional[int]=42) -> List[Dict]:
    rng = random.Random(seed)
    out = []
    for i in range(n):
        correct = rng.random() < p_correct
        mu = 0.7 if correct else 0.3
        score = max(0.0, min(1.0, rng.gauss(mu, 0.1)))
        out.append({'text': f'candidate_{i}', 'is_correct': correct, 'score': score})
    return out

def best_of_n(cands: Sequence[Dict]) -> Dict:
    return max(cands, key=lambda d: d['score'])

def majority_vote(cands: Sequence[Dict]) -> bool:
    votes = sum(1 for c in cands if c['is_correct'])
    return votes > (len(cands)//2)

def weighted_vote(cands: Sequence[Dict]) -> bool:
    total = sum(c['score'] for c in cands)
    correct = sum(c['score'] for c in cands if c['is_correct'])
    if total == 0:
        return False
    return (correct / total) > 0.5

def evaluate_strategies(num_trials:int=200, n:int=8, p_correct:float=0.4):
    bo_hits = mv_hits = wv_hits = 0
    for t in range(num_trials):
        cands = simulate_candidates(n=n, p_correct=p_correct, seed=42+t)
        bo_hits += int(best_of_n(cands)['is_correct'])
        mv_hits += int(majority_vote(cands))
        wv_hits += int(weighted_vote(cands))
    return {
        'best_of_n_acc': bo_hits/num_trials,
        'majority_vote_acc': mv_hits/num_trials,
        'weighted_vote_acc': wv_hits/num_trials,
    }

def sweep_n_plot(p_correct:float=0.4, max_n:int=15):
    ns = list(range(1, max_n+1))
    bo, mv, wv = [], [], []
    for n in ns:
        res = evaluate_strategies(num_trials=200, n=n, p_correct=p_correct)
        bo.append(res['best_of_n_acc'])
        mv.append(res['majority_vote_acc'])
        wv.append(res['weighted_vote_acc'])
    plt.figure()
    plt.plot(ns, bo, label='Best-of-N')
    plt.plot(ns, mv, label='Majority vote')
    plt.plot(ns, wv, label='Weighted vote')
    plt.xlabel('N (samples)')
    plt.ylabel('Accuracy (simulated)')
    plt.title(f'Strategy accuracy vs N (p_correct={p_correct})')
    plt.legend()
    plt.grid(True)
    plt.show()


### Best-of-N

In [None]:
def best_of_n(sampled_responses: list[str], model, tokenizer) -> str:
    """Score each candidate with the reward model and return the best one.
    Uses assistant-only chat templates (no user problem context available here).
    """
    examples = [{"response": r} for r in sampled_responses]
    if not examples:
        return ""
    scores = get_batch_scores(examples, model, tokenizer)
    if not scores:
        return sampled_responses[0]
    best_idx = int(max(range(len(scores)), key=lambda i: scores[i]))
    return sampled_responses[best_idx]


def run_best_of_n(sampled_responses, model, tokenizer, n_samples: int = 0):
  accuracy = defaultdict(int)

  for dataset_name, examples in sampled_responses.items():
    is_corrects = []
    for example in examples:
      sampled_responses = example["sampled_responses"]
      if n_samples > 0:
        sampled_responses = sampled_responses[:n_samples]
      best_answer = best_of_n(sampled_responses, model, tokenizer)

      is_correct = int(best_answer == example["example"]["answer"])
      is_corrects.append(is_correct)

    accuracy[dataset_name] = np.mean(is_correct)

  return accuracy

best_of_n1 = run_best_of_n(SAMPLED_RESPONSES1, reward_model, reward_tokenizer)
print(f"best-of-{N_SAMPLES1} = {best_of_n1}")

### Varying N in Best-of-N

In [None]:
best_of_n_data = defaultdict(list)

for n in range(np.log2(MAX_N_SAMPLES) + 1):
  n_samples = 2 ** n
  best_of_n = run_best_of_n(SAMPLED_RESPONSES, reward_model, reward_tokenizer, n_samples)
  for dataset_name, accuracy in maj_at_n.items():
    best_of_n[dataset_name].append(
        {"n": n_samples, "accuracy": accuracy, "method": "best-of-N"}
    )

plot(best_of_n_data, "Best-of-N")

# Part 3: Self-correction


## Your tasks:
  1. **Implement self-correction:** Given sampled responses from Part 1 (use the first response from parallel generations), prompt the same model for a self-verification and collect the new answer.

  2. **Measure the accuracy after self-correction and compare it with before**

  3. **Bonus: Plot accuracy as a function of the number of output tokens**


## Your code:

### Loading model (again)

In [None]:
# off-load reward model from GPU
del reward_model
torch.cuda.empty_cache()
gc.collect()


print(f"Loading {SELECTED_MODEL}...")
tokenizer = AutoTokenizer.from_pretrained(SELECTED_MODEL)
model = AutoModelForCausalLM.from_pretrained(SELECTED_MODEL)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

generator = pipeline(
  "text-generation",
  model=model,
  tokenizer=tokenizer,
  model_kwargs={"torch_dtype": torch.bfloat16},
  device_map="auto",
)

print("Model loaded!")


### Self-correct

In [None]:
def self_correct(problem: str, response: str, model):
    """Use the model to verify/correct a proposed answer and return a new final answer inside \\boxed{...}."""
    import re
    verify_prompt = (
        "You are checking a solution to a math problem.\n"
        f"Problem: {problem}\n"
        f"Proposed answer: {response}\n\n"
        "Instructions:\n"
        "1) Re-compute the correct answer carefully.\n"
        "2) If the proposed answer is correct, keep it. If it's wrong, fix it.\n"
        "3) Reply with ONLY the final answer inside \\boxed{...} and nothing else."
    )
    try:
        out = generate_text(verify_prompt, max_tokens=128, temperature=0.0)
    except TypeError:
        out = generate_text(generator, verify_prompt, max_tokens=128, temperature=0.0)
    m = re.search(r"\\\boxed\{([^}]*)\}", out or "")
    return m.group(1).strip() if m else (out or "").strip()



def run_self_correct(sampled_responses, model):
  accuracy = defaultdict(int)

  for dataset_name, examples in sampled_responses.items():
    is_corrects = []
    for example in examples:
      response = example["sampled_responses"][0]
      modified_answer = self_correct(example["example"]["problem"], response, model)

      is_correct = int(modified_answer == example["example"]["answer"])
      is_corrects.append(is_correct)

    accuracy[dataset_name] = np.mean(is_correct)

  return accuracy


self_correct = run_self_correct(SAMPLED_RESPONSES1, model)
print(f"self-correct = {self_correct}")

## Reflection Questions:

1. Which strategy worked best for multiplication problems? Why?
2. How did compute budget affect your results?
3. What are the trade-offs between the two main approaches?
4. How would you extend these methods to harder problems?
5. What external information would be most helpful?