In [1]:
import random  
import json  
from typing import Dict, List, Any, Annotated
from autogen import ConversableAgent, GroupChat, GroupChatManager, LLMConfig  
  
# LLM配置
# llm_config = LLMConfig(api_type="ollama", model="llama3.1")
llm_config = LLMConfig(
    api_type="deepseek",
    model="deepseek-chat",
    api_key="sk-511f3246640241bd850af659c73283bd",
    base_url="https://api.deepseek.com/v1"
)

In [2]:
# 模拟的故障数据，故障根因是数据库连接池耗尽导致的
SIMULATED_FAULT_DATA = {  
    "logs": """  
2024-06-03 10:15:23 ERROR [UserService] Database connection timeout after 30s  
2024-06-03 10:15:24 WARN [OrderService] Retry attempt 3/3 failed for order #12345  
2024-06-03 10:15:25 ERROR [PaymentService] Payment gateway returned 500 Internal Server Error  
2024-06-03 10:15:26 ERROR [UserService] Connection pool exhausted, max connections: 100  
2024-06-03 10:15:27 FATAL [OrderService] Unable to process order queue, backing off for 60s  
2024-06-03 10:15:28 ERROR [NotificationService] Failed to send email notification: SMTP timeout  
2024-06-03 10:15:30 ERROR [UserService] Database connection timeout after 30s  
2024-06-03 10:15:32 ERROR [PaymentService] Payment processing failed for transaction #67890  
""",  
      
    "traces": {  
        "trace_id": "abc123-def456-ghi789",  
        "spans": [  
            {  
                "service": "api-gateway",  
                "operation": "POST /api/orders",  
                "duration_ms": 5000,  
                "status": "ERROR",  
                "error": "Downstream service timeout"  
            },  
            {  
                "service": "order-service",   
                "operation": "create_order",  
                "duration_ms": 4800,  
                "status": "ERROR",  
                "error": "Database connection failed"  
            },  
            {  
                "service": "user-service",  
                "operation": "validate_user",   
                "duration_ms": 30000,  
                "status": "TIMEOUT",  
                "error": "Connection timeout"  
            },  
            {  
                "service": "payment-service",  
                "operation": "process_payment",  
                "duration_ms": 2000,  
                "status": "ERROR",   
                "error": "Gateway unavailable"  
            }  
        ]  
    },  
      
    "metrics": {  
        "cpu_usage": [85, 92, 88, 95, 89, 91, 87],  # 过去7分钟的CPU使用率  
        "memory_usage": [78, 82, 85, 88, 90, 92, 89],  # 内存使用率  
        "response_time_ms": [1200, 1800, 2500, 3200, 4100, 5000, 4800],  # 响应时间  
        "error_rate": [0.02, 0.05, 0.08, 0.12, 0.15, 0.18, 0.16],  # 错误率  
        "database_connections": [95, 98, 100, 100, 100, 100, 98],  # 数据库连接数  
        "queue_depth": [50, 120, 200, 350, 500, 480, 420]  # 消息队列深度  
    }  
}  

# 全局上下文用于维护分析过程  
analysis_context = {  
    "log_analysis": None,  
    "trace_analysis": None,   
    "metrics_analysis": None,  
    "review_results": {},  
    "final_report": None  
}  
  
  
# 工具函数实现  

# 日志数据处理函数
def process_log_data(
    log_data: Annotated[str, "系统日志数据，包含时间戳、服务名称和错误信息"]
) -> Dict[str, Any]:
    """
    提取日志数据中的统计信息。

    功能：
    - 统计不同类型的日志数量（ERROR、FATAL、WARN）。
    - 提取所有包含关键字的日志条目。

    参数：
    - log_data: 包含系统日志的字符串。

    返回：
    - 一个字典，包含日志统计信息和关键日志条目。
    """
    error_count = log_data.count("ERROR")
    fatal_count = log_data.count("FATAL")
    warn_count = log_data.count("WARN")
    timeout_logs = [line for line in log_data.splitlines() if "timeout" in line.lower()]

    return {
        "error_count": error_count,
        "fatal_count": fatal_count,
        "warn_count": warn_count,
        "timeout_logs": timeout_logs
    }

