## Pipeline: LLM-powered program generation for solving ARC-AGI

### Imports

In [29]:
import numpy as np
import ollama
import os
import json
from dotenv import load_dotenv
from openai import OpenAI
import re
import importlib.util

### Shared Variables

In [30]:
# Shared system prompt for all tasks
system_prompt = """
You are a visual reasoning and Python programming expert solving ARC-AGI (Abstraction and Reasoning Corpus - Artificial General Intelligence) tasks.

Each integer in the grid represents a color:
0 = black, 1 = blue, 2 = red, 3 = green, 4 = yellow,
5 = grey, 6 = pink, 7 = orange, 8 = light blue, 9 = brown.
"""


### Prompts

#### Basic Prompt

In [31]:
base_prompt = """
Write a Python function that correctly transforms each input grid into its corresponding output grid based on the given examples.

- The function must be named: `solve(grid: List[List[int]]) -> List[List[int]]`
- Include only the code and necessary imports (e.g., `import numpy as np`)
- Do not include comments, explanations, or print statements
- Do not hard-code values or specific grid sizes — the function must generalize based on the patterns in the examples
- Ensure your solution works for all provided input-output pairs
"""

In [32]:
print(base_prompt)


Write a Python function that correctly transforms each input grid into its corresponding output grid based on the given examples.

- The function must be named: `solve(grid: List[List[int]]) -> List[List[int]]`
- Include only the code and necessary imports (e.g., `import numpy as np`)
- Do not include comments, explanations, or print statements
- Do not hard-code values or specific grid sizes — the function must generalize based on the patterns in the examples
- Ensure your solution works for all provided input-output pairs



#### Prompt 1

In [33]:
prompt_1 = """
List visual observations from the training pairs.

- Use bullet points (max 10).
- Focus on colors, shapes, object counts, positions, and differences.
- Avoid reasoning or explanations.
- Be concise. No full sentences, no extra formatting.
"""

In [34]:
print(prompt_1)


List visual observations from the training pairs.

- Use bullet points (max 10).
- Focus on colors, shapes, object counts, positions, and differences.
- Avoid reasoning or explanations.
- Be concise. No full sentences, no extra formatting.



#### Prompt 2

In [35]:
prompt_2 = """
Describe the transformation(s) from input to output grids.

- Use 3 to 5 short sentences.
- Focus on what changes: movement, color, shape, duplication, etc.
- Mention if the transformation is based on position, context, or rules.
- Avoid implementation hints or code.
"""

In [36]:
print(prompt_2)


Describe the transformation(s) from input to output grids.

- Use 3 to 5 short sentences.
- Focus on what changes: movement, color, shape, duplication, etc.
- Mention if the transformation is based on position, context, or rules.
- Avoid implementation hints or code.



#### Prompt 3

In [37]:
prompt_3 = """
Reflect on how you would solve the task in Python.

- Use 3 to 5 sentences.
- Mention your overall approach, logical steps, and possible uncertainties.
- Do not return code or pseudocode.
"""

In [38]:
print(prompt_3)


Reflect on how you would solve the task in Python.

- Use 3 to 5 sentences.
- Mention your overall approach, logical steps, and possible uncertainties.
- Do not return code or pseudocode.



#### Prompt 4

In [39]:
revision_prompt = """
In the following you'll receive a Python function that attempted to solve the following task. It did'nt succeed and you are tasked with fixing it.

- The function must be named: `solve(grid: List[List[int]]) -> List[List[int]]`
- Include only the code and necessary imports (e.g., `import numpy as np`)
- Do not include comments, explanations, or print statements
- Do not hard-code values or specific grid sizes — the function must generalize based on the patterns in the examples
- Ensure your solution works for all provided input-output pairs
"""

In [40]:
print(revision_prompt)


In the following you'll receive a Python function that attempted to solve the following task. It did'nt succeed and you are tasked with fixing it.

- The function must be named: `solve(grid: List[List[int]]) -> List[List[int]]`
- Include only the code and necessary imports (e.g., `import numpy as np`)
- Do not include comments, explanations, or print statements
- Do not hard-code values or specific grid sizes — the function must generalize based on the patterns in the examples
- Ensure your solution works for all provided input-output pairs



### Functions

#### Load Tasks

