<a href="https://colab.research.google.com/github/yongsa-nut/SF323_CN408_AIEngineer/blob/main/SF323_CN408_Agent_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Agentic Demo

## 0. Setting up

Openrouter: https://openrouter.ai/

In [18]:
from openai import OpenAI
from google.colab import userdata
import json

client = OpenAI(
  base_url="https://openrouter.ai/api/v1",
  api_key=userdata.get('openrouter'),
)

In [20]:
completion = client.chat.completions.create(
  model="google/gemini-2.5-flash-lite",
  messages=[
    {
      "role": "user",
      "content": "Hello"
    }
  ]
)
print(completion.choices[0].message.content)

Hi there! How can I help you today? 😊


## 1. File-editing Agent

- Adapted from https://ampcode.com/how-to-build-an-agent

In [None]:
# Let's code!



In [78]:
# @title File-editing Agent

import os

class FileEditingAgent:

    def __init__(self, model = "google/gemini-2.5-flash"):
        # LLM setup
        self.client = OpenAI(
            base_url="https://openrouter.ai/api/v1",
            api_key=userdata.get('openrouter'),
        )
        self.model = model

        system_prompt = '''You are a helpful assistant.'''
        self.messages = [{'role':'system','content':system_prompt}]

        self.tools = [{
            "type": "function",
            "function":{
                "name": "read_file",
                "description": "Read a file and return its content as a string",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "filename": {
                            "type": "string",
                            "description": "The name of the file you want to read."
                        }
                    },
                    "required": ["filename"]
                }
            }
        },
        {
            "type": "function",
            "function": {
                "name": "list_folder_contents",
                "description": "List all files and directories in a given folder. Returns a formatted string with directories marked with a trailing slash and items sorted alphabetically.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "folder_path": {
                            "type": "string",
                            "description": "The path to the folder/directory you want to list contents for."
                        }
                    },
                    "required": ["folder_path"]
                }
            }
        },
        {
            "type": "function",
            "function": {
                "name": "edit_file",
                "description": "Make edits to a text file. Replaces 'old_str' with 'new_str' in the given file. 'old_str' and 'new_str' MUST be different from each other. If the file specified with path doesn't exist, it will be created.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "file_path": {
                            "type": "string",
                            "description": "The path to the file you want to edit."
                        },
                        "old_str": {
                            "type": "string",
                            "description": "The string to be replaced in the file."
                        },
                        "new_str": {
                            "type": "string",
                            "description": "The string to replace the old string with."
                        }
                    },
                    "required": ["file_path", "old_str", "new_str"]
                }
            }
        }]

    def read_file(self, filename):
        """Read a file and return its content as a string."""
        try:
            with open(filename, 'r') as file:
                return file.read()
        except Exception as e:
            return (f"Error reading file: {e}")

    def list_folder_contents(self, folder_path):
        """List all files and directories in a folder with proper formatting."""
        try:
            items = os.listdir(folder_path)
            result = []

            for item in items:
                full_path = os.path.join(folder_path, item)
                if os.path.isdir(full_path):
                    result.append(f"{item}/")
                else:
                    result.append(item)

            return '\n'.join(sorted(result))

        except Exception as e:
            return f"Error listing directory: {e}"
    def edit_file(self, file_path, old_str, new_str):
        """
        Make edits to a text file.
        Replaces 'old_str' with 'new_str' in the given file.
        If the file doesn't exist, it will be created.

        Args:
            file_path (str): Path to the file to edit
            old_str (str): String to be replaced
            new_str (str): String to replace with

        Returns:
            str: Success/error message
        """
        # Check if old_str and new_str are different
        if old_str == new_str:
            return "Error: 'old_str' and 'new_str' must be different from each other."

        try:
            # Try to read existing file, create empty content if file doesn't exist
            try:
                with open(file_path, 'r', encoding='utf-8') as file:
                    content = file.read()
            except FileNotFoundError:
                content = ""

            # Replace old_str with new_str
            modified_content = content.replace(old_str, new_str)

            # Write the modified content back to the file
            with open(file_path, 'w', encoding='utf-8') as file:
                file.write(modified_content)

            # Count replacements made
            replacements = content.count(old_str)
            if replacements > 0:
                return f"Successfully replaced {replacements} occurrence(s) of '{old_str}' with '{new_str}' in '{file_path}'."
            else:
                return f"No occurrences of '{old_str}' found in '{file_path}'. File created/updated."

        except Exception as e:
            return f"Error editing file '{file_path}': {e}"


    def run_tool(self, name, arguments):
        if name == 'read_file':
            return self.read_file(**arguments)
        if name == 'list_folder_contents':
            return self.list_folder_contents(**arguments)
        if name == 'edit_file':
            return self.edit_file(**arguments)

    def run_tools(self, tool_calls):
        tool_results = []
        for tool_call in tool_calls:
            try:
                tool_name = tool_call.function.name
                arguments = json.loads(tool_call.function.arguments)
                print(f'\033[92mTool\033[0m: {tool_name}({arguments})')
                tool_result = self.run_tool(tool_name, arguments)
                tool_result = {
                        "role" : "tool",
                        "tool_call_id": tool_call.id,
                        "name":tool_name,
                        "content" : str(tool_result)
                }
            except Exception as e:
                tool_result = {
                        "role" : "tool",
                        "tool_call_id": tool_call.id,
                        "name":tool_name,
                        "content" : f"Error: {e}",
                }
            tool_results.append(tool_result)

        return tool_results

    def run(self):
        while True:
            user_query = input('User: ')
            if user_query.lower() == 'quit':
                break

            self.messages.append({'role': 'user', 'content': user_query})

            # Tool Handling
            while True:
                response = self.client.chat.completions.create(
                    model = self.model,
                    messages = self.messages,
                    tools = self.tools
                )
                if response.choices[0].finish_reason != 'tool_calls':
                    break
                self.messages.append(response.choices[0].message)
                results = self.run_tools(response.choices[0].message.tool_calls)
                self.messages.extend(results)

            # Final response (per turn)
            self.messages.append({'role': 'assistant',
                                  'content': response.choices[0].message.content})
            print('\033[38;5;208mAssistant\033[0m:', response.choices[0].message.content)