# 调用链数据处理函数
def process_trace_data(
    trace_data: Annotated[Dict[str, Any], "分布式追踪数据，包含服务调用链和性能信息"]
) -> Dict[str, Any]:
    """
    提取调用链数据中的统计信息。

    功能：
    - 计算调用链的总耗时。
    - 提取所有失败的服务及其错误信息。

    参数：
    - trace_data: 包含调用链信息的字典。

    返回：
    - 一个字典，包含调用链统计信息和失败服务详情。
    """
    total_duration = sum(span["duration_ms"] for span in trace_data["spans"])
    failed_services = [
        {"service": span["service"], "error": span["error"]}
        for span in trace_data["spans"] if span["status"] == "ERROR"
    ]

    return {
        "total_duration_ms": total_duration,
        "failed_services": failed_services
    }

# 指标数据处理函数
def process_metrics_data(
    metrics_data: Annotated[Dict[str, Any], "系统监控指标数据，包括CPU使用率、响应时间等"]
) -> Dict[str, Any]:
    """
    提取系统监控指标中的统计信息。

    功能：
    - 计算平均CPU使用率和内存使用率。
    - 提取响应时间的峰值和当前错误率。

    参数：
    - metrics_data: 包含系统监控指标的字典。

    返回：
    - 一个字典，包含指标统计信息。
    """
    avg_cpu = sum(metrics_data["cpu_usage"]) / len(metrics_data["cpu_usage"])
    avg_memory = sum(metrics_data["memory_usage"]) / len(metrics_data["memory_usage"])
    max_response_time = max(metrics_data["response_time_ms"])
    current_error_rate = metrics_data["error_rate"][-1]

    return {
        "avg_cpu_usage": avg_cpu,
        "avg_memory_usage": avg_memory,
        "peak_response_time_ms": max_response_time,
        "current_error_rate": current_error_rate
    }

In [3]:
# 测试日志数据处理函数
def test_process_log_data():
    log_data = SIMULATED_FAULT_DATA["logs"]
    result = process_log_data(log_data)
    print("Log Data Analysis Result:")
    print(json.dumps(result, indent=2, ensure_ascii=False))

test_process_log_data()

Log Data Analysis Result:
{
  "error_count": 6,
  "fatal_count": 1,
  "warn_count": 1,
  "timeout_logs": [
    "2024-06-03 10:15:23 ERROR [UserService] Database connection timeout after 30s  ",
    "2024-06-03 10:15:28 ERROR [NotificationService] Failed to send email notification: SMTP timeout  ",
    "2024-06-03 10:15:30 ERROR [UserService] Database connection timeout after 30s  "
  ]
}


In [4]:
# 测试调用链数据处理函数
def test_process_trace_data():
    trace_data = SIMULATED_FAULT_DATA["traces"]
    result = process_trace_data(trace_data)
    print("Trace Data Analysis Result:")
    print(json.dumps(result, indent=2, ensure_ascii=False))

test_process_trace_data()

Trace Data Analysis Result:
{
  "total_duration_ms": 41800,
  "failed_services": [
    {
      "service": "api-gateway",
      "error": "Downstream service timeout"
    },
    {
      "service": "order-service",
      "error": "Database connection failed"
    },
    {
      "service": "payment-service",
      "error": "Gateway unavailable"
    }
  ]
}


In [5]:
# 测试指标数据处理函数
def test_process_metrics_data():
    metrics_data = SIMULATED_FAULT_DATA["metrics"]
    result = process_metrics_data(metrics_data)
    print("Metrics Data Analysis Result:")
    print(json.dumps(result, indent=2, ensure_ascii=False))

test_process_metrics_data()

Metrics Data Analysis Result:
{
  "avg_cpu_usage": 89.57142857142857,
  "avg_memory_usage": 86.28571428571429,
  "peak_response_time_ms": 5000,
  "current_error_rate": 0.16
}


