## The ArcAGI Challenge

This notebook establishes an agentic workflow for an LLM to interrogate an ARC challenge's data, so self reflect on responses and finally, propose a predicted output for the challenge. 

This is done by adapting Microsoft Research's agentic framework, Autogen, with access to a Jupyter Notebook. In this implementaiton the agent is permitted to use code to access and view data and to resolve the challenge itself. This is to test whether access to a symbolic logic tool, such as code, enables the LLM to score more highly on the ARC challenges.

The code allows for the agent to loop over a number of the challenges in a stateless fashion. This means the agent has no memory, so it cannot learn techniques as it progresses over the challenges.

Visit the challenge at:

    https://arcprize.org/guide

Clone the data to local pc with git:

    ```bash
    cd myfolder/subfolder
    git clone https://github.com/fchollet/ARC-AGI.git
    ```



### The Agentic AI Approach

1. Extend Microsoft Autogen with ability to code in a stateful Jupyter Notebook for coding python
    - The child Notebook will have its own kernel, but use the same python environment (inc same packages) as this parent notebook
    - The notebook records the agent's comments (aka reasoning) as markdown cells, their code as code cells and the code outputs as output fields.
    - This growing notebook will form the context window for each new call to the Data Scientist
        - We will use approx 150k input tokens and 6k output tokens per challenge question, which is approx $0.55 on Claude 3.5 Sonnet.

2. Functions to access the challenge data
    - Straightforward helper functions

3. Functions to set up a two agent conversation in Autogen:
    - First agent is a data scientist given the task to inspect the data and write python code to represent the mapping from input to output
    - Data Scientist role is played by Claude 3.5 Sonnet
    - Second agent is the child notebook, it is not an LLM. This agent simply executes the code and returns output to the notebook.
    - The session between the data scientist and notebook kicks off with a detailed description of the task, which is submitted to the data scientist.

4. Functions to manage the execution and record results

5. Main - Execute the challenge
    - Simply loop over all challenge questions, opening a new notebook for each and recording the outcome
    - Each challenge question is limited to just 15 conversation turns. 
    - This 'early stopping' prevents wasting tokens, rarely do agents solve a challenge they are repeatedly struggling with.




### Other Variations Attempted But No Longer Used

1. Same as above but GPT-4o as Data Scientist. 
    - Scored poorly (<10%), Claude 3.5 Sonnet appears better suited to this task
2. Same as above, but having inspected the data, the Data Scientist is invited to simply 'estimate' the output grid, not produce code to calculate it
    - This is very token efficient, saving all code production, however success rate halves.
3. Team chat.
    - Initially a team comprised of Data Scientist (Claude 3.5 Sonnet), Critic (GPT-4o), Chat Manager (GPT-4o) and Notebook (not LLM) was attempted. 
    - Critic was intended to enable self reflection, however, this was verbose and expensive whilst adding only small increase in performance
    - In a multi agent chat each agent receives the full notebook as their context, this arrangement had three LLM's so used triple the tokens as the above approach. 
    - As an aside, it was discovered that only GPT-4o or GPT-4 can act as effective chat managers, Claude 3.5 Sonnet cannot.


### PHASE 1: Extending Autogen with Access to a Stateful Jupyter Notebook

I developed this for Autogen back in Nov 2023 and in 2024 the Microsoft team kindly included a Jupyter notebook executor in the standard distribution. However, the originally developed solution, as used here, is a little more convenient and robust for this task. 

Therefore, we take the time to adapt the Autogen classes with our own Notebook executor.

In [1]:
# package for configuration management (API keys)
from dotenv import load_dotenv, find_dotenv

# packages for accessing data and managing the challenge
import os
import datetime
import json
import pprint 
import random
import numpy as np

# packages suggested to the agents
import pandas as pd
import shapely
import skimage

In [2]:
from typing import List, Dict, Any
from autogen import code_utils, config_list_from_json, Agent, AssistantAgent
from __future__ import annotations
from pydantic import BaseModel
from typing import Any, Dict, List, Literal, Optional

# Build compatibility with future versions of AutoGen
# See https://github.com/microsoft/autogen/pull/1405/files

class CodeBlock(BaseModel):
    """A class that represents a code block."""

    """The code to execute."""
    code: str

    """The language of the code."""
    language: str


class CodeResult(BaseModel):
    """A class that represents the result of a code execution."""

    """The exit code of the code execution."""
    exit_code: int

    """The output of the code execution."""
    output: str


In [3]:
from __future__ import annotations
from re import escape, search, sub, DOTALL
from queue import Empty
from typing import List, Union
from pydantic import BaseModel, Field
from autogen.code_utils import extract_code
from nbformat import write
from nbformat.v4 import new_notebook, new_code_cell, new_output, new_markdown_cell
# from IPython.core.interactiveshell import InteractiveShell
from jupyter_client import KernelManager
from autogen.code_utils import DEFAULT_TIMEOUT

CODE_BLOCK_IDENTIFIER= "```"

# Override the default timeout for code execution
DEFAULT_TIMEOUT = 600

