# Agent 5: Security Evaluator

The `Security Evaluator` agent is a specialized agent designed to assess and visualize the security posture of a system, application, or network. Its primary function is to evaluate various security metrics and present them in an easily comprehensible format, such as radar (or spider) charts. These charts allow users to quickly compare and identify strengths and weaknesses across multiple dimensions of security performance.

In [None]:
# load environment variables from the .env file
from dotenv import load_dotenv

load_dotenv()

## Define user tools

In [None]:
from math import pi
from typing import List
from datetime import datetime
import matplotlib.pyplot as plt


def generate_radar_chart(categories: List[str], values: List[int]) -> str:
    """
    Generate a radar chart visualization for given categories and their corresponding values.

    This function creates a radar chart (also known as a spider chart) to visually represent the performance
    or metrics of different categories. It saves the generated chart as a PNG file in the local directory.

    :param categories: A list of category names to be displayed on the radar chart. Each category represents
                       a dimension on the radar chart.
                       Example: ['Vulnerability Score', 'Detection Rate', 'Response Time']

    :param values: A list of numerical values corresponding to the categories. These values should be on a
                   uniform scale (e.g., 0 to 10) for proper visualization.
                   Example: [8, 7, 6]

    :return: A tuple containing:
             - A status indicator (bool): True if the radar chart was successfully generated and saved,
               False if an error occurred.
             - A result message (str): If successful, the message contains the file path of the saved radar chart.
               If an error occurs, the message contains an error description.

             Example (Success):
             (True, './figures/cybersecurity_radar_chart.png')

             Example (Error):
             (False, 'An error occurred while generating the radar chart: FileNotFoundError')

    :rtype: str
    """
    try:
        # Number of categories
        num_vars = len(categories)

        # Compute the angle for each category
        angles = [n / float(num_vars) * 2 * pi for n in range(num_vars)]
        angles += angles[:1]  # Close the radar chart loop

        # Close the loop for the data as well
        values += values[:1]

        # Create the figure
        plt.figure(figsize=(8, 8))
        ax = plt.subplot(111, polar=True)

        # Draw one axis per variable and add labels
        plt.xticks(angles[:-1], categories, color='grey', size=10)

        # Draw y-labels (range of values)
        ax.set_rlabel_position(30)
        plt.yticks([2, 4, 6, 8], ["2", "4", "6", "8"], color="grey", size=7)
        plt.ylim(0, 10)

        # Plot data
        ax.plot(angles, values, linewidth=2,
                linestyle='solid', label="Current Metrics")

        # Fill area
        ax.fill(angles, values, color='blue', alpha=0.4)

        # Add a title
        plt.title('Cybersecurity Metrics Radar Chart',
                  size=15, color='black', y=1.1)

        # Add legend
        plt.legend(loc='upper right', bbox_to_anchor=(0.1, 0.1))

        # Define image file path
        current_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        image_path = f'./figures/cybersecurity_radar_chart_{current_timestamp}.png'

        # Save the figure locally
        plt.savefig(image_path, dpi=300, bbox_inches='tight')
        plt.close()

        # Return success status and the image path
        return image_path

    except Exception as e:
        # Return failure status and an error message
        return f"An error occurred while generating the radar chart: {str(e)}"


# Example usage
result = generate_radar_chart(
    ['Vulnerability Score', 'Detection Rate', 'Response Time',
        'Threat Intelligence', 'System Uptime'],
    [8, 7, 6, 9, 8]
)

print(result)

## Create Agent

In [3]:
import os
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential


project_client = AIProjectClient.from_connection_string(
    credential=DefaultAzureCredential(), conn_str=os.environ["AIPROJECT_CONNECTION_STRING"]
)

In [None]:
from typing import Any, Set, Callable
from azure.ai.projects.models import FunctionTool

user_functions: Set[Callable[..., Any]] = {generate_radar_chart}
functions = FunctionTool(functions=user_functions)
print(functions.definitions)

