In [3]:
# imports
from typing import Optional 
import json
import os
import openai
from getpass import getpass
import cmbagent
import re # for regex
import ast  # safe way to parse Python literals like lists, dicts
import pandas as pd
from tabulate import tabulate #for printing pretty tables in the terminal.

In [4]:
# get openai API KEY

os.environ['OPENAI_API_KEY'] = getpass('Enter your OpenAI API key: ')
openai.api_key = os.environ['OPENAI_API_KEY']

Enter your OpenAI API key:  ········


In [5]:
# essential functions

In [6]:
# Load all problems from a JSON file into a python dict

def load_problems(json_path: str) -> dict: 
    with open(json_path, 'r') as f:
        problems = json.load(f)
    return problems

In [7]:
# load_test_cases_for_one_problem as a list of tuples of lists

def load_test_cases_for_one_problem(problem_id: str, tests_dir: str) -> list | None:
    test_cases = []
    i = 1
    while True:
        input_file = os.path.join(tests_dir, problem_id, f"I.{i}")
        output_file = os.path.join(tests_dir, problem_id, f"O.{i}")

        if not os.path.exists(input_file) or not os.path.exists(output_file):
            break

        try:
            with open(input_file, "r") as f_in:
                input_list = list(map(int, f_in.read().strip().split()))

            with open(output_file, "r") as f_out:
                output_list = list(map(int, f_out.read().strip().split()))

            test_cases.append((input_list, output_list))

        except ValueError as e:
            print(f"⚠️ Skipping problem {problem_id} due to non-numeric data in test case {i}: {e}")
            return None

        i += 1

    return test_cases if test_cases else None


In [8]:
def print_cmbagent_benchmark_summary(results_summary: dict) -> None:
    from tabulate import tabulate
    import pandas as pd

    print("\n============ BENCHMARK SUMMARY FOR CMBAGENT FOR ALL PROBLEMS ==============")
    
    total_accuracy = 0
    total_problems = len(results_summary)
    all_costs = []

    for problem_id, stats in results_summary.items():
        print(f"{problem_id}:\n"
              f"  Total test cases: {stats['total']}\n"
              f"  Correctly solved: {stats['correct']}\n"
              f"  Accuracy: {stats['accuracy']:.2f}%\n")
        total_accuracy += stats['accuracy']

        cost_df = stats.get("cost_dataframe")
        if isinstance(cost_df, pd.DataFrame) and not cost_df.empty:
            all_costs.append(cost_df[cost_df["Agent"] != "Total"])  # only agent-level rows

    average_accuracy = (total_accuracy / total_problems) if total_problems > 0 else 0

    print("============ CONCLUSION ===============")
    print(f"Average accuracy over all problems: {average_accuracy:.2f}%")
    print("=======================================\n")

    # Final benchmark-wide cost aggregation
    if all_costs:
        benchmark_cost_df = pd.concat(all_costs, ignore_index=True)
        benchmark_cost_df = benchmark_cost_df.groupby("Agent", as_index=False).sum(numeric_only=True)
        total_row = benchmark_cost_df.drop(columns=["Agent"]).sum(numeric_only=True)
        total_row["Agent"] = "Total"
        benchmark_cost_df = pd.concat([benchmark_cost_df, pd.DataFrame([total_row])], ignore_index=True)

        print("======================= FINAL BENCHMARK COST SUMMARY =======================")
        print(tabulate(benchmark_cost_df, headers="keys", tablefmt="github"))
        print("============================================================================\n")

In [9]:
# functions for evaluating cmbagent with agent = 'engineer' (executes code by himself)

In [10]:
# find result found by cmbagent through execution with regex

def find_result_in_cmbagent_string(cmbagent_answer: dict) -> list[int] | None:
    last_execution_output_message = None

    for message in reversed(cmbagent_answer['chat_history']):
        content = message.get('content', '')
        if "Execution output:" in content:
            last_execution_output_message = content
            break
    
    if last_execution_output_message:
        match = re.search(r'Execution output:\s*(.*)', last_execution_output_message)
        if match:
            result_str = match.group(1).strip()
            try:
                # Safely parse the string representation of a Python literal (like a list)
                result_list = ast.literal_eval(result_str)
                # Optional: verify it's a list of ints
                if isinstance(result_list, list) and all(isinstance(x, int) for x in result_list):
                    return result_list
                else:
                    print("Parsed result is not a list of ints:", result_list)
                    return None
            except Exception as e:
                print("Error parsing output string:", e)
                return None
        else:
            print("Pattern found but couldn't parse output.")
            return None
    else:
        print("No execution output found in chat history.")
        return None