In [6]:
# 创建智能体，将前面的工具函数集成到智能体中 
def create_agents():  
    """创建所有智能体"""  
    with llm_config:  
        # 日志分析智能体  
        log_agent = ConversableAgent(  
            name="log_analyzer",  
            system_message="""你是日志分析专家，负责分析系统日志数据，识别异常模式和错误信息。  
            请分析提供的日志数据，识别关键错误模式，评估严重程度，并提供分析结论。  
            完成分析后输出'LOG_ANALYSIS_COMPLETE'。""",  
            description="专门分析系统日志数据的专家",
            functions=[process_log_data] 
        )  
          
        # 调用链分析智能体  
        trace_agent = ConversableAgent(  
            name="trace_analyzer",   
            system_message="""你是调用链分析专家，负责分析调用链数据，识别性能瓶颈和异常调用。  
            请分析提供的调用链数据，识别性能瓶颈，分析服务间依赖关系，并提供优化建议。  
            完成分析后输出'TRACE_ANALYSIS_COMPLETE'。""",  
            description="专门分析调用链数据的专家",
            functions=[process_trace_data]
        )  
          
        # 指标分析智能体  
        metrics_agent = ConversableAgent(  
            name="metrics_analyzer",  
            system_message="""你是系统指标专家，负责分析监控指标，识别异常趋势和阈值违规。  
            请分析提供的系统指标数据，识别异常趋势，评估系统健康状况，并提供告警建议。  
            完成分析后输出'METRICS_ANALYSIS_COMPLETE'。""",  
            description="专门分析系统指标的专家",
            functions=[process_metrics_data]
        )  
          
        # 进度管理智能体  
        progress_agent = ConversableAgent(  
            name="progress_manager",  
            system_message="""你负责协调整个故障检测分析流程，确保各个阶段按序进行，并组织评审投票。  
            你需要：1) 启动各个分析智能体的工作 2) 协调评审流程 3) 确保流程顺利推进""",  
            description="协调整个故障检测流程的管理者"  
        )  
          
        # 报告生成智能体
        report_agent = ConversableAgent(  
            name="report_generator",  
            system_message="""你是报告生成专家，负责根据各个分析智能体的结果生成最终分析报告。
            请整合以下内容：1) 用户问题摘要 2) 分析步骤总览（日志、调用链、指标） 3) 各智能体的分析过程摘要 4) 最终共识根因与解释。""",  
            description="生成最终分析报告的专家"  
        )  
      
    return log_agent, trace_agent, metrics_agent, progress_agent, report_agent  
  
def create_review_groupchat(analyzer_agent: ConversableAgent, all_agents: List[ConversableAgent]):  
    """从所有智能体中随机选择3个组成评审团"""  
      
    # 排除当前分析的智能体，从剩余智能体中随机选择3个  
    available_agents = [agent for agent in all_agents if agent != analyzer_agent]  
    selected_reviewers = random.sample(available_agents, min(3, len(available_agents)))  
      
    # 为评审创建临时智能体副本  
    reviewers = []  
    for agent in selected_reviewers:  
        reviewer = ConversableAgent(  
            name=f"{agent.name}_reviewer",  
            system_message=f"""你现在作为评审专家，需要审查分析结果。
            请仔细评估分析过程和结论的正确性、完整性和合理性。  
              
            评审标准：  
            1) 分析逻辑是否清晰合理  
            2) 结论是否有充分依据  
            3) 是否遗漏重要信息  
            4) 建议是否可行  
              
            如果认为分析正确且完整，回复'APPROVE'；  
            如果认为需要重新分析或补充，回复'REJECT'并详细说明原因。  
            最后必须明确表态：APPROVE 或 REJECT""",  
            llm_config=llm_config  
        )  
        reviewers.append(reviewer)  
      
    return reviewers  
  
