In [None]:
# Run the vLLM model in the background via "vllm serve Qwen/..."
model_name = "Qwen/Qwen2.5-Math-7B"
client = OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="EMPTY"
)

In [None]:
from openai import OpenAI
from datasets import load_dataset
import numpy as np

# Load the dataset (GSM8k)
dataset = load_dataset("openai/gsm8k", "main", split="train")

In [None]:
for idx in range(len(dataset)):
    task, solution = dataset[idx]["question"], dataset[idx]["answer"]
    print(f"Task {idx + 1}:\n{task}\n\nSolution:\n{solution}\n")

In [None]:

idx = 0
task, solution = dataset[idx]["question"], dataset[idx]["answer"]
print(task + '\n\n' + solution)

In [None]:
# Define the prompts

# Text-to-code prompt;
text2code_examples = """
Task: Nancy picked up 10 apples from the basket. She gave 2 apples to her friend. How many apples does Nancy have left?
Solution: Initially, Nancy had 10 apples. She gave 2 apples to her friend. So, she has 10 - 2 = 8 apples left.
Code: 
```python
def main():
    nancy_apples = 10 # Nancy initially had 10 apples
    nancy_apples -= 2 # She gave 2 apples to her friend
    print(nancy_apples)

main()
```
""".strip()

text2code_prompt = """
You will be provided a problem statement and a solution. Please, generate the code in Python that corresponds to the solution.

Examples:
{examples}

Now, please, generate the code for the following task:

Task: {task}
Solution: {solution}
""".strip()

# Augmentation prompt;
augmentation_examples = """
Task: Nancy picked up 10 apples from the basket. She gave 2 apples to her friend. How many apples does Nancy have left?

Solution: Initially, Nancy had 10 apples. 
She gave 2 apples to her friend. 
So, she has 10 - 2 = 8 apples left.
#### 8

Code: 
```python
def main():
    nancy_apples = 10 # Nancy initially had 10 apples
    nancy_apples -= 2 # She gave 2 apples to her friend
    print(nancy_apples)

main()
```

Let's add a new variable for number of oranges that Bob had.

New code: 
<code>
```python
def main():
    nancy_apples = 10 # Nancy initially had 10 apples
    bo_oranges = 3 # Bob had 3 oranges
    nancy_apples -= 2 # She gave 2 apples to her friend
    print(nancy_apples)

main()
```
</code>

New task:
<task>
Nancy picked up 10 apples from the basket. Nancy gave 2 apples to her friend. At the same time, Bob had 3 oranges. How many apples does Nancy have left?
</task>

New solution:
<solution>
Initially, Nancy had 10 apples, and Bob had 3 oranges.
Nancy gave 2 apples to her friend. 
So, she has 10 - 2 = 8 apples left. 
#### 8
</solution>
""".strip()

augmentation_prompt = """
You will be provided a problem statement, a solution, and code corresponding to the solution. Your task is to generate a new solution to the problem statement that is different from the provided solution, maximizing the novelty score that will be computed and sent back to you.

Please, first change the code, and then adapt the natural language solution and the task to the new code. Both the task, solution, and code should be consistent and sensible.
You are allowed to do the following changes:
 - Add a new variable;
 - Change the values of the variables;
 - Change the names of the variables;
 - Change the operation;
 - Change the logic steps;
 - Add a new step;
 - Remove a step;

Format your response with <code>...</code>, <task>...</task>, and <solution>...</solution> tags.

Examples:
{examples}

Now, please, create a new code, task and solution for the following:

Task: 
<task>
{task}
</task>
Solution: 
<solution>
{solution}
</solution>
Code: 
<code>
```python
{code}
```
</code>
""".strip()

config = {
    "text2code_examples": text2code_examples,
    "text2code_prompt": text2code_prompt,
    "augmentation_examples": augmentation_examples,
    "augmentation_prompt": augmentation_prompt,
    "n_iterations": 10
}



In [None]:
import ast
import tempfile
import subprocess
import re
import numpy as np

from loguru import logger

import requests

def get_novelty_score(task: str, solution: str) -> float:
    """
    Calculate the novelty score using the token log probabilities from the qwen-math model.
    """
    # prompt hte model, provide task + solutoin pair to the model -->
    #
    prompt = """
    You are a math reasoning assistant. Your job is to solve the following problem:
    Question: {question}
    Answer: {answer}
    """

    # Define the prompt for qwen-math
    # Query the qwen-math model
    response = requests.post(
        "http://localhost:8000/v1/completions",
        json={
            "model": model_name,
            "prompt": prompt,
            "max_tokens": 100,
            "temperature": 0.0,
            "logprobs": 1  # Request token-level log probabilities
        }
    )
    response_data = response.json()

    try:
        log_probs = response_data["choices"][0]["logprobs"]["token_logprobs"]
        novelty_score = sum(log_probs)  # Aggregate log probabilities (e.g., sum or mean)
    except (KeyError, TypeError):
        novelty_score = 0.0  # Default to 0 if parsing fails

    return novelty_score

def extract_final_answer(solution: str) -> str:
    """
    Extract the final answer from the solution.
    """
    match = re.search("#### (.*)", solution)    
    return int(match.group(1))

def extract_python_code(text: str) -> str:
    """
    Extract the Python code from the text by looking for the last occurrence of ```python...```.
    """
    # Find the last occurrence of ```python...```
    pattern = re.compile(r'```python\s*(.*?)(?:\s*```|$)', re.DOTALL)
    blocks = pattern.findall(text)
    if len(blocks) > 0:
        return blocks[-1].strip()
    return ""