In [41]:
def load_tasks(folder):
    tasks = []
    for filename in sorted(os.listdir(folder)):
        if filename.endswith(".json"):
            with open(os.path.join(folder, filename), "r") as f:
                data = json.load(f)
                tasks.append({"filename": filename, "data": data})
    return tasks

#### Load API-Key

In [42]:
def load_api_key(file_path="key.env"):
    load_dotenv(file_path)
    import openai
    openai.api_key = os.getenv("OPENAI_API_KEY")
    if not openai.api_key:
        print("No API key found. Please set OPENAI_API_KEY in key.env.")
    global client
    client = OpenAI()

#### Call GPT

In [43]:
def call_gpt(prompt):
    response = client.chat.completions.create(
        model="o3-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": prompt}
        ]
    )
    return response.choices[0].message.content.strip()

#### Building and Combining Prompts

Adds the tasks demonstration pairs to the prompt:

In [44]:
def add_tasks(prompt, task_data):
    full_prompt = prompt.strip() + "\n\nHere are the demonstration pairs (JSON data):\n"
    for i, pair in enumerate(task_data['train']):
        full_prompt += f"\nTrain Input {i+1}: {pair['input']}\n"
        full_prompt += f"Train Output {i+1}: {pair['output']}\n"
    return full_prompt

Combines secondary prompt 1 and 2:

In [45]:
def combine_prompts_1_and_2(prompt_1_response, prompt_2_template):
    combined_prompt = f"""{prompt_2_template.strip()}

Here are visual observations of the task at hand, that may assist you in identifying the transformation:

{prompt_1_response.strip()}

Now provide your transformation analysis based on these observations."""
    return combined_prompt

Combines secondary prompt 1, 2 and 3:

In [46]:
def combine_prompts_1_2_and_3(prompt_1_response, prompt_2_response, prompt_3_template):
    combined_prompt = f"""{prompt_3_template.strip()}

Here are visual observations of the task that may help inform your implementation:
{prompt_1_response.strip()}

Here are the transformation rules that have been identified based on the task:
{prompt_2_response.strip()}

Now reflect on how you would implement a solution to this task in Python, following the instructions above.
"""
    return combined_prompt


Combines secondary prompt 3 with the base prompt

In [47]:
def combine_prompts_3_and_base(prompt_3_response, prompt_base_template):
    combined_prompt = f"""
Implementation Reflection:
{prompt_3_response.strip()}

{prompt_base_template.strip()}
"""
    return combined_prompt.strip()

Combine responses of the secondary prompt to the base prompt to create task-tailored prompt.

In [48]:
def build_prompts(task_data):
    # Build secondary prompt 1
    full_prompt_1 = add_tasks(prompt_1, task_data)
    response_1 = call_gpt(full_prompt_1)
    
    # Build secondary prompt 2
    combined_prompt_2 = combine_prompts_1_and_2(response_1, prompt_2)
    full_prompt_2 = add_tasks(combined_prompt_2, task_data)
    response_2 = call_gpt(full_prompt_2)

    # Build secondary prompt 3
    combined_prompt_3 = combine_prompts_1_2_and_3(response_1, response_2, prompt_3)
    full_prompt_3 = add_tasks(combined_prompt_3, task_data)
    response_3 = call_gpt(full_prompt_3)
    
    # Build task-tailored prompt
    combined_prompt_base = combine_prompts_3_and_base(response_3, base_prompt)
    tailored_prompt = add_tasks(combined_prompt_base, task_data)
    print("Built tailored prompt.")
    
    return tailored_prompt

#### Save Programs

In [49]:
def save_program(program_text, task_id):
    import re

    # Define the base and task-specific folder paths
    base_folder = "Candidate_programs"
    task_folder = os.path.join(base_folder, f"task_{task_id}")
    
    # Create the task-specific folder if it doesn't exist
    os.makedirs(task_folder, exist_ok=True)

    # Remove ```python or ``` if present
    cleaned_text = re.sub(r"^```(?:python)?\s*|```$", "", program_text.strip(), flags=re.MULTILINE)

    # Find the next available version number
    existing_files = os.listdir(task_folder)
    version_numbers = [
        int(re.search(r"solution_v(\d+)\.py", fname).group(1))
        for fname in existing_files
        if re.match(r"solution_v\d+\.py", fname)
    ]
    next_version = max(version_numbers, default=0) + 1
    
    # Define the full path to the new Python file
    file_path = os.path.join(task_folder, f"solution_v{next_version}.py")
    
    # Save the program text to the file
    with open(file_path, "w", encoding="utf-8") as f:
        f.write(cleaned_text.strip())

    
    print(f"Saved program for task {task_id} as version {next_version}: {file_path}")

