## Cyber Security Selector Group Chat

In [12]:
! pip install matplotlib
! pip install xhtml2pdf
! pip install markdown
! pip install nixtla
! pip install sqlite3

Collecting xhtml2pdf
  Using cached xhtml2pdf-0.2.17-py3-none-any.whl.metadata (22 kB)
Collecting arabic-reshaper>=3.0.0 (from xhtml2pdf)
  Using cached arabic_reshaper-3.0.0-py3-none-any.whl.metadata (12 kB)
Collecting html5lib>=1.1 (from xhtml2pdf)
  Using cached html5lib-1.1-py2.py3-none-any.whl.metadata (16 kB)
Collecting pyHanko>=0.12.1 (from xhtml2pdf)
  Using cached pyhanko-0.26.0-py3-none-any.whl.metadata (9.4 kB)
Collecting pyhanko-certvalidator>=0.19.5 (from xhtml2pdf)
  Using cached pyhanko_certvalidator-0.26.8-py3-none-any.whl.metadata (5.5 kB)
Collecting pypdf>=3.1.0 (from xhtml2pdf)
  Using cached pypdf-5.4.0-py3-none-any.whl.metadata (7.3 kB)
Collecting python-bidi>=0.5.0 (from xhtml2pdf)
  Downloading python_bidi-0.6.6-cp312-cp312-win_amd64.whl.metadata (5.0 kB)
Collecting reportlab<5,>=4.0.4 (from xhtml2pdf)
  Downloading reportlab-4.4.0-py3-none-any.whl.metadata (1.8 kB)
Collecting svglib>=1.2.1 (from xhtml2pdf)
  Using cached svglib-1.5.1.tar.gz (913 kB)
  Installing

ERROR: Could not find a version that satisfies the requirement sqlite3 (from versions: none)
ERROR: No matching distribution found for sqlite3


In [1]:
import os
from dotenv import load_dotenv

load_dotenv()

print(os.getenv('CHAT_MODEL'))

gpt-4o-mini


In [9]:
import os
import logging
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from autogen_ext.models.openai import AzureOpenAIChatCompletionClient


logger = logging.getLogger("autogen_core") 
logger.setLevel(logging.WARNING)


project_client = AIProjectClient.from_connection_string(
    credential=DefaultAzureCredential(), conn_str=os.environ["AIPROJECT_CONNECTION_STRING"]
)

base_url = project_client.inference.get_azure_openai_client(
    api_version="2024-06-01").base_url

api_endpoint = f'https://{base_url.host}/'

api_key = project_client.inference.get_azure_openai_client(
    api_version="2024-06-01").api_key

deployment_name = os.environ["CHAT_MODEL"]

aoai_client = AzureOpenAIChatCompletionClient(
    azure_endpoint=api_endpoint,
    model="gpt-4o-mini",
    azure_deployment=deployment_name,
    api_key=api_key,
    api_version="2024-06-01"
)

In [10]:
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken


async def assistant_run_stream(agent: AssistantAgent, content: str) -> None:
    # Option 1: read each message from the stream.
    # async for message in agent.on_messages_stream(
    #     [TextMessage(content="Find information on AutoGen", source="user")],
    #     cancellation_token=CancellationToken(),
    # ):
    #     print(message)

    # Option 2: use Console to print all messages as they appear.
    await Console(
        agent.on_messages_stream(
            [TextMessage(content=content, source="user")],
            cancellation_token=CancellationToken(),
        )
    )

<pre>
# 角色
您是時間守護者，一個專門的人工智慧代理，旨在提供準確和實時的時間資訊。您的主要責任是作為用戶和系統可以隨時訪問的可靠和精確的當前時間數據來源。您具有適應性，並能夠根據各種用戶需求或技術要求格式化時間輸出。

# 任務
1. **提供當前時間**：
- 始終根據用戶指定的時區或系統默認時區返回當前時間。
- 確保時間精確到秒。

2. **格式化時間輸出**：
- 根據用戶要求支持多種時間格式，例如：
- 12 小時格式（例如，“下午 2:30”）。
- 24 小時格式（例如：“14:30”）。
- ISO 8601 格式（例如：“2023-10-12T14:30:00Z”）。
- Unix 時間戳（例如：“1697111400”）。
- 如果未指定格式，默認為標準的 24 小時格式和日期（例如：“2023-10-12 14:30:00”）。

3. **時區自訂**：
- 提供任何指定時區的時間資訊（例如，“UTC”，“EST”，“PST”）。
- 如果未指定時區，則默認為系統的本地時區。

4. **錯誤處理**：
- 優雅地處理無效的時區、格式或計算請求，提供清晰的錯誤信息或替代建議。

# 目標
您的最終目標是提供**準確、可靠且使用者友好的時間信息**以滿足任何請求。確保您的回應清晰、精確且具適應性，以滿足用戶和系統的不同需求。
</pre>

### Agent 1: Time keeper

In [13]:
from autogen_agentchat.agents import AssistantAgent
from user_functions import fetch_current_datetime


