This notebook shows the GRPO fine-tuning for sp-struct-rwd1.

The system prompt can be replaced by sp-base, sp-declare, and sp-reflect, and the reward design can be replaced by reward suite 2 and reward suite 3.

### Install Unsloth and VLLM

In [None]:
%%capture
import os
if "COLAB_" not in "".join(os.environ.keys()):
    !pip install unsloth vllm
else:
    # [NOTE] Do the below ONLY in Colab! Use [[pip install unsloth vllm]]
    !pip install --no-deps unsloth vllm==0.8.5.post1

In [None]:
#@title Colab Extra Install { display-mode: "form" }
%%capture
import os
if "COLAB_" not in "".join(os.environ.keys()):
    !pip install unsloth vllm
else:
    !pip install --no-deps unsloth vllm==0.8.5.post1
    # [NOTE] Do the below ONLY in Colab! Use [[pip install unsloth vllm]]
    # Skip restarting message in Colab
    import sys, re, requests; modules = list(sys.modules.keys())
    for x in modules: sys.modules.pop(x) if "PIL" in x or "google" in x else None
    !pip install --no-deps bitsandbytes accelerate xformers==0.0.29.post3 peft "trl==0.15.2" triton cut_cross_entropy unsloth_zoo
    !pip install sentencepiece protobuf "datasets>=3.4.1" huggingface_hub hf_transfer
    !pip install transformers==4.51.3

    # vLLM requirements - vLLM breaks Colab due to reinstalling numpy
    f = requests.get("https://raw.githubusercontent.com/vllm-project/vllm/refs/heads/main/requirements/common.txt").content
    with open("vllm_requirements.txt", "wb") as file:
        file.write(re.sub(rb"(transformers|numpy|xformers)[^\n]{1,}\n", b"", f))
    !pip install -r vllm_requirements.txt

### Import wandb

In [None]:
import wandb

# 1. Log in to wandb.
# If you have your wandb key in an environment variable, you can skip passing it here.
wandb.login()

# 2. Initialize your run with a specific project name (and entity if needed).
wandb.init(
    project="gsm8k-prolog-prover",
    #entity="my-team-or-username",  # optional if on personal account
    name="sp-struct-rwd1"   # optional name
)

### Load model

In [None]:
from unsloth import is_bfloat16_supported, FastLanguageModel
import torch
max_seq_length = 2048 # Can increase for longer reasoning traces

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "Qwen/Qwen2.5-3B-Instruct",
    max_seq_length = max_seq_length,
    load_in_4bit = True, # False for LoRA 16bit
    fast_inference = True, # Enable vLLM fast inference
    max_lora_rank = 64, # Larger rank = smarter, but slower
    gpu_memory_utilization = 0.7, # Reduce if out of memory
)

model = FastLanguageModel.get_peft_model(
    model,
    r = 32, # Choose any number > 0 ! Suggested 8, 16, 32, 64, 128
    target_modules = [
        "q_proj", "k_proj", "v_proj", "o_proj",
        "gate_proj", "up_proj", "down_proj",
    ], # Remove QKVO if out of memory
    lora_alpha = 64, # LoRA rank
    use_gradient_checkpointing = "unsloth", # Enable long context finetuning
    random_state = 3407,
)

### Install SWI-Prolog

In [None]:
!apt-get install swi-prolog

### System prompt

In [None]:
SYSTEM_PROMPT = """
You are a specialized Prolog code-generating assistant.

Your task is to solve math problems by providing a structured answer in two clearly defined sections:

1. <reasoning>
   - Provide a clear, concise step-by-step explanation of how you arrive at the solution.

2. <answer>
   - Provide executable Prolog code using constraint logic programming to compute the numeric answer.
   - Always start with: ':- use_module(library(clpq)).'
   - Define any necessary numeric constants or intermediate values using predicates.
   - Final answer should be unified explicitly in solve(X) using curly-brace constraints, without printing commands.

Use this XML format strictly:
<reasoning>
(Your step-by-step reasoning here)
</reasoning>
<answer>
:- use_module(library(clpq)).

(Any predicates/constants defined here)

solve(X) :-
    (Intermediate computations using curly braces)
    {X = final constraint logic}.
</answer>
"""

### Preprocess dataset and push to HF

