<table style="width:100%">
<tr>
<td style="vertical-align:middle; text-align:left;">
<font size="2">
Supplementary code for the <a href="https://mng.bz/lZ5B">Build a Reasoning Model (From Scratch)</a> book by <a href="https://sebastianraschka.com">Sebastian Raschka</a><br>
<br>Code repository: <a href="https://github.com/rasbt/reasoning-from-scratch">https://github.com/rasbt/reasoning-from-scratch</a>
</font>
</td>
<td style="vertical-align:middle; text-align:left;">
<a href="https://mng.bz/lZ5B"><img src="https://sebastianraschka.com/images/reasoning-from-scratch-images/cover-small.webp" width="100px"></a>
</td>
</tr>
</table>


# Chapter 4: Exercise Solutions

Packages that are being used in this notebook:

In [1]:
from importlib.metadata import version

used_libraries = [
    "reasoning_from_scratch",
    "torch",
    "tokenizers"  # Used by reasoning_from_scratch
]

for lib in used_libraries:
    print(f"{lib} version: {version(lib)}")

reasoning_from_scratch version: 0.1.9
torch version: 2.9.0
tokenizers version: 0.21.4


&nbsp;
## Exercise 4.1: Use chain-of-thought prompting on MATH-500

- The modification just requires adding a prompt suffix, for example, `"\n\nExplain step by step."` after applying the prompt template
- The modified MATH-500 evaluation function from chapter 3 is shown below (the changes are commented via `# NEW`)

```python
import json
from pathlib import Path
import time

from reasoning_from_scratch.ch03 import (
    eta_progress_message,
    extract_final_candidate,
    render_prompt,
    grade_answer,
    generate_text_stream_concat,
)


def evaluate_math500_stream(
    model,
    tokenizer,
    device,
    math_data,
    out_path=None,
    max_new_tokens=512,
    verbose=False,
    prompt_suffix=""  # NEW
):

    if out_path is None:
        dev_name = str(device).replace(":", "-")
        out_path = Path(f"math500-{dev_name}.jsonl")

    num_examples = len(math_data)
    num_correct = 0
    start_time = time.time()

    with open(out_path, "w", encoding="utf-8") as f:
        for i, row in enumerate(math_data, start=1):
            prompt = render_prompt(row["problem"])
            prompt += prompt_suffix  # NEW
            gen_text = generate_text_stream_concat(
                model, tokenizer, prompt, device,
                max_new_tokens=max_new_tokens,
                verbose=verbose,
            )

            extracted = extract_final_candidate(
                gen_text
            )
            is_correct = grade_answer(
                extracted, row["answer"]
            )
            num_correct += int(is_correct)

            record = {
                "index": i,
                "problem": row["problem"],
                "gtruth_answer": row["answer"],
                "generated_text": gen_text,
                "extracted": extracted,
                "correct": bool(is_correct),
            }
            f.write(json.dumps(record, ensure_ascii=False) + "\n")

            progress_msg = eta_progress_message(
                processed=i,
                total=num_examples,
                start_time=start_time,
                show_eta=True,
                label="MATH-500",
            )
            print(progress_msg, end="\r", flush=True)
            if verbose:
                print(
                    f"\n\n{'='*50}\n{progress_msg}\n"
                    f"{'='*50}\nExtracted: {extracted}\n"
                    f"Expected:  {row['answer']}\n"
                    f"Correct so far: {num_correct}\n{'-'*50}"
                )

    seconds_elapsed = time.time() - start_time
    acc = num_correct / num_examples if num_examples else 0.0
    print(f"\nAccuracy: {acc*100:.1f}% ({num_correct}/{num_examples})")
    print(f"Total time: {seconds_elapsed/60:.1f} min")
    print(f"Logs written to: {out_path}")
    return num_correct, num_examples, acc
```

- The improvements over the baseline in chapter 3 are shown below

|    | Method                                       | Model     | Accuracy | Time       |
|----|----------------------------------------------|-----------|----------|------------|
| 1  | Baseline (chapter 3), greedy decoding        | Base      | 15.2%    | 10.1 min   |
| 2  | Baseline (chapter 3), greedy decoding        | Reasoning | 48.2%    | 182.1 min  |
| 3  | Chain-of-thought prompting ("CoT")           | Base      | 40.6%    | 84.5 min   |

