# Testing LLM Tool Usage with LangChain and Ollama

This notebook demonstrates how to use LangChain tools with the Llama model through Ollama.

In [25]:
# Import required libraries
from langchain_ollama import ChatOllama
from langchain.tools import tool
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import HumanMessage, SystemMessage
import json

## Define Test Tools

We'll create three tools for testing:
1. Network scanning
2. CVE checking
3. Report generation

In [26]:
@tool
def search_network(ip_range: str) -> str:
    """Search the network for devices in the given IP range."""
    # Simulate network search
    return f"Found devices in {ip_range}: Device1 (192.168.1.100), Device2 (192.168.1.101)"

@tool
def check_cve(device_type: str) -> str:
    """Check for known CVEs for a specific device type."""
    # Simulate CVE database check
    return f"Found CVEs for {device_type}: CVE-2023-1234 (Critical), CVE-2023-1235 (High)"

@tool
def generate_report(findings: str) -> str:
    """Generate a security report based on the findings."""
    # Simulate report generation
    return f"Security Report:\n{findings}\n\nRecommendations:\n1. Update firmware\n2. Implement network segmentation"

## Set up the LLM and Agent

In [27]:
# Initialize the LLM
llm = ChatOllama(model="llama3.1:8b")

# Create a list of tools
tools = [search_network, check_cve, generate_report]

# Create the prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a security assessment agent. Use the available tools to analyze the network and generate reports."),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
])

# Create the agent
agent = create_openai_tools_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

## Test Cases

Let's run through three test cases to evaluate the agent's ability to use tools:
1. Simple network scan
2. CVE check for a specific device
3. Complex multi-step task

In [28]:
# Test 1: Simple network scan
print("Test 1: Simple network scan")
result1 = agent_executor.invoke({"input": "Scan the network 192.168.1.0/24 for devices"})
print("\nResult:")
print(json.dumps(result1, indent=2))

Test 1: Simple network scan


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `search_network` with `{'ip_range': '192.168.1.0/24'}`


