# Tutorial 1: Demonstrating zap_toolkit usage in agentic systems

In this notebook we demonstrate how to use of the zap_toolkit in a simple single-agent system to perform basic operations:
- initiate the ZAP API session
- crawl the target
- retrieve the findings from the passive scanner

Basic understanding of Langchain/Langgraph libraries is recommended. We will be using locally run ZAP API service based on the latest docker image.


You can execute this notebook by:
1. Cloning the github repo: https://github.com/ishish222/zap_toolkit
2. Installing dev/testing dependencies in a virtual environment with:
```sh
poetry install --with dev
```
5. Registering ipykernel with:
```sh
poetry shell
poetry run python -m ipykernel install --user --name=zap-toolkit-tests --display-name "Python (zap-toolkit-tests)"
poetry run jupyter notebook
```
4. Selecting appropriate kernel from the list and executing the cells

Please note:
You need to configure relevant env variables in your environment or .env file:
- OPENAI_API_KEY
- ZAP_API_ENDPOINT_HTTP
- ZAP_API_ENDPOINT_HTTPS
- ZAP_API_KEY

As for the ZAP_* variables you can use for the purpose of the locally run tutorial:
```sh
ZAP_API_ENDPOINT_HTTP="http://127.0.0.1:8081"
ZAP_API_ENDPOINT_HTTPS="https://127.0.0.1:8081"
ZAP_API_KEY="abcd0123"
```

Example command for running the latest ZAP API docker image: 
```sh
docker run -u zap -p 8081:8080 zaproxy/zap-stable zap.sh -daemon -host 0.0.0.0 -port 8080 -config api.addrs.addr.name=.* -config api.addrs.addr.regex=true -config api.key=abcd0123
```

## Initial imports and env setup

In [None]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage
from langchain_core.prompts import SystemMessagePromptTemplate
from dotenv import load_dotenv, find_dotenv
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode

import os
import sys


In [None]:
_ = load_dotenv(find_dotenv())
memory = MemorySaver()


## Preparing some templates

In [None]:
system_template = SystemMessagePromptTemplate.from_template(
"""
You are a skilled and seasoned penetration tester working for one of 
the best IT security companies in the world. You have the following 
certifications: OSCP, OSCE, CISSP. You are tasked with assisting the 
operator in carrying out some tasks related to the OWASP Top 10 
penetration tests, specifically with performing crawling of a target 
website and reporting any findings.

You are focused and motivated (if you do this task right, you wil 
be promoted). You only return information which you are asked for, 
without additional chatter and in the format that is requested of you.

This specific task is about managing a set of provided tools to perform
ZAP API session initiation, crawling and retrieving findings from the 
passive scanner.

The following are out of scope of this task:
- compiling a test report
- planning the remaining steps of the OWASP penetration test
- executing the analysis of the target URI

The target URIs in scope of this task:
{target_uris}

Here are additional rules:
- Remember, you need to initialize session before any other interactions 
    with ZAP API.
""")

In [None]:
reasoning_message = """
Please examine the conversation so far and decide on the subsequent 
course of action.
"""

In [None]:
observation_message = """
Please examine the result of the action executed and interpret it in 
the context of the task at hand.
"""

## Configuring the Langchain agent

In [None]:
class ParameterAgentState(TypedDict):
    target_uris: List[str]
    messages: Annotated[list[str], operator.add]
    interation_number: int
    max_interation_number: int

In [None]:
class ParameterAgent:
    def __init__(self, model, tools, checkpointer):
        
        # initialize agent w/ system message

        graph = StateGraph(ParameterAgentState)

        # nodes
        graph.add_node("system", self.system)
        graph.add_node("reasoning", self.reasoning)
        graph.add_node("tool", ToolNode(tools))
        graph.add_node("observation", self.observation)

        # edges
        graph.add_edge("system", "reasoning")
        graph.add_edge("tool", "observation")
        graph.add_edge("observation", "reasoning")

        graph.add_conditional_edges(
            "reasoning",
            self.determine_action,
            {
            "tool": "tool",
            "END": END
            }
        )
        
        graph.set_entry_point("system")

        #compiling
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model_wo_tools = model
        self.model = model.bind_tools(tools)

    def system(self, state: ParameterAgentState):
        formatted_target_uris = "\n".join(state['target_uris'])
        system_message = system_template.format(target_uris=formatted_target_uris)
        return {'messages': [system_message]}
    
    def reasoning(self, state: ParameterAgentState):
        messages = state['messages']
        
        # update messages
        state['messages'].append(HumanMessage(content=reasoning_message))
        messages = state['messages']
        
        message = self.model.invoke(messages)
        return {'messages': [message], 'interation_number': state['interation_number']+1}
    
    def observation(self, state: ParameterAgentState):

        state['messages'].append(HumanMessage(content=observation_message))
        messages = state['messages']
        message = self.model_wo_tools.invoke(messages)
        return {'messages': [message], 'interation_number': state['interation_number']+1}

    def determine_action(
            self,
            state: TypedDict
            ):
        
        print("In determine_action")
        result = state['messages'][-1]
        
        if len(result.tool_calls) == 0:
            return "END"
        else:
            return "tool"
        

## Initiating tools

In [None]:
from zap_toolkit import *

In [None]:
tools = [
    SessionInitTool(),
    SessionCloseTool(),
    PassiveScannerRetrieveFindingsTool(),
    CrawlingTool()
]

## Using the agentic system to initiate session, crawl the target website and retrieve passive scan results

In [None]:
model = ChatOpenAI(model="gpt-4o", temperature=0.0)
zap_agent = ParameterAgent(model, tools, memory)

In [None]:
from IPython.display import Image, display

try:
    display(Image(zap_agent.graph.get_graph().draw_mermaid_png()))
except Exception as e:
    print(e)

In [None]:
thread = {"configurable": {"thread_id": "example_thread_id_1"}}

In [None]:
for event in zap_agent.graph.stream({
    'target_uris': ['http://testphp.vulnweb.com'], 
    'interation_number': 1,
    'max_interation_number': 10
}, thread):
    for v in event.values():
        print(v['messages'])