In [None]:
import re
from datasets import load_dataset
import subprocess

def execute_prolog_code(prolog_code: str) -> str:
    """
    Executes the given Prolog code in SWI-Prolog, calling solve(X),
    and returns the printed solution as a string (e.g., "12000").
    Returns None if there's an error or no output.
    """
    try:
        # Write the Prolog code to a temporary file
        with open("temp.pl", "w") as f:
            f.write(prolog_code)

        # Run SWI-Prolog: load 'temp.pl', call solve(X), print X, then halt
        result = subprocess.run(
            ["swipl", "-q", "-f", "temp.pl", "-g", "solve(X), writeln(X), halt"],
            capture_output=True,
            text=True,
            timeout=5,  # optional: 5-second timeout
        )

        # If there's any error output, we can check result.stderr or result.returncode
        if result.returncode != 0 or not result.stdout:
            return None

        # result.stdout is whatever got printed by writeln(X)
        lines = result.stdout.strip().splitlines()
        return lines[-1].strip() if lines else None

    except Exception as e:
        print(f"Error executing Prolog code: {e}")
        return None

In [None]:
from datasets import load_dataset

def get_gsm8k_questions(split="train"):
    data = load_dataset('niklasm222/gsm8k-prolog-prover')[split]

    def map_fn(x):
        # Compute the correct numerical result by executing the reference Prolog solution.
        numerical_result = execute_prolog_code(x["output"])
        return {
            "instruction": x["instruction"],
            "input": x["input"],
            "output": x["output"],
            "prompt": [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": f"{x['instruction']}\n{x['input']}"}
            ],
            # Optionally, you can also append the numerical result to the output field.
            "answer": x['output'],
            "numerical_result": str(numerical_result),  # Precomputed numeric result
        }

    data = data.map(map_fn)
    return data

dataset = get_gsm8k_questions()
print(dataset[0])

In [None]:
# Save and push the dataset to Hugging Face Hub.
# Replace "your_username" with your HF username and "hf_your_token" with your token if needed.
dataset.push_to_hub("niklasm222/gsm8k-prolog-prover-sp_struct-v4", token="", private=False)

### Load dataset

In [None]:
# FOR SUBSET TRAINING

import re
from datasets import load_dataset, DatasetDict
import subprocess

def get_gsm8k_split(subset_size=2500, seed=42):
    """
    Load the 'niklasm222/gsm8k-prolog-prover-v4' dataset, select a subset,
    and split it into 70% train, 15% validation, and 15% test.
    """
    # 1. Load dataset and shuffle
    dataset = load_dataset("niklasm222/gsm8k-prolog-prover-sp_struct-v4", split="train")
    subset = dataset.shuffle(seed=seed).select(range(subset_size))

    # 2. Split off 15% for test
    split_1 = subset.train_test_split(test_size=0.15, seed=seed)
    train_val = split_1["train"]
    test = split_1["test"]

    # 3. From the remaining 85%, split off 15% for validation (~0.1765)
    val_ratio = 0.15 / 0.85
    split_2 = train_val.train_test_split(test_size=val_ratio, seed=seed)
    train = split_2["train"]
    val = split_2["test"]

    return DatasetDict({"train": train, "validation": val, "test": test})

# Load Data
splits = get_gsm8k_split()
train_dataset = splits["train"]
val_dataset = splits["validation"]
test_dataset = splits["test"]

# Print dataset information
print(f"Training samples: {len(train_dataset)}")
print(f"Validation samples: {len(val_dataset)}")
print(f"Test samples: {len(test_dataset)}")
print(f"Columns: {train_dataset.column_names}")

# Inspect the first training sample
print("\nTraining sample:")
print(train_dataset[0])

### Helper functions

In [None]:
# ---------------------
# Helper Functions
# ---------------------
def extract_xml_answer(text: str) -> str:
    """
    1) Truncate 'text' at <|endoftext|> if present.
    2) Find the FIRST fully-completed <answer>...</answer> block in that truncated text.
    3) Return that block's content, or None if not found.
    """
    try:
        # 1) Truncate at <|endoftext|>
        eot_index = text.find("<|endoftext|>")
        truncated_text = text[:eot_index] if eot_index != -1 else text

        # 2) Find the FIRST <answer> tag
        start = truncated_text.find("<answer>")
        if start == -1:
            return None

        # 3) Find the NEXT </answer> after this <answer>
        end = truncated_text.find("</answer>", start)
        if end == -1:
            return None

        return truncated_text[start+len("<answer>"):end].strip()

    except Exception:
        return None