# Note, this class should inherit from Pydantic's BaseModel, but making life easy for now...
class NotebookCodeExecutor(object):
    """A code executor class that executes code statefully using a IPython kernel 
    operating with a Jupyter Notebook
    Each execution is stateful and can access variables created from previous
    executions in the same session.
    """

    class UserCapability:
        """An AgentCapability class that gives agent ability use a Jupyter Notebook
        code executor."""

        DEFAULT_SYSTEM_MESSAGE_UPDATE = """You have been given coding capability
to solve tasks using Python code in a stateful Jupyter Notebook
When you write Python code, put the code in a block with the language set to Python.
For example:
"""+CODE_BLOCK_IDENTIFIER+"""python
x = 3
print(x)
"""+CODE_BLOCK_IDENTIFIER+"""

## Working with Jupyter Notebooks

The code will be executed in a Jupyter Notebook, and the output will be returned to you.
You can use variables created earlier in the subsequent code blocks.
NEVER present your code in json format.
If an error cannot be fixed or if the task is not solved even after the code is executed 
successfully, then analyze the problem, revisit your assumption, 
then pause to think of a different approach for solving the task.

## Handling Charts

When your code plots a chart then your chart will be presented in the notebook.
BUT, charts presented in the notebook are inaccessible to you, you cannot view them.
Therefore, prefer numerical methods over visuals for algorithm evaluation and optimisation.

"""

        def add_to_agent(self, agent):
            """Add this capability to an agent."""
            agent.update_system_message(agent.system_message + self.DEFAULT_SYSTEM_MESSAGE_UPDATE)

    # default class variables
    timeout = DEFAULT_TIMEOUT
    kernel = "python3"
    output_dir= "notebooks"

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        # establish kernel
        #self._shell = InteractiveShell.instance()
        self._kernel_manager = KernelManager(kernel_name=self.kernel)
        self._kernel_manager.start_kernel()
        self._kernel_client = self._kernel_manager.client()
        self._kernel_client.start_channels()
        self._timeout = self.timeout

        # establish notebook
        self._nb = new_notebook()

    # The notebook is useful as a public property, 
    # we can then inspect and modify the notebook as we wish
    # in fact, we do so in the following example where we ...
    # a) prefix the notebook with the team's task description in a markdown cell.
    # b) we also want to disable warnings, so manually append a code cell which executes automatically
    @property
    def nb(self):
        """Returns the notebook for inspection"""
        return self._nb

    def nb_append_markdown(self, text: str) -> str:
        """Users may choose to append comments in a markdown cell
        For example, the notebook makes more sense when prefixed with the task description
        Args:
            text (str): The text to append to the notebook as a markdown cell
        """
        self._nb.cells.append(new_markdown_cell(text))
        return 'markdown cell appended'

    def nb_append_code(self, code: str) -> str:
        """Users may choose to append executable code in a code cell
        For example, to disable warnings before the project starts, as warnings consume tokens
        Args:
            code (str): The code to execute in a code cell
        """
        # Append the code block to the notebook     
        code_block = CodeBlock(code=code, language="python")

        # execute the cell
        result = self.execute_code_blocks([code_block])

        # print result to user
        return result
    
    @property
    def user_capability(self) -> NotebookCodeExecutor.UserCapability:
        """Export a user capability that can be added to an agent."""
        return NotebookCodeExecutor.UserCapability()

    def extract_code_blocks(self, message: str) -> List[CodeBlock]:
        """Extract code blocks from a message.
        Args:
            message (str): The message to extract code blocks from.
        Returns:
            List[CodeBlock]: The extracted code blocks.
        """
        code_blocks = []
        for lang, code in extract_code(message):
            code_blocks.append(CodeBlock(code=code, language=lang))
        return code_blocks

    def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CodeResult:
        """For each code block, we will append it to the notebook as a cell
            execute it then return the result.
        Args:
            code_blocks (List[CodeBlock]): The code blocks to execute.
        Returns:
            CodeResult: The result of the code execution.
        """
        self._kernel_client.wait_for_ready(timeout=self._timeout)
        outputs = []
        for code_block in code_blocks:

            # Ensure any mention of "!pip install" has the "-qqq" flag added
            # this makes the pip install silent, which is important for the LLM
            code = self._process_code(code_block.code)

            # Append the code block to the notebook     
            code_cell = new_code_cell(code)
            self._nb.cells.append(code_cell)

            # the cell we want to execute is now the final cell in the notebook
            cell = self._nb.cells[-1]

            # execute the cell
            if cell.cell_type == 'code':
                self._kernel_client.execute(cell.source, allow_stdin=False)
                cell.outputs = []

                # capture the result in the notebook
                while True:
                    try:
                        msg = self._kernel_client.get_iopub_msg(timeout=self.timeout)
                        msg_type = msg['msg_type']
                        content = msg['content']

                        if msg_type in ['execute_result', 'display_data']:
                            cell.outputs.append(new_output(msg_type, data=content['data']))
                        elif msg_type == 'stream':
                            cell.outputs.append(new_output(msg_type, name=content['name'], text=content['text']))
                        elif msg_type == 'error':
                            cell.outputs.append(new_output(msg_type, ename=content['ename'], evalue=content['evalue'], traceback=content['traceback']))

                        if msg_type == 'status' and content['execution_state'] == 'idle':
                            break
                    # handle time outs.
                    except Empty:
                        return CodeResult(
                            exit_code=1,
                            output=f"ERROR: Timeout waiting for output from code block: {cell.source}",
                        )
                    except Exception as e:
                        return CodeResult(exit_code=1, output=f"ERROR: {e}")

                # we return images for display in the groupchat as a note, not the full image. The full image is kept in the Notebook only (see above)
                # This is because the image is a lot of tokens, wastes money sending it to the LLM team.
                modified_outputs = []
                for output in cell.outputs:

                    # determine whether output contains an image
                    if output['output_type'] in ['execute_result', 'display_data']:
                        output_is_image = any(key.startswith('image/') for key in output['data'])
                    else:
                        output_is_image = False

                    # if it does contain an image, replace the image with a note for the returned value
                    if output_is_image:
                        modified_outputs.append("Charts are good practice but not visible. If using a chart to decide upon your next step, use a numerical method instead ")
                    else:
                        modified_outputs.append(output)  # Keep other outputs unchanged
            else:
                return CodeResult(
                        exit_code=1,
                        output=f"ERROR: Attempted to execute a non-code cell: {cell.source}"
                        )

            modified_outputs_joined = "\n".join([str(modified_output) for modified_output in modified_outputs])
            outputs.append(modified_outputs_joined)

        return CodeResult(exit_code=0, output="\n".join([str(output) for output in outputs]))

    def save_notebook(self, file_path: str) -> str:
        """
        Saves the current notebook to the specified folder and filename.
        Intended to be used when groupchat has completed, user is expected to save the notebook to their local machine.

        Args:
            file_path: The file path (inc file name) where the notebook is located.
        Returns:
            A status message indicating success or failure.
        """

        try:
            with open(file_path, 'w', encoding='utf-8') as f:
                write(self._nb, f)
            return "Notebook saved successfully."
        except Exception as e:
            return f"Error saving notebook: {str(e)}"

    def restart(self) -> None:
        """Restart a new session."""
        self._kernel_client.stop_channels()
        self._kernel_manager.shutdown_kernel()
        self._kernel_manager = KernelManager(kernel_name=self.kernel)
        self._kernel_manager.start_kernel()
        self._kernel_client = self._kernel_manager.client()
        self._kernel_client.start_channels()
        # print result to user
        print(f"Notebook kernel has been restarted, kernel name={self.kernel}")

    def shutdown(self) -> None:
        """Shutdown the notebook"""
        self._kernel_client.stop_channels()
        self._kernel_manager.shutdown_kernel()
        # print result to user
        print("Notebook kernel has been shutdown")

    def _process_code(self, code: str) -> str:
        """Process code before execution."""
        # Find lines that start with `! pip install` and make sure "-qqq" flag is added.
        lines = code.split("\n")
        for i, line in enumerate(lines):
            # use regex to find lines that start with `! pip install` or `!pip install`.
            match = search(r"^! ?pip install", line)
            if match is not None:
                if "-qqq" not in line:
                    lines[i] = line.replace(match.group(0), match.group(0) + " -qqq")
        return "\n".join(lines)