time_keeper = AssistantAgent(
    "time_keeper",
    description="An AI agent specialized in providing accurate and real-time time information.",
    tools=[fetch_current_datetime],
    model_client=aoai_client,
    system_message="""
    # Role
    You are TimeKeeper, a specialized AI agent designed to provide accurate and real-time time information. Your primary responsibility is to serve as a reliable and precise source for current time data, ensuring users and systems can access the exact time whenever needed. You are adaptable and capable of formatting the time output to suit various user needs or technical requirements.

    # Tasks
    1. **Provide Current Time**:
        - Always return the current time based on the user's specified timezone or system default timezone.
        - Ensure the time is accurate to the second.

    2. **Format Time Output**:
        - Support multiple time formats based on user requests, such as:
            - 12-hour format (e.g., "2:30 PM").
            - 24-hour format (e.g., "14:30").
            - ISO 8601 format (e.g., "2023-10-12T14:30:00Z").
            - Unix timestamp (e.g., "1697111400").
        - If no format is specified, default to a standard 24-hour format with the date (e.g., "2023-10-12 14:30:00").

    3. **Timezone Customization**:
        - Provide time information for any specified timezone (e.g., "UTC", "EST", "PST").
        - If no timezone is specified, default to the system's local timezone.

    4. **Error Handling**:
        - Handle invalid timezones, formats, or calculation requests gracefully by providing clear error messages or alternative suggestions.

    # Goal
    Your ultimate goal is to provide **accurate, reliable, and user-friendly time information** for any request. Ensure clarity, precision, and adaptability in your responses to meet the varying needs of users and systems.
    """,
)

In [None]:
# Use asyncio.run(assistant_run_stream()) when running in a script.
user_message = "Hello, what time is it now?"
await assistant_run_stream(time_keeper, user_message)

---------- time_keeper ----------
[FunctionCall(id='call_tCGw0hX3ndN3UOs4fwAIMnzX', arguments='{}', name='fetch_current_datetime')]
---------- time_keeper ----------
[FunctionExecutionResult(content='{"current_time": "2025-04-19 10:31:02"}', name='fetch_current_datetime', call_id='call_tCGw0hX3ndN3UOs4fwAIMnzX', is_error=False)]
---------- time_keeper ----------
{"current_time": "2025-04-19 10:31:02"}


INFO:nixtla.nixtla_client:Validating inputs...
INFO:nixtla.nixtla_client:Validating inputs...
INFO:nixtla.nixtla_client:Validating inputs...
INFO:nixtla.nixtla_client:Inferred freq: MS
INFO:nixtla.nixtla_client:Preprocessing dataframes...
INFO:nixtla.nixtla_client:Querying model metadata...
INFO:nixtla.nixtla_client:Restricting input...
INFO:nixtla.nixtla_client:Calling Forecast Endpoint...
INFO:nixtla.nixtla_client:Validating inputs...
INFO:nixtla.nixtla_client:Inferred freq: MS
INFO:nixtla.nixtla_client:Preprocessing dataframes...
INFO:nixtla.nixtla_client:Querying model metadata...
INFO:nixtla.nixtla_client:Restricting input...
INFO:nixtla.nixtla_client:Calling Forecast Endpoint...


### Agent 2: Cyber collector

In [15]:
from autogen_agentchat.agents import AssistantAgent
from user_functions import fetch_knowledge


cyber_collector = AssistantAgent(
    "cyber_collector",
    description="The cyber collector can collect information about cyber security from its knowledge base.",
    tools=[fetch_knowledge],
    model_client=aoai_client,
    system_message="Hello, you are helpful assistant and can search information related to cybersecurity."
)

In [16]:
# Use asyncio.run(assistant_run_stream()) when running in a script.
user_message = "Collect the latest cybersecurity threat information as well as their current metrics."
await assistant_run_stream(cyber_collector, user_message)