def execute_prolog_code(prolog_code: str) -> str:
    """
    Executes the given Prolog code in SWI-Prolog, calling solve(X),
    and returns the printed solution as a string (e.g., "12000").
    Returns None if there's an error or no output.
    """
    try:
        # Write the Prolog code to a temporary file
        with open("temp.pl", "w") as f:
            f.write(prolog_code)

        # Run SWI-Prolog: load 'temp.pl', call solve(X), print X, then halt
        result = subprocess.run(
            ["swipl", "-q", "-f", "temp.pl", "-g", "solve(X), writeln(X), halt"],
            capture_output=True,
            text=True,
            timeout=5,  # optional: 5-second timeout
        )

        # If there's any error output, we can check result.stderr or result.returncode
        if result.returncode != 0 or not result.stdout:
            return None

        # result.stdout is whatever got printed by writeln(X)
        lines = result.stdout.strip().splitlines()
        return lines[-1].strip() if lines else None

    except Exception as e:
        print(f"Error executing Prolog code: {e}")
        return None

### Reward design (reward suite 1)

In [None]:
import os
import re
import uuid
import subprocess
from datasets import load_dataset

def correctness_reward_func(prompts, completions, answer, numerical_result, **kwargs) -> list[float]:
    """
    Compare the model’s executed Prolog answer to the known correct numeric result.
    Provide partial rewards for progress toward correctness during early training.
    This function depends on SWI-Prolog execution results.
    """
    # 1. Get the model's generated text and extract the Prolog snippet
    responses = [comp[0]["content"] for comp in completions]
    extracted_responses = [extract_xml_answer(r) for r in responses]

    # 2. Retrieve reference numeric results (passed from dataset)
    correct_values = numerical_result

    # 3. Debug print for the first sample only
    if len(responses) > 0:
        question = prompts[0][-1]["content"] if (prompts and prompts[0]) else "N/A"
        print(
            "-" * 20,
            f"Question:\n{question}",
            f"\nReference Prolog answer:\n{answer[0]}",
            f"\nReference Numerical Result:\n{correct_values[0]}",
            f"\nModel Response:\n{responses[0]}",
            f"\nExtracted Code:\n{extracted_responses[0]}"
        )

    # 4. Execute the model's Prolog code with SWI-Prolog
    model_values = []
    for code in extracted_responses:
        if code:
            mv = execute_prolog_code(code)
            if mv:
                model_values.append(mv)
            else:
                model_values.append(None)
                print("SWI-Prolog returned no output or an error.")
        else:
            model_values.append(None)
            print("No Prolog code extracted from the model.")

    # 5. Compare results and provide rewards
    rewards = []
    for mv, cv in zip(model_values, correct_values):
        if mv is None or cv is None:
            # Partial reward for at least attempting to generate some code
            rewards.append(0.5)
            print("Partial Reward: Model attempted code or code is None, no numeric match.")
            continue

        try:
            # If it's an unbound variable, e.g. "_12345", that's partial credit
            if mv.startswith("_"):
                rewards.append(0.5)
                print(f"Unbound variable in Prolog output: {mv}")
                continue

            mv_cleaned = mv.strip().split('\n')[-1]
            mv_float = float(mv_cleaned)
            cv_float = float(cv)
            print(f"Model Value: {mv_float}, Correct Value: {cv_float}")

            if abs(mv_float - cv_float) < 1e-6:
                # Full reward for correct numeric result
                rewards.append(2.0)
                print("Match: Model value matches correct value.")
            else:
                # Partial reward for producing a numeric result, but not correct
                rewards.append(1.0)
                print("Partial Reward: Model generated a numeric result, but it's incorrect.")
        except Exception as e:
            # Partial credit for at least generating code that runs
            rewards.append(0.5)
            print(f"Error converting model output to float: {e}\nModel: {mv}, Correct: {cv}")

    return rewards