In [4]:
class CodeExecutorFactory:
    """A factory class for creating code executors."""

    @staticmethod
    def create(code_execution_config: Dict) -> NotebookCodeExecutor:
        """Get a code executor based on the code execution config."""
        executor_name = code_execution_config.get("executor")
        if executor_name == "notebook":
            return NotebookCodeExecutor(**code_execution_config.get("notebook", {}))
        else:
            raise ValueError(f"Unknown code executor {executor_name}")

In [5]:
from nbformat.v4 import new_markdown_cell
from autogen.code_utils import UNKNOWN

def _generate_code_execution_reply_using_executor(
    self,
    messages: Optional[List[Dict]] = None,
    sender: Optional[Agent] = None,
    config: Optional[Union[Dict, Literal[False]]] = None,
    ):

    """Generate a reply using code executor.

    Processes messages, performs notebook operations and execute code therein, based on the extracted intent.

    This function iterates through a specified number of recent messages, extracts any code blocks and text content, 
    and appends bith comments and code to a Jupyter notebook. 

    The method first checks the configuration for code execution. If disabled, it immediately returns without processing. 
    For each groupchat message it extracts any embedded code blocks and text content. 
    Note, a single groupchat message may contain multiple code blocks and text content.
    The text is grouped together and appended to the notebook. The code is appended and executed in as many chunks as it is provided. 

    The function returns a boolean indicating whether any notebook operation was performed and a message detailing 
    the outcome of the operation, such as successful execution of cells or error.

    Args:
        messages: list of message dicts (aka a groupchat), where each message contains content that may be code and/or text.
        sender  : The sender of the messages.
        config  : Configuration options for code execution
    Returns
        (bool), (bool) : is final message?, reply content

    """

    code_execution_config = config if config is not None else self._code_execution_config
    if code_execution_config is False:
        return False, None
    if messages is None:
        messages = self._oai_messages[sender]
    last_n_messages = code_execution_config.get("last_n_messages", "auto")

    if not (isinstance(last_n_messages, (int, float)) and last_n_messages >= 0) and last_n_messages != "auto":
        raise ValueError("last_n_messages must be either a non-negative integer, or the string 'auto'.")

    messages_to_scan = last_n_messages
    if last_n_messages == "auto":
        # Find when the agent last spoke
        messages_to_scan = 0
        for i in range(len(messages)):
            message = messages[-(i + 1)]
            if "role" not in message:
                break
            elif message["role"] != "user":
                break
            else:
                messages_to_scan += 1

    # iterate through the last n messages in reverse
    # if code blocks are found, execute the code blocks and return the output
    # if no code blocks are found, continue
    for i in range(min(len(messages), messages_to_scan)):
        message = messages[-(i + 1)]
        if not message["content"]:
            continue
        
        # identify code blocks in the message
        code_blocks = self._code_executor.extract_code_blocks(message["content"])
        if len(code_blocks) == 1 and code_blocks[0].language == UNKNOWN:
            continue

        # Retain agent comments as markdown cells
        # This helps the notebook to explain the agent's reasoning behind the code
        # The text content for a markdown cell is the message content with the code blocks removed  
        pattern = rf"(?<!\\){CODE_BLOCK_IDENTIFIER}.*?(?<!\\){CODE_BLOCK_IDENTIFIER}"

        # Replace code blocks with a break
        text_content = sub(pattern, "<br>", message["content"], flags=DOTALL).strip()

        # if text content is provided then append it to notebook as a markdown cell
        # Note, we do this before appending or executing any code cells
        if len(text_content)>0:
            text_cell = new_markdown_cell(text_content)
            self._code_executor._nb.cells.append(text_cell)
        
        # we don't need to execute the markdown cell
        # however, we do need to append the code blocks as code cells, execute them and gather outputs
        code_result = self._code_executor.execute_code_blocks(code_blocks)
        exitcode2str = "execution succeeded" if code_result.exit_code == 0 else "execution failed"
        return True, f"exitcode: {code_result.exit_code} ({exitcode2str})\nCode output: {code_result.output}"

    return False, None

