<a href="https://colab.research.google.com/github/todd-flanagan/buildyourownagent/blob/main/buildyourownagent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!!pip install litellm

# Important!!!
#
# <---- Set your 'OPENAI_API_KEY' as a secret over there with the "key" icon
#
#
# You will also want to add some sample files to the "Files" (folder icon)
# on the left. When the agent asks you what to do, start with something
# simplie like "tell me what files are in this directory"
#
import os
from google.colab import userdata
api_key = userdata.get('OPENAI_API_KEY')
os.environ['OPENAI_API_KEY'] = api_key

In [6]:
!!git pull https://github.com/todd-flanagan/buildyourownagent.git

["Cloning into 'buildyourownagent'...",
 'remote: Enumerating objects: 16, done.\x1b[K',
 'remote: Counting objects:   6% (1/16)\x1b[K',
 'remote: Counting objects:  12% (2/16)\x1b[K',
 'remote: Counting objects:  18% (3/16)\x1b[K',
 'remote: Counting objects:  25% (4/16)\x1b[K',
 'remote: Counting objects:  31% (5/16)\x1b[K',
 'remote: Counting objects:  37% (6/16)\x1b[K',
 'remote: Counting objects:  43% (7/16)\x1b[K',
 'remote: Counting objects:  50% (8/16)\x1b[K',
 'remote: Counting objects:  56% (9/16)\x1b[K',
 'remote: Counting objects:  62% (10/16)\x1b[K',
 'remote: Counting objects:  68% (11/16)\x1b[K',
 'remote: Counting objects:  75% (12/16)\x1b[K',
 'remote: Counting objects:  81% (13/16)\x1b[K',
 'remote: Counting objects:  87% (14/16)\x1b[K',
 'remote: Counting objects:  93% (15/16)\x1b[K',
 'remote: Counting objects: 100% (16/16)\x1b[K',
 'remote: Counting objects: 100% (16/16), done.\x1b[K',
 'remote: Compressing objects:   7% (1/13)\x1b[K',
 'remote: Compressing objects

In [23]:
import json
import os
import sys
from litellm import completion
from typing import List, Dict

def extract_markdown_block(response: str, block_type: str = "json") -> str:
    """Extract code block from response"""

    if not '```' in response:
        return response

    code_block = response.split('```')[1].strip()

    if code_block.startswith(block_type):
        code_block = code_block[len(block_type):].strip()

    return code_block

def generate_response(messages: List[Dict]) -> str:
    """Call LLM to get a response."""
    response = completion(
        model="openai/gpt-4o",
        messages=messages,
        max_tokens=1024
    )
    return response.choices[0].message.content.strip()

def parse_action(response: str) -> Dict:
    """Parse the LLM response into a structured action dictionary."""
    try:
        response = extract_markdown_block(response, "action")
        response_json = json.loads(response)
        if "tool_name" in response_json and "args" in response_json:
            return response_json
        else:
            return {"tool_name": "error", "args": {"message": "You must respond with a JSON tool invocation."}}
    except json.JSONDecodeError:
        return {"tool_name": "error", "args": {"message": "Invalid JSON response. You must respond with a JSON tool invocation."}}

def list_sr_files(folder: str) -> List[str]:
    """List files in the current directory."""
    return os.listdir(folder)

def create_file(file_name : str) -> None:
    """List files in the current directory."""
    with open(file_name, "w") as file:
        file.write("")

def read_file(file_name: str, folder: str = ".") -> str:
    """Read a file's contents."""
    try:
        with open(folder + "/" + file_name, "r") as file:
            return file.read()
    except FileNotFoundError:
        return f"Error: {file_name} not found."
    except Exception as e:
        return f"Error: {str(e)}"

def write_results(file_name: str, results: str) -> None:
    """Write results to a file."""
    with open(file_name, "a") as file:
        file.write(results + "\n")

def terminate(message: str) -> None:
    """Terminate the agent loop."""
    print(f"Termination message: {message}")


tool_functions = {
    "list_sr_files": list_sr_files,
    "read_file": read_file,
    "write_results": write_results,
    "create_file": create_file,
    "terminate" : terminate
}

tools = [
    {
        "type": "function",
        "function": {
            "name": "list_sr_files",
            "description": "Returns a list of service request files.",
            "parameters": {"type": "object",
                           "properties": {"folder": {"type": "string"}}},
            "required": ["folder"]
        }
    },
    {
        "type": "function",
        "function": {
            "name": "read_file",
            "description": "Reads the content of a specified file in the directory.",
            "parameters": {
                "type": "object",
                "properties": {"file_name": {"type": "string"},
                               "folder" : {"type": "string"}},
                "required": ["file_name"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "create_file",
            "description": "Creates a new file of the given name.",
            "parameters": {
                "type": "object",
                "properties": {"file_name": {"type": "string"}},
                "required": ["file_name"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "write_results",
            "description": "writes a string to a file",
            "parameters": {
                "type": "object",
                "properties": {
                    "file_name": {"type": "string"},
                    "results": {"type": "string"}
                    },
                "required": ["file_name", "str"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "terminate",
            "description": "Terminates the conversation. No further actions or interactions are possible after this. Prints the provided message for the user.",
            "parameters": {
                "type": "object",
                "properties": {
                    "message": {"type": "string"},
                },
                "required": ["message"]
            }
        }
    }
]


# Define system instructions (Agent Rules)
agent_rules = [{
    "role": "system",
    "content": """
You are an AI agent that can perform tasks by using available tools.

retreive the list of service request files from the folder buildyourownagent/srs.  read each file.  If the file is not in english, transtanslate it to english.
for each file, suggest a list of 2-3 tags to categorize the request, an assessment of where it is in the workflow,
and an issue type.  Then append the list of tags to out.txt in the form of <srfilebane> : <deploment step> : <issue type> : <tag1>, <tag2>...
where deployment step represents the step in the deployment process the customer experienced the issue.
the valid deployment steps are in the file buildyourownagent/agentdata/deploymentsteps.txt
Valid issue types are: request, howto, inquiry, issue.
The valid tags are in tags.txt in the buildyourownagent/agentdata folder
before writing the results, create a new, blank file
write the results as you process each file

When you are done, terminate the conversation by using the "terminate" tool and I will provide the results to the user.
"""
}]

# Initialize agent parameters
iterations = 0
max_iterations = 20

user_task = "process the srs"

memory = [{"role": "user", "content": user_task}]

# The Agent Loop
while iterations < max_iterations:
    # 1. Construct prompt: Combine agent rules with memory
    print(memory)
    messages = agent_rules + memory

    # 2. Generate response from LLM
    print("Agent thinking...")
    response = completion(model="openai/gpt-4o",
                          messages = messages,
                          tools = tools,
                          max_tokens = 1024)
    print(f"Agent response: {response}")

    # 3. Parse response to determine action
    #action = parse_action(response)
    #print(f"Agent action: {action}")

    # 4. Execute action
    #result = "Action executed"

    if response.choices[0].message.tool_calls:
      tool = response.choices[0].message.tool_calls[0]
      tool_name = tool.function.name
      tool_args = json.loads(tool.function.arguments)
      action = {"tool_name": tool_name, "args": tool_args}

      if tool_name == "terminate":
        print(terminate(action["args"]["message"]))
        break
      elif tool_name in tool_functions:
        try:
          result = {"result": tool_functions[tool_name](**tool_args)}
        except Exception as e:
          result = {"error": str(f"Error executing {tool_name}: {e}")}
      else:
          result = {"error": f"Unknown tool: {tool_name}"}

      print(f"Executing: {tool_name} with args {tool_args}")
      print(f"Action result: {result}")

      # 5. Update memory with response and results
      memory.extend([
          {"role": "assistant", "content": json.dumps(action)},
          {"role": "user", "content": json.dumps(result)}
      ])
    else:
      result = response.choices[0].message.content
      print(f"Action result: {result}")

    iterations += 1

[{'role': 'user', 'content': 'process the srs'}]
Agent thinking...
Agent response: ModelResponse(id='chatcmpl-CLsYut4R3V8uCu2L3rSJW5MYg7tWY', created=1759330660, model='gpt-4o-2024-08-06', object='chat.completion', system_fingerprint='fp_f33640a400', choices=[Choices(finish_reason='tool_calls', index=0, message=Message(content=None, role='assistant', tool_calls=[ChatCompletionMessageToolCall(function=Function(arguments='{"folder":"buildyourownagent/srs"}', name='list_sr_files'), id='call_PMkFVx0mU6LRw97L9yWx8CfN', type='function')], function_call=None, provider_specific_fields={'refusal': None}, annotations=[]), provider_specific_fields={})], usage=Usage(completion_tokens=20, prompt_tokens=411, total_tokens=431, completion_tokens_details=CompletionTokensDetailsWrapper(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0, text_tokens=None), prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=0, cached_tokens=0, text_tokens=None, image_