# Step 1: Mounting Google Drive and Importing Libraries

In [1]:
from google.colab import drive
drive.mount("/content/drive")
%cd /content/drive/MyDrive/grpo-verified-reasoner
!ls

Mounted at /content/drive
/content/drive/MyDrive/grpo-verified-reasoner
data			      notebooks  unsloth_compiled_cache
huggingface_tokenizers_cache  outputs	 _unsloth_sentencepiece_temp
LICENSE			      README.md
models			      src


In [2]:
# Install UV (Faster pip)
!pip install --upgrade -qqq uv

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/22.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/22.2 MB[0m [31m33.0 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.3/22.2 MB[0m [31m121.8 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━[0m [32m17.2/22.2 MB[0m [31m253.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m22.2/22.2 MB[0m [31m274.1 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m22.2/22.2 MB[0m [31m274.1 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m22.2/22.2 MB[0m [31m108.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
!pip -q install -U evalplus

In [3]:
import os
import subprocess

In [4]:
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:False"

In [5]:
os.environ["UNSLOTH_VLLM_STANDBY"] = "1"

In [6]:
# Environment Logic (Colab vs Local)
if "COLAB_" not in "".join(os.environ.keys()):
    !pip install unsloth vllm
else:
    # Version Matching
    try: import numpy, PIL; get_numpy = f"numpy=={numpy.__version__}"; get_pil = f"pillow=={PIL.__version__}"
    except: get_numpy = "numpy"; get_pil = "pillow"
    try: is_t4 = "Tesla T4" in str(subprocess.check_output(["nvidia-smi"]))
    except: is_t4 = False

    # A100 gets vllm 0.10.2 (Fast), T4 gets 0.9.2 (Stable)
    get_vllm, get_triton = ("vllm==0.9.2", "triton==3.2.0") if is_t4 else ("vllm==0.10.2", "triton")

    # Install Everything
    !uv pip install -qqq --upgrade \
        unsloth {get_vllm} {get_numpy} {get_pil} torchvision bitsandbytes xformers
    !uv pip install -qqq {get_triton}

# Install TRL
!uv pip install transformers==4.56.2
!uv pip install --no-deps trl==0.22.2

[2mUsing Python 3.12.12 environment at: /usr[0m
[2K[2mResolved [1m18 packages[0m [2min 469ms[0m[0m
[2K[2mPrepared [1m1 package[0m [2min 557ms[0m[0m
[2mUninstalled [1m1 package[0m [2min 348ms[0m[0m
[2K[2mInstalled [1m1 package[0m [2min 47ms[0m[0m
 [31m-[39m [1mtransformers[0m[2m==4.57.3[0m
 [32m+[39m [1mtransformers[0m[2m==4.56.2[0m
[2mUsing Python 3.12.12 environment at: /usr[0m
[2K[2mResolved [1m1 package[0m [2min 3ms[0m[0m
[2K[2mPrepared [1m1 package[0m [2min 122ms[0m[0m
[2mUninstalled [1m1 package[0m [2min 2ms[0m[0m
[2K[2mInstalled [1m1 package[0m [2min 10ms[0m[0m
 [31m-[39m [1mtrl[0m[2m==0.24.0[0m
 [32m+[39m [1mtrl[0m[2m==0.22.2[0m


In [50]:
import re
import ast
import torch
import random
import evalplus
import traceback
import numpy as np
import multiprocessing as mp
from unsloth import FastLanguageModel
from evalplus.data import get_mbpp_plus

# Step 2: Verifying GPU and Environment

In [8]:
print("Torch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU:", torch.cuda.get_device_name(0))

Torch version: 2.7.0+cu126
CUDA available: True
GPU: Tesla T4


# Step 3: Loading Base Model and LoRA Adapters

In [9]:
MODEL_PATH = "models/qwen3-4b-sft"

In [None]:
# Load the model
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = MODEL_PATH,
    max_seq_length = 3072,
    load_in_4bit = True,        # TRUE for T4 (Crucial for memory)
    fast_inference = True,      # TRUE to test vLLM
    gpu_memory_utilization = 0.6, # Conservative for T4
)

In [81]:
print(model)

PeftModelForCausalLM(
  (base_model): LoraModel(
    (model): Qwen3ForCausalLM(
      (model): Qwen3Model(
        (embed_tokens): Embedding(151936, 2560, padding_idx=151654)
        (layers): ModuleList(
          (0-1): 2 x Qwen3DecoderLayer(
            (self_attn): Qwen3Attention(
              (q_proj): lora.Linear(
                (base_layer): Linear(in_features=2560, out_features=4096, bias=False)
                (lora_dropout): ModuleDict(
                  (default): Identity()
                )
                (lora_A): ModuleDict(
                  (default): Linear(in_features=2560, out_features=32, bias=False)
                )
                (lora_B): ModuleDict(
                  (default): Linear(in_features=32, out_features=4096, bias=False)
                )
                (lora_embedding_A): ParameterDict()
                (lora_embedding_B): ParameterDict()
                (lora_magnitude_vector): ModuleDict()
              )
              (k_proj): lora.Linear(


# Step 4: Sanity Check

In [39]:
# This is the same prompt that we used during SFT
system_prompt = """You are a code-generation engine.
You must output your response in the following exact format:
<START_WORKING_OUT>
Concise reasoning steps required to solve the problem.
</END_WORKING_OUT>
<SOLUTION>
Valid Python code only.
</SOLUTION>
Do not output anything outside these tags."""

In [40]:
user_prompt = "Write a Python function that returns the factorial of a number."

messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": user_prompt},
]
inputs = tokenizer.apply_chat_template(
    messages,
    tokenize=True,
    add_generation_prompt=True,
    return_tensors="pt",
    return_dict=True,
)

In [20]:
# Move the dictionary to GPU manually
inputs = {k: v.to("cuda") for k, v in inputs.items()}

In [21]:
FastLanguageModel.for_inference(model) # Temporarily enable inference mode for the test
with torch.no_grad():
    output = model.generate(
        **inputs,
        max_new_tokens=256,
        temperature=0.0, # Deterministic check
    )

In [22]:
decoded = tokenizer.decode(output[0], skip_special_tokens=True)

In [23]:
print("\n--- MODEL OUTPUT ---")
input_len = inputs["input_ids"].shape[1]
print(tokenizer.decode(output[0][input_len:], skip_special_tokens=True))


--- MODEL OUTPUT ---
<START_WORKING_OUT>
Define a function factorial that takes an integer n.
Handle non-positive input by returning 1 (factorial of 0 or negative is 1).
Initialize result to 1.
Multiply result by each integer i from 1 to n.
Return result.
</END_WORKING_OUT>
<SOLUTION>
def factorial(n):
    if n <= 0:
        return 1
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result
</SOLUTION>


Comment:  No schema check, extractor, or reward function ever sees the full decoded sequence. They only ever see generated_text.

# Step 6: Defining Output Schema

In [24]:
# Regular expressions for tag validation (case-insensitive)
RE_START = re.compile(r"<START_WORKING_OUT>", re.IGNORECASE)
RE_END   = re.compile(r"</END_WORKING_OUT>", re.IGNORECASE)
RE_SOL   = re.compile(r"<SOLUTION>", re.IGNORECASE)
RE_SOL_END = re.compile(r"</SOLUTION>", re.IGNORECASE)

In [25]:
def validate_schema(text: str) -> tuple[bool, str]:
    """
    Checks whether the model output follows the exact required schema.
    Returns (is_valid, reason).
    """
    if not RE_START.search(text):
        return False, "Missing <START_WORKING_OUT>"
    if not RE_END.search(text):
        return False, "Missing </END_WORKING_OUT>"
    if not RE_SOL.search(text):
        return False, "Missing <SOLUTION>"
    if not RE_SOL_END.search(text):
        return False, "Missing </SOLUTION>"

    # Optional: check order consistency
    start_idx = RE_START.search(text).start()
    sol_idx   = RE_SOL.search(text).start()
    if sol_idx < start_idx:
        return False, "Tag order incorrect (<SOLUTION> before reasoning block)."

    return True, "Schema valid"

In [26]:
# Run a sanity test using the previous decoded output
is_valid, reason = validate_schema(decoded)
print("Schema Check:", is_valid, "|", reason)

Schema Check: True | Schema valid


# Step 7: Solution Extraction

In [28]:
# Regex to extract the code block between <SOLUTION> ... </SOLUTION>
RE_SOLUTION = re.compile(r"<SOLUTION>\s*(.*?)\s*</SOLUTION>", re.IGNORECASE | re.DOTALL)

In [30]:
def extract_solution(text: str) -> tuple[str | None, str]:
    """
    Extracts the Python code inside <SOLUTION> tags.
    Returns (code, status) where:
        code   -> the extracted string or None if failed
        status -> textual reason (for debugging)
    """
    match = RE_SOLUTION.search(text)
    if not match:
        return None, "No <SOLUTION> block found."

    code = match.group(1).strip()
    if not code:
        return None, "Empty <SOLUTION> block."

    # Syntax check via Python's AST parser
    try:
        ast.parse(code)
    except SyntaxError as e:
        return None, f"Syntax error in code: {e}"

    return code, "Valid Python code extracted."

In [34]:
# Calculate where the prompt ends
input_len = inputs["input_ids"].shape[1]

In [35]:
# Decode ONLY the new tokens (The Assistant's reply)
generated_text = tokenizer.decode(output[0][input_len:], skip_special_tokens=True)

In [36]:
# Now run the check on ONLY the generated text
code, status = extract_solution(generated_text) # Use the new variable
print("Status:", status)

Status: Valid Python code extracted.


In [37]:
# Show snippet of the extracted code
if code:
    print("\n--- Extracted Python Code ---\n")
    print(code)


--- Extracted Python Code ---

def factorial(n):
    if n <= 0:
        return 1
    result = 1
    for i in range(1, n + 1):
        result *= i
    return result


# Step 8: Section 5 — Verifier Integration (EvalPlus MBPP+)

In [47]:
# Load MBPP+ tasks as a dict: {task_id: problem_dict}
MBPP_TASKS = get_mbpp_plus()

print(f"Loaded MBPP+ tasks: {len(MBPP_TASKS)}")

Loaded MBPP+ tasks: 378


In [48]:
# Quick peek at one task to confirm fields & shape
sample_task_id = next(iter(MBPP_TASKS.keys()))
sample_task = MBPP_TASKS[sample_task_id]

print("\nSample Task ID:", sample_task_id)
print("Keys:", list(sample_task.keys()))
print("\nPrompt (first 400 chars):\n", sample_task["prompt"][:400])


Sample Task ID: Mbpp/2
Keys: ['task_id', 'prompt', 'entry_point', 'canonical_solution', 'base_input', 'atol', 'plus_input', 'contract', 'assertion']

Prompt (first 400 chars):
 """
Write a function to find the shared elements from the given two lists.
assert set(similar_elements((3, 4, 5, 6),(5, 7, 4, 10))) == set((4, 5))
"""



In [58]:
# Different EvalPlus versions may store tests under slightly different keys,
# so we normalize via a helper (used later in reward function).
def get_tests_from_task(task: dict) -> list[str]:
    """
    Extracts MBPP test assertions from a task.
    Supports both list-based and string-based formats.
    """
    # Case 1: already a list of assertions
    for k in ("test_list", "tests", "plus_tests", "base_tests"):
        if k in task and task[k]:
            return list(task[k])

    # Case 2: single multiline assertion string (MBPP+ common case)
    if "assertion" in task and task["assertion"]:
        lines = task["assertion"].strip().splitlines()
        return [line for line in lines if line.strip()]

    raise KeyError(f"No tests found in task keys: {list(task.keys())}")

# Step 9: Defining Reward Function

In [59]:

def _exec_code_and_tests_worker(code: str, tests: list[str], queue: mp.Queue) -> None:
    """
    Runs inside a subprocess. Executes model code + MBPP tests.
    Reports (passed: bool, error: str|None) via queue.
    """
    try:
        # Restrict environment (keep it minimal; MBPP doesn't need much)
        g = {"__builtins__": __builtins__}
        l = {}

        # 1) Define user's solution
        exec(code, g, l)

        # 2) Run tests (assert statements)
        for t in tests:
            exec(t, g, l)

        queue.put((True, None))
    except Exception:
        queue.put((False, traceback.format_exc()))

In [60]:
def run_mbpp_tests(code: str, task: dict, timeout_s: float = 2.0) -> tuple[bool, str | None]:
    """
    Executes MBPP tests for a given task in a subprocess with timeout.
    Returns (passed, error_str).
    """
    tests = get_tests_from_task(task)

    ctx = mp.get_context("fork")  # Colab/Linux: fork is fastest & simplest
    q = ctx.Queue()
    p = ctx.Process(target=_exec_code_and_tests_worker, args=(code, tests, q))
    p.start()
    p.join(timeout_s)

    if p.is_alive():
        p.terminate()
        p.join()
        return False, f"Timeout after {timeout_s:.1f}s"

    if q.empty():
        return False, "No result returned from worker."

    passed, err = q.get()
    return passed, err

In [61]:

def compute_mbpp_reward(generated_text: str, task: dict) -> tuple[float, dict]:
    """
    Main reward function (GRPO-compatible later).
    Input must be ONLY the model completion (sliced), not full prompt+completion.
    Returns:
      reward: float
      info: dict (diagnostics for debugging)
    """
    # 1) Schema gate
    ok, reason = validate_schema(generated_text)
    if not ok:
        return 0.0, {"stage": "schema", "ok": False, "reason": reason}

    # 2) Extract + syntax gate
    code, status = extract_solution(generated_text)
    if code is None:
        return 0.0, {"stage": "extract", "ok": False, "reason": status}

    # 3) Semantic gate (MBPP tests)
    passed, err = run_mbpp_tests(code, task, timeout_s=2.0)
    if not passed:
        return 0.0, {"stage": "eval", "ok": False, "reason": err, "extract_status": status}

    return 1.0, {"stage": "eval", "ok": True, "reason": "passed", "extract_status": status}

In [75]:
# Select a specific MBPP task (for deterministic debugging)
task_ids = list(MBPP_TASKS.keys())

task_id = task_ids[1]   # try the second task
task = MBPP_TASKS[task_id]

print("Task ID:", task_id)
print("\n--- TASK PROMPT ---\n")
print(task["prompt"])

Task ID: Mbpp/3

--- TASK PROMPT ---

"""
Write a python function to identify non-prime numbers.
assert is_not_prime(2) == False
"""



In [76]:
# Build messages for THIS task (not the earlier sanity check)
messages = [
    {"role": "system", "content": system_prompt},
    {"role": "user", "content": task["prompt"]},
]

inputs = tokenizer.apply_chat_template(
    messages,
    tokenize=True,
    add_generation_prompt=True,
    return_tensors="pt",
    return_dict=True,
)

In [77]:
# Move to GPU
inputs = {k: v.to("cuda") for k, v in inputs.items()}

# Generate
FastLanguageModel.for_inference(model)
with torch.no_grad():
    output = model.generate(
        **inputs,
        max_new_tokens=512,
        temperature=0.0,  # deterministic for debugging
    )

In [78]:
# CRITICAL: slice off prompt tokens
input_len = inputs["input_ids"].shape[1]
generated_text = tokenizer.decode(
    output[0][input_len:],
    skip_special_tokens=True
)

In [79]:
print("\n--- GENERATED TEXT ---\n")
print(generated_text)


--- GENERATED TEXT ---

<START_WORKING_OUT>
Define a function is_not_prime that takes an integer n.
If n <= 1, return False (1 and numbers below are prime by definition).
Check divisibility from 2 to sqrt(n). If divisible by any, return True.
Otherwise, return False.
</END_WORKING_OUT>
<SOLUTION>
import math

def is_not_prime(n: int) -> bool:
    """Return True if n is not a prime number."""
    if n <= 1:
        return False
    for i in range(2, math.isqrt(n) + 1):
        if n % i == 0:
            return True
    return False
</SOLUTION>


In [80]:
reward, info = compute_mbpp_reward(generated_text, task)

print("\n--- REWARD RESULT ---")
print("Reward:", reward)
print("Info:", info)


--- REWARD RESULT ---
Reward: 0.0
Info: {'stage': 'eval', 'ok': False, 'reason': 'Traceback (most recent call last):\n  File "/tmp/ipython-input-90596588.py", line 16, in _exec_code_and_tests_worker\n    exec(t, g, l)\n  File "<string>", line 1, in <module>\nAssertionError\n', 'extract_status': 'Valid Python code extracted.'}