- For your convenience, you can run the [cot_prompting_math500.py](../02_math500-inference-scaling-scripts/cot_prompting_math500.py) script located in [../02_math500-inference-scaling-scripts](../02_math500-inference-scaling-scripts)

&nbsp;
## Exercise 4.2: Use temperature scaling and top-p filtering on MATH-500       

- Here, we have to swap the `generate_text_stream_concat` with the `generate_text_stream_concat_flex` function and plug the `generate_text_top_p_stream_cache` function into it
- - The modified MATH-500 evaluation function from chapter 3 is shown below (the changes are commented via `# NEW`)

```python
import json
from pathlib import Path
import time

from reasoning_from_scratch.ch03 import (
    eta_progress_message,
    extract_final_candidate,
    render_prompt,
    grade_answer,
    generate_text_stream_concat,
)
from reasoning_from_scratch.ch04 import generate_text_stream_concat_flex


def evaluate_math500_stream(
    model,
    tokenizer,
    device,
    math_data,
    out_path=None,
    max_new_tokens=512,
    verbose=False,
    temperature=1.0,  # NEW
    top_p=1.0,        # NEW
):

    if out_path is None:
        dev_name = str(device).replace(":", "-")
        out_path = Path(f"math500-{dev_name}.jsonl")

    num_examples = len(math_data)
    num_correct = 0
    start_time = time.time()

    with open(out_path, "w", encoding="utf-8") as f:
        for i, row in enumerate(math_data, start=1):
            prompt = render_prompt(row["problem"])
            gen_text = generate_text_stream_concat_flex( # NEW
                model, tokenizer, prompt, device,
                max_new_tokens=max_new_tokens,
                verbose=verbose,
                generate_func=generate_text_top_p_stream_cache,  # NEW
                temperature=temperature,                         # NEW
                top_p=top_p                                      # NEW
            )

            extracted = extract_final_candidate(
                gen_text
            )
            is_correct = grade_answer(
                extracted, row["answer"]
            )
            num_correct += int(is_correct)

            record = {
                "index": i,
                "problem": row["problem"],
                "gtruth_answer": row["answer"],
                "generated_text": gen_text,
                "extracted": extracted,
                "correct": bool(is_correct),
            }
            f.write(json.dumps(record, ensure_ascii=False) + "\n")

            progress_msg = eta_progress_message(
                processed=i,
                total=num_examples,
                start_time=start_time,
                show_eta=True,
                label="MATH-500",
            )
            print(progress_msg, end="\r", flush=True)
            if verbose:
                print(
                    f"\n\n{'='*50}\n{progress_msg}\n"
                    f"{'='*50}\nExtracted: {extracted}\n"
                    f"Expected:  {row['answer']}\n"
                    f"Correct so far: {num_correct}\n{'-'*50}"
                )

    seconds_elapsed = time.time() - start_time
    acc = num_correct / num_examples if num_examples else 0.0
    print(f"\nAccuracy: {acc*100:.1f}% ({num_correct}/{num_examples})")
    print(f"Total time: {seconds_elapsed/60:.1f} min")
    print(f"Logs written to: {out_path}")
    return num_correct, num_examples, acc
```

- When running the method with `temperature` 0.9 and `top_p` 0.9, there is only a minor difference compared to the baseline (row 1) in the table below; however, that's expected though as this is merely a setup for self-consistency sampling

|      | Method                                    | Model     | Accuracy | Time      |
| ---- | ----------------------------------------- | --------- | -------- | --------- |
| 1    | Baseline (chapter 3), greedy decoding     | Base      | 15.2%    | 10.1 min  |
| ...  | ...                                       | ...       | ...      | ...       |
| 4    | Temperature and top-p ("Top-p")           | Base      | 17.8%    | 30.7 min  |

- For your convenience, you can run the [self_consistency_math500.py](../02_math500-inference-scaling-scripts/self_consistency_math500.py) script located in [../02_math500-inference-scaling-scripts](../02_math500-inference-scaling-scripts)
- Technically, it's a self-consistency sampling script, but if we set `--num_samples 1`, it effectively disables the self-consistency sampling portion