def conduct_review_voting(analyzer_agent: ConversableAgent, all_agents: List[ConversableAgent], analysis_result: Dict[str, Any]) -> bool:  
    """执行评审投票流程"""  
    print(f"\n=== 开始评审 {analyzer_agent.name} 的分析结果 ===")  
      
    # 创建评审团  
    reviewers = create_review_groupchat(analyzer_agent, all_agents, analysis_result)  
      
    # 创建评审群聊  
    review_groupchat = GroupChat(  
        agents=reviewers,  
        messages=[],  
        max_round=len(reviewers) + 2,  
        speaker_selection_method="round_robin"  
    )  
      
    review_manager = GroupChatManager(  
        name="review_manager",  
        groupchat=review_groupchat,   
        llm_config=llm_config  
    )  
      
    # 启动评审投票  
    review_prompt = f"""  
    请评审{analyzer_agent.name}的分析结果：  
      
    分析结果：  
    {json.dumps(analysis_result, indent=2, ensure_ascii=False)}  
      
    每位评审者请基于评审标准进行评估，并明确投票：APPROVE 或 REJECT  
    如果REJECT，请详细说明需要改进的地方。  
    """  
      
    # 开始评审群聊  
    if reviewers:  
        reviewers[0].initiate_chat(  
            recipient=review_manager,  
            message=review_prompt  
        )  
          
        # 统计投票结果  
        approve_count = 0  
        reject_count = 0  
          
        for message in review_groupchat.messages:  
            content = message.get("content", "").upper()  
            if "APPROVE" in content:  
                approve_count += 1  
            elif "REJECT" in content:  
                reject_count += 1  
          
        print(f"投票结果: APPROVE={approve_count}, REJECT={reject_count}")  
        return approve_count >= 2  # 2票及以上通过  
      
    return False

In [None]:
def run_fault_detection_system():  
    """运行完整的故障检测系统"""  
    print("=== 启动软件故障检测系统 ===")  
      
    # 创建所有智能体  
    log_agent, trace_agent, metrics_agent, progress_agent, report_agent = create_agents()  
    analysis_agents = [log_agent, trace_agent, metrics_agent]  
    
    print("\n=== 第一阶段：统一分析群聊 ===")  
    # 创建统一分析群聊  
    analysis_chat = GroupChat(  
        agents=analysis_agents,  
        messages=[],  
        max_round=10,  
        speaker_selection_method="round_robin"  
    )  
    analysis_manager = GroupChatManager(  
        groupchat=analysis_chat,  
        llm_config=llm_config,  
    )  
    
    # 启动分析群聊  
    analysis_result = log_agent.initiate_chat(  
        recipient=analysis_manager,  
        message="请分析以下数据：",  
        max_turns=10  
    )  
    analysis_context["analysis_result"] = analysis_result.summary  
    
    print("\n=== 第二阶段：嵌套评审群聊 ===")  
    nested_chats = [  
        {  
            "recipient": progress_agent,  
            "message": lambda recipient, messages, sender, config: f"请评审以下分析结果：{messages[-1]['content']}",  
            "max_turns": 2,  
            "summary_method": "last_msg",  
        },  
        {  
            "recipient": report_agent,  
            "message": "请根据评审通过的分析结果生成最终故障报告。",  
            "max_turns": 1,  
            "summary_method": "last_msg",  
        }  
    ]  
    progress_agent.register_nested_chats(  
        chat_queue=nested_chats,  
        trigger=lambda sender: sender in [log_agent, trace_agent, metrics_agent],  
    )  
    
    # 启动嵌套评审群聊  
    final_result = progress_agent.initiate_chat(  
        recipient=report_agent,  
        message="请生成最终故障报告。",  
        max_turns=5  
    )  
    
    print("最终故障报告：", final_result.summary)

In [8]:
# 主程序入口  
if __name__ == "__main__":  
    try:  
        run_fault_detection_system()  
    except Exception as e:  
        print(f"系统运行出错: {e}")  
        import traceback  
        traceback.print_exc()

=== 启动软件故障检测系统 ===

=== 第一阶段：统一分析群聊 ===

=== 第一阶段：统一分析群聊 ===
[33mlog_analyzer[0m (to chat_manager):

请分析以下数据：

--------------------------------------------------------------------------------
[33mlog_analyzer[0m (to chat_manager):

请分析以下数据：

--------------------------------------------------------------------------------


KeyboardInterrupt: 