In [78]:
import csv
import json
import os
from openai import OpenAI
import subprocess
import sys

In [82]:
class Logger:
    def __init__(self):
        self.log_file = "logs/agent_logs.csv"
        self.fieldnames = [
            'action',
            'result',
            'time'
        ]

    def log(self, data):
        file_exists = os.path.isfile(self.log_file)
        mode = "a" if file_exists else "w"
        try:
            with open(self.filepath, mode, newline="", encoding="utf-8") as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=self.fieldnames)
                # Write header if the file was just created
                if not file_exists:
                    writer.writeheader()
                writer.writerows(data)

        except IOError:
            print(f"[-] Could not open {self.filepath} for logging")
            sys.exit(1)

SyntaxError: invalid syntax (1077909747.py, line 3)

In [None]:
class Agent:
    def __init__(self):
        self.client = OpenAI()
        self.memory = []
        self.max_memory = 10
        self.set_system_prompt(self.read_system_prompt("system_prompt.txt"))
        self.init_tools()
        self.logger = Logger

    def set_api_key(self, filepath: str = '.env'):
        """
        Set the OpenAI API key
        """
        with open(filepath, 'r') as f:
            for line in f:
                if line.startswith("OPENAI_API_KEY"):
                    os.environ['OPENAI_API_KEY'] = line.strip().split("=")[-1].strip("\"")

        assert os.environ['OPENAI_API_KEY'] is not None or os.environ['OPENAI_API_KEY'] != ""


    def read_system_prompt(self, filepath: str = "system_prompt.txt") -> str:
        """
        Read system prompt from file
        """
        prompt = ""
        with open(filepath, 'r') as f:
            for line in f:
                prompt += line

        return prompt

    def set_system_prompt(self, prompt: str | None = None):
        """
        Set the agent's system prompt
        """
        self.system_prompt = prompt

    def add_tool(self, tool: dict):
        """
        Add a tool to the agent, not really used as of now. Really only here if you want to dynamically add tools later
        """
        self.tools.append(tool)

    def init_tools(self):
        """
        Initialize base tools and add them to store
        """
        tools = [
        {
            "type": "function",
            "function": {
                "name": "run_nmap_scan",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "args": {"type": "string"}
                        },
                    },
                },
            },
        {
            "type": "function",
            "function": {
                "name": "update_memory",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "memory": {"type": "string"}
                        },
                    },
                },
            },
        {
            "type": "function",
            "function": {
                "name": "read_file",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "filename": {"type": "string"}
                        },
                    },
                },
            } 
        ]
        
        self.tools = tools

    def run_scan(self, arguments: dict) -> str:
        """
        Run nmap scan with provided args
        NOTE: This jawn runs code, be careful

        arguments: dict that maps str -> args
        """
        args = arguments['args']
        cmd = f"nmap {args}"
        cmd = cmd.split(" ")
        result = subprocess.run(cmd, capture_output=True, text=True)

        return str({
            "output": result.stdout,
            "error": result.stderr,
            "return_code": result.returncode
        })


    def update_memory(self, arguments: dict | str):
        """
        Add to self memory
        """
        memory = arguments['memory'] if isinstance(arguments, dict) else arguments
        self.memory.append(memory)
        self.memory = self.memory[-self.max_memory:]
        print(f"Updated memory with {memory}")

    def read_file(self, arguments: dict) -> str:
        """
        Read file, restricted to the current directory, exclude .env
        """
        filename = arguments['filename']
        base_directory = os.getcwd()
        file_path = os.path.abspath(os.path.join(base_directory, filename))

        # Check if the file is in the current directory
        if not file_path.startswith(base_directory):
            raise ValueError("Access to files outside the current directory is restricted.")

        # Exclude the .env file
        if os.path.basename(file_path) == ".env":
            raise ValueError("Access to '.env' file is restricted.")

        # Check if the file exists
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"The file '{filename}' does not exist.")

        # Read and return the file content
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()

        return content 
    
    def handle_tool_calls(self, tool_calls):
        for call in tool_calls:
            print(f"Tool call: {call.function.name} {call.function.arguments}")
            if call.function.name == 'run_nmap_scan':
                output = self.run_scan(json.loads(call.function.arguments))
            elif call.function.name == 'update_memory':
                output = self.update_memory(json.loads(call.function.arguments))
            elif call.function.name == 'read_file':
                output = self.read_file(json.loads(call.function.arguments))
            self.update_memory(f"Tool call: {call.function.name} {call.function.arguments}\nOutput: {output}")


    def call_api(self, prompt: str):
        """
        Call the API
        """
        completion = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": self.system_prompt},
                {
                    "role": "user",
                    "content": prompt
                }
            ],
            tools=self.tools
        )

        output = completion.choices[0].message
        print(f"Returned output is : {output.content}")
        # TODO: Make this async?
        if output.tool_calls is not None:
            self.handle_tool_calls(output.tool_calls)

    def build_prompt(self) -> str:
        context = "\n".join(self.memory)
        prompt = f"""
        Your goal is to perform exhaustive reconaissance on the given scope found in './scope.txt'. You will autonomously perform these actions.
        Here is a list of previous actions taken:
        {context}

        Instructions:
        - Always give a written justification for you task
        - Decide whether to use a tool, update memory, or read memory
        - Only use tools when you are confident in your action and have a clear plan with a justification for doing so
        - Organize all scan results in the scans/ directory
        """

        return prompt

    def run(self):
        for i in range(5):
            prompt = self.build_prompt()
            self.call_api(prompt)
            print(self.memory)


# ========================
agent = Agent()
agent.run()

Returned output is : ChatCompletionMessage(content=None, refusal=None, role='assistant', audio=None, function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_8SU5szEioXSoW2SqCbQ7hQQ5', function=Function(arguments='{"filename":"./scope.txt"}', name='read_file'), type='function')])
Tool call: read_file {"filename":"./scope.txt"}
Updated memory with Tool call: read_file {"filename":"./scope.txt"}
Output: 192.168.1.0/24

['Tool call: read_file {"filename":"./scope.txt"}\nOutput: 192.168.1.0/24\n']
Returned output is : ChatCompletionMessage(content='Given the scope of "192.168.1.0/24" from the previous read action, the next logical step in performing reconnaissance is to conduct a network scan to identify live hosts and open ports on the scoped network. This will provide a foundation for further analysis and exploration of network vulnerabilities.\n\n### Plan:\n1. **Network Scanning**: Use `nmap` to perform a scan on the specified subnet "192.168.1.0/24". This will help in ide

KeyboardInterrupt: 