In [1]:
from datasets import load_dataset
from service import Service
from utils import show_diff
import json
import time
import os

model = "gpt-4o-mini"
dataset_name = "humaneval"
main_folder = "code_rag_bench"
eval_folder = f"{main_folder}/{dataset_name}_{model}".replace('-', '_')
os.makedirs(eval_folder, exist_ok=True)
failed_folder = f"{eval_folder}/failed"
os.makedirs(failed_folder, exist_ok=True)

s = Service(seed=42, get_usage=True, get_diff=True)

dataset = load_dataset(f"code-rag-bench/{dataset_name}")["train"]
df = dataset.to_polars()

In [2]:
def compute_price(model, in_tk, out_tk):
    prices_per_1k = {
        "gpt-4o": {"input": 0.0025, "output": 0.010},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
    }

    if model not in prices_per_1k:
        raise ValueError(f"Model {model} not found in the pricing list")

    input_cost = (in_tk / 1000) * prices_per_1k[model]["input"]
    output_cost = (out_tk / 1000) * prices_per_1k[model]["output"]

    return input_cost + output_cost

## Get the Responses

In [3]:
RESPONSES_FILE = f"./{eval_folder}/benchmark.json"

r = {"data":[], "checkpoint": 0, "input_tokens": 0, "output_tokens": 0, "total_price": 0, "price_per_req": 0}
if os.path.exists(RESPONSES_FILE):
    with open(RESPONSES_FILE, "r") as f:
        r = json.load(f)
else:
    with open(RESPONSES_FILE, "w") as f:
        f.write(json.dumps(r, indent=4))

In [4]:
assert r['checkpoint'] == len(r['data']), "Checkpoint does not match with executed requests"
print(f"Done {r['checkpoint']}/{len(df)} ({100*r['checkpoint']/len(df):.2f}%)")

Done 33/164 (20.12%)


In [6]:
for d in df[r['checkpoint']:].iter_rows(named=True):
    print(f"Task ID: {d['task_id']}")
    response = s.process(None, d['prompt'], model)
    #show_diff(response['diff'])

    try: idx_diff = max(i for i, item in enumerate(response['diff']) if item[0] == "insert")
    except: idx_diff = -1
    r["data"].append({
        "solution": response['updatedCode'],
        "only_solution": response['diff'][idx_diff][-1] if idx_diff >=0 else response['updatedCode'],
    })

    r["input_tokens"] += response["usage"]["input"]
    r["output_tokens"] += response["usage"]["output"]
    r["checkpoint"] += 1

    if r['checkpoint'] % 10 == 0:
        price = compute_price(model, r['input_tokens'], r['output_tokens'])
        r['total_price'] = round(price, 4)
        r['price_per_req'] = round(price / r['checkpoint'], 6)
        with open(RESPONSES_FILE, "w") as f:
            f.write(json.dumps(r, indent=4))
        print(f"SAVED ({r['checkpoint']})")

price = compute_price(model, r['input_tokens'], r['output_tokens'])
r['total_price'] = round(price, 4)
r['price_per_req'] = round(price / r['checkpoint'], 8)
with open(RESPONSES_FILE, "w") as f:
    f.write(json.dumps(r, indent=4))

Task ID: HumanEval/33
Task ID: HumanEval/34
Task ID: HumanEval/35
Task ID: HumanEval/36
Task ID: HumanEval/37
Task ID: HumanEval/38
Task ID: HumanEval/39
SAVED (40)
Task ID: HumanEval/40
Task ID: HumanEval/41
Task ID: HumanEval/42
Task ID: HumanEval/43
Task ID: HumanEval/44
Task ID: HumanEval/45
Task ID: HumanEval/46
Task ID: HumanEval/47
Task ID: HumanEval/48
Task ID: HumanEval/49
SAVED (50)
Task ID: HumanEval/50
Task ID: HumanEval/51
Task ID: HumanEval/52
Task ID: HumanEval/53
Task ID: HumanEval/54
Task ID: HumanEval/55
Task ID: HumanEval/56
Task ID: HumanEval/57
Task ID: HumanEval/58
Task ID: HumanEval/59
SAVED (60)
Task ID: HumanEval/60
Task ID: HumanEval/61
Task ID: HumanEval/62
Task ID: HumanEval/63
Task ID: HumanEval/64
Task ID: HumanEval/65
Task ID: HumanEval/66
Task ID: HumanEval/67
Task ID: HumanEval/68
Task ID: HumanEval/69
SAVED (70)
Task ID: HumanEval/70
Task ID: HumanEval/71
Task ID: HumanEval/72
Task ID: HumanEval/73
Task ID: HumanEval/74
Task ID: HumanEval/75
Task ID: H

## Evaluate the Responses

In [9]:
failed_tasks = []
log = ""

for row,_ in zip(df[:r['checkpoint']].iter_rows(named=True), r['data']):
    row.update(_)

    with open(f"./{main_folder}/solution.py", "w", encoding='utf-8') as f:
        if not row['solution'].startswith(('def', 'class', 'import', 'from')):
            f.write(row['prompt']+'\n')
        f.write(row['solution'])
    with open(f"./{main_folder}/canonical_solution.py", "w", encoding='utf-8') as f:
        f.write(row['prompt']+row['canonical_solution'])
    with open(f"./{main_folder}/evaluate.py", "w", encoding='utf-8') as f:
        f.write(f"from {main_folder}.canonical_solution import *\n")
        f.write(row['test'])
    
    try:
        exec(open(f"./{main_folder}/canonical_solution.py").read())
        exec(open(f"./{main_folder}/solution.py").read())
        exec(open(f"./{main_folder}/evaluate.py").read())
        globals()['check'](globals()[row['entry_point']])
    except Exception as e:
        id = row['task_id']
        realid = id.split('/')[1]
        log += f"""Error ({id}): [{str(e.__class__).replace("<class '", "").replace("'>", "")}] {e}\n"""
        # renam solution and evaluate files for the failed task, so they can be debugged
        if not os.path.exists(f"./{failed_folder}/c{realid}_solution.py"):
            os.rename(f"./{main_folder}/solution.py", f"./{failed_folder}/c{realid}_solution.py")
        if not os.path.exists(f"./{failed_folder}/c{realid}_canonical_solution.py"):
            os.rename(f"./{main_folder}/canonical_solution.py", f"./{failed_folder}/c{realid}_canonical_solution.py")
        if not os.path.exists(f"./{failed_folder}/c{realid}_evaluate.py"):
            with open(f"./{failed_folder}/c{realid}_evaluate.py", "w", encoding='utf-8') as f:
                f.write(f"from c{realid}_canonical_solution import *\n")
                f.write(f"from c{realid}_solution import {row['entry_point']} as proposed\n")
                f.write(row['test'])
                f.write(f"\ncheck(proposed)")
        failed_tasks.append(id)

success = r['checkpoint'] - len(failed_tasks)
log += f"Accuracy: {success}/{r['checkpoint']} ({success/r['checkpoint']:.2%})\n"
log += f"Failed tasks: {failed_tasks}"

with open(f"{eval_folder}/benchmark.log", "w") as f:
    f.write(log)

# remove the temporary files
if os.path.exists(f"./{main_folder}/solution.py"):
    os.remove(f"./{main_folder}/solution.py")
if os.path.exists(f"./{main_folder}/evaluate.py"):
    os.remove(f"./{main_folder}/evaluate.py")
if os.path.exists(f"./{main_folder}/canonical_solution.py"):
    os.remove(f"./{main_folder}/canonical_solution.py")