In [6]:
# We are using the current version of autogen's ConversableAgent class
# but wish to replicate a future version which has the _generate_code_execution_reply_using_executor method
from autogen import ConversableAgent
ConversableAgent._generate_code_execution_reply_using_executor = _generate_code_execution_reply_using_executor


In [7]:
# Now we create the NotebookAgent class which will execute code in a notebook on our behalf, within the AutoGen framework
from autogen import UserProxyAgent

class NotebookAgent(UserProxyAgent):

    def __init__(self, 
                 name,
                 system_message,
                 code_execution_config, 
                 function_map = None):

        super().__init__(
            name            = name,
            system_message  = system_message,
            llm_config      = False,
            human_input_mode= "NEVER",
            is_termination_msg=lambda msg: "TERMINATE" in msg.get("content"),
            function_map    = function_map,
            max_consecutive_auto_reply=10,
            )

        self._code_execution_config = code_execution_config

        # Create a code executor based on the code execution config
        self._code_executor = CodeExecutorFactory.create(self._code_execution_config)

        # Append the code executor capability to this agent's system prompt
        self._code_executor.user_capability.add_to_agent(self)

        # Ensure code executor responses (i.e. output from code execution) reach the conversation
        self.register_reply(ConversableAgent, 
                            ConversableAgent._generate_code_execution_reply_using_executor)

    @property
    def code_executor(self) -> NotebookCodeExecutor:
        """The code executor used by this agent. Raise if code execution is disabled."""
        if not hasattr(self, "_code_executor"):
            raise ValueError(
                "No code executor as code execution is disabled. "
                "To enable code execution, set code_execution_config."
            )
        return self._code_executor


### PHASE 2: Helper Functions to Access Data

Source is at: https://github.com/fchollet/ARC-AGI/tree/master/data/evaluation

Examples of these JSON files have been downloaded and can be found in the folder '/data'

In [8]:

# Data access functions

def load_data(file_path: str) -> Dict[str, Any]:
    with open(file_path, 'r') as file:
        return 

def get_train(file_path: str, pair=0, print_to_screen=True):
    with open(file_path, 'r') as file:
        dataset = json.load(file)['train']

    train_len = len(dataset)

    if pair < 0 :
        pair = 0
    else:
        pair = min(train_len,pair)

    array_input  = np.array(dataset[pair]['input'])
    array_output = np.array(dataset[pair]['output'])

    if print_to_screen:
        print(f"There are {train_len} pairs of input and output in the training dataset.")
        print(f"Pair {pair}")
        print(f"INPUT. Shape={array_input.shape}")
        pprint.pprint(array_input)
        print(f"OUTPUT. Shape={array_output.shape}")
        pprint.pprint(array_output)

    return array_input, array_output

def get_train_all(file_path: str):

    with open(file_path, 'r') as file:
        dataset = json.load(file)['train']

    train_len = len(dataset)

    for pair in range(train_len):

        array_input  = np.array(dataset[pair]['input'])
        array_output = np.array(dataset[pair]['output'])

        if pair == 0:
            train_all = f"There are {train_len} examples in the file. \n Pair {pair}\n INPUT. Shape={array_input.shape}\n {array_input} \n OUTPUT. Shape={array_output.shape} \n {array_output}"
        else:
            train_all = f"{train_all} \n Pair {pair}\n INPUT. Shape={array_input.shape}\n {array_input} \n OUTPUT. Shape={array_output.shape} \n {array_output}"    

    return train_all

