In [1]:
!pip install -U -q google-generativeai # Install the Python SDK
#!pip -q install groq

In [2]:
!pip install black

Collecting black
  Downloading black-24.4.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.1/77.1 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
Collecting packaging>=22.0 (from black)
  Downloading packaging-24.1-py3-none-any.whl.metadata (3.2 kB)
Collecting pathspec>=0.9.0 (from black)
  Downloading pathspec-0.12.1-py3-none-any.whl.metadata (21 kB)
Downloading black-24.4.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m50.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading packaging-24.1-py3-none-any.whl (53 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.0/54.0 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pathspec-0.12.1-py3-none-any.whl (31 kB)
Installing collected packages: pathspec, packaging, black
  Attempting uninstall: packaging
 

In [3]:
import pandas as pd
import numpy as np
import json
import os
import re
import google.generativeai as genai

## Define API clients

In [4]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()

In [5]:
import google.generativeai as genai

genai.configure(api_key = user_secrets.get_secret("GOOGLE_API_KEY"))

## What is the objective?

- Create an agent system that generates code to solve math problems
- Includes error corection (reflection)

## Create a list of agents

AGENTS
1. code_writer_agent
2. code_runner_agent
3. router_agent
4. final_answer_agent

## Helper functions

In [6]:
def extract_and_run_llm_code(raw_output):
    
    """
    Putting it all together:
    batcmd = 'timeout 7 ' + sys.executable + ' code.py' 
    constructs a command that runs code.py with the 
    currently executing Python interpreter, 
    but ensures that the execution is terminated if 
    it takes longer than 7 seconds. This command is a 
    safeguard against potentially long-running or frozen scripts, 
    ensuring that the overall application can recover and 
    continue executing subsequent instructions.
    """
    
    import sys
    import subprocess
    import autopep8

    # Extract the code block and remove the word python
    code = raw_output.split('```')[1][7:]

    # Split the multi-line string into individual lines
    code_lines_list = code.strip().split('\n')

    # Add the imports for numpy and scipy.
    # The model often misses imports
    #code_lines_list = ['import fitz', ''] + code_lines_list

    # Fixing more formatting issues
    cleaned_line_list = []

    for i, item in enumerate(code_lines_list):

        """
        if 'def ' in item:
            item = item.lstrip()
            item = item.strip()

        if 'print' in item:
            item = item.lstrip()
            item = item.strip()

        if 'result =' in item:
            item = item.lstrip()
            item = item.strip()
        """  

        cleaned_line_list.append(item)

    # Write the code to the file line by line.
    # This approach preserves the indentation of each line.
    # Without this approach will get indentation errors
    # when code.py is run.
    with open('code.py', 'w') as file:
        for line in cleaned_line_list:
            file.write(line + '\n')
    
    # Use autopep8 to automatically fix the code style, including indentation
    with open('code.py', 'r') as file:
        code_content = file.read()

    fixed_code = autopep8.fix_code(code_content)

    with open('code.py', 'w') as file:
        file.write(fixed_code)
    

    # Construct a command to execute 'code.py' with a timeout of 7 seconds
    batcmd = f'timeout 7 {sys.executable} code.py'

    # Execute the command and capture both stdout and stderr
    result = subprocess.run(batcmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Decode the output from bytes to a string
    shell_output = result.stdout.decode('utf8')
    error_output = result.stderr.decode('utf8')

    # Print or process the outputs
    if shell_output:
        print("Output:\n", shell_output)
        return {"status": "success", "output": shell_output}
    elif error_output:
        print("Error:\n", error_output)
        return {"status": "failed", "output": error_output}
    else:
        # Sometimes there's no output
        # If output is None
        print("Error: No output")
        return {"status": "failed", "output": "No output"}


In [7]:
def extract_and_run_llm_code(raw_output):
    """
    Putting it all together:
    batcmd = 'timeout 7 ' + sys.executable + ' code.py' 
    constructs a command that runs code.py with the 
    currently executing Python interpreter, 
    but ensures that the execution is terminated if 
    it takes longer than 7 seconds. This command is a 
    safeguard against potentially long-running or frozen scripts, 
    ensuring that the overall application can recover and 
    continue executing subsequent instructions.
    """

    import sys
    import subprocess
    import autopep8
    import black

    # Extract the code block and remove the word python
    code = raw_output.split('```')[1][7:]

    # Split the multi-line string into individual lines
    code_lines_list = code.strip().split('\n')

    # Fixing more formatting issues
    cleaned_line_list = []

    for i, item in enumerate(code_lines_list):

        if 'def ' in item:
            item = item.lstrip()
            item = item.strip()

        if 'print' in item:
            item = item.lstrip()
            item = item.strip()

        if 'result =' in item:
            item = item.lstrip()
            item = item.strip()

        cleaned_line_list.append(item)

    # Write the code to the file line by line.
    # This approach preserves the indentation of each line.
    # Without this approach will get indentation errors
    # when code.py is run.
    with open('code.py', 'w') as file:
        for line in cleaned_line_list:
            file.write(line + '\n')

    # Use autopep8 to automatically fix the code style, including indentation
    with open('code.py', 'r') as file:
        code_content = file.read()

    fixed_code = autopep8.fix_code(code_content)

    # Further format code with black
    try:
        fixed_code = black.format_str(fixed_code, mode=black.FileMode())
    except black.InvalidInput:
        pass  # Handle invalid input for black formatting

    with open('code.py', 'w') as file:
        file.write(fixed_code)

    # Construct a command to execute 'code.py' with a timeout of 7 seconds
    batcmd = f'timeout 7 {sys.executable} code.py'

    # Execute the command and capture both stdout and stderr
    result = subprocess.run(batcmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Decode the output from bytes to a string
    shell_output = result.stdout.decode('utf8')
    error_output = result.stderr.decode('utf8')

    # Print or process the outputs
    if shell_output:
        print("Code ran successfully.")
        print("Output:\n", shell_output)
        return {"status": "success", "output": shell_output}
    if error_output:
        print("Code failed to run.")
        print("Error:\n", error_output)
        return {"status": "failed", "output": error_output}


## System messages

In [8]:
code_writer_agent_system_message = """
You are an expert at writing python code to solve mathematical problems.
The user will give you a mathematical problem that you need to solve by writing python code.
You always write a plan. You always think step by step.

Output your response in markdown format, example: ```python code-block ```
You only output one code block.
When you send a response your code will be executed on the user's machine.
If there was an error in your code then the user will call you again with this:

Error: The error message


Example session:

User Message: Find the sum of the first 50 natural numbers.

Your Output:

```python
# Define the first term (a) and the number of terms (n)
a = 1
n = 50

# Calculate the sum of the arithmetic series using the formula: S = n/2 * (2a + (n-1)d)
# For natural numbers, a = 1, d = 1
S = n * (n + 1) // 2

# Print the result
print(f"The sum of the first {n} natural numbers is: {S}")

# For easier output parsing
print("###")
print(S)
print("###")
```
""".strip()

## Set up the llm

In [9]:
def make_llm_api_call(message):

    """
    Makes a call to the Llama3 model on Groq.
    Args:
        message_history (List of dicts): The message history
    Returns:
        response_text: (str): The text response from the LLM
    """

    response = chat.send_message(message)
    
    text_response = response.text

    return text_response


# Example

model = genai.GenerativeModel(
    "models/gemini-1.5-flash",
    system_instruction="Your name is Molly. You are a helpful assistant.",
)

chat = model.start_chat()

user_message = "What's your name?"

response = make_llm_api_call(user_message)

print(response)

Hi there! My name is Molly. 😊  How can I help you today? 



## Set up the agents

In [10]:
def run_code_writer_agent(message):

    print("---CODE WRITER AGENT---")

    # Prompt the llm
    response = make_llm_api_call(message)

    print(response)

    return response



# Example

model = genai.GenerativeModel(
    "models/gemini-1.5-flash",
    system_instruction = code_writer_agent_system_message,
)

chat = model.start_chat()

message = "What is the square root of 25?"
message = "Let $k, l > 0$ be parameters. The parabola $y = kx^2 - 2kx + l$ intersects the line $y = 4$ at two points $A$ and $B$. These points are distance 6 apart. What is the sum of the squares of the distances from $A$ and $B$ to the origin?"
message = "There exists a unique increasing geometric sequence of five 2-digit positive integers. What is their sum?"

# Prompt the chat_agent
raw_output = run_code_writer_agent(message)


---CODE WRITER AGENT---
```python
# Define a function to check if a number is a 2-digit positive integer
def is_2_digit_positive_integer(n):
  return 10 <= n <= 99

# Iterate through all possible starting values for the geometric sequence
for a in range(10, 100):
  # Iterate through all possible common ratios
  for r in range(2, 10):
    # Generate the five terms of the geometric sequence
    sequence = [a * r**i for i in range(5)]
    # Check if all terms are 2-digit positive integers and are in increasing order
    if all(is_2_digit_positive_integer(term) for term in sequence) and all(sequence[i] < sequence[i+1] for i in range(4)):
      # Print the sequence and its sum
      print(f"The sequence is: {sequence}")
      print(f"The sum of the sequence is: {sum(sequence)}")
      # Break out of the loops since we've found the unique sequence
      break
  else:
    # Continue to the next starting value if no valid sequence is found for the current starting value
    continue
  break

#

In [11]:
def run_code_interpreter_agent(raw_output):
    
    print("---CODE INTERPRETER AGENT---")
    
    output_dict = extract_and_run_llm_code(raw_output)
    
    print(output_dict)
    
    return  output_dict


# Example

model = genai.GenerativeModel(
    "models/gemini-1.5-flash",
    system_instruction = code_writer_agent_system_message,
)

chat = model.start_chat()

message = "What is the square root of 25?"


raw_output = run_code_writer_agent(message)

output_dict = run_code_interpreter_agent(raw_output)

---CODE WRITER AGENT---
```python
import math

# Calculate the square root of 25 using the math.sqrt() function
square_root = math.sqrt(25)

# Print the result
print(f"The square root of 25 is: {square_root}")

# For easier output parsing
print("###")
print(square_root)
print("###")
```
---CODE INTERPRETER AGENT---
Code ran successfully.
Output:
 The square root of 25 is: 5.0
###
5.0
###

{'status': 'success', 'output': 'The square root of 25 is: 5.0\n###\n5.0\n###\n'}


In [12]:
def run_router_agent(output_dict):

    print("---ROUTER AGENT---")
        
    # Extract the status
    status = output_dict['status']
    print("Status:", status)

    if status == 'failed':
        print("Route: to_code_writer_agent")
        return status

    elif status == 'success':
        print("Route: to_final_answer")
        return status
    
    
    
# Example

model = genai.GenerativeModel(
    "models/gemini-1.5-flash",
    system_instruction = code_writer_agent_system_message,
)

chat = model.start_chat()

message = "What is the square root of 25?"


raw_output = run_code_writer_agent(message)

output_dict = run_code_interpreter_agent(raw_output)

status = run_router_agent(output_dict)

---CODE WRITER AGENT---
```python
import math

# Calculate the square root of 25 using the math.sqrt() function
square_root = math.sqrt(25)

# Print the result
print(f"The square root of 25 is: {square_root}")

# For easier output parsing
print("###")
print(square_root)
print("###")
```
---CODE INTERPRETER AGENT---
Code ran successfully.
Output:
 The square root of 25 is: 5.0
###
5.0
###

{'status': 'success', 'output': 'The square root of 25 is: 5.0\n###\n5.0\n###\n'}
---ROUTER AGENT---
Status: success
Route: to_final_answer


In [13]:
def run_final_answer_agent(output_dict):

    print("---FINAL ANSWER AGENT---")
    
    final_answer = output_dict['output']
    
    # Extract the answer
    final_answer = final_answer.split("###")[1].strip()

    print(output_dict['output'])
    print("Extracted answer:", final_answer)
    
    return final_answer
    

# Example

model = genai.GenerativeModel(
    "models/gemini-1.5-flash",
    system_instruction = code_writer_agent_system_message,
)

chat = model.start_chat()

message = "What is the square root of 25?"


raw_output = run_code_writer_agent(message)

output_dict = run_code_interpreter_agent(raw_output)

status = run_router_agent(output_dict)

if status == "failed":
    pass
    
final_answer = run_final_answer_agent(output_dict)

---CODE WRITER AGENT---
```python
import math

# Calculate the square root using the math.sqrt() function
square_root = math.sqrt(25)

# Print the result
print(f"The square root of 25 is: {square_root}")

# For easier output parsing
print("###")
print(square_root)
print("###")
```
---CODE INTERPRETER AGENT---
Code ran successfully.
Output:
 The square root of 25 is: 5.0
###
5.0
###

{'status': 'success', 'output': 'The square root of 25 is: 5.0\n###\n5.0\n###\n'}
---ROUTER AGENT---
Status: success
Route: to_final_answer
---FINAL ANSWER AGENT---
The square root of 25 is: 5.0
###
5.0
###

Extracted answer: 5.0


## Run the system

In [14]:
# Example

model = genai.GenerativeModel(
    "models/gemini-1.5-flash",
    system_instruction = code_writer_agent_system_message,
)

chat = model.start_chat()


user_input = "What is the square root of 25?"
#user_input = "There exists a unique increasing geometric sequence of five 2-digit positive integers. What is their sum?"

for i in range(0, 3):

    raw_output = run_code_writer_agent(user_input)

    output_dict = run_code_interpreter_agent(raw_output)

    status = run_router_agent(output_dict)

    if status == "failed":
        # This will be input as a new chat message to the code_writer_agent
        error_message = output_dict['output']
        user_input = f"Observation: {error_message}"
    else:
        final_answer = run_final_answer_agent(output_dict)
        break

---CODE WRITER AGENT---
```python
import math

# Calculate the square root using the math.sqrt() function
square_root = math.sqrt(25)

# Print the result
print(f"The square root of 25 is: {square_root}")

# For easier output parsing
print("###")
print(square_root)
print("###")
```
---CODE INTERPRETER AGENT---
Code ran successfully.
Output:
 The square root of 25 is: 5.0
###
5.0
###

{'status': 'success', 'output': 'The square root of 25 is: 5.0\n###\n5.0\n###\n'}
---ROUTER AGENT---
Status: success
Route: to_final_answer
---FINAL ANSWER AGENT---
The square root of 25 is: 5.0
###
5.0
###

Extracted answer: 5.0