In [None]:
agent = FileEditingAgent()
agent.run()

Things to test:
- Read a file
- List files in a directory -> Ask it to read.
- What are datasets in this project?
- Create a file
- Create a fizzbuzz python file (rune with !python fizzbuzz.py)
- Edit the fizzbuzz! (Note the gemini 2.5 flash is not so agentic!)
- Create a congrats.py script that rot13-decodes the following string "Pbatenghyngvbaf ba ohvyqvat n pbqr-rqvgvat ntrag!" and prints it

## 2. Data Analysis Agent

- Data from  https://github.com/disler/single-file-agents/blob/main/sfa_duckdb_anthropic_v2.py
- Agent with a code execution tool


In [None]:
!wget https://github.com/disler/single-file-agents/raw/refs/heads/main/data/analytics.db

In [None]:
import duckdb
import pandas as pd

# Connect to the database
DB_PATH = 'analytics.db'
DB_CONN = duckdb.connect(DB_PATH)

Check the data

In [None]:
# Get all tables
tables = DB_CONN.execute("SHOW TABLES").fetchall()

for table in tables:
    table_name = table[0]
    print(f"\n=== Table: {table_name} ===")

    # Convert to pandas DataFrame for better display
    df = DB_CONN.execute(f"SELECT * FROM {table_name}").df()
    print(df)
    print(f"Shape: {df.shape}")

See DB agent: https://colab.research.google.com/drive/1EHjDizfFyQqypxa3J09YqY3cV8Sz1Y4h?usp=sharing

Executing Python Code:

- Langchain Python REPL: https://python.langchain.com/docs/integrations/tools/python/

In [None]:
!pip install langchain_experimental

In [None]:
from langchain_experimental.utilities import PythonREPL
import warnings

warnings.filterwarnings("ignore")

