In [1]:
from openai import OpenAI
from dateutil.relativedelta import relativedelta
import os
import json
from pydantic import BaseModel,Field
from langchain.llms import OpenAI
from dotenv import load_dotenv
from langchain.chat_models import ChatOpenAI, init_chat_model
from langchain_core.messages import HumanMessage, SystemMessage
from tqdm import tqdm
import re
import textwrap
from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
file_path = '../data/GSM8K/test.jsonl'

with open(file_path, "r", encoding="utf-8") as f:
    dataset = [json.loads(line) for line in f]

In [3]:
load_dotenv()

True

In [32]:

class Step(BaseModel):
    explanation: str
    output: str
class IntermediateProgram(BaseModel):
    program: str
    result: str  
class FinalAnswer(BaseModel):
    steps: list[Step]
    final_answer: str

load_dotenv()
model = init_chat_model("gpt-4o-mini", model_provider="openai", temperature=0.2)

def PoT(question: str) -> IntermediateProgram:
    pot_prompt = SystemMessage(content="""
You are a math expert. Read the question and write code to solve it.
""")
    pot_messages = [pot_prompt, HumanMessage(content=question)]
    model_pot = model.with_structured_output(IntermediateProgram)
    return model_pot.invoke(pot_messages)

# === Function: Prompt ===
def Prompt(question: str, intermediate_result: str) -> FinalAnswer:
    final_prompt = SystemMessage(content="""
Based on the question and result, return only the numeric final answer.
You must explain the solution step-by-step using 'steps'.
Final numeric result in 'final_answer'.
The result should be a number only, with no units.
If the final result is a decimal ending in .0, convert it to an integer using int() or round().
""")
    context = question + f"\nAccording to the program: ans = {intermediate_result}"
    prompt_messages = [final_prompt, HumanMessage(content=context)]
    model_final = model.with_structured_output(FinalAnswer)
    return model_final.invoke(prompt_messages)

question = "Carlos is planting a lemon tree. The tree will cost $90 to plant. Each year it will grow 7 lemons, which he can sell for $1.5 each. It costs $3 a year to water and feed the tree. How many years will it take before he starts earning money on the lemon tree?"
intermediate = PoT(question)
intermediate_result = intermediate.result
final = Prompt(question, intermediate_result)
print(intermediate.result)
print(final.final_answer)


13
13


In [21]:
def extract_ground_truth(answer: str) -> str:
    match = re.search(r"####\s*(\d+)", answer)
    return match.group(1).strip() if match else ""

def compare_answers(predicted: str, actual: str) -> bool:
    return predicted.strip() == actual.strip()

In [34]:
import ast
def process_item(item):
    question = item["question"]
    true_answer = extract_ground_truth(item["answer"])
    try:
        intermediate = PoT(question)
        intermediate_result = intermediate.result
        final = Prompt(question, intermediate_result)
        return {
            "question": question,
            "true_answer": true_answer,
            "predicted_answer": final.final_answer,
            "steps": [step.model_dump() for step in final.steps],
            "correct": compare_answers(final.final_answer, true_answer)
        }
    except Exception as e:
        return {"error": str(e), "question": question}

results = []
correct = 0
total = len(dataset[:300])
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(process_item, item) for item in dataset[:300]]
    for future in tqdm(as_completed(futures), total=total):
        result = future.result()
        if "error" not in result:
            results.append(result)
            if result["correct"]:
                correct += 1
        else:
            print(f"Error on question: {result['question'][:60]}... => {result['error']}")
accuracy = correct / total * 100
print(f"Accuracy: {accuracy:.2f}% ({correct}/{total})")


100%|██████████| 300/300 [07:51<00:00,  1.57s/it]

Accuracy: 92.00% (276/300)





In [35]:
# Lưu ra file JSONL
output_path = "PoT_results.jsonl"
with open(output_path, "w", encoding="utf-8") as f:
    for item in results:
        f.write(json.dumps(item, ensure_ascii=False) + "\n")
print(f"Đã lưu kết quả vào {output_path}")

Đã lưu kết quả vào PoT_results.jsonl