#### Create Programs

In [None]:
def create_programs(tailored_prompt, task_index, amount):
    # Create two programs (change range for n programs)
    for i in range(amount):  # You can adjust the range to create more programs
        response = call_gpt(tailored_prompt)
        
        # Save the program and store its name
        program_name = save_program(response, task_index)

#### Evaluate Programs

In [51]:
def load_program(file_path):
    """Load and execute a Python program from a file."""
    spec = importlib.util.spec_from_file_location("program", file_path)
    program = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(program)
    return program.solve  # Assumes the function is named `solve`


In [52]:
def evaluate_programs(task_data, task_index, task_folder):
    """Evaluate programs and collect detailed candidate outputs for each demonstration pair."""
    programs = []
    program_files = [f for f in os.listdir(task_folder) if f.endswith(".py")]
    
    for program_file in program_files:
        program_path = os.path.join(task_folder, program_file)
        solve_function = load_program(program_path)  # Load the solve function
        
        details = []
        correct_count = 0
        total_pairs = len(task_data['train'])
        
        # Test each demonstration pair and collect detailed outputs
        for pair in task_data['train']:
            input_grid = pair['input']
            expected_output = pair['output']
            try:
                candidate_output = solve_function(input_grid)
                # Convert outputs to arrays for comparison
                if np.array_equal(np.array(candidate_output), np.array(expected_output)):
                    correct_count += 1
            except Exception as e:
                candidate_output = f"Error: {e}"
            details.append({
                "input": input_grid,
                "candidate_output": candidate_output,
                "expected_output": expected_output
            })
        
        score = correct_count / total_pairs if total_pairs > 0 else 0
        programs.append({
            "program_name": program_file,
            "score": score,
            "correct_pairs": correct_count,
            "total_pairs": total_pairs,
            "details": details
        })
    
    return programs

#### Creation of Revision Prompt and Revised Programs

In [53]:
def revise_candidate_program(candidate_file_path, task_data, task_id):
    # Load candidate code text
    with open(candidate_file_path, "r", encoding="utf-8") as f:
        candidate_code_text = f.read()
    
    # Load candidate's solve function
    solve_fn = load_program(candidate_file_path)
    
    # Use the externally defined revision_prompt and add the generated code and demonstration pairs details.
    local_revision_prompt = revision_prompt + "\n\nHere is the generated code:\n" + candidate_code_text + "\n\nDemonstration Pairs:\n"
    
    for i, pair in enumerate(task_data['train']):
        try:
            candidate_output = solve_fn(pair['input'])
        except Exception as e:
            candidate_output = f"Error: {e}"
        local_revision_prompt += f"{i+1}. Input: {pair['input']}\n"
        local_revision_prompt += f"   Expected Output: {pair['output']}\n"
        local_revision_prompt += f"   Generated Output: {candidate_output}\n"
    
    local_revision_prompt += "\nPlease revise the code."
    
    # Send the revision prompt to the LLM
    revised_code = call_gpt(local_revision_prompt)
    
    # Save the revised program using your existing save_program function
    save_program(revised_code, task_id)

#### Identification of Best Programs

In [54]:
def get_best_programs(evaluation_results, task_data, task_id, n=2):
    """
    Evaluates programs for a given task and returns the file paths for the top n programs based on demonstration pairs.
    Always returns exactly n programs by sorting by score in descending order.
    """
    # Sort programs by score descending; if scores are equal, the original order is preserved.
    sorted_programs = sorted(evaluation_results, key=lambda x: x['score'], reverse=True)
    task_folder = os.path.join("Candidate_programs", f"task_{task_id}")
    best_program_files = [os.path.join(task_folder, prog['program_name']) for prog in sorted_programs[:n]]
    for i in best_program_files:
        print(f"Best program: {i}")
    return best_program_files



#### Generation of Predictions on Test Inputs