In [None]:
instruction = """
# Role
You are "Security Evaluator," an AI agent designed to assess and visualize cybersecurity metrics using radar charts. Your primary goal is to assist users in generating a radar chart that visually represents multiple security-related categories and their corresponding values. Use the provided tool to create these visualizations and guide the user effectively in providing the necessary input.

# Tasks
1. Collecting a list of categories (e.g., "Vulnerability Score," "Detection Rate") and their corresponding values (e.g., scores on a uniform scale like 0–10) from the user.
2. Ensuring the input is valid (e.g., the number of categories matches the number of values, and values are within a valid range).
3. Using the generate_radar_chart function to generate a radar chart based on the user's input.
4. Explaining the result to the user:
    - If successful, provide the file path to the saved radar chart.
    - If an error occurs, describe the issue and guide the user to fix it.

# Goal
You are polite, concise, and clear in your communication. Provide helpful examples when the user is unsure about what input to provide. Always ensure that the user understands the requirements for generating the radar chart.
"""

security_evaluator = project_client.agents.create_agent(
    model=os.environ["CHAT_MODEL"],
    name="security_evaluator",
    description="AI agent that assists users in generating radar charts for cybersecurity metrics.",
    instructions=instruction,
    tools=functions.definitions,
    # Parameters
    temperature=0.7,
    top_p=0.95,
    # Metadata
    metadata={"group": "internet_threat_analysis"},
)

print(f"Created agent, agent ID: {security_evaluator.id}")

## Construct messages

In [None]:
thread = project_client.agents.create_thread()
print(f"Created thread, thread ID: {thread.id}")

# Create a message
message = project_client.agents.create_message(
    thread_id=thread.id,
    role="user",
    content="""I’m analyzing my organization’s cybersecurity posture and need a radar chart to visualize it. Here’s how the metrics look:

The Vulnerability Score is 8.
The Detection Rate is 7.
The Response Time is rated at 6.
Our Threat Intelligence capability scores a 9.
Finally, the System Uptime is at an 8.
Could you help me create a radar chart for these?""",
)

In [None]:
import time
from azure.ai.projects.models import RequiredFunctionToolCall, SubmitToolOutputsAction, ToolOutput


# Create and process assistant run in thread with tools
run = project_client.agents.create_run(
    thread_id=thread.id, assistant_id=security_evaluator.id)
print(f"Created run, ID: {run.id}")

while run.status in ["queued", "in_progress", "requires_action"]:
    time.sleep(1)
    run = project_client.agents.get_run(thread_id=thread.id, run_id=run.id)

    if run.status == "requires_action" and isinstance(run.required_action, SubmitToolOutputsAction):
        tool_calls = run.required_action.submit_tool_outputs.tool_calls
        if not tool_calls:
            print("No tool calls provided - cancelling run")
            project_client.agents.cancel_run(
                thread_id=thread.id, run_id=run.id)
            break

        tool_outputs = []
        for tool_call in tool_calls:
            if isinstance(tool_call, RequiredFunctionToolCall):
                try:
                    print(f"Executing tool call: {tool_call}")
                    output = functions.execute(tool_call)
                    tool_outputs.append(
                        ToolOutput(
                            tool_call_id=tool_call.id,
                            output=output,
                        )
                    )
                except Exception as e:
                    print(f"Error executing tool_call {tool_call.id}: {e}")

        print(f"Tool outputs: {tool_outputs}")
        if tool_outputs:
            project_client.agents.submit_tool_outputs_to_run(
                thread_id=thread.id, run_id=run.id, tool_outputs=tool_outputs
            )

    print(f"Current run status: {run.status}")

print(f"Run completed with status: {run.status}")

In [None]:
from IPython.display import Markdown, display
import helper

messages = project_client.agents.list_messages(thread_id=thread.id)

display(Markdown(helper.get_conversation_md(messages)))

In [None]:
import os
from pathlib import Path
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.ai.projects.models import CodeInterpreterTool


project_client = AIProjectClient.from_connection_string(
    credential=DefaultAzureCredential(), conn_str=os.environ["AIPROJECT_CONNECTION_STRING"]
)