def prolog_syntax_reward_func(completions, **kwargs) -> list[float]:
    """
    Partial reward for including Prolog-specific patterns:
      - ':-' (typical directives, e.g. :- use_module)
      - 'solve('
      - lines ending with '.'
      - 'use_module(library(clpq))'
    """
    pattern = r'(?::-|solve\s*\(|use_module|clpq|\.\s*$)'
    rewards = []
    for c in completions:
        text = c[0]["content"]
        hits = re.findall(pattern, text, re.MULTILINE)
        # Simple approach: #hits * 0.2, capped at 1.0
        score = min(len(hits) * 0.2, 1.0)
        rewards.append(score)
    return rewards

def strict_format_reward_func(completions, **kwargs) -> list[float]:
    """Reward function that checks if the completion has a specific format."""
    pattern = r"^<reasoning>\n.*?\n</reasoning>\n<answer>\n.*?\n</answer>\n$"
    responses = [completion[0]["content"] for completion in completions]
    matches = [re.match(pattern, r, flags=re.DOTALL) for r in responses]
    return [0.5 if match else 0.0 for match in matches]


def soft_format_reward_func(completions, **kwargs) -> list[float]:
    """Reward function that checks if the completion has a CoT-like XML format."""
    pattern = r"<reasoning>.*?</reasoning>\s*<answer>.*?</answer>"
    responses = [completion[0]["content"] for completion in completions]
    matches = [re.search(pattern, r, flags=re.DOTALL) for r in responses]
    return [0.5 if match else 0.0 for match in matches]

def count_xml(text: str) -> float:
    """
    A custom function that attempts to parse how well the output
    adheres to your <reasoning>...</reasoning> <answer>...</answer> blocks.
    """
    count = 0.0
    if text.count("<reasoning>\n") == 1:
        count += 0.125
    if text.count("\n</reasoning>\n") == 1:
        count += 0.125
    if text.count("\n<answer>\n") == 1:
        count += 0.125
        count -= len(text.split("\n</answer>\n")[-1]) * 0.001
    if text.count("\n</answer>") == 1:
        count += 0.125
        count -= (len(text.split("\n</answer>")[-1]) - 1) * 0.001
    return count

def xmlcount_reward_func(completions, **kwargs) -> list[float]:
    contents = [completion[0]["content"] for completion in completions]
    return [count_xml(c) for c in contents]

### GRPOConfig and GRPOTrainer

In [None]:
from trl import GRPOConfig, GRPOTrainer
training_args = GRPOConfig(
    seed=42,
    use_vllm = True, # use vLLM for fast inference!
    learning_rate = 5e-6,
    adam_beta1 = 0.9,
    adam_beta2 = 0.99,
    weight_decay = 0.1,
    warmup_ratio = 0.1,
    lr_scheduler_type = "cosine",
    optim = "adamw_8bit",
    logging_steps = 1,
    bf16 = is_bfloat16_supported(),
    fp16 = not is_bfloat16_supported(),
    per_device_train_batch_size = 8,
    gradient_accumulation_steps = 1, # Increase to 4 for smoother training
    num_generations = 8, # Decrease if out of memory
    max_prompt_length = 256,
    max_completion_length = 768,
    num_train_epochs = 1, # Set to 1 for a full training run
    #max_steps = 1000,
    save_steps = 250,
    max_grad_norm = 0.1,
    report_to = "wandb", # Can use Weights & Biases
    output_dir = "outputs",
)

In [None]:
trainer = GRPOTrainer(
    model=model,
    processing_class=tokenizer,
    reward_funcs=[
        xmlcount_reward_func,
        soft_format_reward_func,
        strict_format_reward_func,
        prolog_syntax_reward_func,
        correctness_reward_func,
    ],
    args=training_args,
    train_dataset=train_dataset,
)
trainer.train()

# Save the LoRA Adapter
model.save_lora("grpo_saved_lora")

# Merge to 16bit - Replace with your HF Repo, Model Name, and HF Token
if True: model.save_pretrained_merged("qwen2.5-3b-grpo-1.75k-gsm8k-sp-struct-rwd1", tokenizer, save_method = "merged_16bit",)
if True: model.push_to_hub_merged("niklasm222/qwen2.5-3b-grpo-1.75k-gsm8k-sp-struct-rwd1", tokenizer, save_method = "merged_16bit", token = "")