In [55]:
def generate_test_predictions(task_data, actual_task_id, best_program_files):
    """
    Loads the two best candidate programs from the specified file paths,
    runs each one on every test input, and saves the generated outputs
    in the submission file.
    
    Expects task_data to have a 'test' key with a list of test pairs,
    where each pair is a dict containing an "input" key.
    
    Returns a submission dictionary of the form:
    { actual_task_id: [ { "attempt_1": output_from_program1, "attempt_2": output_from_program2 }, ... ] }
    """
    # Load the candidate solvers using the file paths.
    best_solvers = [load_program(prog_file) for prog_file in best_program_files]
    
    predictions = []
    # Iterate over each test pair.
    for i, pair in enumerate(task_data["test"]):
        input_grid = pair["input"]
        attempt_predictions = {}
        # Run each candidate solver on the test input.
        for idx, solver in enumerate(best_solvers, start=1):
            output = solver(input_grid)
            attempt_predictions[f"attempt_{idx}"] = output
        predictions.append(attempt_predictions)
    
    submission = {str(actual_task_id): predictions}
    
    return submission


### Pipeline

In [None]:
# Load tasks and API key
tasks = load_tasks("evaluation_set")
load_api_key()

# Stores predictions for all tasks
final_submission = {} 

# Loop through each task (adjustable for n tasks)
for i, task in enumerate(tasks[:1]):
    # Extract the actual task id from the filename (for submission file)
    actual_task_id = task["filename"].split(".")[0]
    
    # Create task-tailored prompt
    tailored_prompt = build_prompts(task['data'])
    
    # Create programs based on the tailored prompt (Change amount for n programs)
    create_programs(tailored_prompt, i+1, amount=2)
    
    # Evaluate the created programs and revise if necessary
    task_folder = os.path.join("Candidate_programs", f"task_{i+1}")
    evaluation_results = evaluate_programs(task['data'], i+1, task_folder)
    for result in evaluation_results:
        # If program has a score < 1 it will be revised (<1 meaning it did'nt solve all demonstration pairs correctly)
        if result['score'] < 1:
            candidate_file = os.path.join(task_folder, result['program_name'])
            print(f"Revising {result['program_name']}...")
            # Gives the candidate program to the revision function to create a revision prompt and generate a new program
            revise_candidate_program(candidate_file, task['data'], i+1)
    
    # Second evaluation and printing of results
    evaluation_results = evaluate_programs(task['data'], i+1, task_folder)
    print(f"Task {actual_task_id} ({i+1}) evaluation results:")
    for result in evaluation_results:
        print(f"Program {result['program_name']} solved {result['correct_pairs']} out of {result['total_pairs']} pairs. Score: {result['score']:.2f}")
        print("="*50)
    
    # Selection of two best performing programs
    best_program_files = get_best_programs(evaluation_results, task['data'], i+1, n=2)
    # Generate predictions for all test inputs.
    submission = generate_test_predictions(task['data'], actual_task_id, best_program_files)
    print(submission)

    # Add to the final submission dictionary
    final_submission.update(submission)
    
# Create a JSON file of the predictions
with open("submission.json", "w") as f:
    json.dump(final_submission, f)


Built tailored prompt.
Saved program for task 1 as version 1: Candidate_programs\task_1\solution_v1.py
Saved program for task 1 as version 2: Candidate_programs\task_1\solution_v2.py
Revising solution_v1.py...
Saved program for task 1 as version 3: Candidate_programs\task_1\solution_v3.py
Revising solution_v2.py...
Saved program for task 1 as version 4: Candidate_programs\task_1\solution_v4.py
Task 0607ce86 (0) evaluation results:
Program solution_v1.py solved 0 out of 3 pairs. Score: 0.00
Program solution_v2.py solved 0 out of 3 pairs. Score: 0.00
Program solution_v3.py solved 0 out of 3 pairs. Score: 0.00
Program solution_v4.py solved 0 out of 3 pairs. Score: 0.00
Best program: Candidate_programs\task_1\solution_v1.py
Best program: Candidate_programs\task_1\solution_v2.py
{'0607ce86': [{'attempt_1': [[3, 3, 3, 8, 3, 3, 0, 3, 3, 8, 3, 3, 0, 3, 3, 8, 3, 3, 0, 3, 0, 3], [3, 3, 3, 8, 3, 3, 0, 3, 3, 8, 3, 3, 0, 3, 3, 8, 3, 3, 0, 3, 0, 3], [3, 3, 3, 8, 3, 3, 0, 3, 3, 8, 3, 3, 0, 3, 3, 8, 3