&nbsp;
## Exercise 4.3: Use self-consistency sampling on MATH-500

- Taking the `evaluate_math500_stream` function from chapter 3 as a basis, the first change is to swap out the ` gen_text = generate_text_stream_concat(...)` portion with the `results = self_consistency_vote(...)` call from chapter 4
- The second change involves implementing the simple tie-breaking rule, where the code takes the first instance of the most frequent group (e.g., if we have the results 1, 3, 5, 3, 5, then it would return 3 as the answer
- So, since the most frequent groups are recorded under `results["majority_winners"]`, one approach to break ties is to get the first instance of `results["majority_winners"]`, i.e., `results["majority_winners"][0]`

```python
import json
from pathlib import Path
import time

from reasoning_from_scratch.ch03 import (
    eta_progress_message,
    render_prompt,
    grade_answer,
)
from reasoning_from_scratch.ch04 import self_consistency_vote


def evaluate_math500_stream(
    model,
    tokenizer,
    device,
    math_data,
    out_path=None,
    max_new_tokens=2048,
    verbose=False,
    prompt_suffix="",    # NEW
    temperature=1.0,     # NEW
    top_p=1.0,           # NEW
    seed=None,           # NEW
    num_samples=10,      # NEW
):

    if out_path is None:
        dev_name = str(device).replace(":", "-")
        out_path = Path(f"math500-{dev_name}.jsonl")

    num_examples = len(math_data)
    num_correct = 0
    start_time = time.time()

    with open(out_path, "w", encoding="utf-8") as f:
        for i, row in enumerate(math_data, start=1):
            prompt = render_prompt(row["problem"])

            ##############################################################
            # NEW
            prompt += prompt_suffix
            results = self_consistency_vote(
                model=model,
                tokenizer=tokenizer,
                prompt=prompt,
                device=device,
                num_samples=num_samples,
                temperature=temperature,
                top_p=top_p,
                max_new_tokens=max_new_tokens,
                show_progress=False,
                show_long_answer=False,
                seed=seed,
            )

            # resolve ties
            if results["final_answer"] is None:
                extracted = results["majority_winners"][0]
            else:
                extracted = results["final_answer"]

            # extracted = extract_final_candidate(
            #     gen_text
            # )

            # Optionally, get long answer
            if extracted is not None:
                for idx, s in enumerate(results["short_answers"]):
                    if s == extracted:
                        long_answer = results["full_answers"][idx]
                        break
            gen_text = long_answer
            ##############################################################

            is_correct = grade_answer(
                extracted, row["answer"]
            )
            num_correct += int(is_correct)

            record = {
                "index": i,
                "problem": row["problem"],
                "gtruth_answer": row["answer"],
                "generated_text": gen_text,
                "extracted": extracted,
                "correct": bool(is_correct),
            }
            f.write(json.dumps(record, ensure_ascii=False) + "\n")

            progress_msg = eta_progress_message(
                processed=i,
                total=num_examples,
                start_time=start_time,
                show_eta=True,
                label="MATH-500",
            )
            print(progress_msg, end="\r", flush=True)
            if verbose:
                print(
                    f"\n\n{'='*50}\n{progress_msg}\n"
                    f"{'='*50}\nExtracted: {extracted}\n"
                    f"Expected:  {row['answer']}\n"
                    f"Correct so far: {num_correct}\n{'-'*50}"
                )

    seconds_elapsed = time.time() - start_time
    acc = num_correct / num_examples if num_examples else 0.0
    print(f"\nAccuracy: {acc*100:.1f}% ({num_correct}/{num_examples})")
    print(f"Total time: {seconds_elapsed/60:.1f} min")
    print(f"Logs written to: {out_path}")
    return num_correct, num_examples, acc
```

- The performance improvements when using self-consistency sampling are summarized in the table below (rows 5-7 and rows 9-12)

|      | Method                                    | Model     | Accuracy | Time      |
| ---- | ----------------------------------------- | --------- | -------- | --------- |
| 1    | Baseline (chapter 3), greedy decoding     | Base      | 15.2%    | 10.1 min  |
| 2    | Baseline (chapter 3), greedy decoding     | Reasoning | 48.2%    | 182.1 min |
| 3    | Chain-of-thought prompting ("CoT")        | Base      | 40.6%    | 84.5 min  |
| 4    | Temperature and top-p ("Top-p")           | Base      | 17.8%    | 30.7 min  |
| 5    | "Top-p" + Self-consistency (n=3)          | Base      | 29.6%    | 97.6 min  |
| 6    | "Top-p" + Self-consistency (n=5)          | Base      | 27.8%    | 116.8 min |
| 7    | "Top-p" + Self-consistency (n=10)         | Base      | 31.6%    | 300.4 min |
| 8    | "Top-p" + "CoT"                           | Base      | 33.4%    | 129.2 min |
| 9    | Self-consistency (n=3) + "Top-p" + "CoT"  | Base      | 42.2%    | 211.6 min |
| 10   | Self-consistency (n=5) + "Top-p" + "CoT"  | Base      | 48.0%    | 452.9 min |
| 11   | Self-consistency (n=10) + "Top-p" + "CoT" | Base      | 52.0%    | 862.6 min |
| 12   | Self-consistency (n=3) + "Top-p" + "CoT"  | Reasoning | 55.2%    | 544.4 min |

- For your convenience, you can run the [self_consistency_math500.py](../02_math500-inference-scaling-scripts/self_consistency_math500.py) script located in [../02_math500-inference-scaling-scripts](../02_math500-inference-scaling-scripts) to reproduce these; the [../02_math500-inference-scaling-scripts](../02_math500-inference-scaling-scripts) contains further information on which settings to use

&nbsp;
## Exercise 4.4: Early stopping in self-consistency sampling

- The early stopping check can be implemented by adding a few lines of code that check whether the given answer is already counted multiple times, or, more specifically, if the given answer count is greater than num_samples / 2:

```python
if early_stop and counts[short] > num_samples / 2:
    majority_winners = [short]
    final_answer = short
    break
```

- The complete, modified function is shown below, with the changes highlighted via `# New`

```python
import torch
from collections import Counter

from reasoning_from_scratch.ch03 import (
    extract_final_candidate,
)
from reasoning_from_scratch.ch04 import (
    generate_text_stream_concat_flex,
    generate_text_top_p_stream_cache,
)


def self_consistency_vote(
    model,
    tokenizer,
    prompt,
    device,
    num_samples=10,
    temperature=0.8,
    top_p=0.9,
    max_new_tokens=2048,
    show_progress=True,
    show_long_answer=False,
    seed=None,
    early_stop=True,   # NEW
):
    full_answers, short_answers = [], []
    counts = Counter()
    groups = {}
    majority_winners, final_answer = [], None

    for i in range(num_samples):
        if seed is not None:
            torch.manual_seed(seed + i + 1)

        answer = generate_text_stream_concat_flex(
            model=model,
            tokenizer=tokenizer,
            prompt=prompt,
            device=device,
            max_new_tokens=max_new_tokens,
            verbose=show_long_answer,
            generate_func=generate_text_top_p_stream_cache,
            temperature=temperature,
            top_p=top_p,
        )

        short = extract_final_candidate(
            answer, fallback="number_then_full"
        )
        full_answers.append(answer)
        short_answers.append(short)
        counts[short] += 1
        groups.setdefault(short, []).append(i)

        if show_progress:
            print(f"[Sample {i+1}/{num_samples}] â†’ {short!r}")

        #########################################################
        # NEW
        # Early stop if one answer already meets >= 50% majority
        if early_stop and counts[short] > num_samples / 2:
            majority_winners = [short]
            final_answer = short
            break
        #########################################################

    if final_answer is None:
        mc = counts.most_common()
        if mc:
            top_freq = mc[0][1]
            majority_winners = [s for s, f in mc if f == top_freq]
            final_answer = mc[0][0] if len(majority_winners) == 1 else None

    return {
        "full_answers": full_answers,
        "short_answers": short_answers,
        "counts": dict(counts),
        "groups": groups,
        "majority_winners": majority_winners,
        "final_answer": final_answer,
    }
```

- For your convenience, you can run the [self_consistency_math500.py](../02_math500-inference-scaling-scripts/self_consistency_math500.py) script located in [../02_math500-inference-scaling-scripts](../02_math500-inference-scaling-scripts) with the `--early_stop` flag to use this modified function on the MATH-500 dataset