In [None]:
python_repl = PythonREPL()
python_repl.run("print(1+1)")

'2\n'

In [None]:
PYTHON_REPL = PythonREPL()

def execute_python_code(code: str) -> str:
    """Executes the python code and returns results to user.

    Args:
        code: Python chode

    Returns:
        Code results as a string
    """
    try:
        # Use the global connection to execute the query
        result = PYTHON_REPL.run(code)

        # Use regular print with simple formatting
        print(f"Python Code Tool\nQuery: {code}")
        print(f"Result:\n {result}")
        return result
    except Exception as e:
        print(f"Error running final query: {str(e)}")
        return str(e)

In [None]:
code = """
import duckdb
DB_CONN2 = duckdb.connect('analytics.db')
tables = DB_CONN2.execute("SHOW TABLES").fetchall()
"""

execute_python_code(code)

Python Code Tool
Query: 
import duckdb
DB_CONN2 = duckdb.connect('analytics.db')
tables = DB_CONN2.execute("SHOW TABLES").fetchall()

Result:
 


''

In [None]:
code = """
for table in tables:
    table_name = table[0]

    # Convert to pandas DataFrame for better display
    df = DB_CONN2.execute(f"SELECT * FROM {table_name}").df()
    print(df)
"""

execute_python_code(code)

Full Agent

In [None]:
class DataAnalysisAgent:

    MAX_TOOL_ITERATIONS = 10

    def __init__(self, model = "qwen/qwen3-32b"):
        # Initialize the Python REPL and load the data
        self.python_repl = PythonREPL()
        initial_code = """
import duckdb
import pandas as pd
import warnings
warnings.filterwarnings('ignore')
DB_CONN = duckdb.connect('analytics.db')
        """
        self.python_repl.run(initial_code)

        # LLM setup
        self.client = OpenAI(
            base_url="https://openrouter.ai/api/v1",
            api_key=userdata.get('openrouter'),
        )
        self.model = model

        # System prompt (# Add/remove this: There is only one table in the database named `User`. )
        system_prompt = """You are an expert data scientist.
        You have access to a Python REPL tool (run_python_code) that can execute pandas code safely.
        You have access to a duckdb database (DB_CONN) which contains a table of 30 students' performance. The data has been loaded in the Python REPL as DB_CONN.

        ## Tool:
        - run_python_code: Execute Python code.
          - Keep the code simple.
          - **Important** duckdb and pandas (as pd) are already imported.

        ## Instruction:
        - Use the data to answer user's query.
        - If user's query is unrelated to the data, say "I can't answer your question using available data."

        ## Output:
        - Return the final results in a markdown format
        """
        self.conversation_history = [{'role':'system','content':system_prompt}]

        self.tools = [{
            "type": "function",
            "function":{
                "name": "run_python_code",
                "description": "Execute python code. The code runs in a static sandbox without interactive mode, so print output.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "code": {
                            "type": "string",
                            "description": "Python code to execute."
                        }
                    },
                    "required": ["code"]
                }
            }
        }]

    def run_python_code(self, code: str) -> str:
        try:
            # Execute code directly (environment already set up)
            result = self.python_repl.run(code)
            return f"Code executed successfully:\nOutput:\n{result}"
        except Exception as e:
            return f"Error executing code:\nError: {str(e)}"

    def chat(self, user_message: str) -> str:

        iteration_count = 0
        consecutive_failures = 0
        final_response = ""
        self.conversation_history.append({
            'role':'user',
            'content':user_message
        })

        while iteration_count < self.MAX_TOOL_ITERATIONS:
              print(f'----- Iteration: {iteration_count} -----')
              iteration_count += 1

              # Make request
              response = self.client.chat.completions.create(
                  model=self.model,
                  messages=self.conversation_history,
                  tools=self.tools
              )

              print(response)
              llm_response = response.choices[0].message
              # Check if the model wants to use tools
              if llm_response.tool_calls:
                  llm_response = response.choices[0].message
                  # Process tool usage
                  for tool_call in llm_response.tool_calls:
                      # Obtain the name and arguments from tool call message
                      tool_name = tool_call.function.name
                      tool_arguments = json.loads(tool_call.function.arguments)
                      print(f"Tool arguments:\n{tool_arguments}")

                      # Get tool results (We only have one tool)
                      tool_result = self.run_python_code(tool_arguments['code'])
                      print(f"\nTool Result:\n{tool_result}")

                      # Add the tool calling message and tool results
                      self.conversation_history.append(llm_response)
                      self.conversation_history.append({
                          "role": "tool",
                          "tool_call_id": tool_call.id,
                          "content": str(tool_result)
                      })

              else:
                  # No tool use - this is the final response
                  final_response = response.choices[0].message.content

                  # Add to conversation history
                  self.conversation_history.append({
                      "role": "assistant",
                      "content": final_response
                  })

                  break

        return final_response if final_response else "I apologize, but I wasn't able to complete the analysis."