In [11]:
# functions for evaluating cmbagent with agent = 'researcher' (execute code locally)

In [12]:
def extract_code(model_answer: dict) -> str | None:
    content = model_answer["chat_history"][2]["content"]
    matches = re.findall(r"<code>(.*?)</code>", content, re.DOTALL)
    if matches:
        return matches[-1].strip()  # Return the last <code>...</code> block
    print("⚠️ No <code>...</code> block found.")
    return None


In [13]:
def run_python_code_locally_for_one_test_case(code_str: str, input_data: list[int]) -> list[int]:
    exec_locals = {}

    # run code 
    
    try:
        exec(code_str, {}, exec_locals)
    except Exception as e:
        raise RuntimeError(f"Code execution failed: {e}")

    # get main function
    
    main_func = exec_locals.get("main_function")
    
    if not main_func:
        raise RuntimeError("No function named 'main_function' found in code.")

    # run example on function
    
    try:
        result = main_func(input_data)
    except Exception as e:
        raise RuntimeError(f"Error when calling main_function: {e}")

    # return result (it is an array)
    
    return result

In [20]:
# main benchamrk function for cmbagent

def run_benchmark_on_cmbagent(problems: dict, problem_dir: str, cmbagent_model: str, agent: str) -> dict:

    if agent not in {"engineer", "researcher"}:
        raise ValueError("Agent must be 'engineer' or 'researcher'")

    results_summary = {}
    all_problem_costs = []  # collect all cost DataFrames here
    skipped_problems = []

    total_problems = len(problems)
    c = 0
    
    for problem_index, (problem_id, problem) in enumerate(problems.items(), start=1):
        d = 0
        
        print("===============================================================")
        print(f"\t\tEvaluating CMBAgent on problem {problem_index}/{total_problems}: {problem_id}")
        print("===============================================================")

        test_cases = load_test_cases_for_one_problem(problem_id, problem_dir)
        
        if not test_cases:
            print(f"No test cases found for {problem_id}")
            skipped_problems.append(problem_id)
            continue

        example_input, example_output = test_cases[0]

        if agent == "engineer":
            prompt = (
                f"Task: {problem['description']}\n"
                f"Example:\n"
                f"Input: {example_input}\n"
                f"Expected Output: {example_output}\n"
                f"Deliver the result strictly as a Python list:\n"
                f"\t- If the result is a single integer n, return it as [n]\n"
                f"\t- If the result is a list, return it as [a, b, c, ...]\n"
                f"Do not include any extra text, explanation, or formatting."
            )

        else:  # researcher
            prompt = (
                f"Task:\nWrite a function which solves the following problem: {problem['description']}\n"
                f"Always name the primary function main_function, even if it requires helper functions\n"
                f"Example:\n"
                f"Function Input: {example_input}\n"
                f"Expected Function Output: {example_output}\n"
                f"The function has to deliver the result strictly as a Python list:\n"
                f"\t- If the result is a single integer n, return it as [n]\n"
                f"\t- If the result is a list, return it as [a, b, c, ...]\n"
                f"Do not include any extra text, explanation, or formatting."
            )

            model_answer = cmbagent.one_shot(
                prompt,
                max_rounds=10,
                agent='researcher',
                engineer_model=cmbagent_model,
            )

            code_str = extract_code(model_answer)

        correct = 0
        total = 0
        cost_dfs = []

        for i, (input_list, expected_output) in enumerate(test_cases[1:], start=1):

            if agent == "engineer":

                if i > 1:
                    print("========================================================")
                    print(f"\t\tNEXT TEST CASE ({i}/{len(test_cases) - 1}, on problem {problem_index}/{total_problems})")
                    print("========================================================\n")

                test_prompt = (
                    prompt +
                    f"\nFind the answer for the following input:\n{input_list}\nOutput: ?"
                )

                model_answer = cmbagent.one_shot(
                    test_prompt,
                    max_rounds=10,
                    agent='engineer',
                    engineer_model=cmbagent_model,
                )

                parsed_answer = find_result_in_cmbagent_string(model_answer)

                if parsed_answer == expected_output:
                    correct += 1
                total += 1

                cost_df = model_answer["final_context"].data.get("cost_dataframe", pd.DataFrame())
                if not cost_df.empty:
                    cost_df = cost_df[cost_df["Agent"] != "Total"]
                    cost_dfs.append(cost_df)

            else:  # researcher agent

                print(f"Executing code for TEST CASE {i}/{len(test_cases) - 1}, on problem {problem_index}/{total_problems}\n")

                result_from_code = run_python_code_locally_for_one_test_case(code_str, input_list)

                if not (isinstance(result_from_code, list) and all(isinstance(item, int) for item in result_from_code)):
                    print(f"Result for TEST CASE ({i}/{len(test_cases) - 1}, on problem {problem_index}/{total_problems} is NOT a list")

                if result_from_code == expected_output:
                    correct += 1
                total += 1

                # *** Assuming researcher cost data is stored in model_answer's final_context (you may need to adapt this) ***
                if i == 1:
                    # Example: try to extract cost data from researcher response once per problem (you can change this)
                    researcher_cost_df = model_answer["final_context"].data.get("cost_dataframe", pd.DataFrame())
                    if not researcher_cost_df.empty:
                        researcher_cost_df = researcher_cost_df[researcher_cost_df["Agent"] != "Total"]
                        cost_dfs.append(researcher_cost_df)

            d += 1
            if d == 3:
                break

        accuracy = (correct / total * 100) if total > 0 else 0

        # Aggregate cost for this problem

        if cost_dfs:
            problem_cost_df = pd.concat(cost_dfs, ignore_index=True)
            problem_cost_df = problem_cost_df.groupby("Agent", as_index=False).sum(numeric_only=True)
            total_row = problem_cost_df.drop(columns=["Agent"]).sum(numeric_only=True)
            total_row["Agent"] = "Total"
            problem_cost_df = pd.concat([problem_cost_df, pd.DataFrame([total_row])], ignore_index=True)
            all_problem_costs.append(problem_cost_df[problem_cost_df["Agent"] != "Total"])
        else:
            problem_cost_df = pd.DataFrame()

        results_summary[problem_id] = {
            "total": total,
            "correct": correct,
            "accuracy": accuracy,
            #"cost_dataframe": problem_cost_df  # optional per problem cost
        }

        print(f"\n =========== BENCHMARK RESULT FOR CMBAGENT ON PROBLEM {problem_id} ============\n")
        print(f"Total test cases: {total}")
        print(f"Correctly guessed test_cases: {correct}")
        print(f"Accuracy: {accuracy:.2f}%")
        # Optionally, print problem cost breakdown
        # print(tabulate(problem_cost_df, headers='keys', tablefmt='github'))

        c += 1
        if c == 3:
            break

    print("⚠️ Skipped problems due to non-numeric data:")
    for pid in skipped_problems:
        print(f"- {pid}")

    # Aggregate total cost across all problems
    if all_problem_costs:
        total_cost_df = pd.concat(all_problem_costs, ignore_index=True)
        total_cost_df = total_cost_df.groupby("Agent", as_index=False).sum(numeric_only=True)
        total_row = total_cost_df.drop(columns=["Agent"]).sum(numeric_only=True)
        total_row["Agent"] = "Total"
        total_cost_df = pd.concat([total_cost_df, pd.DataFrame([total_row])], ignore_index=True)

        print("\n=== Total aggregated cost across all problems ===")
        print(tabulate(total_cost_df, headers="keys", tablefmt="github"))
    else:
        print("No cost data available to summarize.")

    return results_summary