def get_test(file_path: str, print_to_screen=True, testtype='input'):
    """
    datatype can be input or output 
    """
    with open(file_path, 'r') as file:
        dataset = json.load(file)

    grid_test = np.array(dataset['test'][0][testtype])

    if print_to_screen:
        print(f"TEST {testtype}. Shape={grid_test.shape}")
        pprint.pprint(grid_test)

    return grid_test


### PHASE 3: Helper Functions to Establish Autogen Conversation With A Notebook

We need functions to:
1. Seed the notebook with code the agents can use to access challenge data
2. Establish a task, notebook agent and data scientist agent

In [9]:
# The folloing texts will be code cells in the notebook
# since the notebook is stateful, the described functions will be available to the agent

def notebook_setup(filename:str, train_or_eval:str):

    # Disable version warnings in the notebook, we pay for the tokens and don't want to waste them on warnings
    # exit code=0 means success, exit code=1 means failure
    disable_warnings_code = """
import warnings

# Filter out FutureWarning
warnings.simplefilter(action='ignore', category=FutureWarning)
    """

    # functions to load data
    load_data_function = """

import json
import pprint
import numpy as np
from typing import List, Dict, Any

# Ensure the large matrices print without carriage returns.
np.set_printoptions(linewidth=150)

def get_train(file_path: str, pair=0, print_to_screen=True):
    with open(file_path, 'r') as file:
        dataset = json.load(file)['train']

    train_len = len(dataset)

    if pair < 0 :
        pair = 0
    else:
        pair = min(train_len,pair)

    array_input  = np.array(dataset[pair]['input'])
    array_output = np.array(dataset[pair]['output'])

    if print_to_screen:
        print(f"Pair {pair}")
        print(f"INPUT. Shape={array_input.shape}")
        pprint.pprint(array_input)
        print(f"OUTPUT. Shape={array_output.shape}")
        pprint.pprint(array_output)

    return array_input, array_output

def get_train_all(file_path: str):

    with open(file_path, 'r') as file:
        dataset = json.load(file)['train']

    train_len = len(dataset)

    for pair in range(train_len):

        array_input  = np.array(dataset[pair]['input'])
        array_output = np.array(dataset[pair]['output'])

        print(f"Pair {pair}")
        print(f"INPUT. Shape={array_input.shape}")
        pprint.pprint(array_input)
        print(f"OUTPUT. Shape={array_output.shape}")
        pprint.pprint(array_output)

    return

def get_test(file_path: str, print_to_screen=True):

    # datatype can be input or output 

    with open(file_path, 'r') as file:
        dataset = json.load(file)

    grid_test = np.array(dataset['test'][0]['input'])

    if print_to_screen:
        print(f"TEST Input. Shape={grid_test.shape}")
        pprint.pprint(grid_test)

    return grid_test

test_counter = 0

def test_outcome(file_path: str, test_prediction):

    global test_counter

    # Load the dataset from the JSON file
    with open(file_path, 'r') as file:
        dataset = json.load(file)

    # Extract the actual test output from the dataset
    test_actual = np.array(dataset['test'][0]['output'])

    # Compare test_prediction with test_actual, if we have used 3 or less tries
    if test_counter >= 3:
        print(f"You have no more tries, save your prediction and state the termination word.")
        test_outcome = None
    else:
        print(f"Prediction was {np.array_equal(test_prediction, test_actual)}")
        test_outcome = np.array_equal(test_prediction, test_actual)
    
    test_counter += 1

    return test_outcome

    """

    load_data = f"""

# load challenge data from the file
import os

# set working directory
os.chdir("/home/oliver/Documents/LangChain/ProductDevelopment/AutoGen/ArcAGI")
cwd = os.getcwd()
print(cwd)

# get data file
file_path = 'data/{train_or_eval}/{filename}'

# print data into the notebook, for sake of reference, AI agents have no access to these prefixed notebook cells
get_train_all(file_path)

    """

    return disable_warnings_code, load_data_function, load_data


In [10]:
## Task description
# The instructions for the agents to follow, this initiates all conversations.

