In [76]:
from langchain_core.tools import tool
import getpass
import os
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import subprocess
import re



os.environ['OPENAI_API_KEY'] = 'insert-key-here'

## actual port scan tools

custom tools / functions written, for the agent to call

In [137]:
# TOOLS:

@tool
def check_device(a: str) -> bool:
    """
    Checks if the device is connected to the network.
    Args:
        a: hostname of the device (e.g., 'patient-monitor.local').
    Returns:
        bool: True if the device is connected, False otherwise.
    """
    
    try:
        # Run the nmap command for a ping scan
        result = subprocess.run(
            ["nmap", "-sn", a],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )

        # Capture the output
        nmap_output = result.stdout
        if "Host is up" in nmap_output:
            print(nmap_output)  # Print the detailed output in the desired format
            return True
        else:
            print(f"Nmap scan report for {a}\nDevice is not reachable.")
            return False
    except Exception as e:
        print(f"Error occurred while running nmap: {e}")
        return False
    

@tool
def port_scan(a: str) -> str:
    """
    Performs a port scan on the given device and returns details about open ports.
    Args:
        a: The hostname or IP address of the device to scan.
    Returns:
        A string containing details about the open ports, their state, and the associated service.
    """
    
    # code from chatgpt to run the command line nmap process for port scanning
    try:
        # Run the nmap command to scan all ports and detect services
        result = subprocess.run(
            ["nmap", a],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )

        # Capture the output
        nmap_output = result.stdout

        # Extract details of open ports using regex
        open_ports_info = []
        for line in nmap_output.splitlines():
            # Regex to match lines with port, state, and service info
            match = re.match(r"(\d+/tcp)\s+(open)\s+([\w\-]+)", line)
            if match:
                port, state, service = match.groups()
                open_ports_info.append(f"Port: {port}, State: {state}, Service: {service}")

        # If no open ports are found
        if not open_ports_info:
            return f"No open ports found on device {a}."

        # Format the output
        result_str = f"Nmap scan results for {a}:\n" + "\n".join(open_ports_info)
        print(result_str)  # Print the results for debugging or logging purposes
        return result_str

    except Exception as e:
        error_msg = f"Error occurred while running port scan: {e}"
        print(error_msg)
        return error_msg
    


tools = [check_device, port_scan]



In [117]:
# binding tools to the llm to make the agent
llm = ChatOpenAI(model="gpt-4o-mini")
llm_with_tools = llm.bind_tools(tools)

### tool invocation 1: checking the device connection


In [124]:
# query = "Check if the patient-monitor.local device is connected"
query = "Check if the device with IP 192.168.5.115 is connected"
messages = [HumanMessage(query)]
output = llm_with_tools.invoke(query) # this output tells us what tool is to be called, but doesnt call it yet
messages.append(output) 
print(output.tool_calls)   



[{'name': 'check_device', 'args': {'a': '192.168.5.115'}, 'id': 'call_aFyzSrM00I4OMHydhYk3yMx4', 'type': 'tool_call'}]


In [125]:
# Run the tool that was identified by the llm (tool calling)
for tool_call in output.tool_calls:
    selected_tool = {"check_device": check_device, "port_scan": port_scan}[tool_call["name"].lower()]  # Add `port_scan` here later
    tool_msg = selected_tool.invoke(tool_call)
    print(tool_msg)
    print()
    messages.append(tool_msg)


print("LLM Final Response:")
# use llm to integrate tool outputs into its response
llm_with_tools.invoke(messages)