def extract_new_sample(text: str) -> tuple[str, str, str, int]:
    """
    Extract the new task, solution and code from the text.
    """
    try:
        code = re.search("<code>(.*)</code>", text, re.DOTALL).group(1)
        code = extract_python_code(code)
    except Exception:
        raise ValueError("No code found or invalid format.")
    try:
        task = re.search("<task>(.*)</task>", text, re.DOTALL).group(1)
    except Exception:
        raise ValueError("No task found or invalid format.")
    try:
        solution = re.search("<solution>(.*)</solution>", text, re.DOTALL).group(1)
    except Exception:
        raise ValueError("No solution found or invalid format.")
    try:
        final_answer = extract_final_answer(solution)
    except Exception:
        raise ValueError("No final answer found or invalid format.")
    return task, solution, code, final_answer

import json

def validate_code_against_solution(code: str, solution: str) -> bool:
    """Validate the code solution against the natural language one by comparing the code output with the solution answer."""
    # Sanitize the code to handle invalid characters
    sanitized_code = code.replace("'", '"')  # Replace single quotes with double quotes

    # Run the code in a temporary file
     with tempfile.NamedTemporaryFile(mode='w', suffix='.py') as tmp_file:
        tmp_file.write(sanitized_code)
        tmp_file.flush()
        result = subprocess.run(
            [
                "python",
                '-E',  # Ignore environment variables
                '-I',  # Run in isolated mode
                tmp_file.name
            ],
            capture_output=True,
            text=True
        )
    if result.returncode != 0:
        raise RuntimeError(f"Code execution failed: {result.stderr}")

    # Ensure stdout contains valid Python literal
    try:
        code_output = ast.literal_eval(result.stdout.strip())
    except (SyntaxError, ValueError):
        raise ValueError(f"Invalid output from code execution: {result.stdout.strip()}")

    solution_answer = extract_final_answer(solution)
    return code_output == solution_answer

def parse_response(response: str) -> tuple[str, str, str, str]:
    """
    Parse the augmented sample from the model.
    """
    try:
        new_task, new_solution, new_code, new_final_answer = extract_new_sample(response)
    except ValueError as e:
        logger.warning("Invalid response.")
        feedback = "Please, fix the formatting of your response. " + str(e)
        raise ValueError(feedback)
    if not validate_code_against_solution(new_code, new_solution):
        logger.warning("Code validation failed.")
        feedback = "The new code does not match the new solution."
        raise ValueError(feedback)
    return new_task, new_solution, new_code, new_final_answer
    

def run_sample_augmentation(task: str, solution: str, config: dict = {}) -> list[tuple[str, str, int, str]]:
    """
    Run the sample augmentation loop.

    Returns the list of tuples with the new task, solution, final answer and code for each iteration.
    """
    # First, generate the initial code;
    logger.info("Generating the initial code...");
    response = client.chat.completions.create(
        model=model_name,
        messages=[
            {"role": "user", "content": config.get("text2code_prompt", "").format(task=task, solution=solution, examples=config.get("text2code_examples", ""))}
        ]
    )
    logger.info(f"Initial response:\n{response.choices[0].message.content}")
    logger.info("Extracting the initial code...")
    code = extract_python_code(response.choices[0].message.content)
    assert validate_code_against_solution(code, solution), "The initial code does not match the solution."
    final_answer = extract_final_answer(solution)
    logger.info(f"Initial code:\n{code}")
    logger.info(f"Initial final answer: {final_answer}")
    samples = [(task, solution, final_answer, code)]
    messages = [
        {"role": "user", "content": config.get("augmentation_prompt", "").format(task=task, solution=solution, code=code, examples=config.get("augmentation_examples", ""))}
    ]
    # Iteratively in the loop, augment the sample;
    logger.info("Augmenting the sample...");
    for i in range(config.get("n_iterations", 10)):
        logger_prefix = f"Iteration {i+1} | "
        feedback, new_task, new_solution, new_code = None, None, None, None
        response = client.chat.completions.create(
            model=model_name,
            messages=messages
        )
        response_text = response.choices[0].message.content
        logger.info(logger_prefix + f"Response:\n{response_text}")
        messages.append({"role": "assistant", "content": response_text})
        try:
            new_task, new_solution, new_code, new_final_answer = parse_response(response_text)
            novelty_score = get_novelty_score(new_task, new_solution)
            feedback = f"The novelty score is {novelty_score} / 10. Try again to increase it!"
            samples.append((new_task, new_solution, new_code, new_final_answer))
        except ValueError as e:
            feedback = str(e)
        logger.info(logger_prefix + f"Feedback:\n{feedback}")
        messages.append({"role": "user", "content": feedback})
    return samples


In [None]:
import pandas as pd

# Initialize an empty list to store results
results = []

# Loop through the dataset
for idx in range(len(dataset)):
    task, solution = dataset[idx]["question"], dataset[idx]["answer"]
    print(f"Task {idx + 1}:\n{task}\n\nSolution:\n{solution}\n")

    # Run the sample augmentation
    samples = run_sample_augmentation(task, solution, config)

    # Append each sample to the results list
    for sample in samples:
        is_correct = False
        try:
            is_correct = validate_code_against_solution(sample[3], sample[1])
        except Exception as e:
            print(f"Validation failed for Task {idx + 1}: {e}")

        results.append({
            "task": sample[0],
            "solution": sample[1],
            "final_answer": sample[2],
            "code": sample[3],
            "is_correct": is_correct
        })

# Convert the results to a DataFrame
results_df = pd.DataFrame(results)

# Display the DataFrame
print(results_df)

In [None]:
# tokenizer.applychattemplate

    # role user: content: task
    # role assistant: content: answer
    # return dict = True
    # return assistant_token_mask
    # Calcu

# From logits go to the log probabiltiy


In [None]:
# the augmenter should be the VLLM serve Qwen Coder
# For solver we can use the Qwen Math model