def create_task(filename, train_len, train_or_eval, training_data):

    ## Task description
    task_description = f"""

# PROJECT INSTRUCTIONS

We are working on the ARC AGI challenge. This involves a series of json files, each of which contains a handful of pairs of grids. 
Each pair has an input and an output grid. Each grid is simple a numpy array of integers. 
Your task is to discover the single mapping which converts each input grid to its corresponding output grid and apply that to the test input, arriving at a test output.

## YOUR APPROACH

    1. Start by visualizing and analyzing each input-output pair carefully. 
    2. Look for consistent transformations across all training pairs. 
    3. Develop a hypothesis about the mapping logic and refine it as you examine more pairs. 
    4. Implement the hypothesized logic in Python. 
    5. Test the function(s) on all training pairs to verify accuracy. 
    6. Assume numpy, but also consider using image processing libraries like skimage for more complex transformations. 
    7. Break down complex transformations into simpler steps. 
    8. Use helper functions for repetitive tasks. 
    9. When ready, apply your function on the test input grid to predict the output grid.

## NOTE ON ACCESS TO TEST DATA

    You can access and view the final test grid as follows, assuming you wish to print_to_screen:
        input_test = get_test('data/{train_or_eval}/{filename}', print_to_screen=True)

    When you have successfully predicted the output grid for the test input grid, 
    save the numpy array as text, being careful to use this filepath:
        np.savetxt('predictions/{train_or_eval}/{filename}_output_test.txt', output_test, fmt='%d', delimiter=',')

## WHEN COMPLETE, END THE CONVERSATION

    When the project is complete, meaning the mapping has been tested AND proven on all training pairs AND a test output has been computed AND saved to file
    then you must end the conversation with the termination word.

### TRAINING DATA:

If you need to access any given pair, for example pair=0, then use this code:
    input_train0, output_train0 = get_train('data/{train_or_eval}/{filename}', pair=0, print_to_screen=True)

However, here it is presented to you without the need for code:

{training_data}

### MAPPINGS ARE COMBINATIONS OF TRANSFORMATIONS

    When building your hypotheses on the above mappings, try to propose combinations of the following transformations:

    Grid Expansion and Repetition (Tiling):
    - Simply expand the grid and repeat (tile) the input grid into the output grid
    Symmetry and Mirroring (flipping):
    - Horizontally or vertically
    Propagation of patterns:
    - Identify non-zero clusters or shapes in the input grid and propagating them in the output. Proceeding horizontally, vertically or diagonally.
    Mathematical Operations:
    - Incrementing values, taking modulo, or performing addition.
    Color/Value Substitution:
    - Values in the input grid replaced with different values in the output grid, often changing all instances of one number to another
    Shape Detection and Transformation:
    - Identifying geometric shapes in the input grid and applying transformations such as rotation, scaling, flipping, translation and/or overlapping.
    Grid Segmentation:
    - Divide the input grid into sections and apply transformations to each section.
    Boundary Detection and Fill:
    - Identify the boundaries of shapes or patterns and fill them with specific values. This sometimes involved propagating values from the edges inward.
    Connectivity-based Transformations:
    - Using connected component analysis to identify and transform groups of connected cells.
    Rule-based Transformations:
    - Applying specific rules based on the arrangement of values in the input grid. These rules often considered the neighboring cells of each position.
    Coordinate-based Transformations:
    - Using the coordinates of cells to determine how they should be transformed or moved in the output grid.
    When the pattern is more complex than originally assumed:
    - Review all training pairs again and try to describe the transformation in plain language
    - Do not leap immediately to code

Please proceed with developing your own hypotheses on the training data.

    """

    return task_description


In [11]:
## Notebook
# This is the executor who will be given ability to execute code in a Jupyter Notebook

def notebook_create():

    # Instantiate the Notebook
    notebook = NotebookAgent(
        name="Notebook",
        # We include a system message to explain the capabilities of the notebook to the GroupChatManager (if any) which is an LLM, whereas this agent is not an LLM
        system_message="""Notebook. You are a Jupyter Notebook
        When presented with python code wrapped in this delimiter '```' then you execute that code in a Jupyter Notebook and report the results back to team.
        """,
        code_execution_config={ "last_n_messages": 3, 
                                "work_dir"       : "coding",        
                                "use_docker"     : False,     # set to True or image name like "python:3" to use docker
                                "executor"       : "notebook" # this is crucial, ensures execution happens in a notebook
                                },
        #function_map = {"myfunction": myfunction_json}
    )

    return notebook


In [12]:
## Scientist
# This is the agent who will propose the code which the notebook executes

def datascientist_create(model_name):

    # filter LLM API keys for the desired model
    datascientist_config_list= config_list_from_json(
        "MODEL_CONFIG_LIST",
        filter_dict={
            "model": {model_name} 
        }
    )

    # prep configuration for the coding agent to use the selected LLM and its API key
    datascientist = AssistantAgent(
        name="Helpful assistant",
        llm_config={"seed"            : 42,  # change the seed for different trials
                    "temperature"     : 0,   # 0 uses most likely token every time, highly repeatable. 1 is more creative.
                    "config_list"     : datascientist_config_list,
                    "max_retries"     : 5
                    #"request_timeout" : 5*60,
                    },
        human_input_mode="NEVER",
        is_termination_msg=lambda msg: "TERMINATE" in msg.get("content"),#lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"), 
        system_message="""# DATA SCIENTIST

You have special observational and hypothecation abilities to propose rules which govern mapping patterns in data.
You have succinct coding capability to solve data mapping tasks using Python code in a stateful IPython kernel.
You are the only team member responsible for writing code, however, the code_executor_agent is responsible for executing the code.
Present only one block of code at a time. Allow yourself to see output from executed before presenting the next block of code.

## BEST PRACTICES FOR CODING

* Present only one block of code at a time. **Do not present multiple blocks of code in a single message.**
Allow yourself to see output from executed before presenting the next block of code.
Code should be written incrementally, and you should leverage the statefulness of the kernel to avoid repeating code.

If you code then it MUST be presented in a code block wrapped in three backticks, '```' with the language set to Python.
For example:
```python
x = 3
```

## WHEN A PROJECT IS COMPLETE

When a project is complete, as specified in the project instructions, then you must say 'TERMINATE', which is the termination word.
""",
    )

    return datascientist

### Functions to Manage the Project

