https://huggingface.co/learn/cookbook/fine_tuning_llm_grpo_trl

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from huggingface_hub import notebook_login
from math_verify import LatexExtractionConfig, parse, verify


# use notebook key. Paste with menu: Edit->paste in vscode.
notebook_login()


VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [None]:
from datasets import load_dataset, Dataset


# dataset_id = "AI-MO/NuminaMath-TIR"
# dataset_id = "openai/gsm8k"
# train_dataset, test_dataset = load_dataset(
#    dataset_id, split=["train[:5%]", "test[:5%]"]
# )
def extract_xml_answer(text: str) -> str:
    answer = text.split("<answer>")[-1]
    answer = answer.split("</answer>")[0]
    return answer.strip()


def extract_hash_answer(text: str) -> str | None:
    if "####" not in text:
        return None
    return text.split("####")[1].strip()


SYSTEM_PROMPT = (
    "A conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant "
    "first thinks about the reasoning process in the mind and then provides the user with the answer. The reasoning "
    "process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., "
    "<think> reasoning process here </think><answer> answer here </answer>"
)


# uncomment middle messages for 1-shot prompting
def get_gsm8k_questions(split="train") -> Dataset:
    data = load_dataset("openai/gsm8k", "main")[split]  # type: ignore
    data = data.map(
        lambda x: {  # type: ignore
            "prompt": [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": x["question"]},
            ],
            "answer": extract_hash_answer(x["answer"]),
            "solution": x["answer"],
        }
    )  # type: ignore
    return data  # type: ignore


train_dataset = get_gsm8k_questions(split="train")


Map:   0%|          | 0/7473 [00:00<?, ? examples/s]

In [14]:
train_dataset[0]

{'question': 'Natalia sold clips to 48 of her friends in April, and then she sold half as many clips in May. How many clips did Natalia sell altogether in April and May?',
 'answer': '72',
 'prompt': [{'content': 'A conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant first thinks about the reasoning process in the mind and then provides the user with the answer. The reasoning process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think><answer> answer here </answer>',
   'role': 'system'},
  {'content': 'Natalia sold clips to 48 of her friends in April, and then she sold half as many clips in May. How many clips did Natalia sell altogether in April and May?',
   'role': 'user'}],
 'solution': 'Natalia sold 48/2 = <<48/2=24>>24 clips in May.\nNatalia sold 48+24 = <<48+24=72>>72 clips altogether in April and May.\n#### 72'}

In [15]:
import torch
from transformers import AutoModelForCausalLM

from transformers import AutoTokenizer

model_id = "Qwen/Qwen2-0.5B-Instruct"
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype="auto",
    device_map="auto",
)
tokenizer = AutoTokenizer.from_pretrained(model_id)

In [16]:
from peft import LoraConfig, get_peft_model

lora_config = LoraConfig(
    task_type="CAUSAL_LM",
    r=8,
    lora_alpha=32,
    lora_dropout=0.1,
    target_modules=["q_proj", "v_proj"],
)

model = get_peft_model(model, lora_config)

model.print_trainable_parameters()

trainable params: 540,672 || all params: 494,573,440 || trainable%: 0.1093


In [22]:
# exemple of maths verify

# \\cup : union of two sets
gold = parse("${1,3} \\cup {2,4}$")
answer = parse("${1,2,3,4}$")

# Order here is important!
verify(gold, answer), gold, answer

(True,
 [Union({1, 3}, {2, 4}), '{1,3} \\cup {2,4}'],
 [{1, 2, 3, 4}, '{1,2,3,4}'])

In [17]:
import re


def format_reward(completions, **kwargs):
    """Reward function that checks if the completion has a specific format."""
    pattern = r"^<think>.*?</think>\s*<answer>.*?</answer>$"
    completion_contents = [completion[0]["content"] for completion in completions]
    matches = [re.match(pattern, content) for content in completion_contents]
    rewards_list = [1.0 if match else 0.0 for match in matches]
    return rewards_list


In [None]:
prompts = [
    [{"role": "user", "content": "What is the result of (1 + 2) * 4?"}],
    [{"role": "user", "content": "What is the result of (3 + 1) * 2?"}],
]
completions = [
    [
        {
            "role": "assistant",
            "content": "<think>The sum of 1 and 2 is 3, which we multiply by 4 to get 12.</think><answer>(1 + 2) * 4 = 12</answer>",
        }
    ],
    [
        {
            "role": "assistant",
            "content": "The sum of 3 and 1 is 4, which we multiply by 2 to get 8. So (3 + 1) * 2 = 8.",
        }
    ],
]
format_reward(prompts=prompts, completions=completions)

[1.0, 0.0]

In [None]:
def correctness_reward_func(prompts, completions, answer, **kwargs) -> list[float]:
    responses = [completion[0]["content"] for completion in completions]
    q = prompts[0][-1]["content"]
    extracted_responses = [extract_xml_answer(r) for r in responses]
    print(
        "-" * 20,
        f"Question:\n{q}",
        f"\nAnswer:\n{answer[0]}",
        f"\nResponse:\n{responses[0]}",
        f"\nExtracted:\n{extracted_responses[0]}",
    )
    return [2.0 if r == a else 0.0 for r, a in zip(extracted_responses, answer)]