Starting Nmap 7.95 ( https://nmap.org ) at 2024-12-29 18:09 GMT
Nmap scan report for 192.168.5.115
Host is up (0.062s latency).
Nmap done: 1 IP address (1 host up) scanned in 0.15 seconds

content='true' name='check_device' tool_call_id='call_aFyzSrM00I4OMHydhYk3yMx4'

LLM Final Response:


AIMessage(content='The device with IP 192.168.5.115 is connected.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 187, 'total_tokens': 204, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_0aa8d3e20b', 'finish_reason': 'stop', 'logprobs': None}, id='run-9a31f0af-8949-45ce-a5d1-2bf1a8e1133b-0', usage_metadata={'input_tokens': 187, 'output_tokens': 17, 'total_tokens': 204, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

### tool invocation 2: running a port scan

In [122]:
# query = "run port scan on patient-monitor.local"
query = "run port scan on IP 192.168.5.115"
messages = [HumanMessage(query)]
output = llm_with_tools.invoke(query) # this output tells us what tool is to be called, but doesnt call it yet
print(output.tool_calls)   
messages.append(output)



[{'name': 'port_scan', 'args': {'a': '192.168.5.115'}, 'id': 'call_4zopGqQcYn8d1ZbYaLfpAMpF', 'type': 'tool_call'}]


In [None]:
# Run the tool
for tool_call in output.tool_calls:
    selected_tool = {"check_device": check_device, "port_scan": port_scan}[tool_call["name"].lower()]
    tool_msg = selected_tool.invoke(tool_call)
    print(tool_msg)
    print()
    messages.append(tool_msg)


Nmap scan results for 192.168.5.115:
Port: 6668/tcp, State: open, Service: irc
content='Nmap scan results for 192.168.5.115:\nPort: 6668/tcp, State: open, Service: irc' name='port_scan' tool_call_id='call_AH6ziBE7wo81A9V0djIEl4jX'



In [114]:
# use llm to integrate tool outputs into its response
x = llm_with_tools.invoke(messages)


In [126]:
print("LLM Final Response:")
print(x.content)

LLM Final Response:
The port scan results for IP 192.168.5.115 are as follows:

- **Port:** 6668/tcp
- **State:** open
- **Service:** irc


### calling 2 tools with one query
to be improved: calling multiple tools is fine, but configuring a logic for calling tools is harder. like telling the llm to only call the port scanning tool if the device is connected. the llm doesnt automatically handle this logic.

In [135]:
query = "check that IP 192.168.5.115 is connected, and run a port scan on the same IP"
messages = [HumanMessage(query)]
output = llm_with_tools.invoke(query) # this output tells us what tool is to be called, but doesnt call it yet
print(output.tool_calls)   
messages.append(output) 


[{'name': 'check_device', 'args': {'a': '192.168.5.115'}, 'id': 'call_cWLrwmXkfwC13ruXmzx2Q9Lw', 'type': 'tool_call'}, {'name': 'port_scan', 'args': {'a': '192.168.5.115'}, 'id': 'call_767sLfTer0sil6lehuOQT1Ki', 'type': 'tool_call'}]


In [138]:
# Run the tool
for tool_call in output.tool_calls:
    selected_tool = {"check_device": check_device, "port_scan": port_scan}[tool_call["name"].lower()]
    tool_msg = selected_tool.invoke(tool_call)
    print(tool_msg)
    print()
    messages.append(tool_msg)

Starting Nmap 7.95 ( https://nmap.org ) at 2024-12-29 18:20 GMT
Nmap scan report for 192.168.5.115
Host is up (0.22s latency).
Nmap done: 1 IP address (1 host up) scanned in 0.25 seconds

content='true' name='check_device' tool_call_id='call_cWLrwmXkfwC13ruXmzx2Q9Lw'

Nmap scan results for 192.168.5.115:
Port: 6668/tcp, State: open, Service: irc
content='Nmap scan results for 192.168.5.115:\nPort: 6668/tcp, State: open, Service: irc' name='port_scan' tool_call_id='call_767sLfTer0sil6lehuOQT1Ki'



In [139]:
x = llm_with_tools.invoke(messages)
print("LLM Final Response:")
print(x.content)


LLM Final Response:
The device with IP address 192.168.5.115 is connected to the network. 

Additionally, a port scan reveals the following information:
- Port: **6668/tcp**
- State: **open**
- Service: **irc**


## testing tool calling

In [None]:
@tool
def add(a: int, b: int) -> int:
    """Add two integers.

    Args:
        a: First integer
        b: Second integer
    """
    return a + b

@tool
def multiply(a: int, b: int) -> int:
    """Multiply two integers.

    Args:
        a: First integer
        b: Second integer
    """
    return a * b

@tool
def port_scan(a: str, b: int) -> int:
    """Runs port scan on a given IP address.

    Args:
        a: IP address
        b: Port number
    """
    
    output  = f"""
    Nmap scan report for {a}
    Port {b} is open
    """
    return output

tools = [add, multiply, port_scan]

In [None]:
llm = ChatOpenAI(model="gpt-4o-mini")

In [None]:
llm_with_tools = llm.bind_tools(tools)
query = "What is the product of 4 and 12? What is the sum of 9 and 47?"
messages = [HumanMessage(query)]
output = llm_with_tools.invoke(query)
messages.append(output)

In [None]:
print(output.tool_calls)

[{'name': 'multiply', 'args': {'a': 4, 'b': 12}, 'id': 'call_mgZJzsaQrvReYRfm14UkU8Q3', 'type': 'tool_call'}, {'name': 'add', 'args': {'a': 9, 'b': 47}, 'id': 'call_8di1nY00HNqrrpmByuKfPKMm', 'type': 'tool_call'}]


In [None]:
for tool_call in output.tool_calls:
    selected_tool = {"add": add, "multiply": multiply}[tool_call["name"].lower()]
    tool_msg = selected_tool.invoke(tool_call)
    print(tool_msg)
    messages.append(tool_msg)
    print()

content='48' name='multiply' tool_call_id='call_aVbsBCoPpa2i6rlTbMryylxo'

content='56' name='add' tool_call_id='call_S26XUS6hPUbtssZSohIMHO5j'



In [None]:
llm_with_tools.invoke(messages)

AIMessage(content='The product of 4 and 12 is 48, and the sum of 9 and 47 is 56.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 27, 'prompt_tokens': 219, 'total_tokens': 246, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_0aa8d3e20b', 'finish_reason': 'stop', 'logprobs': None}, id='run-152665e1-3096-4f6d-920c-c083184ab6f5-0', usage_metadata={'input_tokens': 219, 'output_tokens': 27, 'total_tokens': 246, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

### testing port scan placeholder tool

In [None]:
llm = ChatOpenAI(model="gpt-4o-mini")

In [None]:
llm_with_tools = llm.bind_tools(tools)
query = "Run a port scan on the address 192.169.1.1 on port 80"
messages = [HumanMessage(query)]

output = llm_with_tools.invoke(query)
messages.append(output)

In [None]:
print(output.tool_calls)

[{'name': 'port_scan', 'args': {'a': '192.169.1.1', 'b': 80}, 'id': 'call_3iGa9YkU11krOCKuoO7NBLkA', 'type': 'tool_call'}]


In [None]:
for tool_call in output.tool_calls:
    selected_tool = {"add": add, "multiply": multiply, "port_scan": port_scan }[tool_call["name"].lower()]
    tool_msg = selected_tool.invoke(tool_call)
    print(tool_msg)
    print()
    messages.append(tool_msg)

content='\n    Nmap scan report for 192.169.1.1\n    Port 80 is open\n    ' name='port_scan' tool_call_id='call_3iGa9YkU11krOCKuoO7NBLkA'



In [None]:
llm_with_tools.invoke(messages)

AIMessage(content='The port scan on the address 192.169.1.1 shows that port 80 is open.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 24, 'prompt_tokens': 206, 'total_tokens': 230, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_0aa8d3e20b', 'finish_reason': 'stop', 'logprobs': None}, id='run-b59b30b3-7056-46cf-b08a-074418538fe5-0', usage_metadata={'input_tokens': 206, 'output_tokens': 24, 'total_tokens': 230, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})

In [None]:
print(tool_msg)

content='\n    Nmap scan report for 192.169.1.1\n    Port 80 is open\n    ' name='port_scan' tool_call_id='call_BTNUCBjqY4sZ8qvVSpeiwIs8'