In [18]:
# main wrapper function (this one is called by user and 'does all the work')

def run_benchmark(
    problem_json: str,
    problem_dir: str, 
    eval_cmbagent: bool = True, 
    cmbagent_model: Optional[str] = None,
    eval_normal_llm: bool = False, 
    llm_model: Optional[str] = None
) -> None:

    """
    Run benchmark evaluations on problems located in problem_dir.
    
    Args:
        problem_dir (str): Path to the directory containing problems and test data.
        eval_cmbagent (bool): Whether to evaluate cmbagent model.
        eval_normal_llm (bool): Whether to evaluate a normal LLM.
        llm_model (Optional[str]): The normal LLM model name (required if eval_normal_llm is True).
    
    Returns:
        None
    """

    # Validate arguments

    if not eval_cmbagent and not eval_normal_llm:
        raise ValueError("At least one of eval_cmbagent or eval_normal_llm must be True")
    if eval_normal_llm and not llm_model:
        raise ValueError("llm_model must be provided if eval_normal_llm is True")
    if not problem_json or not problem_dir:
        raise ValueError("Both problem_json and problem_dir must be specified")
    if not os.path.exists(problem_json):
        raise FileNotFoundError(f"Problem JSON file not found: {problem_json}")
    if not os.path.exists(problem_dir):
        raise FileNotFoundError(f"Problem directory not found: {problem_dir}")
    if eval_cmbagent and not cmbagent_model:
        raise ValueError("cmbagent_model must be provided if eval_cmbagent is True")
    
    # Load problems

    problems = load_problems(problem_json)
    
    # evaluate cmbagent on problems

    if eval_cmbagent: 
        results = run_benchmark_on_cmbagent(problems, problem_dir, cmbagent_model, "researcher", )
        print_cmbagent_benchmark_summary(results)
        
    
    # evaluate the normal LLM on problems

    if eval_normal_llm:
        run_benchmark_on_normal_llm(problems, problem_dir, llm_model)
    
    # Print or save benchmark results

    print("Benchmark evaluation completed.")