In [19]:
def accuracy_reward(completions, **kwargs):
    """Reward function that checks if the completion is the same as the ground truth."""
    solutions = kwargs["solution"]
    completion_contents = [completion[0]["content"] for completion in completions]
    rewards = []
    for content, solution in zip(completion_contents, solutions):
        gold_parsed = parse(
            solution,
            extraction_mode="first_match",
            extraction_config=[LatexExtractionConfig()],
        )
        answer_parsed = parse(
            content,
            extraction_mode="first_match",
            extraction_config=[LatexExtractionConfig()],
        )
        if len(gold_parsed) != 0:
            try:
                rewards.append(float(verify(answer_parsed, gold_parsed)))
            except Exception:
                rewards.append(0.0)
        else:
            rewards.append(1.0)
    return rewards


completion_test = [
    [
        {
            "content": """<think>Natalia sold 48/2 = <<48/2=24>>24 clips in May.\nNatalia sold 48+24 = <<48+24=72>>72 clips altogether in April and May.</think><answer>72</answer>""",
            "role": "assistant",
        }
    ]
]
assert accuracy_reward(
    completions=completion_test,
    solution=["72"],
) == [1.0]
assert accuracy_reward(
    completions=completion_test,
    solution=["75"],
) == [0.0]

AssertionError: 

In [None]:
ex_idx = 100
print(train_dataset["prompt"][ex_idx][1]["content"])

Given that the function $f(x)=\sin (π-ωx)\cos ωx+\cos ^{2}ωx\ (ω > 0)$ has a minimum positive period of $π$.
(I) Find the value of $ω$;
(II) The horizontal coordinates of each point on the graph of the function $y=f(x)$ are shortened to half of the original, and the vertical coordinates remain unchanged, resulting in the graph of the function $y=g(x)$. Find the minimum value of the function $y=g(x)$ on the interval $[0, \frac{π}{16}]$.


In [None]:
print(train_dataset["solution"][ex_idx])

To solve this problem, let's break it down step by step and use Python and sympy as needed.

### Part (I): Find the value of \( \omega \)
To find \( \omega \) such that the function \( f(x) = \sin(\pi - \omega x) \cos(\omega x) + \cos^2(\omega x) \) has a minimum positive period of \( \pi \), we need to determine when the function repeats itself after \( \pi \) units.

First, let's simplify the function \( f(x) \):
\[
f(x) = \sin(\pi - \omega x) \cos(\omega x) + \cos^2(\omega x)
\]
Using the trigonometric identities, \( \sin(\pi - \theta) = \sin(\theta) \) and \( \cos^2(\theta) = 1 - \sin^2(\theta) \):
\[
f(x) = \sin(\omega x) \cos(\omega x) + \cos^2(\omega x)
\]
\[
f(x) = \frac{1}{2}\sin(2\omega x) + \cos^2(\omega x)
\]

Given that the period of \(f(x)\) is \( \pi \), we know that:
\[
f(x + \pi) = f(x)
\]

For \(f(x)\) to have a period of \(\pi\), both \(\sin(2\omega x)\) and \(\cos^2(\omega x)\) must repeat after \(\pi\). The period of \(\sin(2\omega x)\) is \(\frac{\pi}{2\omega}\). 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from datasets import load_dataset
import numpy as np

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")


class GRPOTrainer:
    def __init__(self, model, ref_model, reward_model, tokenizer, device, lr=1e-5):
        self.model = model
        self.ref_model = ref_model
        self.reward_model = reward_model
        self.tokenizer = tokenizer
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.device = device

    def sample_outputs(self, model, prompts, num_samples=3):
        formatted_prompt = self.tokenizer.apply_chat_template(prompts, tokenize=False)
        inputs = self.tokenizer(
            formatted_prompt, return_tensors="pt", padding=True, truncation=True
        ).to(self.device)
        outputs = [model.generate(**inputs) for _ in range(num_samples)]
        completions = [
            [
                {"content": self.tokenizer.batch_decode(out, skip_special_tokens=True)}
                for out in outputs
            ]
        ]
        return completions

    def compute_rewards(self, completions, solutions):
        rewards = [self.reward_model(c, solutions) for c in completions]
        return np.array(rewards)

    def compute_advantage(self, rewards):
        baseline = np.mean(rewards)
        return rewards - baseline

    def train_step(self, questions, solutions):
        completions = self.sample_outputs(self.model, questions)
        print(completions)
        rewards = self.compute_rewards(completions, solutions)
        advantages = self.compute_advantage(rewards)

        loss = -torch.mean(
            torch.tensor(advantages, dtype=torch.float32, requires_grad=True)
        )

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()


def combined_rewards(completions, solutions):
    format_rewards = format_reward(completions)
    accuracy_rewards = accuracy_reward(completions, solutions)
    combined_rewards = 0.5 * format_rewards + 0.5 * accuracy_rewards
    return combined_rewards