Primarily for recording results and saving the notebook generated by the agents.


In [13]:
# Keep track of the results in a table on file
def outcome_spreadsheet(train_or_eval, filename, outcome, date_time_end_label, model_name, est_method, outcomes_path='arcagi_outcomes.xlsx'):
    """
    Update or create an Excel spreadsheet with the results of each model run.

    Args:
        train_or_eval (str): Whether the file belongs to the training or evaluation dataset
        filename (str): Name of the ARC challenge file being processed.
        outcome (str or bool): The outcome of the model run.
        date_time_end_label (str): Timestamp of when the model run ended (in text format).
        model_name (str): Name of the LLM used.
        est_method (str): Method used by the Data Scientist to predict the output grid ('estimation' or 'coding')
        outcomes_path (str, optional): Path to the Excel file. Defaults to 'arcagi_outcomes.xlsx'.

    Returns:
        None
    """

    # Define the file path
    outcomes_path = 'arcagi_outcomes.xlsx'

    # New data to append
    new_data = pd.DataFrame({
        'dataset': [train_or_eval],
        'file'   : [filename],
        'outcome': [outcome],
        'time'   : [date_time_end_label],
        'model'  : [model_name],
        'est_method': [est_method],
    })

    # Check if the file exists
    if os.path.exists(outcomes_path):

        print('Arranging updates for spreadsheet')

        # Load existing data
        existing_data = pd.read_excel(outcomes_path)

        # Append new data
        combined_data = pd.concat([existing_data, new_data], ignore_index=True)
    else:
        print('Arranging first data for spreadsheet')

        # If file doesn't exist, use only the new data
        combined_data = new_data

    # Save the combined data to Excel
    print('Updating spreadsheet:', outcomes_path)
    combined_data.to_excel(outcomes_path, index=False)

    return

In [14]:

def notebook_outcome_stats(notebook, filename, file_path, train_or_eval, date_time_start, cwd, model_name, est_method):
    """
    Calculate and record statistics for a notebook execution, including duration, prediction accuracy, and outcome.

    Args:
        notebook (object): The notebook object.
        filename (str): Name of the file being processed.
        file_path (str): Path to the file (inc filename)
        train_or_eval (str): Indicates whether this is a training or evaluation run.
        date_time_start (datetime): Start time of the notebook execution.
        cwd (str): Current working directory.
        model_name (str): Name of the LLM being used.
        est_method (str): Estimation method used, either 'estimation' or 'coding'

    Returns:
        tuple: A tuple containing:
            - str: The outcome of the prediction ('True', 'False', or 'Incomplete').
            - datetime: The end time of the notebook execution.
            - str: A formatted string describing the duration of execution.
    """
    # Record time finished
    date_time_end = datetime.datetime.now()
    date_time_end_label = date_time_end.strftime("%Y-%m-%d %H:%M:%S")
    duration = date_time_end - date_time_start

    # Extract hours, minutes, and seconds
    hours, remainder = divmod(duration.seconds, 3600)
    minutes, seconds = divmod(remainder, 60)
    duration_txt     = f"Duration of notebook execution\n {hours} hours, {minutes} minutes, {seconds} seconds"

    # Record duration in the notebook itself
    notebook.code_executor.nb_append_markdown(f"### HUMAN: {duration_txt}")

    # Check the results
    actual_test = get_test(file_path, testtype='output', print_to_screen=False)
    predicted_test_file = os.path.join(cwd, 'predictions', train_or_eval, f"{filename}_output_test.txt")
    
    if os.path.exists(predicted_test_file):
        predicted_test = np.loadtxt(predicted_test_file, delimiter=',', dtype=int)

        if actual_test.shape != predicted_test.shape:
            print(f"Warning: Shape mismatch. actual_test shape: {actual_test.shape}, predicted_test shape: {predicted_test.shape}")
            outcome = False
        else:
            outcome = (actual_test == predicted_test).all()
    else:
        print(f"Warning: Predicted test file {predicted_test_file} not found.")
        outcome = "Incomplete"

    # Save outcome to the notebook itself
    notebook.code_executor.nb_append_markdown("### HUMAN: End of Cells by AI Agents\n Let's test the results of their prediction.")
    notebook.code_executor.nb_append_code(f"print('Prediction is accurate? ', {outcome})")

    # Save the notebook for later inspection
    # Creates unique filename so this notebook can be repeatedly executed and team solutions compared
    filename_nb = f"{filename}_{train_or_eval}_{date_time_end_label}_{outcome}.ipynb"
    notebook.code_executor.save_notebook(file_path=os.path.join(cwd, 'predictions', train_or_eval, filename_nb))

    # Record stats to spreadsheet
    outcome_spreadsheet(train_or_eval, filename, outcome, date_time_end_label, model_name, est_method)

    # print outcome stats
    print(f"Duration : {duration_txt}")
    print(f"Prediction is accurate? : {outcome}\n")

    return str(outcome), date_time_end, duration_txt

In [15]:

def count_files(folder_path):
    """
    Recursively count the number of files in a given folder and its subfolders.

    Args:
        folder_path (str): The path to the folder whose files are to be counted.

    Returns:
        int: The total number of files found in the folder and its subfolders.
    """
    file_count = 0
    for root, dirs, files in os.walk(folder_path):
        file_count += len(files)
    return file_count