In [None]:
chatbot = DataAnalysisAgent(model = "qwen/qwen3-32b")
chatbot.chat("What is the average and standard deviation of students' score?")

----- Iteration: 0 -----
ChatCompletion(id='gen-1750556887-MwneBmVPjFLkCa3rIgkJ', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_sWnOAklsrGUXCegCPz4HZkiy', function=Function(arguments='{"code": "query = \\"SELECT AVG(score) AS average, STDDEV(score) AS std_dev FROM students\\";\\nresult = DB_CONN.execute(query).fetchone();\\nprint(f\'Average: {result[0]}, Standard Deviation: {result[1]}\')"}', name='run_pandas_code'), type='function', index=0)], reasoning='Okay, the user is asking for the average and standard deviation of students\' scores. Let me check the data.\n\nFirst, I need to make sure the data is loaded. The user mentioned a duckdb database with a table of 30 students\' performance. The data is available in the Python REPL as DB_CONN. So I can use duckdb to query the data.\n\nWait, but the t

"The average and standard deviation of students' scores are:\n\n- **Average**: 56.80  \n- **Standard Deviation**: 27.69  \n\nThe data was retrieved from the table `analytics.User` in the DuckDB database. Let me know if you need further analysis!"

In [None]:
chatbot = DataAnalysisAgent(model = "google/gemini-2.5-flash-lite")
chatbot.chat("What is the average and standard deviation of students' score?")

In [None]:
chatbot = DataAnalysisAgent(model = "anthropic/claude-sonnet-4")
chatbot.chat("What is the average and standard deviation of students' score?")

----- Iteration: 0 -----
ChatCompletion(id='gen-1750557130-t7NfE4i8XoUhioAnX1xa', choices=[Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content="I'll help you find the average and standard deviation of students' scores from the database. Let me first explore the data structure and then calculate these statistics.", refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='toolu_vrtx_01WZxx8LKvSjfvEFUGgkWEew', function=Function(arguments='{"code": "import duckdb\\nimport pandas as pd\\n\\n# First, let\'s see what tables are available in the database\\ntables = DB_CONN.execute(\\"SHOW TABLES\\").fetchall()\\nprint(\\"Available tables:\\")\\nprint(tables)"}', name='run_pandas_code'), type='function', index=0)], reasoning=None), native_finish_reason='tool_calls')], created=1750557130, model='anthropic/claude-sonnet-4', object='chat.completion', service_tier=None, system_fingerp

"## Students' Score Statistics\n\nBased on the analysis of the 30 students' performance data, here are the key statistics for their scores:\n\n### Primary Results:\n- **Average Score**: **56.80**\n- **Standard Deviation**: **27.69**\n\n### Additional Insights:\n- **Number of Students**: 30\n- **Minimum Score**: 3.10\n- **Maximum Score**: 96.18\n- **Median Score**: 55.83\n\nThe data shows that students' scores have a relatively wide distribution with a standard deviation of about 28 points, indicating considerable variation in performance across the group. The average score of approximately 57 points suggests moderate overall performance, with scores ranging from very low (3.10) to very high (96.18)."