trainer = GRPOTrainer(
    model,
    ref_model=model,
    reward_model=combined_rewards,
    tokenizer=tokenizer,
    device=device,
)


# formatted_text = tokenizer.apply_chat_template(train_dataset[0]["prompt"], tokenize=False)
loss = trainer.train_step(train_dataset[0]["prompt"], [train_dataset[0]["solution"]])


[[{'content': ['system\nA conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant first thinks about the reasoning process in the mind and then provides the user with the answer. The reasoning process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think><answer> answer here </answer>\nuser\nWhat is the coefficient of $x^2y^6$ in the expansion of $\\left(\\frac{3}{5}x-\\frac{y}{2}\\right)^8$?  Express your answer as a common fraction.\nuser']}, {'content': ['system\nA conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant first thinks about the reasoning process in the mind and then provides the user with the answer. The reasoning process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think><answer> answer here </ans

KeyError: 0

In [None]:
train_dataset[0].get("prompt")

In [None]:
completions = [
    "system\nA conversation between User and Assistant. The user asks a question, and the Assistant solves it. The assistant first thinks about the reasoning process in the mind and then provides the user with the answer. The reasoning process and answer are enclosed within <think> </think> and <answer> </answer> tags, respectively, i.e., <think> reasoning process here </think><answer> answer here </answer>\nuser\nWhat is the coefficient of $x^2y^6$ in the expansion of $\\left(\\frac{3}{5}x-\\frac{y}{2}\\right)^8$?  Express your answer as a common fraction.\narterms = 1 + 6 - 8\ncoefficient = terms * (terms"
]

In [None]:
formatted_text = tokenizer.apply_chat_template(
    train_dataset[0]["prompt"], tokenize=False
)
formatted_text


In [None]:
tokenizer(formatted_text)
train_dataset[0]["solution"]

In [None]:
# Training loop
for epoch in range(3):
    for sample in train_dataset:
        formatted_text = tokenizer.apply_chat_template(sample["prompt"], tokenize=False)
        loss = trainer.train_step([formatted_text], [sample["solution"]])
        print(f"Epoch {epoch}, Loss: {loss}")


In [None]:
sample

In [None]:
from trl import GRPOConfig

# Configure training arguments using GRPOConfig
training_args = GRPOConfig(
    output_dir="Qwen2-0.5B-GRPO-test",
    learning_rate=1e-5,
    remove_unused_columns=False,  # to access the solution column in accuracy_reward
    gradient_accumulation_steps=16,
    num_train_epochs=1,
    bf16=True,
    # Parameters that control de data preprocessing
    max_completion_length=64,  # default: 256
    num_generations=4,  # default: 8
    max_prompt_length=128,  # default: 512
    # Parameters related to reporting and saving
    report_to=["tensorboard"],
    logging_steps=10,
    push_to_hub=True,
    save_strategy="steps",
    save_steps=10,
)

In [None]:
from trl import GRPOTrainer

trainer = GRPOTrainer(
    model=model,
    reward_funcs=[format_reward, accuracy_reward],
    args=training_args,
    train_dataset=train_dataset,
)
do_train = False
if do_train:
    # Train the model
    trainer.train()
    trainer.save_model(training_args.output_dir)


In [None]:
model_id = "sergiopaniego/Qwen2-0.5B-GRPO"
trained_model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype="auto",
    device_map="auto",
)
trained_tokenizer = AutoTokenizer.from_pretrained(model_id)

In [None]:
test_dataset["prompt"][0]

In [None]:
import time


def generate_with_reasoning(prompt, r_model, r_tokenizer):
    # Build the prompt from the dataset
    prompt = " ".join(entry["content"] for entry in prompt)

    # Tokenize and move to the same device as the model
    inputs = r_tokenizer(prompt, return_tensors="pt").to(trained_model.device)

    # Generate text without gradients
    start_time = time.time()
    with torch.no_grad():
        output_ids = r_model.generate(**inputs, max_length=500)
    end_time = time.time()

    # Decode and extract model response
    generated_text = r_tokenizer.decode(output_ids[0], skip_special_tokens=True)

    # Get inference time
    inference_duration = end_time - start_time

    # Get number of generated tokens
    num_input_tokens = inputs["input_ids"].shape[1]
    num_generated_tokens = output_ids.shape[1] - num_input_tokens
    response_text = generated_text[len(prompt) :].strip()

    return response_text, generated_text, inference_duration, num_generated_tokens

In [None]:
prompt = test_dataset["prompt"][0]
response_text, generated_text, inference_duration, num_generated_tokens = (
    generate_with_reasoning(prompt, trained_model, trained_tokenizer)
)
print(response_text)


In [None]:
# Use raw model without reasoning training
response_text, generated_text, inference_duration, num_generated_tokens = (
    generate_with_reasoning(prompt, model, trained_tokenizer)
)

print(response_text)


In [None]:
print("-----")
# print("answer:", test_dataset[0]["answer"])
print(test_dataset[0]["solution"])