def get_files(folder_path, num_files, sort_method):
    """
    Retrieve files from a folder in alphabetical or random order.
    
    Args:
    folder_path (str): Path to the folder containing the files.
    num_files (int, optional): Number of files to retrieve. If None, retrieves all files.
    sort_method (str, optional): Method to sort the files. Can be 'alphabetical' or 'random'. 
                            Defaults to 'alphabetical'.
    
    Returns:
    list: List of file names sorted according to the specified method.
    """
    # Get all files in the folder
    all_files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]
    
    if sort_method == 'alphabetical':
        # Sort the files alphabetically
        all_files.sort()
    elif sort_method == 'random':
        # Shuffle the files randomly
        random.shuffle(all_files)
    # else do nothing, return the files as they are

    # Return the specified number of files or all if num_files is None
    return all_files[:num_files] if num_files is not None else all_files

### PHASE 5: Main - Executing the Project

Loads the data and loops the agent conversation over each ARC challenge file.
A new notebook is started for each challenge.

In [19]:
## MAIN

# Set working directory and environment keys
os.chdir("/home/oliver/Documents/LangChain/ProductDevelopment/AutoGen/ArcAGI")
cwd = os.getcwd()

# read local .env file for LLM API keys
_ = load_dotenv(find_dotenv(usecwd=True))
oai_config_value = os.environ.get('MODEL_CONFIG_LIST')

# How many examples to work thru? 
num_files = 20

# What LLM will we use for the Data Scientist?
model_name = "claude-3-5-sonnet-20240620" # "claude-3-5-sonnet-20240620", "gpt-4o", "gpt-4-1106-preview" "gpt-4o-mini"

# get the challenge files to work on
train_or_eval = 'training' # training evaluation
source_folder = os.path.join('data', train_or_eval)
file_qty      = count_files(source_folder)
filenames     = get_files(source_folder, num_files, sort_method='alphabetical')
print(f"Number of files in the folder: {file_qty}")

# Loop through the challenge files
for filename in filenames:
    
    print(f"Next file is {filename}.")

    # set path
    file_path = os.path.join(source_folder, filename)

    training_data = get_train_all(file_path)

    # discover qty of examples to train on
    with open(file_path, 'r') as file:
        dataset = json.load(file)['train']
    train_len = len(dataset)

    print(f"Training length is {train_len}.")

    # Create the notebook agent, who executes the code
    notebook = notebook_create()

    # Seed the notebook with useful functions which will be available to the Data Scientist agent in the same environment
    # Also seed comments, these are not passed to the LLM.
    disable_warnings_code, load_data_function, load_data = notebook_setup(filename, train_or_eval)
    notebook.code_executor.nb_append_markdown(f"## Automated Arc AGI Jupyter Notebook for {filename}\n")
    notebook.code_executor.nb_append_markdown("### HUMAN: Ensure warnings are disabled")
    notebook.code_executor.nb_append_code(disable_warnings_code)
    notebook.code_executor.nb_append_markdown("### HUMAN: Create data access functions on behalf of the AI agent team\n")
    notebook.code_executor.nb_append_code(load_data_function)
    notebook.code_executor.nb_append_code(load_data)
    notebook.code_executor.nb_append_markdown("### AI AGENTS: All subsequent notebook entries are by the AI agent team\n")

    # Create the data scientist agent who proposes the code to be executed by the notebook
    datascientist = datascientist_create(model_name)

    # Create the task description
    task = create_task(filename, train_len, train_or_eval, training_data)

    # Start the timer
    date_time_start = datetime.datetime.now()
    print(f"Starting task {filename} at {date_time_start}\n")

    # Initiate the task
    datascientist.initiate_chat(
        recipient     = notebook, 
        message       = task,
        clear_history = True,
        max_turns     = 15,    # One turn means one conversation round trip. Early stopping prevents chat entering endless loop on problem it cannot solve whilst consuming tokens
        silent        = False  # Set to True to suppress output
    )

    # Record task outcome
    outcome, date_time_end, duration_txt = notebook_outcome_stats(notebook, filename, file_path, train_or_eval, date_time_start, cwd, model_name, est_method='coding')
    print(f"Ending task {filename}. Outcome was {outcome}. {duration_txt}. \n")
    print("------------------------------------------------------------------------\n")



Number of files in the folder: 403
Next file is 1c786137.json.
Training length is 3.
Starting task 1c786137.json at 2024-09-25 11:47:05.673372

[33mHelpful assistant[0m (to Notebook):



# PROJECT INSTRUCTIONS

We are working on the ARC AGI challenge. This involves a series of json files, each of which contains a handful of pairs of grids. 
Each pair has an input and an output grid. Each grid is simple a numpy array of integers. 
Your task is to discover the single mapping which converts each input grid to its corresponding output grid and apply that to the test input, arriving at a test output.

## YOUR APPROACH

    1. Start by visualizing and analyzing each input-output pair carefully. 
    2. Look for consistent transformations across all training pairs. 
    3. Develop a hypothesis about the mapping logic and refine it as you examine more pairs. 
    4. Implement the hypothesized logic in Python. 
    5. Test the function(s) on all training pairs to verify accuracy. 
    6. Assu