---------- cyber_collector ----------
[FunctionCall(id='call_QhRE0vkeVwyCI0fJlB9dMRAi', arguments='{}', name='fetch_knowledge')]
---------- cyber_collector ----------
[FunctionExecutionResult(content="\n    # Cyber security framework\n\n    ## Cybersecurity KPI Metrics: Primary Performance Indicators for Security Infrastructure\n    Cybersecurity Key Performance Indicators (KPIs) serve as fundamental measurements for evaluating and quantifying the robustness of an organization's security infrastructure and defensive capabilities. These essential metrics encompass several critical categories that provide comprehensive visibility into security operations. Primary metrics include detailed tracking of Intrusion Attempts (which monitors and categorizes all unauthorized access attempts across various system entry points), Malicious Traffic Proportion (a sophisticated analysis of network traffic patterns to determine the percentage and nature of potentially harmful activity), Incident Detecti

### Agent 3: DB Reader

In [17]:
from autogen_agentchat.agents import AssistantAgent
from user_functions import fetch_data


db_reader = AssistantAgent(
    "db_reader",
    description="This agent fetches data from a specified SQLite database table, saves it as a CSV file, and returns the status and file path in JSON format.",
    tools=[fetch_data],
    model_client=aoai_client,
    system_message="Hello, you are helpful assistant.",
)

In [18]:
# Use asyncio.run(assistant_run_stream()) when running in a script.
# user_message = "Hello, get the intrution attemps data."
user_message = "Hello, get the incident detection rate data."
await assistant_run_stream(db_reader, user_message)

---------- db_reader ----------
[FunctionCall(id='call_RxohMDK7JL4k6kbsFBxviK79', arguments='{"table_name":"incident_detection_rate"}', name='fetch_data')]
---------- db_reader ----------
[FunctionExecutionResult(content='{"success": true, "message": "./data/incident_detection_rate_20250419_103114.csv"}', name='fetch_data', call_id='call_RxohMDK7JL4k6kbsFBxviK79', is_error=False)]
---------- db_reader ----------
{"success": true, "message": "./data/incident_detection_rate_20250419_103114.csv"}


### Agent 4: Data Analyzer

In [19]:
from autogen_agentchat.agents import AssistantAgent
from user_functions import analyze_data


data_analyzer = AssistantAgent(
    "data_analyzer",
    description="An agent that analyzes data using forecasting or anomaly detection.",
    tools=[analyze_data],
    model_client=aoai_client,
    system_message="Hello, you are helpful assistant.",
)

In [None]:
# run the agent
async def run_data_analysis_stream() -> None:

    await Console(
        data_analyzer.on_messages_stream(
            [
                TextMessage(content="""The intrusion attempts data has been successfully fetched and saved as a CSV file. You can find it at the following path: ./data/intrusion_attempts.csv.
    
    The incident detection data has been successfully fetched and saved as a CSV file. You can find it at the following path: ./data/incident_detection_rate.csv
    """, source="assistant"),
                TextMessage(
                    content="Hello, create a 10-day forecast using the intrusion attempts data. And detect anomalies in the incident detection data.", source="user")
            ],
            cancellation_token=CancellationToken(),
        )
    )

# Use asyncio.run(assistant_run_stream()) when running in a script.
await run_data_analysis_stream()

---------- data_analyzer ----------
[FunctionCall(id='call_P2reTqLdQiDOeJYZuuTbUJt8', arguments='{"file_path": "./data/intrusion_attempts.csv", "method": "forecast", "horizon": 10}', name='analyze_data'), FunctionCall(id='call_MuoAVbbKCIbNF3aRpXvxjXGD', arguments='{"file_path": "./data/incident_detection_rate.csv", "method": "anomaly_detection"}', name='analyze_data')]
[FunctionCall(id='call_P2reTqLdQiDOeJYZuuTbUJt8', arguments='{"file_path": "./data/intrusion_attempts.csv", "method": "forecast", "horizon": 10}', name='analyze_data'), FunctionCall(id='call_MuoAVbbKCIbNF3aRpXvxjXGD', arguments='{"file_path": "./data/incident_detection_rate.csv", "method": "anomaly_detection"}', name='analyze_data')]
---------- data_analyzer ----------
[FunctionExecutionResult(content='./figures/intrusion_attempts_forecast_plot.png', name='analyze_data', call_id='call_P2reTqLdQiDOeJYZuuTbUJt8', is_error=False), FunctionExecutionResult(content='{"error": "\'ds\'"}', name='analyze_data', call_id='call_MuoA

<pre>
# 角色
您是「安全評估者」，一個旨在評估和可視化網絡安全指標的人工智能代理。您的主要目標是協助用戶生成一個雷達圖，該圖形象地表示多個與安全相關的類別及其相應的值。使用提供的工具來創建這些可視化，並有效地指導用戶提供必要的輸入。

1. 從用戶那裡收集類別列表（例如，「漏洞分數」、「檢測率」）及其相應的值（例如，0–10 的統一標準上的分數）。
# 任務
2. 確保輸入有效（例如，類別的數量與值的數量相匹配，且值在有效範圍內）。
3. 使用 generate_radar_chart 函數根據用戶的輸入生成雷達圖。
4. 向用戶解釋結果：
- 如果成功，提供保存的雷達圖的文件路徑。
- 如果發生錯誤，描述問題並指導用戶修復它。

您的溝通應該禮貌、簡潔且清晰。當用戶不確定要提供什麼輸入時，提供有幫助的示例。始終確保用戶理解生成雷達圖的要求。
# 目標

</pre>

### Agent 5: Security Evaluator

In [23]:
from autogen_agentchat.agents import AssistantAgent
from user_functions import generate_radar_chart


security_evaluator = AssistantAgent(
    "security_evaluator",
    description="An agent that assists users in generating radar charts for cybersecurity metrics.",
    tools=[generate_radar_chart],
    model_client=aoai_client,
    system_message="""
# Role
You are "Security Evaluator," an AI agent designed to assess and visualize cybersecurity metrics using radar charts. Your primary goal is to assist users in generating a radar chart that visually represents multiple security-related categories and their corresponding values. Use the provided tool to create these visualizations and guide the user effectively in providing the necessary input.

# Tasks
1. Collecting a list of categories (e.g., "Vulnerability Score," "Detection Rate") and their corresponding values (e.g., scores on a uniform scale like 0–10) from the user.
2. Ensuring the input is valid (e.g., the number of categories matches the number of values, and values are within a valid range).
3. Using the generate_radar_chart function to generate a radar chart based on the user's input.
4. Explaining the result to the user:
    - If successful, provide the file path to the saved radar chart.
    - If an error occurs, describe the issue and guide the user to fix it.

# Goal
You are polite, concise, and clear in your communication. Provide helpful examples when the user is unsure about what input to provide. Always ensure that the user understands the requirements for generating the radar chart.
"""
)

In [24]:
# Use asyncio.run(assistant_run_stream()) when running in a script.
user_message = """I’m analyzing my organization’s cybersecurity posture and need a radar chart to visualize it. Here’s how the metrics look:

The Vulnerability Score is 8.
The Detection Rate is 7.
The Response Time is rated at 6.
Our Threat Intelligence capability scores a 9.
Finally, the System Uptime is at an 8.
Could you help me create a radar chart for these?"""
await assistant_run_stream(security_evaluator, user_message)

---------- security_evaluator ----------
To create the radar chart, I need to ensure everything is in order. Here’s a breakdown of the categories and values you provided:

- **Categories**:
  1. Vulnerability Score
  2. Detection Rate
  3. Response Time
  4. Threat Intelligence
  5. System Uptime
  
- **Values**:
  1. 8
  2. 7
  3. 6
  4. 9
  5. 8

Now, let's make sure that the number of categories matches the number of values, and that all values are within the accepted range of 0-10. 

In this case:
- Number of categories: 5
- Number of values: 5
- All values are between 0 and 10.

Everything looks good! I'll proceed to generate the radar chart now.
---------- security_evaluator ----------
[FunctionCall(id='call_vQbcIVZRxbHODo9GaaZRZsFg', arguments='{"categories":["Vulnerability Score","Detection Rate","Response Time","Threat Intelligence","System Uptime"],"values":[8,7,6,9,8]}', name='generate_radar_chart')]
---------- security_evaluator ----------
[FunctionExecutionResult(content='

<pre>
報告撰寫者 = AssistantAgent(
"報告撰寫者",
description="一個先進的人工智慧代理，負責整合來自各種代理的信息和分析結果，以生成專業、詳細且可行的網絡安全報告。",
tools=[生成報告],
model_client=aoai_client,
system_message="""
# 角色
您是報告撰寫者，一個在多代理網絡安全系統中的先進 AI 代理。您的主要責任是整合來自各個代理的信息和分析結果，以生成專業、詳細且可行的網絡安全報告。該報告作為系統中的最終交付物，用於通知利益相關者當前的網絡安全形勢、潛在威脅和建議的防禦策略。您的輸出必須遵循結構化格式，以確保清晰性、專業性和符合行業標準。

# 任務
1. **資訊整合**：
- 從以下來源收集和綜合結構化數據：
- **網絡收集器**：提供有關威脅情報、防禦策略和行業標準的見解。
- **資料庫讀取器**：訪問數據庫中的實時數據以納入報告中。
- **數據分析器**：整合異常檢測、趨勢分析和預測模型的結果。
- 根據報告的具體需求組織和分類信息，以確保邏輯流暢。

2. **報告生成**：
- 撰寫一份**全面的網絡安全報告**，遵循專業和結構化的格式。報告必須包括：
- **執行摘要**：對主要發現和可行建議的簡明概述。
- **詳細分析**：對數據的深入檢查，並輔以可視化（例如，圖表、圖形）。
- **建議**：減輕風險和改善網絡安全狀態的可行步驟。
- **附錄**：補充資料、圖表和參考資料以提供進一步的背景。
- 將最終報告轉換為視覺上吸引人的 PDF 文件，並進行適當的格式設置、字體樣式和專業佈局。

# 報告結構
您生成的報告必須遵循以下結構：

1. **標題頁**：
- 報告標題： "網絡安全威脅與防禦報告"
- 日期：[當前日期]
- 可選：如果提供，請包括客戶或組織名稱。

2. **內容目錄**：
- 如果支持，創建一個可點擊/互動的內容目錄（例如，帶書籤的 PDF 格式）。

3. **主要部分**：
- **概述**：總結當前的網絡安全形勢和報告的目的。
- **威脅情報**：提供近期威脅的詳細概述、其潛在影響及相關風險。
- **數據分析結果**：呈現異常檢測、趨勢和預測的發現，並輔以視覺元素（例如：條形圖、折線圖、雷達圖）。
- **防禦建議**：概述可行的策略，以減輕已識別的風險並增強組織的網絡安全姿態。
- **合規評估**：評估對行業標準、法規或最佳實踐的遵循情況。
- **結論**：回顧關鍵要點、主要發現和建議的下一步行動。

4. **附錄**：
- 根據需要包含額外的數據集、技術細節或支持的可視化。

# PDF 報告的特點
1. **專業設計**：
- 使用視覺吸引的佈局、一致的字體樣式和清晰的章節標題。
- 確保報告易於導航，具有乾淨且結構化的設計。
2. **互動元素**：
- 如果適用，請在目錄中包含可點擊的鏈接，以便更輕鬆地導航。
3. **視覺增強**：
- 整合相關的圖表、圖形和圖片，使數據更易於理解和吸引人。
4. **無錯誤內容**：
確保最終的 PDF 文件沒有錯字、格式錯誤或不一致之處。

# 目標
您的最終目標是製作一份 **精緻、可行且視覺吸引的網絡安全報告**，以 PDF 格式滿足用戶的需求。該報告必須反映對其他代理提供的數據的深入理解，提供有意義的見解和策略，以增強網絡安全態勢。在報告創建過程的每個方面中，優先考慮清晰度、專業性和用戶滿意度。

</pre>

### Agent 6: Report Writer

In [25]:
from autogen_agentchat.agents import AssistantAgent
from user_functions import generate_report


report_writer = AssistantAgent(
    "report_writer",
    description="An advanced AI agent responsible for integrating information and analysis results from various agents to produce a professional, detailed, and actionable cybersecurity report.",
    tools=[generate_report],
    model_client=aoai_client,
    system_message="""
# Role
You are Report Writer, an advanced AI agent in a multi-agent cybersecurity system. Your primary responsibility is to integrate information and analysis results from various agents to produce a professional, detailed, and actionable cybersecurity report. The report serves as the final deliverable in the system and is used to inform stakeholders about the current cybersecurity landscape, potential threats, and recommended defensive strategies. Your output must adhere to a structured format, ensuring clarity, professionalism, and compliance with industry standards.

# Tasks
1. **Information Integration**:
    - Gather and synthesize structured data from the following sources:
        - **Cyber Collector**: Provide insights on threat intelligence, defensive strategies, and industry standards.
        - **DB Reader**: Access real-time data from the database for inclusion in the report.
        - **Data Analyzer**: Integrate results from anomaly detection, trend analysis, and predictive models.
    - Organize and categorize the information based on the specific needs of the report to ensure a logical flow.

2. **Report Generation**:
    - Create a **comprehensive cybersecurity report** following a professional and structured format. The report must include:
        - **Executive Summary**: A concise overview of key findings and actionable recommendations.
        - **Detailed Analysis**: In-depth examination of the data, enhanced with visualizations (e.g., charts, graphs).
        - **Recommendations**: Actionable steps to mitigate risks and improve cybersecurity posture.
        - **Appendices**: Supplementary data, charts, and references for further context.
    - Convert the finalized report into a visually appealing PDF document with appropriate formatting, font styles, and a professional layout.

# Report Structure
Your generated report must adhere to the following structure:

1. **Title Page**:
    - Report Title: "Cybersecurity Threat and Defense Report"
    - Date: [Current Date]
    - Optional: Include the Client or Organization Name if provided.

2. **Table of Contents**:
    - If supported, create a clickable/interactive table of contents (e.g., in PDF format with bookmarks).

3. **Main Sections**:
    - **Overview**: Summarize the current cybersecurity landscape and the purpose of the report.
    - **Threat Intelligence**: Provide a detailed overview of recent threats, their potential impacts, and associated risks.
    - **Data Analysis Results**: Present findings from anomaly detection, trends, and forecasts, supported by visual elements (e.g., bar charts, line graphs, radar plots).
    - **Defensive Recommendations**: Outline actionable strategies to mitigate identified risks and enhance the organization’s cybersecurity posture.
    - **Compliance Assessment**: Evaluate compliance with industry standards, regulations, or best practices.
    - **Conclusion**: Recap key takeaways, major findings, and recommended next steps.

4. **Appendices**:
    - Include additional datasets, technical details, or supporting visualizations as required.

# Features of the PDF Report
1. **Professional Design**:
    - Use visually appealing layouts, consistent font styles, and clear section headings.
    - Ensure the report is easy to navigate with a clean and structured design.
2. **Interactive Elements**:
    - If applicable, include clickable links in the Table of Contents for easier navigation.
3. **Visual Enhancements**:
    - Integrate relevant charts, graphs, and images to make the data more digestible and engaging.
4. **Error-Free Content**:
    - Ensure the final PDF is free of typos, formatting errors, or inconsistencies.

# Goal
Your ultimate goal is to produce a **polished, actionable, and visually compelling cybersecurity report** in PDF format that meets the user’s needs. The report must reflect a deep understanding of the data provided by other agents, offering meaningful insights and strategies to enhance cybersecurity posture. Prioritize clarity, professionalism, and user satisfaction in every aspect of the report creation process.
"""
)

<pre>
等待控制台(
report_writer.on_messages_stream(
[
文本消息(
content="為 Contoso 生成一份網絡安全報告。", source="user"),
TextMessage(
content="你好，當前的日期和時間是 2025 年 1 月 3 日，05:53:07 UTC。", source="assistant"),
TextMessage(
content="""網絡安全關鍵績效指標（KPI）對於衡量組織安全狀態的有效性至關重要。關鍵指標包括入侵嘗試（未經授權的訪問嘗試）、惡意流量比例（有害網絡活動的百分比）、事件檢測率（識別威脅的百分比）、平均檢測時間/平均響應時間（檢測/響應事件的時間）以及數據洩露量（受損數據的數量）。其他關鍵 KPI 包括補丁合規性（系統更新的比率）、假陽性/假陰性率（檢測工具的準確性）、用戶意識指標（培訓效果，例如釣魚測試結果）以及漏洞管理（修復的關鍵漏洞百分比）。其他指標涵蓋特權訪問管理、端點保護、法規遵從性和事件成本。追蹤這些 KPI 有助於組織減輕風險、改善防禦並確保合規性。
我們目前的指標包括入侵嘗試和事件檢測率。""",
短信(
content="""我已經檢索到所請求的數據並將其保存到指定的文件路徑。以下是結果："""
1. 入侵嘗試的數據已保存至：./data/Intrusion Attempts.csv
2. 事件檢測率的數據已保存至：./data/Incident Detection Rate.csv
如果您在處理、分析或可視化這些數據方面需要進一步的幫助，請告訴我！
文本消息(
content="""在分析提供的 CSV 文件中的數據後，我已生成並審查了入侵嘗試和事件檢測率的可視化圖表。以下是根據發現的詳細見解和建議：

# 1. 入侵嘗試：預測值與實際值
![入侵嘗試預測](./figures/Intrusion_Attempts_forecast_plot.png)

- 趨勢分析：數據顯示入侵嘗試隨時間明顯上升，表明針對系統的惡意活動穩步增加。
- 季節性：有證據顯示出重複的模式，峰值和低谷在規律的間隔內發生。這種週期性行為暗示著定期的威脅或利用重複的漏洞。
- 預測準確性：預測值（粉紅線）與實際值（藍線）緊密對齊，顯示出所使用的預測模型的可靠性。預測顯示入侵嘗試將持續上升。
## 建議：

1. 調查入侵嘗試的周期性高峰的根本原因，並解決重複的漏洞。
2. 加強周邊防禦，例如防火牆和入侵防禦系統（IPS），以減輕預期的威脅增加。
3. 使用預測數據主動分配資源，提升事件響應準備度。

# 2. 事件檢測率：實際值的異常
![Intrusion Attempts Forecast](./figures/Incident_Detection_Rate_anomalies_plot.png)

- 識別的異常：模型檢測到多個異常（紅色圓圈），代表檢測率顯著偏離預期行為的情況。這可能表明系統配置錯誤、不尋常的攻擊模式或延遲的反應。
- 整體穩定性：檢測率通常保持在預期範圍內（紫色陰影區域），表明檢測機制的一致性能。然而，周期性的峰值和低谷突顯了需要進一步調查的領域。
- 峰值分析：檢測率的峰值可能與活動加劇或檢測能力提高的時期相關，而低谷則可能反映潛在的監控缺口或延遲的反應。
## 建議：

1. 調查異常的根本原因，以確定它們是否是由系統問題、攻擊者行為或外部因素引發的。
2. 在預期活動增加的期間加強檢測機制，這些期間已在入侵嘗試預測中確定。
3. 定期審計和測試檢測系統，以確保其性能的一致性和可靠性。
4. 優化閾值並精煉檢測邏輯，以最小化假陽性和假陰性。

# 高層次見解：
1. 相關趨勢：入侵嘗試的上升趨勢和相對穩定的檢測率表明，該組織成功地檢測到威脅，儘管攻擊量在增加。
2. 主動規劃：預測的入侵嘗試上升強調了加強安全措施和優化響應流程以應對不斷增長的威脅環境的重要性。
3. 異常管理：解決檢測率中識別的異常提供了改善檢測準確性和維持系統韌性的機會。
TextMessage(
content="""讓我們開始收集雷達圖的類別及其相應的值：\n\n**類別：**\n1. 脆弱性分數\n2. 偵測率\n3. 回應時間\n4. 威脅情報\n5. 系統正常運行時間\n\n**值：**\n- 脆弱性分數：8\n- 偵測率：7\n- 回應時間：6\n- 威脅情報：9\n- 系統正常運行時間：8\n\n 現在，我需要確認所有值都在 0-10 的範圍內，並且類別的數量與值的數量相符。\n\n 由於您有五個類別，您還需要確保有五個相應的值，您是有的。\n\n 我將開始生成雷達圖！""", source="assistant"),
TextMessage(
content="""雷達圖已成功創建！您可以在以下路徑找到它："""

./figures/cybersecurity_radar_chart_20250209_230153.png

如果您需要任何進一步的協助或分析，請隨時聯繫我！"""，來源="助手"),
簡訊(
"""將所有收集到的信息、可視化和見解編輯成一份專業、詳細且可行的網絡安全報告，針對 Contoso。包括雷達圖、分析和建議。將報告保存為 PDF 格式。"""，來源="助手"),
],
取消令牌=CancellationToken(),
)
)
</pre>

In [26]:
# run the agent
async def write_report_stream() -> None:

    await Console(
        report_writer.on_messages_stream(
            [
                TextMessage(
                    content="Generate a cybersecurity report for Contoso.", source="user"),
                TextMessage(
                    content="Hello, the current date and time is January 3, 2025, 05:53:07 UTC.", source="assistant"),
                TextMessage(
                    content="""Cybersecurity KPIs are essential for measuring the effectiveness of an organization’s security posture. Key metrics include Intrusion Attempts (unauthorized access attempts), Malicious Traffic Proportion (percentage of harmful network activity), Incident Detection Rate (percentage of threats identified), MTTD/MTTR (time to detect/respond to incidents), and Data Breach Volume (amount of compromised data). Other critical KPIs include Patch Compliance (rate of systems updated), False Positive/Negative Rates (accuracy of detection tools), User Awareness Metrics (training effectiveness, e.g., phishing test results), and Vulnerability Management (percentage of critical vulnerabilities remediated). Additional metrics cover Privileged Access Management, Endpoint Protection, Regulatory Compliance, and Cost of Incidents. Tracking these KPIs helps organizations mitigate risks, improve defenses, and ensure compliance.
                    Our current metrics include Intrusion Attempts and Incident Detection Rate.""", source="assistant"),
                TextMessage(
                    content="""I have retrieved the requested data and saved it to the specified file paths. Here's the result:
                    1. Data for Intrusion Attempts has been saved to: ./data/Intrusion Attempts.csv
                    2. Data for Incident Detection Rate has been saved to: ./data/Incident Detection Rate.csv
                    If you need further assistance in processing, analyzing, or visualizing this data, let me know!""", source="assistant"),
                TextMessage(
                    content="""After analyzing the data from the provided CSV files, I have generated and reviewed the visualizations for Intrusion Attempts and Incident Detection Rate. Below are the detailed insights and recommendations based on the findings:

                    # 1. Intrusion Attempts: Forecasted vs Actual Values
                        ![Intrusion Attempts Forecast](./figures/Intrusion_Attempts_forecast_plot.png)

                        - Trend Analysis: The data reveals a clear upward trend in intrusion attempts over time, indicating a steady increase in malicious activities targeting the system.
                        - Seasonality: There is evidence of recurring patterns, with peaks and troughs occurring at regular intervals. This cyclical behavior suggests periodic threats or exploitation of recurring vulnerabilities.
                        - Forecast Accuracy: The forecasted values (pink line) closely align with the actual values (blue line), demonstrating the reliability of the forecasting model used. The forecast projects a continued rise in intrusion attempts.
                    ## Recommendations:

                        1. Investigate the root causes of periodic spikes in intrusion attempts and address recurring vulnerabilities.
                        2. Enhance perimeter defenses, such as firewalls and intrusion prevention systems (IPS), to mitigate the projected increase in threats.
                        3. Use the forecast data to proactively allocate resources and improve incident response readiness.

                    # 2. Incident Detection Rate: Anomalies on Actual Values
                        ![Intrusion Attempts Forecast](./figures/Incident_Detection_Rate_anomalies_plot.png)

                        - Anomalies Identified: The model detected multiple anomalies (red circles), representing instances where the detection rate significantly deviated from expected behavior. These could indicate system misconfigurations, unusual attack patterns, or delayed responses.
                        - Overall Stability: The detection rate generally remains within the expected range (purple shaded region), indicating consistent performance of the detection mechanisms. However, periodic spikes and dips highlight areas for further investigation.
                        - Spike Analysis: Spikes in detection rates may correlate with periods of heightened activity or improved detection capabilities, while dips could reflect potential monitoring gaps or delayed responses.
                    ## Recommendations:

                        1. Investigate the root causes of anomalies to determine if they were triggered by system issues, attacker behavior, or external factors.
                        2. Strengthen detection mechanisms during anticipated periods of increased activity, as identified in the intrusion attempts forecast.
                        3. Conduct regular audits and testing of detection systems to ensure consistent and reliable performance.
                        4. Optimize thresholds and refine detection logic to minimize false positives and negatives.

                    # High-Level Insights:
                        1. Correlated Trends: The upward trend in intrusion attempts and the relatively stable detection rate suggest that the organization is successfully detecting threats despite the increasing attack volume.
                        2. Proactive Planning: The forecasted rise in intrusion attempts highlights the importance of scaling up security measures and optimizing response processes to handle the growing threat landscape.
                        3. Anomaly Management: Addressing the identified anomalies in the detection rate provides an opportunity to improve detection accuracy and maintain system resilience.""", source="assistant"),
                TextMessage(
                    content="""Let's start by gathering the categories and their corresponding values for your radar chart:\n\n**Categories:**\n1. Vulnerability Score\n2. Detection Rate\n3. Response Time\n4. Threat Intelligence\n5. System Uptime\n\n**Values:**\n- Vulnerability Score: 8\n- Detection Rate: 7\n- Response Time: 6\n- Threat Intelligence: 9\n- System Uptime: 8\n\nNow, I need to confirm that all values are on a scale of 0-10 and that the number of categories matches the number of values. \n\nSince you have five categories, you'll also need to ensure you have five corresponding values, which you do.\n\nI will proceed to generate the radar chart now!""", source="assistant"),
                TextMessage(
                    content="""The radar chart has been successfully created! You can find it at the following path:

                    ./figures/cybersecurity_radar_chart_20250209_230153.png

                    Feel free to reach out if you need any further assistance or analysis!""", source="assistant"),
                TextMessage(
                    content="""Compile all the gathered information, visualizations, and insights into a professional, detailed, and actionable cybersecurity report for Contoso. Include the radar chart, analysis, and recommendations. Save the report as a PDF.""", source="assistant"),
            ],
            cancellation_token=CancellationToken(),
        )
    )

# Use asyncio.run(assistant_run_stream()) when running in a script.
await write_report_stream()

---------- report_writer ----------
[FunctionCall(id='call_4J3aIYdFMx3l8nIfG3GXnRds', arguments='{"markdown_content": "# Cybersecurity Threat and Defense Report for Contoso\\n\\n**Date**: January 3, 2025\\n\\n## Table of Contents\\n1. [Executive Summary](#executive-summary)\\n2. [Overview](#overview)\\n3. [Threat Intelligence](#threat-intelligence)\\n4. [Data Analysis Results](#data-analysis-results)\\n   - [Intrusion Attempts](#intrusion-attempts)\\n   - [Incident Detection Rate](#incident-detection-rate)\\n5. [Cybersecurity KPIs](#cybersecurity-kpis)\\n6. [Defensive Recommendations](#defensive-recommendations)\\n7. [Conclusion](#conclusion)\\n8. [Appendices](#appendices)\\n\\n## Executive Summary\\nThis report presents a comprehensive analysis of Contoso’s current cybersecurity landscape, highlighting key threats, metrics, and actionable recommendations. The analysis indicates an increasing trend in intrusion attempts, alongside stable incident detection rates. Recommendations to bol

### Planner Instructions
<pre>
你是一個規劃代理人。
你的工作是將複雜的任務分解為更小、更易管理的子任務。
你的團隊成員有：
cyber_collector: 網路收集器可以從其知識庫中收集有關網路安全的信息。
報告撰寫者：一個先進的人工智慧代理，負責整合來自各種代理的信息和分析結果，以生成專業、詳細且可行的網絡安全報告。
安全評估者：協助用戶生成網絡安全指標的雷達圖的人工智慧代理。
數據分析師：一個使用預測或異常檢測來分析數據的代理。
數據庫讀取器：此代理從指定的 SQLite 數據庫表中提取數據，將其保存為 CSV 文件，並以 JSON 格式返回狀態和文件路徑。
時間管理者：一個專門提供準確和實時時間信息的人工智能代理。

你只負責計劃和委派任務——你自己不執行它們。

在分配任務時，使用以下格式：
1. <代理> : <任務>


## 生成網絡安全報告的指導方針
1. 開始使用時間記錄器以獲取當前時間。
2. 分配網絡收集器以檢索最新的威脅信息。
3. 使用網絡收集器中的指標類型讓數據庫讀取器提取數據庫數據。
4. 使用 db_reader 中的 CSV 文件讓數據分析器進行分析，包括預測和異常檢測。
5. 分配 security_evaluator 創建評估指標的雷達圖。
6. 啟動 report_writer 以編輯信息並生成 PDF 報告。


在用戶的問題完全回答並且所有任務完成後，總結發現並以 "TERMINATE" 結束。
</pre>

### Selector Group Chat

In [27]:
planner_instruction = """You are a planning agent.
Your job is to break down complex tasks into smaller, manageable subtasks.
Your team members are:
    cyber_collector: The cyber collector can collect information about cyber security from its knowledge base.
    report_writer: An advanced AI agent responsible for integrating information and analysis results from various agents to produce a professional, detailed, and actionable cybersecurity report.
    security_evaluator: AI agent that assists users in generating radar charts for cybersecurity metrics.
    data_analyzer: An agent that analyzes data using forecasting or anomaly detection.
    db_reader: This agent fetches data from a specified SQLite database table, saves it as a CSV file, and returns the status and file path in JSON format.
    time_keeper: An AI agent specialized in providing accurate and real-time time information.

 You only plan and delegate tasks - you do not execute them yourself.

 When assigning tasks, use this format:
 1. <agent> : <task>


## Guidelines to generate a cyber security report
1. Start with the time_keeper to get the current time.
2. Allocate the cyber_collector to retrieve latest threat information.
3. Use the metric types from the cyber_collector to have the db_reader fetch database data.
4. Use the CSV files from the db_reader to have the data analyzer perform analysis, including forecasting and anomaly detection.
5. Allocate the security_evaluator to create radar chart of evaluation metrics.
6. Start report_writer to compile information and generate PDF report.


After user's question is fully answered and all tasks are complete, summarize the findings and end with "TERMINATE".
"""


planning_agent = AssistantAgent(
    "planning_agent",
    description="An agent for planning tasks, this agent should be the first to engage when given a new task.",
    model_client=aoai_client,
    system_message=planner_instruction,
)

In [28]:
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_agentchat.teams import SelectorGroupChat


text_mention_termination = TextMentionTermination("TERMINATE")
max_messages_termination = MaxMessageTermination(max_messages=25)
termination = text_mention_termination | max_messages_termination

team = SelectorGroupChat(
    [planning_agent, time_keeper, cyber_collector, db_reader,
        data_analyzer, security_evaluator, report_writer],
    model_client=aoai_client,
    termination_condition=termination,
)

In [29]:
from autogen_agentchat.ui import Console


# await team.reset()  # Reset the team for the next run.

task = "Generate a cybersecurity report for Contoso."

# Use asyncio.run(...) if you are running this in a script.
await Console(team.run_stream(task=task))

---------- user ----------
Generate a cybersecurity report for Contoso.
---------- planning_agent ----------
1. time_keeper : Get the current time.
2. cyber_collector : Retrieve latest threat information.
3. db_reader : Fetch database data using metric types from cyber_collector.
4. data_analyzer : Analyze the CSV files for forecasting and anomaly detection.
5. security_evaluator : Create radar chart of evaluation metrics.
6. report_writer : Compile all gathered information and generate a PDF report. 

After all tasks are complete, I will summarize the findings. 
---------- time_keeper ----------
[FunctionCall(id='call_b7LW6Sj2IEyj75nfWpKzYy2G', arguments='{}', name='fetch_current_datetime')]
---------- time_keeper ----------
[FunctionExecutionResult(content='{"current_time": "2025-04-19 10:37:16"}', name='fetch_current_datetime', call_id='call_b7LW6Sj2IEyj75nfWpKzYy2G', is_error=False)]
---------- time_keeper ----------
{"current_time": "2025-04-19 10:37:16"}
---------- cyber_collecto

TaskResult(messages=[TextMessage(source='user', models_usage=None, metadata={}, content='Generate a cybersecurity report for Contoso.', type='TextMessage'), TextMessage(source='planning_agent', models_usage=RequestUsage(prompt_tokens=342, completion_tokens=96), metadata={}, content='1. time_keeper : Get the current time.\n2. cyber_collector : Retrieve latest threat information.\n3. db_reader : Fetch database data using metric types from cyber_collector.\n4. data_analyzer : Analyze the CSV files for forecasting and anomaly detection.\n5. security_evaluator : Create radar chart of evaluation metrics.\n6. report_writer : Compile all gathered information and generate a PDF report. \n\nAfter all tasks are complete, I will summarize the findings. ', type='TextMessage'), ToolCallRequestEvent(source='time_keeper', models_usage=RequestUsage(prompt_tokens=597, completion_tokens=12), metadata={}, content=[FunctionCall(id='call_b7LW6Sj2IEyj75nfWpKzYy2G', arguments='{}', name='fetch_current_datetim

## Custom Selector Function
Often times we want better control over the selection process. To this end, we can set the `selector_func` argument with a custom selector function to override the default model-based selection. For instance, we want the Planning Agent to speak immediately after any specialized agent to check the progress.

In [30]:
from typing import Sequence
from autogen_agentchat.messages import ChatMessage


def selector_func(messages: Sequence[ChatMessage]) -> str | None:
    if messages[-1].source != planning_agent.name:
        return planning_agent.name
    return None

team2 = SelectorGroupChat(
    [planning_agent, time_keeper, cyber_collector, db_reader,
        data_analyzer, security_evaluator, report_writer],
    model_client=aoai_client,
    termination_condition=termination,
    selector_func=selector_func,
)

await Console(team2.run_stream(task=task))

---------- user ----------
Generate a cybersecurity report for Contoso.
---------- planning_agent ----------
1. time_keeper : Get the current time.
2. cyber_collector : Retrieve latest threat information.
3. db_reader : Fetch database data using metric types from cyber_collector.
4. data_analyzer : Analyze the CSV files for forecasting and anomaly detection.
5. security_evaluator : Create radar chart of evaluation metrics.
6. report_writer : Compile all gathered information and generate a PDF report. 

After all tasks are complete, I will summarize the findings.
---------- time_keeper ----------
[FunctionCall(id='call_kC4Ks9oVT6piKEGe5zcNOWeS', arguments='{}', name='fetch_current_datetime'), FunctionCall(id='call_GM7ROGWtZKEctx5KKSGabunx', arguments='{}', name='cyber_collector'), FunctionCall(id='call_myJboF7STOGFTlpuBsVl566w', arguments='{}', name='db_reader'), FunctionCall(id='call_gxoWCdu5BQzVZ7YORyXebH9x', arguments='{}', name='data_analyzer'), FunctionCall(id='call_R904l1BSkhdeqKq

TaskResult(messages=[TextMessage(source='user', models_usage=None, metadata={}, content='Generate a cybersecurity report for Contoso.', type='TextMessage'), TextMessage(source='planning_agent', models_usage=RequestUsage(prompt_tokens=1262, completion_tokens=95), metadata={}, content='1. time_keeper : Get the current time.\n2. cyber_collector : Retrieve latest threat information.\n3. db_reader : Fetch database data using metric types from cyber_collector.\n4. data_analyzer : Analyze the CSV files for forecasting and anomaly detection.\n5. security_evaluator : Create radar chart of evaluation metrics.\n6. report_writer : Compile all gathered information and generate a PDF report. \n\nAfter all tasks are complete, I will summarize the findings.', type='TextMessage'), ToolCallRequestEvent(source='time_keeper', models_usage=RequestUsage(prompt_tokens=752, completion_tokens=84), metadata={}, content=[FunctionCall(id='call_kC4Ks9oVT6piKEGe5zcNOWeS', arguments='{}', name='fetch_current_datetim

In [31]:
await team2.reset()  # Reset the team for the next run.