[0m[36;1m[1;3mFound devices in 192.168.1.0/24: Device1 (192.168.1.100), Device2 (192.168.1.101)[0m

KeyboardInterrupt: 

In [21]:
# Test 2: CVE check for specific device
print("Test 2: CVE check for specific device")
result2 = agent_executor.invoke({"input": "Check for vulnerabilities in the MRI scanner found at 192.168.1.100"})
print("\nResult:")
print(json.dumps(result2, indent=2))

Test 2: CVE check for specific device


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `check_cve` with `{'device_type': 'MRI scanner'}`


[0m[33;1m[1;3mFound CVEs for MRI scanner: CVE-2023-1234 (Critical), CVE-2023-1235 (High)[0m[32;1m[1;3m
Invoking: `nmap_scan` with `{'target': '192.168.1.100'}`


[0mnmap_scan is not a valid tool, try one of [search_network, check_cve, generate_report].[32;1m[1;3m
Invoking: `exploitdb_search` with `{'cves': ['CVE-2023-1234', 'CVE-2023-1235'], 'ports': [22, 80, 443]}`


[0mexploitdb_search is not a valid tool, try one of [search_network, check_cve, generate_report].[32;1m[1;3m
Invoking: `search_network` with `{'target': '192.168.1.100'}`


[0m

ValidationError: 1 validation error for search_network
ip_range
  Field required [type=missing, input_value={'target': '192.168.1.100'}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/missing

In [None]:
# Test 3: Complex multi-step task
print("Test 3: Complex multi-step task")
result3 = agent_executor.invoke({"input": "Scan the network, check for vulnerabilities in any medical devices found, and generate a security report"})
print("\nResult:")
print(json.dumps(result3, indent=2))

## Analysis

After running the tests above, we can analyze:
1. Did the agent choose the right tools for each task?
2. Did it chain tools together effectively?
3. Did it handle the tool outputs correctly?
4. Were there any unexpected behaviors? 

## testing web scrape ability to get info from a website

using scaper api

In [50]:
import requests

payload = { 'api_key': '51759b17c679805ea5dc26dccd17fc3a', 'url': 'https://documentation.commvault.com/securityadvisories/CV_2025_04_1.html', 'output_format': 'text' }
r = requests.get('https://api.scraperapi.com/', params=payload)
print(r.text)

logo [/securityadvisories/logo.a7407039.svg] /


SECURITY ADVISORIES

Documentation [https://documentation.commvault.com] Cloud Services [https://cloud.commvault.com] Solutions [https://www.commvault.com/supported-technologies]
 * Documentation [https://documentation.commvault.com]
 * Cloud Services [https://cloud.commvault.com]
 * Solutions [https://www.commvault.com/supported-technologies]

< Back [.]


CV_2025_04_1: VULNERABILITY IN COMMVAULT COMMAND CENTER INSTALLATION CRITICAL

 * Advisory ID: CV_2025_04_1
 * Severity: CRITICAL
 * Issued: 2025-04-11
 * Updated: 2025-05-07
 * CVSS Score Range: 10
 * Additional Links:
   * CVE-2025-34028 [https://nvd.nist.gov/vuln/detail/CVE-2025-34028]

A critical security vulnerability has been identified in the Command Center installation, allowing remote attackers to execute arbitrary code without authentication. This vulnerability could lead to a complete compromise of the Command Center environment. Fortunately, other installations within the 

In [None]:
llm = ChatOllama(model="llama3.2:3b")

### testing some tools from tools.py

In [53]:
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()

# Extract network address (assuming /24 subnet)
network_parts = local_ip.split('.')
print(f"{network_parts[0]}.{network_parts[1]}.{network_parts[2]}.0/24")

192.0.0.0/24


In [6]:
import nmap
import socket
import requests
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime


In [7]:

@dataclass
class DeviceInfo:
    ip: str
    hostname: Optional[str]
    services: List[Dict[str, str]]
    os_info: Optional[str]


In [8]:
def scan_network() -> List[DeviceInfo]:
    """
    Scan the local network for devices and their services using nmap.
    Returns a list of DeviceInfo objects containing device details.
    """
    try:
        # Initialize nmap scanner
        nm = nmap.PortScanner()
        
        # Get local network
        # network = get_local_network()
        network = "192.0.0.0/24"
        
        # Run basic service scan without OS detection
        nm.scan(hosts=network, arguments='-sV')
        
        devices = []
        for host in nm.all_hosts():
            device_info = DeviceInfo(
                ip=host,
                hostname=None,
                services=[],
                os_info=None  # We'll keep this field but it will be None
            )
            
            # Get hostname if available
            try:
                device_info.hostname = socket.gethostbyaddr(host)[0]
            except:
                pass
            
            # Get services
            if 'tcp' in nm[host]:
                for port, service in nm[host]['tcp'].items():
                    device_info.services.append({
                        'port': str(port),
                        'name': service['name'],
                        'product': service.get('product', ''),
                        'version': service.get('version', '')
                    })
            
            devices.append(device_info)
        
        return devices
    
    except Exception as e:
        raise Exception(f"Network scan failed: {str(e)}")

In [9]:
devices = scan_network()
print(devices)


[DeviceInfo(ip='192.0.0.2', hostname=None, services=[{'port': '80', 'name': 'http', 'product': 'Apache httpd', 'version': '2.4.62'}, {'port': '5000', 'name': 'rtsp', 'product': '', 'version': ''}, {'port': '7000', 'name': 'rtsp', 'product': '', 'version': ''}], os_info=None)]


In [18]:
import json

# Read the test.json file
with open('../src/test.json', 'r') as f:
    test_data = json.load(f)

print("Loaded JSON data:")
print(test_data)


Loaded JSON data:
{'dataType': 'CVE_RECORD', 'dataVersion': '5.1', 'cveMetadata': {'cveId': 'CVE-2025-34028', 'assignerOrgId': '83251b91-4cc7-4094-a5c7-464a1b83ea10', 'state': 'PUBLISHED', 'assignerShortName': 'VulnCheck', 'dateReserved': '2025-04-15T19:15:22.545Z', 'datePublished': '2025-04-22T16:32:23.446Z', 'dateUpdated': '2025-05-07T22:09:23.958Z'}, 'containers': {'cna': {'affected': [{'defaultStatus': 'unaffected', 'product': 'Command Center Innovation Release', 'vendor': 'Commvault', 'versions': [{'status': 'affected', 'version': '11.38', 'versionType': 'semver'}]}], 'credits': [{'lang': 'en', 'type': 'finder', 'value': 'Sonny'}, {'lang': 'en', 'type': 'sponsor', 'value': 'watchTowr'}], 'descriptions': [{'lang': 'en', 'supportingMedia': [{'base64': False, 'type': 'text/html', 'value': '<div>The Commvault Command Center Innovation Release allows an unauthenticated actor to upload ZIP files that represent install packages that, when expanded by the target server, are vulnerable to 

In [28]:
# cve_data.get("containers")
cve_data.get("containers", {}).get("cna")


{'affected': [{'defaultStatus': 'unaffected',
   'product': 'Command Center Innovation Release',
   'vendor': 'Commvault',
   'versions': [{'status': 'affected',
     'version': '11.38',
     'versionType': 'semver'}]}],
 'credits': [{'lang': 'en', 'type': 'finder', 'value': 'Sonny'},
  {'lang': 'en', 'type': 'sponsor', 'value': 'watchTowr'}],
 'descriptions': [{'lang': 'en',
   'supportingMedia': [{'base64': False,
     'type': 'text/html',
     'value': '<div>The Commvault Command Center Innovation Release allows an unauthenticated actor to upload ZIP files that represent install packages that, when expanded by the target server, are vulnerable to path traversal vulnerability that can result in Remote Code Execution via malicious JSP.<br></div><div><br></div><div>This issue affects Command Center Innovation Release: 11.38.</div>'}],
   'value': 'The Commvault Command Center Innovation Release allows an unauthenticated actor to upload ZIP files that represent install packages that, wh

In [25]:
cve_data["containers"]["cna"]

{'affected': [{'defaultStatus': 'unaffected',
   'product': 'Command Center Innovation Release',
   'vendor': 'Commvault',
   'versions': [{'status': 'affected',
     'version': '11.38',
     'versionType': 'semver'}]}],
 'credits': [{'lang': 'en', 'type': 'finder', 'value': 'Sonny'},
  {'lang': 'en', 'type': 'sponsor', 'value': 'watchTowr'}],
 'descriptions': [{'lang': 'en',
   'supportingMedia': [{'base64': False,
     'type': 'text/html',
     'value': '<div>The Commvault Command Center Innovation Release allows an unauthenticated actor to upload ZIP files that represent install packages that, when expanded by the target server, are vulnerable to path traversal vulnerability that can result in Remote Code Execution via malicious JSP.<br></div><div><br></div><div>This issue affects Command Center Innovation Release: 11.38.</div>'}],
   'value': 'The Commvault Command Center Innovation Release allows an unauthenticated actor to upload ZIP files that represent install packages that, wh

In [22]:
response = requests.get(f"https://cve.circl.lu/api/cve/CVE-2025-34028")
response.raise_for_status()
cve_data = response.json()

In [24]:
type(cve_data)

dict