In [16]:
# examples

In [33]:
# benchmark on 4 extremely easy problems to make sure everything works properly

run_benchmark(
    problem_json="/mnt/p/stage/cmbagent_benchmark/data/clean/easy_custom_samples.json",
    problem_dir="/mnt/p/stage/cmbagent_benchmark/data/clean/easy_tests",
    eval_cmbagent=True,
    cmbagent_model="gpt-4o-mini"
)

		Evaluating CMBAgent on problem 1/4: 0001_easy_addition
Task:
Write a function which solves the following problem: Given two integers A and B, output their sum.
Always name the primary function main_function, even if it requires helper functions
Example:
Function Input: [1, 2]
Expected Function Output: [3]
The function has to deliver the result strictly as a Python list:
	- If the result is a single integer n, return it as [n]
	- If the result is a list, return it as [a, b, c, ...]
Do not include any extra text, explanation, or formatting.

--------------------------------------------------------------------------------
[32m
Calling researcher...
[0m
             Model      agent    Cost  Prompt Tokens  Completion Tokens  Total Tokens
gpt-4.1-2025-04-14 researcher 0.00124            521                 25           546
<code>
def main_function(inputs):
    A, B = inputs
    return [A + B]
</code>

--------------------------------------------------------------------------------
Forma

In [19]:
# benchmark on real usaco problems (right now manually limited to 5 problems and 3 test cases each for time efficient reasons)
# see the end for benchmark conclusion

run_benchmark("/mnt/p/stage/cmbagent_benchmark/data/clean/usaco_clean_307.json",
              "/mnt/p/stage/cmbagent_benchmark/data/clean/usaco_tests",
              eval_cmbagent=True,
              cmbagent_model="gpt-4o-mini",
              eval_normal_llm=False)

		Evaluating CMBAgent on problem 1/307: 1333_platinum_good_bitstrings
Task:
Write a function which solves the following problem: 
For any two positive integers $a$ and $b$, define the function
$\texttt{gen_string}(a,b)$ by the following Python code:


def gen_string(a: int, b: int):
	res = ""
	ia, ib = 0, 0
	while ia + ib < a + b:
		if ia * b <= ib * a:
			res += '0'
			ia += 1
		else:
			res += '1'
			ib += 1
	return res

Equivalent C++ code:


string gen_string(int64_t a, int64_t b) {
	string res;
	int ia = 0, ib = 0;
	while (ia + ib < a + b) {
		if ((__int128)ia * b <= (__int128)ib * a) {
			res += '0';
			ia++;
		} else {
			res += '1';
			ib++;
		}
	}
	return res;
}

$ia$ will equal $a$ and $ib$ will equal $b$ when the loop terminates, so this
function returns a  bitstring of length $a+b$ with exactly $a$ zeroes and $b$
ones. For example, $\texttt{gen_string}(4,10)=01110110111011$.

Call a bitstring $s$ $\textbf{good}$ if there exist positive integers $x$ and
$y$  such that $s=\te