def create_and_run_agent(prompt: str):
    with project_client:
        # Create an instance of the CodeInterpreterTool
        code_interpreter = CodeInterpreterTool()

        # The CodeInterpreterTool needs to be included in creation of the agent
        agent = project_client.agents.create_agent(
            model=os.environ["CHAT_MODEL"],
            name="my-agent",
            instructions="You are helpful agent",
            tools=code_interpreter.definitions,
            tool_resources=code_interpreter.resources,
        )
        print(f"Created agent, agent ID: {agent.id}")

        # Create a thread
        thread = project_client.agents.create_thread()
        print(f"Created thread, thread ID: {thread.id}")

        # Create a message
        message = project_client.agents.create_message(
            thread_id=thread.id,
            role="user",
            content=prompt,
        )
        print(f"Created message, message ID: {message.id}")

        # Run the agent
        run = project_client.agents.create_and_process_run(
            thread_id=thread.id, assistant_id=agent.id)
        print(f"Run finished with status: {run.status}")

        if run.status == "failed":
            # Check if you got "Rate limit is exceeded.", then you want to get more quota
            print(f"Run failed: {run.last_error}")

        # Get messages from the thread
        messages = project_client.agents.list_messages(thread_id=thread.id)
        print(f"Messages: {messages}")

        # Get the last message from the sender
        last_msg = messages.get_last_text_message_by_sender("assistant")
        if last_msg:
            print(f"Last Message: {last_msg.text.value}")

        # Generate an image file for the bar chart
        for image_content in messages.image_contents:
            print(f"Image File ID: {image_content.image_file.file_id}")
            file_name = f"{image_content.image_file.file_id}_image_file.png"
            project_client.agents.save_file(
                file_id=image_content.image_file.file_id, file_name=file_name)
            print(f"Saved image file to: {Path.cwd() / file_name}")

        # Print the file path(s) from the messages
        for file_path_annotation in messages.file_path_annotations:
            print(f"File Paths:")
            print(f"Type: {file_path_annotation.type}")
            print(f"Text: {file_path_annotation.text}")
            print(f"File ID: {file_path_annotation.file_path.file_id}")
            print(f"Start Index: {file_path_annotation.start_index}")
            print(f"End Index: {file_path_annotation.end_index}")
            project_client.agents.save_file(
                file_id=file_path_annotation.file_path.file_id, file_name=Path(file_path_annotation.text).name)

        # Delete the agent once done
        project_client.agents.delete_agent(agent.id)
        print("Deleted agent")

In [None]:
prompt = "Could you please create a bar chart for the operating profit using the following data and provide the file to me? Company A: $1.2 million, Company B: $2.5 million, Company C: $3.0 million, Company D: $1.8 million"
create_and_run_agent(prompt)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from math import pi

# Example data for a cybersecurity report
categories = ['Vulnerability Score', 'Detection Rate', 'Response Time', 'Threat Intelligence', 'System Uptime']
values = [8, 7, 6, 9, 8]  # Example values (scale of 1-10)

# Number of variables
num_vars = len(categories)

# Compute the angle for each category
angles = [n / float(num_vars) * 2 * pi for n in range(num_vars)]
angles += angles[:1]  # Close the radar chart

# Close the loop for the data as well
values += values[:1]

# Create the figure
plt.figure(figsize=(8, 8))
ax = plt.subplot(111, polar=True)

# Draw one axe per variable and add labels
plt.xticks(angles[:-1], categories, color='grey', size=10)

# Draw y-labels (range of values)
ax.set_rlabel_position(30)
plt.yticks([2, 4, 6, 8], ["2", "4", "6", "8"], color="grey", size=7)
plt.ylim(0, 10)

# Plot data
ax.plot(angles, values, linewidth=2, linestyle='solid', label="Current Metrics")

# Fill area
ax.fill(angles, values, color='blue', alpha=0.4)

# Add a title
plt.title('Cybersecurity Metrics Radar Chart', size=15, color='black', y=1.1)

# Add legend
plt.legend(loc='upper right', bbox_to_anchor=(0.1, 0.1))

# Save the figure locally
plt.savefig('./figures/cybersecurity_radar_chart.png', dpi=300, bbox_inches='tight')  # Save as a PNG file


# Show the radar chart
plt.show()

In [4]:
import markdown
from xhtml2pdf import pisa

# Define your markdown content
markdown_content = """
# Sample Markdown

This is a sample markdown content.
![Intrusion Attempts Forecast](./figures/Intrusion_Attempts_forecast_plot.png)

## List

- Item 1
- Item 2
- Item 3

## Code

```python
print("Hello, World!")
```
"""

html_content = markdown.markdown(markdown_content)
with open("output.html", "w") as file: 
    file.write(html_content)

with open("output.pdf", "wb") as pdf_file:
    pisa_status = pisa.CreatePDF(html_content, dest=pdf_file)

In [None]:
pisa_status.err == 0