In [None]:
%pip install datasets, aixplain

In [None]:
import os
import sys
import re
import json
import time
import yaml
import queue
import logging
import traceback
import subprocess
import multiprocessing
from datasets import load_dataset
from langchain.tools import StructuredTool
from aixplain.factories import AgentFactory

# Setup
os.environ["TEAM_API_KEY"] = "TEAM_API_KEY"  # Replace with your key
multiprocessing.set_start_method("fork", force=True)

In [None]:
TOOL_DESCRIPTION = "A Python shell. Use this to execute python program."
INPUT_DESCRIPTION = """Input MUST be a JSON map with the following keys: {0}. The input MUST be in the following format: {1}."""

In [None]:
def exec_program(q, program, input_data, expected_output, timeout):
    try:
        start_time = time.time()
        process = subprocess.Popen(
            [sys.executable, "-c", program],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
        stdout, stderr = process.communicate(input=input_data, timeout=timeout)
        if time.time() - start_time > timeout:
            raise TimeoutError("Execution timed out.")
        if process.returncode != 0:
            q.put(f"failed: {stderr}")
            return

        stdout_clean = stdout.strip()
        expected_clean = expected_output.strip()

        try:
            if round(float(stdout_clean), 9) == round(float(expected_clean), 9):
                q.put("passed")
            else:
                q.put("failed")
        except ValueError:
            q.put("passed" if stdout_clean == expected_clean else "failed")

    except subprocess.TimeoutExpired:
        process.kill()
        q.put("timed out")
    except Exception:
        q.put(f"failed: {traceback.format_exc()}")

In [None]:
def get_code_from_output(output: str) -> str:
    match = re.search(r"```python(.*?)```", output, re.DOTALL)
    return match.group(1).strip() if match else output.strip()

In [None]:
def check_correctness(program: str, input_data: str, expected_output: str, timeout: float, **kwargs) -> str:
    code = get_code_from_output(program)
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=exec_program, args=(q, code, input_data, expected_output, timeout))
    p.start()
    p.join(timeout + 1)
    if p.is_alive():
        p.terminate()
        p.join()
        return "timed out"
    try:
        return q.get_nowait()
    except queue.Empty:
        return "no result returned"

In [None]:
def get_python_exec_tool():
    params = {
        "program": "str: the python code to be executed",
        "input_data": "str: input data",
        "expected_output": "str: expected output",
        "timeout": "float: time in seconds before the code execution times out",
    }
    desc = INPUT_DESCRIPTION.format(
        "\n".join([f"{k}: {v}" for k, v in params.items()]),
        {k: "VALUE" for k in params}
    )
    return StructuredTool.from_function(
        name="python_code_execution",
        description=TOOL_DESCRIPTION + " " + desc,
        func=check_correctness
    )

In [None]:
def load_config(config_path: str) -> dict:
    try:
        with open(config_path, "r") as file:
            config = yaml.safe_load(file)
        if not config:
            raise ValueError("Configuration file is empty.")
        return config
    except FileNotFoundError:
        raise FileNotFoundError(f"Configuration not found: {config_path}")
    except yaml.YAMLError as e:
        raise ValueError(f"YAML parsing error: {e}")

In [None]:
def main(output_folder="code_contests_results", max_samples=6):
    agent = AgentFactory.create(
        name="Code Contests Team Agent",
        description="Single agent configured for the Code Contests experiment.",
        llm_id="669a63646eb56306647e1091"
    )

    os.makedirs(output_folder, exist_ok=True)
    data = load_dataset("deepmind/code_contests", split="test", streaming=True)

    correct_total = 0
    processed_count = 0
    log_path = os.path.join(output_folder, "code_contests_results.jsonl")

    for idx, sample in enumerate(data):
        if processed_count >= max_samples:
            break

        try:
            example = sample["description"]
            time_limit = sample["time_limit"]["seconds"]
            query = f"""Generate Python code to solve the following problem. Use `input()` to read input and `print()` to output. Timeout is {time_limit} seconds. 
            Problem:
            {example}"""

            start_time = time.time()
            response = agent.run(query=query)
            elapsed = round(time.time() - start_time, 2)

            try:
                cost = response.usedCredits
            except:
                cost = "N/A"

            code = get_code_from_output(response.data.output)
            passed = 0
            test_results = []

            if code:
                for i, (inp, outp) in enumerate(zip(sample["public_tests"]["input"], sample["public_tests"]["output"])):
                    try:
                        result = check_correctness(code, inp, outp, timeout=time_limit)
                        test_results.append({"test_id": i + 1, "result": result})
                        if result == "passed":
                            passed += 1
                    except Exception as e:
                        test_results.append({"test_id": i + 1, "result": f"error: {e}"})
            else:
                test_results.append({"test_id": 1, "result": "no code generated"})

            success_rate = passed / len(sample["public_tests"]["input"])
            is_correct = success_rate == 1.0
            correct_total += success_rate
            current_accuracy = correct_total / (processed_count + 1)

            result = {
                "sample_index": idx + 1,
                "name": sample["name"],
                "description": example,
                "time_limit": time_limit,
                "public_tests_input": sample["public_tests"]["input"],
                "public_tests_output": sample["public_tests"]["output"],
                "generated_code": code if code else "No code generated",
                "agent_response": response.data.output,
                "is_correct": is_correct,
                "success_rate": success_rate,
                "current_accuracy": current_accuracy,
                "inference_time_seconds": elapsed,
                "cost": cost,
                "test_results": test_results,
            }

            with open(os.path.join(output_folder, f"{processed_count + 1}_results.json"), "w") as f:
                json.dump(result, f, indent=4)

            with open(log_path, "a") as log_f:
                log_f.write(json.dumps(result) + "\n")

            print(f"[✓] Sample {idx + 1} processed — accuracy: {current_accuracy:.2%}")
            processed_count += 1

        except Exception as e:
            logging.error(f"Error on sample {processed_count + 1}: {e}")

In [None]:
if __name__ == "__main__":
    main()