# 【ERNIE】【MCP】【A2A】基于MCP、A2A协议与多Agent协同的合同风险分析助手
**（多格式文档解析、多Agent智能审查、风险分析协同）**

## 项目背景
在合同审查过程中，传统人工审查面临效率低、专业性要求高、风险点容易遗漏等挑战。本项目基于MCP（Message Control Protocol）和A2A（Agent-to-Agent）协议，构建了一个支持多格式文档解析、多Agent协同审查的智能合同风险分析系统，通过法律、商业、格式等多维度分析，为用户提供全面的合同风险评估服务。

## 项目立意
**多格式文档处理**：整合PDF、Word、TXT等多种格式的文档解析能力，实现合同文本的智能提取  
**多Agent智能协同**：构建法律、商业、格式等专业Agent的协同机制，实现多维度风险分析  
**智能风险评估**：通过大模型驱动的风险评分系统，提供量化的风险评估结果  

## 系统架构
```
+----------------------------------------------------------------------------------------+
|                                     用户层                                               |
|  +--------------------------------------------------------------------------------+   |
|  |                              合同审查请求                                        |   |
+--+--------------------------------------------------------------------------------+---+
                                       ↓
+----------------------------------------------------------------------------------------+
|                                   协调器层                                              |
|  +--------------------------------------------------------------------------------+   |
|  |                         ContractCoordinator (端口7000)                          |   |
|  |                                                                                |   |
|  |    - 接收用户请求                                                              |   |
|  |    - 任务分发调度                                                              |   |
|  |    - 结果整合输出                                                              |   |
+--+--------------------------------------------------------------------------------+---+
                    ↓                          ↓                         ↓
+----------------------------------------------------------------------------------------+
|                                    MCP服务层                                            |
|  +--------------------------------------------------------------------------------+   |
|  |                           MCPService (端口7001)                                 |   |
|  |                                                                                |   |
|  |    工具集：                                                                    |   |
|  |    ┌─────────────────┐              ┌────────────────┐                        |   |
|  |    │  parse_contract │              │highlight_contract│                        |   |
|  |    └─────────────────┘              └────────────────┘                        |   |
|  |                                                                                |   |
|  |    文档支持：                       高亮支持：                                  |   |
|  |    - PDF文件                        - PDF高亮                                  |   |
|  |    - Word文档                       - Word高亮                                 |   |
|  |    - TXT文本                        - 风险等级颜色                             |   |
+--+--------------------------------------------------------------------------------+---+
                    ↓                          ↓                         ↓
+----------------------------------------------------------------------------------------+
|                                  A2A协议层                                              |
|  +--------------------------------------------------------------------------------+   |
|  |                              BaseAgent                                          |   |
|  |                                                                                |   |
|  |    - A2A通信协议实现                                                           |   |
|  |    - 统一任务处理接口                                                          |   |
|  |    - 大模型调用封装                                                            |   |
|  |    - 错误处理机制                                                              |   |
+--+--------------------------------------------------------------------------------+---+
                    ↓                          ↓                         ↓
+----------------------------------------------------------------------------------------+
|                                  专业Agent层                                            |
|  +----------------+        +----------------+        +----------------+                  |
|  | 文档处理Agent  |        |   法律Agent    |        |  商业Agent     |                  |
|  |   (端口7005)   |        |   (端口7002)   |        |   (端口7003)   |                  |
|  +----------------+        +----------------+        +----------------+                  |
|                                                                                        |
|  +----------------+        +----------------+        +----------------+                  |
|  |  格式Agent     |        |   高亮Agent    |        |  整合Agent     |                  |
|  |   (端口7004)   |        |   (端口7006)   |        |   (端口7007)   |                  |
|  +----------------+        +----------------+        +----------------+                  |
|                                                                                        |
|    功能特点：                                                                          |
|    - 专业分工                                                                          |
|    - 并行处理                                                                          |
|    - 结果互补                                                                          |
+----------------------------------------------------------------------------------------+
                    ↓                          ↓                         ↓
+----------------------------------------------------------------------------------------+
|                                  输出层                                                 |
|  +--------------------------------------------------------------------------------+   |
|  |     风险评分     |      分析报告      |     高亮文档     |      修改建议         |   |
+--+--------------------------------------------------------------------------------+---+
```
### 数据流动：

1. **文档处理流**：
   用户请求 → 协调器 → MCP文档解析 → 文档处理Agent → 合同文本

2. **分析处理流**：
   合同文本 → 专业Agent并行分析 → 整合Agent → 风险评分和报告

3. **高亮处理流**：
   分析结果 → MCP高亮工具 → 高亮Agent → 高亮文档

4. **结果整合流**：
   所有分析 → 整合Agent → 最终报告 → 用户展示


## 核心功能模块

### 1. MCP服务层（MCPService）
```python
class FastMCP:
    """MCP服务实现，提供合同处理工具集"""
    def __init__(self, name, description, version):
        self.tools = {}  # 工具注册表
        
    @tool(name="parse_contract")
    def parse_contract(self, file_path: str):
        """解析合同文件获取文本内容"""
        file_type = self._detect_file_type(file_path)
        processors = {
            'pdf': self._parse_pdf,
            'docx': self._parse_docx,
            'txt': self._parse_txt
        }
        return processors[file_type]()
```

**文档处理能力**：
- PDF解析：使用`pdfminer`和`fitz`实现文本提取
- Word处理：支持新旧版本Word文档（docx/doc）
- 文本预处理：实现多级文本清洗和格式化
```python
def _preprocess_text(text: str) -> str:
    # 文本预处理流程
    text = re.sub(r'[\x00-\x09\x0b-\x1f\x7f-\x9f]', '', text)  # 清理特殊字符
    text = re.sub(r'[ \t]+', ' ', text)  # 规范化空白字符
    text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)  # 处理多余空行
```

### 2. A2A协议实现（BaseAgent）
```python
class BaseAgent(A2AServer):
    """基础Agent类，实现A2A协议"""
    def __init__(self, agent_card: AgentCard, mcp_url: str):
        super().__init__(agent_card=agent_card)
        self.mcp_url = mcp_url
        self.llm_client = OpenAI(...)  # 初始化大模型客户端
        
    def _call_llm(self, system_prompt: str, user_content: str) -> Dict:
        """调用大模型进行分析"""
        chat_completion = self.llm_client.chat.completions.create(
            messages=[
                {'role': 'system', 'content': system_prompt},
                {'role': 'user', 'content': user_content}
            ],
            model="ernie-4.0-8k",
            response_format={"type": "json_object"}
        )
```

**核心功能**：
- A2A通信：实现Agent间的标准化通信协议
- 任务处理：统一的任务处理接口和错误处理
- 大模型调用：封装大模型API调用和结果处理

### 3. 专业Agent实现

#### 3.1 协调器Agent（ContractCoordinator）
```python
class ContractCoordinator(BaseAgent):
    """合同审查协调器"""
    def _process_content(self, content):
        # 1. 调用文档处理Agent
        processor_response = self._call_agent("processor", content)
        
        # 2. 并行调用专业Agent
        results = await self._distribute_tasks(
            contract_text=processor_response["content"],
            party_type=content.get("party_type", "甲方")
        )
        
        # 3. 生成分析报告
        final_report = self._generate_report(results)
```

#### 3.2 法律审查Agent（LegalAgent）
```python
class LegalAgent(BaseAgent):
    """法律风险审查专家"""
    def _get_system_prompt(self, party_type):
        return f"""
        你是一位专业的法律顾问，从{party_type}的视角审查合同中的法律风险：
        1. 识别潜在的法律风险点
        2. 评估风险等级（高/中/低）
        3. 提供法律依据
        4. 给出修改建议
        """
```

#### 3.3 商业审查Agent（BusinessAgent）
```python
class BusinessAgent(BaseAgent):
    """商业风险审查专家"""
    def _process_content(self, content):
        # 分析商业条款
        result = self._call_llm(
            self._get_system_prompt(content["party_type"]),
            content["text"]
        )
```

#### 3.4 格式审查Agent（FormatAgent）
```python
class FormatAgent(BaseAgent):
    """格式规范审查专家"""
    system_prompt = """
    检查合同格式规范性：
    1. 检查结构完整性
    2. 检查标题规范性
    3. 检查编号连续性
    4. 检查格式一致性
    """
```

## 技术创新

### 1. 智能JSON解析
- 多层解析策略：实现多级容错的JSON解析机制
```python
def extract_json(text):
    # 清理和预处理
    text = re.sub(r'//.*?\n|/\*.*?\*/', '', text, flags=re.DOTALL)
    text = re.sub(r'(\w+)(?=:)', r'"\1"', text)
    
    # 多模式匹配
    patterns = [
        r'\{[\s\S]*?"[\w\u4e00-\u9fa5]+"\s*:\s*\[([\s\S]*?)\]\s*\}',
        r'\[\s*\{[\s\S]*?\}\s*\]',
        r'\{(?:[^{}]|(?R))*\}'
    ]
```

### 2. 风险评分系统
- 多维度评分：考虑风险等级、类型权重和对己方影响
```python
def _calculate_risk_score(self, issues: list, party_type: str) -> float:
    weights = {
        "高": 1.0,
        "中": 0.6,
        "低": 0.3
    }
    type_weights = {
        "法律风险": 1.0,
        "商业风险": 0.8,
        "格式问题": 0.5
    }
```

## **部署架构**
- 协调器：端口7000
- MCP服务：端口7001
- 专业Agent：端口7002-7007
- 基于FastAPI的HTTP服务

## **应用场景**
1. 合同起草阶段的风险预警
2. 合同谈判阶段的条款分析
3. 合同签订前的最终审查
4. 合同库的批量风险评估

## **项目特色**
1. 基于MCP的统一工具调用机制
2. 基于A2A的标准化Agent通信
3. 多Agent协同的专业分工机制
4. 智能的风险评分和建议系统
5. 支持甲乙双方视角的分析

## **项目运行**
### **安装依赖**

In [1]:
!pip install "python-a2a[all]" python-docx pdfminer.six paddleocr python-dotenv

[0mLooking in indexes: http://mirrors.baidubce.com/pypi/simple/, https://pypi.tuna.tsinghua.edu.cn/simple/, https://pypi.mirrors.ustc.edu.cn/simple/, https://mirrors.aliyun.com/pypi/simple/, https://pypi.org/simple/
[0m

### **获取key**
![](https://ai-studio-static-online.cdn.bcebos.com/86114670b12b4c2b9d687e31ee45efbaf025e8252af342b4bc3204628be0b909)
![](https://ai-studio-static-online.cdn.bcebos.com/6b166f6425b14513b9b31dca4c8cc89c039de89dcc9a45be8ff9be7c353c35b9)

### **配置key**
![](https://ai-studio-static-online.cdn.bcebos.com/0950562fcaaf4df8a485cacfad3ceb76f2789aeaa47c4d7b89fe05db10a54bec)

### **启动MCP服务器**

In [2]:
# mcp_service.py
from python_a2a.mcp import FastMCP, text_response, create_fastapi_app
import uvicorn
from docx import Document
from pdfminer.high_level import extract_text
import fitz
import os
import re
from tempfile import NamedTemporaryFile
from paddleocr import PaddleOCR
import logging
from typing import List, Dict
import platform
import base64

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger("DocProcessor")

# 初始化MCP服务
contract_mcp = FastMCP(
    name="Contract Processing Tools",
    description="合同审查专用工具集",
    version="1.0.0"
)

# 初始化OCR
try:
    ocr = PaddleOCR(
        use_angle_cls=True,  # 使用方向分类器
        lang="ch",           # 中文模型
        enable_mkldnn=True   # 启用MKL-DNN加速
    )
    logger.info("PaddleOCR初始化成功")
except Exception as e:
    logger.error(f"PaddleOCR初始化失败: {e}")
    ocr = None

def _detect_file_type(file_path: str) -> str:
    """检测文件类型"""
    return os.path.splitext(file_path)[1].lower()[1:]

def _preprocess_text(text: str) -> str:
    """文本预处理"""
    # 1. 基础清洗：移除各种特殊字符和控制字符，但保留换行符
    text = re.sub(r'[\x00-\x09\x0b-\x1f\x7f-\x9f]', '', text)
    
    # 2. 空白字符处理：将连续的空格替换为单个空格，但保留换行符
    text = re.sub(r'[ \t]+', ' ', text)
    text = text.replace('　', ' ')  # 全角空格替换
    
    # 3. 处理多余空行，但保留段落结构
    text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)  # 将多个空行减少为两个空行
    
    return text.strip()

@contract_mcp.tool(
    name="parse_contract",
    description="解析合同文件获取文本内容"
)
def parse_contract(file_path: str):
    """解析合同文件（支持PDF/DOCX/TXT/DOC）"""
    file_type = _detect_file_type(file_path)
    logger.info(f"开始处理文件: {file_path}，类型: {file_type}")
    
    try:
        if file_type == "pdf":
            text = _parse_pdf(file_path)
        elif file_type == "docx":
            text = _parse_docx(file_path)
        elif file_type == "doc":
            text = _parse_doc(file_path)
        elif file_type == "txt":
            text = _parse_txt(file_path)
        else:
            return {"error": f"不支持的文件格式: {file_type}"}
            
        if isinstance(text, dict) and "error" in text:
            return text
            
        return {"content": text}
    except Exception as e:
        logger.error(f"解析失败：{str(e)}")
        return {"error": f"解析失败：{str(e)}"}

def _parse_pdf(file_path: str):
    """处理PDF文件，支持OCR"""
    try:
        # 首先尝试直接提取文本
        logger.info("尝试直接提取PDF文本...")
        text = extract_text(file_path)
        
        # 如果文本为空，使用OCR处理
        if not text or text.isspace():
            logger.info("直接提取文本失败，尝试使用OCR处理...")
            
            # 检查OCR是否可用
            if ocr is None:
                logger.error("OCR服务不可用")
                return text_response("OCR服务不可用，无法处理扫描版PDF文件")
            
            # 使用PaddleOCR处理每一页
            doc = fitz.open(file_path)
            text_parts = []
            
            for page_num in range(doc.page_count):
                page = doc[page_num]
                # 将PDF页面转换为图片
                pix = page.get_pixmap()
                img_path = f"temp_page_{page_num}.png"
                pix.save(img_path)
                
                try:
                    # 使用OCR识别文本
                    result = ocr.ocr(img_path, cls=True)
                    if result and result[0]:
                        page_text = '\n'.join([line[1][0] for line in result[0]])
                        text_parts.append(page_text)
                except Exception as e:
                    logger.error(f"OCR处理第{page_num+1}页时失败: {e}")
                finally:
                    # 确保临时图片被删除
                    if os.path.exists(img_path):
                        os.remove(img_path)
            
            doc.close()
            text = '\n\n'.join(text_parts)
            
            if not text or text.isspace():
                logger.warning(f"OCR处理后文本仍为空: {file_path}")
                return text_response("OCR处理后文本仍为空，请检查PDF文件是否包含可识别的文本")
                
            logger.info(f"OCR处理成功，提取文本长度: {len(text)} 字符")
        else:
            logger.info(f"成功直接提取PDF文本，长度: {len(text)} 字符")
        
        # 文本预处理
        processed_text = _preprocess_text(text)
        logger.info(f"文本预处理后长度: {len(processed_text)} 字符")
        
        return text_response(processed_text)
        
    except Exception as e:
        logger.error(f"处理PDF文件时发生错误: {file_path}")
        logger.error(f"错误信息: {str(e)}")
        return text_response(f"处理PDF失败：{str(e)}")

def _parse_docx(file_path: str):
    """处理Word文档"""
    try:
        logger.info(f"处理Word文档: {file_path}")
        doc = Document(file_path)
        text = '\n'.join([para.text for para in doc.paragraphs])
        
        # 文本预处理
        processed_text = _preprocess_text(text)
        logger.info(f"Word文档处理完成，文本长度: {len(processed_text)} 字符")
        
        return text_response(processed_text)
    except Exception as e:
        logger.error(f"处理Word文档失败: {file_path}")
        logger.error(f"错误信息: {str(e)}")
        return text_response(f"处理Word文档失败：{str(e)}")

def _parse_doc(file_path: str):
    """处理旧版Word文档（.doc）"""
    try:
        logger.info(f"处理旧版Word文档: {file_path}")
        
        # 检查操作系统
        if platform.system() != "Windows":
            return text_response("处理.doc文件需要Windows系统环境")
            
        # 使用pywin32调用Word应用程序
        import win32com.client
        word = win32com.client.Dispatch('Word.Application')
        word.Visible = False  # 不显示Word界面
        doc = word.Documents.Open(file_path)
        text = doc.Content.Text
        doc.Close()
        word.Quit()
        
        # 文本预处理
        processed_text = _preprocess_text(text)
        logger.info(f"旧版Word文档处理完成，文本长度: {len(processed_text)} 字符")
        
        return text_response(processed_text)
    except Exception as e:
        logger.error(f"处理旧版Word文档失败: {file_path}")
        logger.error(f"错误信息: {str(e)}")
        return text_response(f"处理旧版Word文档失败：{str(e)}")

def _parse_txt(file_path: str):
    """处理文本文件"""
    try:
        logger.info(f"处理文本文件: {file_path}")
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()
        except UnicodeDecodeError:
            # 尝试其他编码
            encodings = ['gbk', 'gb2312', 'latin-1']
            for encoding in encodings:
                try:
                    with open(file_path, 'r', encoding=encoding) as f:
                        text = f.read()
                    logger.info(f"使用 {encoding} 编码成功处理文件")
                    break
                except UnicodeDecodeError:
                    continue
            else:
                logger.error(f"无法解码文件 {file_path}")
                return text_response(f"无法解码文件: {file_path}")
        
        # 文本预处理
        processed_text = _preprocess_text(text)
        logger.info(f"文本文件处理完成，文本长度: {len(processed_text)} 字符")
        
        return text_response(processed_text)
    except Exception as e:
        logger.error(f"处理文本文件失败: {file_path}")
        logger.error(f"错误信息: {str(e)}")
        return text_response(f"处理文本文件失败：{str(e)}")

@contract_mcp.tool(
    name="highlight_contract",
    description="生成带高亮的合同文件"
)
def highlight_contract(original_path: str, issues: List[Dict]):
    """生成高亮PDF/Word文件"""
    file_type = _detect_file_type(original_path)
    logger.info(f"开始生成高亮文件: {original_path}，类型: {file_type}")
    
    try:
        if file_type == "pdf":
            result = _highlight_pdf(original_path, issues)
        elif file_type == "docx":
            result = _highlight_docx(original_path, issues)
        else:
            return {"error": f"不支持的文件格式: {file_type}，仅支持PDF和DOCX"}
            
        if isinstance(result, dict) and "error" in result:
            return result
            
        return result
    except Exception as e:
        logger.error(f"高亮生成失败：{str(e)}")
        return {"error": f"高亮生成失败：{str(e)}"}

def _highlight_pdf(original_path: str, issues: List[Dict]):
    """生成高亮PDF文件"""
    try:
        doc = fitz.open(original_path)
        output_path = NamedTemporaryFile(delete=False, suffix=".pdf").name
        
        # 遍历每个问题，在PDF中高亮显示
        for issue in issues:
            if not issue or not isinstance(issue, dict):
                continue
                
            text_to_highlight = issue.get("条款", "")
            if not text_to_highlight:
                continue
                
            for page_num in range(len(doc)):
                page = doc[page_num]
                text_instances = page.search_for(text_to_highlight)
                
                # 为每个找到的文本实例添加高亮
                for inst in text_instances:
                    highlight = page.add_highlight_annot(inst)
                    # 根据风险等级设置不同颜色
                    risk_level = issue.get("风险等级", "低")
                    if risk_level == "高":
                        highlight.set_colors(stroke=[1, 0, 0])  # 红色
                    elif risk_level == "中":
                        highlight.set_colors(stroke=[1, 0.5, 0])  # 橙色
                    else:
                        highlight.set_colors(stroke=[1, 1, 0])  # 黄色
                    highlight.update()
        
        # 保存修改后的PDF
        doc.save(output_path)
        doc.close()
        
        logger.info(f"PDF高亮完成，生成文件: {output_path}")
        
        # 读取文件并转换为base64
        with open(output_path, 'rb') as f:
            file_content = f.read()
        file_base64 = base64.b64encode(file_content).decode('utf-8')
        
        return {
            "status": "success",
            "file_type": "pdf",
            "file_name": "highlighted_contract.pdf",
            "content": file_base64
        }
    except Exception as e:
        logger.error(f"PDF高亮失败：{str(e)}")
        return text_response(f"PDF高亮失败：{str(e)}")

def _highlight_docx(original_path: str, issues: List[Dict]):
    """生成高亮Word文件"""
    try:
        doc = Document(original_path)
        output_path = NamedTemporaryFile(delete=False, suffix=".docx").name
        
        # 遍历每个问题，在Word中高亮显示
        for issue in issues:
            if not issue or not isinstance(issue, dict):
                continue
                
            text_to_highlight = issue.get("条款", "")
            if not text_to_highlight:
                continue
                
            # 遍历所有段落
            for para in doc.paragraphs:
                # 如果段落包含问题文本
                if text_to_highlight in para.text:
                    # 处理每个运行（run）
                    for run in para.runs:
                        if text_to_highlight in run.text:
                            # 根据风险等级设置不同颜色
                            risk_level = issue.get("风险等级", "低")
                            if risk_level == "高":
                                run.font.highlight_color = 6  # 红色
                            elif risk_level == "中":
                                run.font.highlight_color = 4  # 橙色
                            else:
                                run.font.highlight_color = 7  # 黄色
        
        # 保存修改后的Word文档
        doc.save(output_path)
        
        logger.info(f"Word高亮完成，生成文件: {output_path}")
        
        # 读取文件并转换为base64
        with open(output_path, 'rb') as f:
            file_content = f.read()
        file_base64 = base64.b64encode(file_content).decode('utf-8')
        
        return {
            "status": "success",
            "file_type": "docx",
            "file_name": "highlighted_contract.docx",
            "content": file_base64
        }
    except Exception as e:
        logger.error(f"Word高亮失败：{str(e)}")
        return text_response(f"Word高亮失败：{str(e)}")

if __name__ == "__main__":
    # 创建FastAPI应用
    app = create_fastapi_app(contract_mcp)
    
    # 运行服务
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=7001,
        log_level="info",
    )

  from .autonotebook import tqdm as notebook_tqdm
INFO:matplotlib.font_manager:Failed to extract font properties from /usr/share/fonts/truetype/unifont/unifont_sample.ttf: In FT2Font: Could not set the fontsize (error code 0x17)
INFO:matplotlib.font_manager:Failed to extract font properties from /usr/share/fonts/truetype/noto/NotoColorEmoji.ttf: In FT2Font: Can not load face.  Unknown file format.
INFO:matplotlib.font_manager:generated new fontManager
INFO:faiss.loader:Loading faiss with AVX512 support.
INFO:faiss.loader:Successfully loaded faiss with AVX512 support.
INFO:python_a2a.mcp.fastmcp:Initialized FastMCP server: Contract Processing Tools v1.0.0
  ocr = PaddleOCR(
[32mCreating model: ('PP-LCNet_x1_0_doc_ori', None)[0m
[32mUsing official model (PP-LCNet_x1_0_doc_ori), the model files will be automatically downloaded and saved in `/home/aistudio/.paddlex/official_models/PP-LCNet_x1_0_doc_ori`.[0m
Processing 5 items:   0%|          | 0.00/5.00 [00:00<?, ?it/s]
Down

RuntimeError: asyncio.run() cannot be called from a running event loop

# **`mcp_service.py`**

### MCP服务初始化

- **服务实例**：使用`FastMCP`创建合同处理工具服务，运行在`端口7001`
- **服务元数据**：
  - `name`：`"Contract Processing Tools"`
  - `description`：`"合同审查专用工具集"`
  - `version`：`"1.0.0"`

### 核心工具定义

#### 1. 合同解析工具（`parse_contract`）
- **工具名称**：`"parse_contract"`
- **功能描述**：解析多种格式的合同文档
- **支持格式**：
  - `PDF`：使用`pdfminer`和`fitz`库
  - `DOCX`：使用`python-docx`库
  - `DOC`：使用`pywin32`库
  - `TXT`：支持多编码（`utf-8`、`gbk`、`gb2312`）

#### 2. 合同高亮工具（`highlight_contract`）
- **工具名称**：`"highlight_contract"`
- **功能描述**：生成带有风险标注的合同文档
- **高亮特性**：
  - `风险等级`：高风险（红色）、中风险（橙色）、低风险（黄色）
  - `文件格式`：支持`PDF`和`DOCX`格式
  - `返回格式`：`base64`编码的文件内容

### 辅助功能实现

#### 文件类型检测（`_detect_file_type`）
- **输入**：`file_path`（文件路径）
- **输出**：文件扩展名（小写）
- **用途**：自动识别文件类型并选择对应处理器

#### 文本预处理（`_preprocess_text`）
- **处理步骤**：
  - 清理特殊字符：使用`re.sub(r'[\x00-\x09\x0b-\x1f\x7f-\x9f]', '')`
  - 规范化空白：使用`re.sub(r'[ \t]+', ' ')`
  - 处理空行：使用`re.sub(r'\n\s*\n\s*\n+', '\n\n')`

### 文档处理实现

#### PDF处理（`_parse_pdf`）
- **主要功能**：
  - 文本提取：使用`extract_text`
  - OCR支持：使用`PaddleOCR`
  - 页面处理：使用`fitz.open`

#### Word处理（`_parse_docx`和`_parse_doc`）
- **DOCX处理**：
  - 使用`Document(file_path)`
  - 提取段落：`[para.text for para in doc.paragraphs]`
- **DOC处理**：
  - Windows环境：使用`win32com.client`
  - 跨平台支持：返回适当的错误信息

### 系统配置

#### 日志配置
- **级别**：`logging.INFO`
- **格式**：`%(asctime)s - %(name)s - %(levelname)s - %(message)s`
- **记录器**：`logger = logging.getLogger("DocProcessor")`

#### OCR配置
- **引擎**：`PaddleOCR`
- **参数**：
  - `use_angle_cls=True`：启用方向分类
  - `lang="ch"`：中文支持
  - `enable_mkldnn=True`：性能优化

### 服务运行

- **FastAPI应用**：使用`create_fastapi_app(contract_mcp)`创建
- **服务配置**：
  - `host`：`"0.0.0.0"`
  - `port`：`7001`
  - `log_level`：`"info"`

### 错误处理
- **异常捕获**：使用`try-except`包装所有关键操作
- **错误返回**：统一使用`{"error": error_message}`格式
- **日志记录**：使用`logger.error`记录详细错误信息


In [3]:
!python mcp_service.py

INFO:faiss.loader:Loading faiss with AVX512 support.
INFO:faiss.loader:Successfully loaded faiss with AVX512 support.
INFO:python_a2a.mcp.fastmcp:Initialized FastMCP server: Contract Processing Tools v1.0.0
  ocr = PaddleOCR(
[32mCreating model: ('PP-LCNet_x1_0_doc_ori', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/aistudio/.paddlex/official_models/PP-LCNet_x1_0_doc_ori`.[0m
[32mCreating model: ('UVDoc', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/aistudio/.paddlex/official_models/UVDoc`.[0m
[32mCreating model: ('PP-LCNet_x1_0_textline_ori', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/aistudio/.paddlex/official_models/PP-LCNet_x1_0_textline_ori`.[0m
[32mCreating model: ('PP-OCRv5_server_det', None)[0m
[32mModel files already exist. Using

### **启动A2A（多Agent）**

In [None]:
from python_a2a import A2AServer, run_server, TaskStatus, TaskState, AgentCard, AgentSkill
import requests
import json
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
from typing import List, Dict, Any
import os
from openai import OpenAI
import aiohttp
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 服务配置
MCP_SERVER_URL = "http://localhost:7001"
COORDINATOR_PORT = 7000
LEGAL_PORT = 7002
BUSINESS_PORT = 7003
FORMAT_PORT = 7004
PROCESSOR_PORT = 7005
HIGHLIGHTER_PORT = 7006
INTEGRATION_PORT = 7007

# 初始化AI Studio客户端
llm_client = OpenAI(
    api_key=os.environ.get("AI_STUDIO_API_KEY"),
    base_url="https://aistudio.baidu.com/llm/lmapi/v3",
)

class BaseAgent(A2AServer):
    """基础Agent类，提供通用功能"""
    def __init__(self, agent_card: AgentCard, mcp_url: str = MCP_SERVER_URL):
        super().__init__(agent_card=agent_card)
        self.mcp_url = mcp_url
        
        # 初始化AI Studio客户端
        self.llm_client = llm_client
        
        self.app = FastAPI()
        self._setup_routes()
        logger.info(f"{self.__class__.__name__} 初始化完成")

    def _setup_routes(self):
        """设置路由"""
        @self.app.post("/task")
        async def handle_task(request: Request):
            try:
                data = await request.json()
                task_result = self.handle_task(data)
                return JSONResponse(content=task_result)
            except Exception as e:
                logger.error(f"处理请求失败: {str(e)}")
                return JSONResponse(
                    content={"error": str(e)},
                    status_code=500
                )

        @self.app.get("/health")
        async def health_check():
            return {"status": "healthy"}

    def _call_mcp_tool(self, tool_name: str, params: dict) -> dict:
        """调用MCP工具"""
        if not self.mcp_url:
            return {"error": "MCP服务地址未配置"}
        
        tool_endpoint = f"{self.mcp_url}/tools/{tool_name}"
        try:
            logger.info(f"调用MCP工具: {tool_endpoint}, 参数: {params}")
            response = requests.post(tool_endpoint, json=params, timeout=30)
            response.raise_for_status()
            
            return response.json()
        except Exception as e:
            error_msg = f"调用MCP工具失败: {str(e)}"
            logger.error(error_msg)
            return {"error": error_msg}

    def handle_task(self, data: dict) -> dict:
        """处理任务请求"""
        try:
            if "message" not in data:
                return {
                    "error": "无效的请求格式",
                    "artifacts": [{"parts": [{"type": "text", "text": "请求中缺少message字段"}]}],
                    "status": {"state": TaskState.FAILED}
                }

            content = data["message"].get("content", {})
            if not content:
                return {
                    "error": "请求中缺少content字段",
                    "artifacts": [{"parts": [{"type": "text", "text": "请求中缺少content字段"}]}],
                    "status": {"state": TaskState.FAILED}
                }

            result = self._process_content(content)
            return {
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}]}],
                "status": {"state": TaskState.COMPLETED}
            }

        except Exception as e:
            logger.error(f"处理任务失败: {str(e)}")
            return {
                "error": str(e),
                "artifacts": [{"parts": [{"type": "text", "text": f"处理失败: {str(e)}"}]}],
                "status": {"state": TaskState.FAILED}
            }

    def _process_content(self, content: dict) -> Any:
        """子类需要实现此方法来处理具体内容"""
        raise NotImplementedError

    def _call_llm(self, system_prompt: str, user_content: str) -> Dict:
        """调用大模型"""
        try:
            # 使用ernie-4.0-8k模型
            chat_completion = self.llm_client.chat.completions.create(
                messages=[
                    {'role': 'system', 'content': system_prompt},
                    {'role': 'user', 'content': user_content}
                ],
                model="ernie-4.0-8k",
                temperature=0.7,
                response_format={"type": "json_object"}
            )
            
            content = chat_completion.choices[0].message.content.strip()
            
            def extract_json(text):
                """智能JSON提取与解析"""
                import re
                
                # 清理和预处理文本
                text = re.sub(r'//.*?\n|/\*.*?\*/', '', text, flags=re.DOTALL)  # 移除注释
                text = re.sub(r'(\w+)(?=:)', r'"\1"', text)  # 修复键的引号
                
                # 尝试直接解析
                try:
                    result = json.loads(text)
                    # 如果是字典且包含数组值，返回第一个数组
                    if isinstance(result, dict):
                        for v in result.values():
                            if isinstance(v, list):
                                return v
                        return [result]
                    return result if isinstance(result, list) else [result]
                except json.JSONDecodeError:
                    pass
                
                # 使用正则提取JSON结构
                patterns = [
                    r'\{[\s\S]*?"[\w\u4e00-\u9fa5]+"\s*:\s*\[([\s\S]*?)\]\s*\}',  # 带键的数组
                    r'\[\s*\{[\s\S]*?\}\s*\]',  # 标准数组
                    r'\{(?:[^{}]|(?R))*\}'  # 递归对象
                ]
                
                for pattern in patterns:
                    matches = re.finditer(pattern, text)
                    for match in matches:
                        try:
                            result = json.loads(match.group())
                            if isinstance(result, dict):
                                # 提取字典中的数组值
                                for v in result.values():
                                    if isinstance(v, list):
                                        return v
                                return [result]
                            return result if isinstance(result, list) else [result]
                        except json.JSONDecodeError:
                            continue
                
                # 最后的回退方案
                logger.warning(f"JSON解析失败，使用标准格式封装: {text[:100]}...")
                return [{
                    "类型": "解析结果",
                    "内容": text.replace('"', "'")
                }]
            
            # 解析并返回结果
            result = extract_json(content)
            return {'content': json.dumps(result, ensure_ascii=False)}
            
        except Exception as e:
            logger.error(f"调用大模型失败: {str(e)}")
            return {'content': '[]'}

class ContractCoordinator(BaseAgent):
    """合同审查协调器Agent"""
    def __init__(self):
        card = AgentCard(
            name="Contract Coordinator",
            description="合同审查多智能体协调器",
            url=f"http://localhost:{COORDINATOR_PORT}",
            version="1.0.0",
            skills=[
                AgentSkill(
                    name="Task Distribution",
                    description="分发任务给专业Agent"
                ),
                AgentSkill(
                    name="Result Integration",
                    description="整合各Agent的审查结果"
                ),
                AgentSkill(
                    name="Health Check",
                    description="检查各Agent的健康状态"
                )
            ]
        )
        super().__init__(card)
        self.agent_endpoints = {
            "legal": f"http://localhost:{LEGAL_PORT}",
            "business": f"http://localhost:{BUSINESS_PORT}",
            "format": f"http://localhost:{FORMAT_PORT}",
            "processor": f"http://localhost:{PROCESSOR_PORT}",
            "highlighter": f"http://localhost:{HIGHLIGHTER_PORT}"
        }

    async def _call_agent(self, agent_type: str, content: dict) -> dict:
        """异步调用其他Agent"""
        try:
            endpoint = self.agent_endpoints[agent_type]
            async with aiohttp.ClientSession() as session:
                async with session.post(f"{endpoint}/task", json={"message": {"content": content}}) as response:
                    return await response.json()
        except Exception as e:
            logger.error(f"调用{agent_type} Agent失败: {str(e)}")
            return {"error": str(e)}

    async def _distribute_tasks(self, contract_text: str, file_path: str) -> List[Dict]:
        """并行分发任务给各个专业Agent"""
        tasks = []
        content = {"text": contract_text, "type": "text"}
        
        # 调用专业审查Agent
        for agent_type in ["legal", "business", "format"]:
            tasks.append(self._call_agent(agent_type, content))
        
        results = await asyncio.gather(*tasks)
        return results

    def _process_content(self, content):
        file_path = content.get("file_path", "")
        
        if not file_path:
            return {
                "error": "请提供文件路径",
                "artifacts": [{"parts": [{"type": "text", "text": "请提供文件路径"}]}],
                "status": {"state": TaskState.FAILED}
            }

        try:
            # 1. 调用文档处理Agent解析文档
            processor_response = requests.post(
                f"{self.agent_endpoints['processor']}/task",
                json={"message": {"content": {"file_path": file_path}}},
                timeout=30
            ).json()
            
            # 检查处理器响应
            if "artifacts" not in processor_response:
                logger.error(f"文档处理器响应格式错误: {processor_response}")
                return {
                    "error": "文档处理器响应格式错误",
                    "artifacts": [{"parts": [{"type": "text", "text": "文档处理器响应格式错误"}]}],
                    "status": {"state": TaskState.FAILED}
                }
            
            try:
                # 从处理器响应中提取文本
                processor_text = processor_response["artifacts"][0]["parts"][0]["text"]
                processor_result = json.loads(processor_text)
                
                # 检查是否有错误信息
                if "error" in processor_result:
                    logger.error(f"文档处理失败: {processor_result['error']}")
                    return {
                        "error": f"文档处理失败: {processor_result['error']}",
                        "artifacts": [{"parts": [{"type": "text", "text": f"文档处理失败: {processor_result['error']}"}]}],
                        "status": {"state": TaskState.FAILED}
                    }
                
                contract_text = processor_result.get("content", "")
                if not contract_text or contract_text.isspace():
                    logger.error("文档处理器返回的文本为空")
                    return {
                        "error": "文档处理器返回的文本为空",
                        "artifacts": [{"parts": [{"type": "text", "text": "文档处理器返回的文本为空"}]}],
                        "status": {"state": TaskState.FAILED}
                    }
                
                logger.info(f"成功提取文本，长度: {len(contract_text)} 字符")
                
            except json.JSONDecodeError as e:
                logger.error(f"解析处理器响应失败: {e}")
                return {
                    "error": f"解析处理器响应失败: {str(e)}",
                    "artifacts": [{"parts": [{"type": "text", "text": f"解析处理器响应失败: {str(e)}"}]}],
                    "status": {"state": TaskState.FAILED}
                }

            # 2. 分发任务给各个专业Agent
            combined_issues = []
            party_type = content.get("party_type", "甲方")  # 获取审查视角
            
            # 串行调用专业审查Agent
            for agent_type in ["legal", "business", "format"]:
                try:
                    response = requests.post(
                        f"{self.agent_endpoints[agent_type]}/task",
                        json={"message": {"content": {
                            "text": contract_text,
                            "party_type": party_type  # 传递审查视角
                        }}},
                        timeout=30
                    ).json()
                    
                    if "artifacts" in response:
                        for artifact in response["artifacts"]:
                            for part in artifact.get("parts", []):
                                if part.get("type") == "text":
                                    try:
                                        issues = json.loads(part.get("text", "[]"))
                                        if isinstance(issues, list):
                                            combined_issues.extend(issues)
                                        else:
                                            logger.warning(f"{agent_type} Agent返回的不是列表格式")
                                    except json.JSONDecodeError:
                                        logger.error(f"解析{agent_type} Agent响应失败")
                except Exception as e:
                    logger.error(f"调用{agent_type} Agent失败: {str(e)}")

            # 3. 调用整合Agent生成分析报告
            try:
                integrator_response = requests.post(
                    f"{self.agent_endpoints['integrator']}/task",
                    json={
                        "message": {
                            "content": {
                                "text": contract_text,
                                "party_type": party_type,
                                "issues": combined_issues
                            }
                        }
                    },
                    timeout=30
                ).json()
                
                if "error" in integrator_response:
                    logger.error(f"整合分析失败: {integrator_response['error']}")
                    return {
                        "error": f"整合分析失败: {integrator_response['error']}",
                        "artifacts": [{"parts": [{"type": "text", "text": "整合分析失败"}]}],
                        "status": {"state": TaskState.FAILED}
                    }
                
                try:
                    analysis_report = json.loads(integrator_response["artifacts"][0]["parts"][0]["text"])
                except json.JSONDecodeError:
                    logger.error("解析整合分析结果失败")
                    analysis_report = None
                    
            except Exception as e:
                logger.error(f"调用整合Agent失败: {str(e)}")
                analysis_report = None

            # 4. 调用文档高亮Agent生成高亮版本
            try:
                highlighter_response = requests.post(
                    f"{self.agent_endpoints['highlighter']}/task",
                    json={
                        "message": {
                            "content": {
                                "file_path": file_path,
                                "issues": combined_issues
                            }
                        }
                    },
                    timeout=30
                ).json()
            except Exception as e:
                logger.error(f"调用高亮Agent失败: {str(e)}")
                highlighter_response = None

            # 5. 生成最终报告
            final_report = {
                "original_file": file_path,
                "highlighted_file": None,
                "analysis": analysis_report,
                "issues": combined_issues  # 添加原始问题列表
            }

            # 如果高亮成功，添加高亮文件信息
            if highlighter_response and "artifacts" in highlighter_response:
                try:
                    highlighter_result = json.loads(highlighter_response["artifacts"][0]["parts"][0]["text"])
                    if isinstance(highlighter_result, dict) and "content" in highlighter_result:
                        final_report["highlighted_file"] = highlighter_result.get("file_name")
                except:
                    logger.error("解析高亮结果失败")

            return {
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps(final_report, ensure_ascii=False, indent=2)}]}],
                "status": {"state": TaskState.COMPLETED}
            }

        except Exception as e:
            logger.error(f"协调器处理任务失败: {str(e)}")
            return {
                "error": str(e),
                "artifacts": [{"parts": [{"type": "text", "text": f"处理失败: {str(e)}"}]}],
                "status": {"state": TaskState.FAILED}
            }

class LegalAgent(BaseAgent):
    """法律审查Agent"""
    def __init__(self):
        card = AgentCard(
            name="Legal Review Agent",
            description="法律合规审查专家",
            url=f"http://localhost:{LEGAL_PORT}",
            version="1.0.0",
            skills=[
                AgentSkill(
                    name="Legal Risk Analysis",
                    description="识别合同中的法律风险"
                )
            ]
        )
        super().__init__(card)

    def _get_system_prompt(self, party_type):
        """根据甲乙方视角生成系统提示"""
        return f"""
        你是一位专业的法律顾问，专门负责从{party_type}的视角审查合同中的法律风险。你需要：
        1. 仔细阅读合同文本，始终站在{party_type}的立场
        2. 识别潜在的法律风险点，特别关注对{party_type}不利的条款
        3. 评估每个风险的等级（高/中/低）
        4. 提供相关的法律依据
        5. 给出具体的修改建议，确保修改后对{party_type}最有利
        
        输出格式必须是JSON数组，每个风险点包含以下字段：
        {{
            "类型": "法律风险",
            "条款": "具体条款原文",
            "问题描述": "风险描述",
            "风险等级": "高/中/低",
            "法律依据": "相关法律条文",
            "修改建议": "具体修改建议",
            "对己方影响": "该条款对{party_type}的具体影响"
        }}
        
        特别注意：
        1. 重点关注{party_type}的义务、责任和权益
        2. 标识对{party_type}明显不利的条款
        3. 识别任何违法或违规条款
        4. 评估条款对{party_type}的影响程度
        5. 提供对{party_type}最有利的修改方案
        6. 关注对方义务履行的保障措施
        7. 确保{party_type}的权益得到充分保护
        """

    def _process_content(self, content):
        contract_text = content.get("text", "")
        party_type = content.get("party_type", "甲方")  # 默认甲方视角
        
        if not contract_text:
            return {
                "artifacts": [{"parts": [{"type": "text", "text": "[]"}]}],
                "status": {"state": TaskState.COMPLETED}
            }

        try:
            # 调用大模型进行分析
            result = self._call_llm(self._get_system_prompt(party_type), contract_text)
            
            # 解析大模型输出
            try:
                issues = json.loads(result['content'])
            except:
                logger.error("解析大模型输出失败，使用空列表")
                issues = []

            return {
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps(issues, ensure_ascii=False)}]}],
                "status": {"state": TaskState.COMPLETED}
            }
        except Exception as e:
            logger.error(f"法律审查失败: {str(e)}")
            return {
                "error": str(e),
                "artifacts": [{"parts": [{"type": "text", "text": "[]"}]}],
                "status": {"state": TaskState.FAILED}
            }

class BusinessAgent(BaseAgent):
    """商业审查Agent"""
    def __init__(self):
        card = AgentCard(
            name="Business Review Agent",
            description="商业条款审查专家",
            url=f"http://localhost:{BUSINESS_PORT}",
            version="1.0.0",
            skills=[
                AgentSkill(
                    name="Business Risk Analysis",
                    description="识别合同中的商业风险"
                )
            ]
        )
        super().__init__(card)

    def _get_system_prompt(self, party_type):
        """根据甲乙方视角生成系统提示"""
        return f"""
        你是一位资深的商业顾问，专门负责从{party_type}的视角审查合同中的商业风险。你需要：
        1. 仔细阅读合同文本，始终站在{party_type}的立场思考
        2. 识别潜在的商业风险点，特别关注对{party_type}不利的条款
        3. 评估每个风险的等级（高/中/低）
        4. 分析可能的商业影响，重点关注对{party_type}的利益影响
        5. 给出具体的修改建议，确保修改后对{party_type}最有利
        
        输出格式必须是JSON数组，每个风险点包含以下字段：
        {{
            "类型": "商业风险",
            "条款": "具体条款原文",
            "问题描述": "风险描述",
            "风险等级": "高/中/低",
            "影响分析": "对{party_type}的具体商业影响",
            "修改建议": "具体修改建议",
            "商业优化": "如何调整以最大化{party_type}利益"
        }}
        
        特别注意：
        1. 重点关注{party_type}的商业利益和风险
        2. 评估条款对{party_type}的商业影响
        3. 关注付款条件、违约责任等关键商业条款
        4. 评估商业条款的公平性和合理性
        5. 识别潜在的商业机会和优势
        6. 关注成本收益分配的合理性
        7. 确保{party_type}的商业利益最大化
        8. 评估长期商业影响和潜在风险
        """

    def _process_content(self, content):
        contract_text = content.get("text", "")
        party_type = content.get("party_type", "甲方")  # 默认甲方视角
        
        if not contract_text:
            return {
                "artifacts": [{"parts": [{"type": "text", "text": "[]"}]}],
                "status": {"state": TaskState.COMPLETED}
            }

        try:
            # 调用大模型进行分析
            result = self._call_llm(self._get_system_prompt(party_type), contract_text)
            
            # 解析大模型输出
            try:
                issues = json.loads(result['content'])
            except:
                logger.error("解析大模型输出失败，使用空列表")
                issues = []

            return {
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps(issues, ensure_ascii=False)}]}],
                "status": {"state": TaskState.COMPLETED}
            }
        except Exception as e:
            logger.error(f"商业审查失败: {str(e)}")
            return {
                "error": str(e),
                "artifacts": [{"parts": [{"type": "text", "text": "[]"}]}],
                "status": {"state": TaskState.FAILED}
            }

class FormatAgent(BaseAgent):
    """格式审查Agent"""
    def __init__(self):
        card = AgentCard(
            name="Format Review Agent",
            description="格式与条款完整性审查专家",
            url=f"http://localhost:{FORMAT_PORT}",
            version="1.0.0",
            skills=[
                AgentSkill(
                    name="Format Analysis",
                    description="检查合同格式规范性"
                )
            ]
        )
        super().__init__(card)
        self.system_prompt = """
        你是一位合同格式规范专家，负责审查合同的格式和完整性。你需要：
        1. 检查合同结构的完整性
        2. 检查章节标题的规范性
        3. 检查条款编号的连续性
        4. 检查格式的一致性
        5. 给出具体的修改建议
        
        输出格式必须是JSON数组，每个问题包含以下字段：
        {
            "类型": "格式问题",
            "条款": "具体位置描述",
            "问题描述": "格式问题描述",
            "风险等级": "高/中/低",
            "修改建议": "具体修改建议"
        }
        """

    def _process_content(self, content):
        contract_text = content.get("text", "")
        
        if not contract_text:
            return {
                "artifacts": [{"parts": [{"type": "text", "text": "[]"}]}],
                "status": {"state": TaskState.COMPLETED}
            }

        try:
            # 调用大模型进行分析
            result = self._call_llm(self.system_prompt, contract_text)
            
            # 解析大模型输出
            try:
                issues = json.loads(result['content'])
            except:
                logger.error("解析大模型输出失败，使用空列表")
                issues = []

            return {
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps(issues, ensure_ascii=False)}]}],
                "status": {"state": TaskState.COMPLETED}
            }
        except Exception as e:
            logger.error(f"格式审查失败: {str(e)}")
            return {
                "error": str(e),
                "artifacts": [{"parts": [{"type": "text", "text": "[]"}]}],
                "status": {"state": TaskState.FAILED}
            }

class DocumentProcessorAgent(BaseAgent):
    """文档处理Agent"""
    def __init__(self):
        card = AgentCard(
            name="Document Processor Agent",
            description="文档处理专家",
            url=f"http://localhost:{PROCESSOR_PORT}",
            version="1.0.0",
            skills=[
                AgentSkill(
                    name="Document Processing",
                    description="处理各种格式的文档并提取文本"
                )
            ]
        )
        super().__init__(card)

    def _process_content(self, content):
        file_path = content.get("file_path", "")
        
        if not file_path:
            return {
                "error": "请提供文件路径",
                "artifacts": [{"parts": [{"type": "text", "text": "请提供文件路径"}]}],
                "status": {"state": TaskState.FAILED}
            }

        # 调用MCP的parse_contract工具
        result = self._call_mcp_tool("parse_contract", {"file_path": file_path})
        
        if "error" in result:
            return {
                "error": result["error"],
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps({"error": result["error"]}, ensure_ascii=False)}]}],
                "status": {"state": TaskState.FAILED}
            }
            
        return {
            "artifacts": [{"parts": [{"type": "text", "text": json.dumps(result, ensure_ascii=False)}]}],
            "status": {"state": TaskState.COMPLETED}
        }

class DocumentHighlighterAgent(BaseAgent):
    """文档高亮Agent"""
    def __init__(self):
        card = AgentCard(
            name="Document Highlighter Agent",
            description="文档高亮处理专家",
            url=f"http://localhost:{HIGHLIGHTER_PORT}",
            version="1.0.0",
            skills=[
                AgentSkill(
                    name="Document Highlighting",
                    description="根据审查结果生成高亮版本的文档"
                )
            ]
        )
        super().__init__(card)

    def _process_content(self, content):
        file_path = content.get("file_path", "")
        issues = content.get("issues", [])
        
        if not file_path or not issues:
            return {
                "error": "请提供文件路径和问题列表",
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps({
                    "error": "请提供文件路径和问题列表",
                    "content": None
                }, ensure_ascii=False)}]}],
                "status": {"state": TaskState.FAILED}
            }

        try:
            # 获取文件扩展名
            file_ext = os.path.splitext(file_path)[1].lower()
            
            # 如果是TXT文件，创建一个新的DOCX文件
            if file_ext == '.txt':
                try:
                    # 读取TXT文件内容
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                    
                    # 创建临时DOCX文件
                    from docx import Document
                    doc = Document()
                    doc.add_paragraph(content)
                    temp_docx = os.path.splitext(file_path)[0] + '_temp.docx'
                    doc.save(temp_docx)
                    
                    # 更新文件路径
                    file_path = temp_docx
                    logger.info(f"已将TXT文件转换为DOCX: {temp_docx}")
                except Exception as e:
                    logger.error(f"转换TXT文件失败: {str(e)}")
                    return {
                        "artifacts": [{"parts": [{"type": "text", "text": json.dumps({
                            "error": f"转换TXT文件失败: {str(e)}",
                            "content": None
                        }, ensure_ascii=False)}]}],
                        "status": {"state": TaskState.FAILED}
                    }

            # 调用MCP的highlight_contract工具
            result = self._call_mcp_tool("highlight_contract", {
                "original_path": file_path,
                "issues": issues
            })
            
            # 如果是临时文件，清理它
            if file_ext == '.txt' and os.path.exists(file_path):
                try:
                    os.remove(file_path)
                    logger.info("已清理临时DOCX文件")
                except:
                    logger.warning("清理临时DOCX文件失败")

            if "error" in result:
                return {
                    "artifacts": [{"parts": [{"type": "text", "text": json.dumps({
                        "error": result["error"],
                        "content": None
                    }, ensure_ascii=False)}]}],
                    "status": {"state": TaskState.FAILED}
                }

            return {
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps({
                    "error": None,
                    "content": result
                }, ensure_ascii=False)}]}],
                "status": {"state": TaskState.COMPLETED}
            }
            
        except Exception as e:
            logger.error(f"高亮处理失败: {str(e)}")
            return {
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps({
                    "error": str(e),
                    "content": None
                }, ensure_ascii=False)}]}],
                "status": {"state": TaskState.FAILED}
            }

class IntegrationAgent(BaseAgent):
    """整合分析Agent"""
    def __init__(self):
        card = AgentCard(
            name="Integration Analysis Agent",
            description="整合分析专家",
            url=f"http://localhost:{INTEGRATION_PORT}",
            version="1.0.0",
            skills=[
                AgentSkill(
                    name="Risk Integration",
                    description="整合各专业Agent的分析结果，生成综合报告"
                )
            ]
        )
        super().__init__(card)

    def _calculate_risk_score(self, issues: list, party_type: str) -> float:
        """计算风险分数（0-100，分数越高风险越大）"""
        if not issues:
            return 0.0
            
        # 风险权重
        weights = {
            "高": 1.0,
            "中": 0.6,
            "低": 0.3
        }
        
        # 类型权重
        type_weights = {
            "法律风险": 1.0,
            "商业风险": 0.8,
            "格式问题": 0.5
        }
        
        # 对己方不利条款的额外权重
        unfavorable_multiplier = 1.5
        
        total_score = 0
        max_possible_score = 0
        
        for issue in issues:
            risk_weight = weights.get(issue.get("风险等级", "低"), 0.3)
            type_weight = type_weights.get(issue.get("类型", "").split()[0], 0.5)
            
            # 检查是否是对己方不利的条款
            is_unfavorable = False
            if party_type == "甲方" and ("甲方义务" in issue.get("条款", "") or "甲方责任" in issue.get("条款", "")):
                is_unfavorable = True
            elif party_type == "乙方" and ("乙方义务" in issue.get("条款", "") or "乙方责任" in issue.get("条款", "")):
                is_unfavorable = True
            
            # 计算该问题的风险分数
            issue_score = risk_weight * type_weight * 100
            
            # 如果是对己方不利的条款，增加权重
            if is_unfavorable:
                issue_score *= unfavorable_multiplier
            
            total_score += issue_score
            max_possible_score += 100  # 最高可能分数
        
        # 归一化到0-100范围
        if max_possible_score > 0:
            final_score = (total_score / max_possible_score) * 100
            return round(final_score, 2)
        return 0.0

    def _generate_risk_analysis(self, issues: list, party_type: str) -> dict:
        """生成风险分析报告"""
        # 按风险等级和类型分类
        categorized_issues = {
            "法律风险": {"高": [], "中": [], "低": []},
            "商业风险": {"高": [], "中": [], "低": []},
            "格式问题": {"高": [], "中": [], "低": []}
        }
        
        # 统计信息
        stats = {
            "total": len(issues),
            "by_level": {"高": 0, "中": 0, "低": 0},
            "by_type": {"法律风险": 0, "商业风险": 0, "格式问题": 0},
            "unfavorable": 0,  # 对己方不利条款数
            "illegal": 0,      # 违法条款数
            "favorable": 0     # 对己方有利条款数
        }
        
        # 分类统计
        for issue in issues:
            risk_type = issue.get("类型", "").split()[0]
            risk_level = issue.get("风险等级", "低")
            
            if risk_type in categorized_issues and risk_level in categorized_issues[risk_type]:
                categorized_issues[risk_type][risk_level].append(issue)
                
            # 更新统计
            stats["by_level"][risk_level] = stats["by_level"].get(risk_level, 0) + 1
            stats["by_type"][risk_type] = stats["by_type"].get(risk_type, 0) + 1
            
            # 检查是否对己方不利
            if (party_type == "甲方" and ("甲方义务" in issue.get("条款", "") or "甲方责任" in issue.get("条款", ""))) or \
               (party_type == "乙方" and ("乙方义务" in issue.get("条款", "") or "乙方责任" in issue.get("条款", ""))):
                stats["unfavorable"] += 1
            
            # 检查是否对己方有利
            elif (party_type == "甲方" and ("乙方义务" in issue.get("条款", "") or "乙方责任" in issue.get("条款", ""))) or \
                 (party_type == "乙方" and ("甲方义务" in issue.get("条款", "") or "甲方责任" in issue.get("条款", ""))):
                stats["favorable"] += 1
                
            # 检查是否违法
            if "违法" in issue.get("类型", "") or "违规" in issue.get("类型", ""):
                stats["illegal"] += 1
        
        # 计算总体风险分数
        risk_score = self._calculate_risk_score(issues, party_type)
        
        # 生成风险等级
        risk_level = "高" if risk_score >= 70 else "中" if risk_score >= 40 else "低"
        
        return {
            "risk_score": risk_score,
            "risk_level": risk_level,
            "statistics": stats,
            "categorized_issues": categorized_issues,
            "key_concerns": self._identify_key_concerns(issues, party_type),
            "favorable_aspects": self._identify_favorable_aspects(issues, party_type)
        }

    def _identify_key_concerns(self, issues: list, party_type: str) -> list:
        """识别关键问题"""
        key_concerns = []
        
        # 识别所有高风险问题
        high_risks = [issue for issue in issues if issue.get("风险等级") == "高"]
        
        # 识别违法/违规条款
        illegal_issues = [issue for issue in issues 
                        if "违法" in issue.get("类型", "") or "违规" in issue.get("类型", "")]
        
        # 识别对己方特别不利的条款
        unfavorable_issues = [
            issue for issue in issues
            if (party_type == "甲方" and ("甲方义务" in issue.get("条款", "") or "甲方责任" in issue.get("条款", ""))) or
               (party_type == "乙方" and ("乙方义务" in issue.get("条款", "") or "乙方责任" in issue.get("条款", "")))
        ]
        
        # 添加关键问题
        if illegal_issues:
            key_concerns.append({
                "type": "违法/违规问题",
                "issues": illegal_issues
            })
            
        if high_risks:
            key_concerns.append({
                "type": "高风险问题",
                "issues": high_risks
            })
            
        if unfavorable_issues:
            key_concerns.append({
                "type": "对己方不利条款",
                "issues": unfavorable_issues
            })
            
        return key_concerns

    def _identify_favorable_aspects(self, issues: list, party_type: str) -> list:
        """识别对己方有利的方面"""
        favorable_aspects = []
        
        # 识别对己方有利的条款
        favorable_clauses = [
            issue for issue in issues
            if (party_type == "甲方" and ("乙方义务" in issue.get("条款", "") or "乙方责任" in issue.get("条款", ""))) or
               (party_type == "乙方" and ("甲方义务" in issue.get("条款", "") or "甲方责任" in issue.get("条款", "")))
        ]
        
        if favorable_clauses:
            favorable_aspects.append({
                "type": "有利条款",
                "description": f"发现{len(favorable_clauses)}个对{party_type}有利的条款",
                "clauses": favorable_clauses
            })
        
        return favorable_aspects

    def _process_content(self, content):
        """处理输入内容"""
        try:
            issues = content.get("issues", [])
            party_type = content.get("party_type", "甲方")
            original_text = content.get("text", "")
            
            if not issues:
                return {
                    "artifacts": [{"parts": [{"type": "text", "text": json.dumps({
                        "error": "未找到需要分析的问题",
                        "risk_score": 0,
                        "risk_level": "低",
                        "analysis": None
                    }, ensure_ascii=False)}]}],
                    "status": {"state": TaskState.COMPLETED}
                }

            # 生成整合分析的系统提示
            system_prompt = f"""
            你是一位专业的合同分析专家，现在需要你从{party_type}的角度对合同进行整体分析。
            
            你需要：
            1. 分析合同中的各类问题和风险
            2. 评估问题对{party_type}的影响程度
            3. 识别对{party_type}有利和不利的条款
            4. 给出具体的优化建议
            5. 提供清晰的签约建议
            
            请特别注意：
            1. 始终站在{party_type}的立场思考
            2. 重点关注对{party_type}的影响
            3. 寻找对{party_type}有利的条款和机会
            4. 评估风险的整体影响
            5. 提供具体可行的优化方案
            
            输出格式必须是JSON对象，包含以下字段：
            {{
                "summary": {{
                    "risk_score": "风险评分(0-100)",
                    "risk_level": "风险等级(高/中/低)",
                    "total_issues": "总问题数",
                    "unfavorable_high": "高风险不利条款数",
                    "unfavorable_medium": "中风险不利条款数",
                    "unfavorable_low": "低风险不利条款数",
                    "favorable_clauses": "有利条款数",
                    "illegal_clauses": "违法条款数"
                }},
                "analysis": {{
                    "key_risks": ["主要风险点列表"],
                    "favorable_points": ["有利条款分析"],
                    "impact_analysis": "对{party_type}的整体影响分析",
                    "optimization_suggestions": ["优化建议列表"]
                }},
                "recommendation": {{
                    "signing_advice": "签约建议",
                    "negotiation_points": ["谈判要点列表"],
                    "risk_mitigation": ["风险缓解措施"]
                }}
            }}
            """
            
            # 准备用户输入内容
            user_content = {
                "合同文本": original_text,
                "问题列表": issues,
                "分析视角": party_type
            }
            
            # 调用大模型进行分析
            result = self._call_llm(system_prompt, json.dumps(user_content, ensure_ascii=False))
            
            try:
                analysis_report = json.loads(result['content'])
                if isinstance(analysis_report, list):
                    analysis_report = analysis_report[0]
            except:
                logger.error("解析整合分析结果失败")
                analysis_report = None

            return {
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps(analysis_report, ensure_ascii=False)}]}],
                "status": {"state": TaskState.COMPLETED}
            }
            
        except Exception as e:
            logger.error(f"整合分析失败: {str(e)}")
            return {
                "error": str(e),
                "artifacts": [{"parts": [{"type": "text", "text": json.dumps({
                    "error": f"分析失败: {str(e)}",
                    "risk_score": 0,
                    "risk_level": "未知",
                    "analysis": None
                }, ensure_ascii=False)}]}],
                "status": {"state": TaskState.FAILED}
            }

    def _generate_recommendation(self, analysis: dict, party_type: str) -> dict:
        """生成建议"""
        risk_score = analysis["risk_score"]
        stats = analysis["statistics"]
        
        recommendation = {
            "signing_advice": "",
            "risk_level": analysis["risk_level"],
            "key_points": [],
            "negotiation_focus": []
        }
        
        # 签约建议
        if stats["illegal"] > 0:
            recommendation["signing_advice"] = "❌ 不建议签约 - 存在违法/违规条款"
        elif risk_score >= 70:
            recommendation["signing_advice"] = "❌ 不建议签约 - 整体风险过高"
        elif risk_score >= 40:
            recommendation["signing_advice"] = "⚠️ 谨慎签约 - 建议协商修改关键条款"
        else:
            recommendation["signing_advice"] = "✅ 可以签约 - 风险在可接受范围内"
        
        # 关键点提示
        if stats["illegal"] > 0:
            recommendation["key_points"].append("存在违法/违规条款，必须修改")
        if stats["unfavorable"] > 0:
            recommendation["key_points"].append(f"存在{stats['unfavorable']}个对{party_type}不利条款")
        if stats["by_level"]["高"] > 0:
            recommendation["key_points"].append(f"存在{stats['by_level']['高']}个高风险问题")
        
        # 谈判重点
        for concern in analysis["key_concerns"]:
            for issue in concern["issues"]:
                recommendation["negotiation_focus"].append({
                    "条款": issue.get("条款", ""),
                    "建议": issue.get("修改建议", "")
                })
        
        return recommendation

def run_all_agents():
    """启动所有Agent"""
    import multiprocessing
    import sys
    import time
    
    # 定义所有Agent及其端口
    agents = {
        "coordinator": (ContractCoordinator, COORDINATOR_PORT),
        "legal": (LegalAgent, LEGAL_PORT),
        "business": (BusinessAgent, BUSINESS_PORT),
        "format": (FormatAgent, FORMAT_PORT),
        "processor": (DocumentProcessorAgent, PROCESSOR_PORT),
        "highlighter": (DocumentHighlighterAgent, HIGHLIGHTER_PORT),
        "integrator": (IntegrationAgent, INTEGRATION_PORT)
    }
    
    processes = []
    try:
        # 启动每个Agent
        for agent_name, (agent_class, port) in agents.items():
            process = multiprocessing.Process(
                target=run_agent,
                args=(agent_class, port),
                name=agent_name
            )
            process.start()
            processes.append(process)
            logger.info(f"启动 {agent_name} Agent，端口: {port}")
            time.sleep(2)  # 给每个Agent一些启动时间
        
        # 等待所有进程
        while True:
            for process in processes:
                if not process.is_alive():
                    logger.error(f"{process.name} Agent已停止运行")
                    # 重启进程
                    agent_name = process.name
                    agent_class, port = agents[agent_name]
                    new_process = multiprocessing.Process(
                        target=run_agent,
                        args=(agent_class, port),
                        name=agent_name
                    )
                    new_process.start()
                    processes.remove(process)
                    processes.append(new_process)
                    logger.info(f"重启 {agent_name} Agent")
            time.sleep(5)
            
    except KeyboardInterrupt:
        logger.info("\n正在关闭所有Agent...")
        for process in processes:
            process.terminate()
            process.join(timeout=5)
            if process.is_alive():
                process.kill()
        logger.info("所有Agent已关闭")

def run_agent(agent_class, port):
    """运行单个Agent"""
    import uvicorn
    try:
        agent = agent_class()
        uvicorn.run(
            agent.app,
            host="0.0.0.0",
            port=port,
            log_level="info"
        )
    except Exception as e:
        logger.error(f"Agent启动失败: {str(e)}")
        raise

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="启动合同审查Agent")
    parser.add_argument("agent_type", 
                       choices=["coordinator", "legal", "business", "format", 
                               "processor", "highlighter", "integrator", "all"],
                       help="要启动的Agent类型")
    args = parser.parse_args()

    if args.agent_type == "all":
        run_all_agents()
    else:
        agent_map = {
            "coordinator": (ContractCoordinator, COORDINATOR_PORT),
            "legal": (LegalAgent, LEGAL_PORT),
            "business": (BusinessAgent, BUSINESS_PORT),
            "format": (FormatAgent, FORMAT_PORT),
            "processor": (DocumentProcessorAgent, PROCESSOR_PORT),
            "highlighter": (DocumentHighlighterAgent, HIGHLIGHTER_PORT),
            "integrator": (IntegrationAgent, INTEGRATION_PORT)
        }

        agent_class, port = agent_map[args.agent_type]
        run_agent(agent_class, port) 

# **`agents.py`**

### 基础配置

#### 服务端口配置
- **协调器**：`COORDINATOR_PORT = 7000`
- **MCP服务**：`MCP_SERVER_URL = "http://localhost:7001"`
- **专业Agent**：
  - 法律：`LEGAL_PORT = 7002`
  - 商业：`BUSINESS_PORT = 7003`
  - 格式：`FORMAT_PORT = 7004`
  - 处理：`PROCESSOR_PORT = 7005`
  - 高亮：`HIGHLIGHTER_PORT = 7006`
  - 整合：`INTEGRATION_PORT = 7007`

#### AI模型配置
- **模型类型**：`ernie-4.0-8k`
- **API配置**：
  - `base_url`：`"https://aistudio.baidu.com/llm/lmapi/v3"`
  - `response_format`：`{"type": "json_object"}`

### 基础Agent实现（BaseAgent）

#### 核心功能
- **初始化**：
  - `agent_card`：Agent身份信息
  - `mcp_url`：MCP服务地址
  - `llm_client`：大模型客户端

#### 关键方法
- **路由设置**（`_setup_routes`）：
  - `/task`：处理任务请求
  - `/health`：健康检查接口

- **MCP工具调用**（`_call_mcp_tool`）：
  - 错误处理
  - 超时设置：`timeout=30`
  - 日志记录

- **大模型调用**（`_call_llm`）：
  - JSON解析策略
  - 错误处理机制
  - 结果格式化

### 专业Agent实现

#### 1. 协调器（ContractCoordinator）
- **功能**：
  - 任务分发与调度
  - 结果整合
  - 错误处理
- **处理流程**：
  ```
  文档处理 → 专业分析 → 结果整合 → 高亮处理
  ```

#### 2. 法律Agent（LegalAgent）
- **分析维度**：
  - 法律风险识别
  - 风险等级评估
  - 法律依据提供
- **输出格式**：
  ```
  {
    "类型": "法律风险",
    "条款": "具体条款",
    "风险等级": "高/中/低",
    "法律依据": "相关法规",
    "修改建议": "建议内容"
  }
  ```

#### 3. 商业Agent（BusinessAgent）
- **分析维度**：
  - 商业条款评估
  - 成本收益分析
  - 商业影响评估
- **输出格式**：
  ```
  {
    "类型": "商业风险",
    "条款": "具体条款",
    "风险等级": "高/中/低",
    "影响分析": "商业影响",
    "修改建议": "优化建议"
  }
  ```

#### 4. 格式Agent（FormatAgent）
- **检查项目**：
  - 结构完整性
  - 标题规范性
  - 编号连续性
  - 格式一致性

#### 5. 文档处理Agent（DocumentProcessorAgent）
- **处理能力**：
  - 多格式支持
  - 文本提取
  - 预处理优化

#### 6. 高亮Agent（DocumentHighlighterAgent）
- **高亮功能**：
  - 风险等级颜色
  - 多格式支持
  - 文件转换

#### 7. 整合Agent（IntegrationAgent）
- **分析功能**：
  - 风险评分计算
  - 关键问题识别
  - 建议生成

### 风险评分系统

#### 权重配置
- **风险等级权重**：
  - 高：`1.0`
  - 中：`0.6`
  - 低：`0.3`

- **问题类型权重**：
  - 法律风险：`1.0`
  - 商业风险：`0.8`
  - 格式问题：`0.5`

#### 评分计算
- **基础分数**：`0-100`分
- **加权因素**：
  - 风险等级
  - 问题类型
  - 对己方影响（`1.5`倍权重）

### 系统运行

#### 启动方式
- **单Agent启动**：
  ```
  python agents.py [agent_type]
  ```
- **全部启动**：
  ```
  python agents.py all
  ```

#### 错误处理
- **重试机制**：最大重试3次
- **日志记录**：详细的错误信息
- **优雅退出**：进程清理和资源释放


In [5]:
!python agents.py all

INFO:__main__:AI Studio客户端初始化成功
ERROR:__main__:无法连接到MCP服务
ERROR:__main__:环境检查失败，程序退出


### **合同审查**

In [None]:
import requests
import json
import logging
import os
from typing import Dict, List, Optional
from datetime import datetime
import docx
import time
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# 配置AI Studio API Key
os.environ["AI_STUDIO_API_KEY"] = "XXX"

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("ContractProcessor")

class ContractProcessor:
    """合同处理器类"""
    def __init__(self):
        # 检查API Key
        self.api_key = os.environ.get("AI_STUDIO_API_KEY")
        if not self.api_key:
            raise ValueError("未设置AI Studio API Key")
        
        # Agent端口配置
        self.processor_port = 7005
        self.legal_port = 7002
        self.business_port = 7003
        self.format_port = 7004
        self.integration_port = 7007
        
        # 创建输出目录
        self.output_dir = "contract_analysis_results"
        os.makedirs(self.output_dir, exist_ok=True)
        
        # 配置请求会话
        self.session = requests.Session()
        retries = Retry(
            total=3,  # 最大重试次数
            backoff_factor=1,  # 重试间隔
            status_forcelist=[500, 502, 503, 504]  # 需要重试的HTTP状态码
        )
        self.session.mount('http://', HTTPAdapter(max_retries=retries))
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

    def _read_docx(self, file_path: str) -> str:
        """读取docx文件内容"""
        try:
            doc = docx.Document(file_path)
            return "\n".join([paragraph.text for paragraph in doc.paragraphs])
        except Exception as e:
            logger.error(f"读取docx文件失败: {str(e)}")
            return ""

    def _read_file_with_multiple_encodings(self, file_path: str) -> str:
        """使用多种编码尝试读取文件"""
        encodings = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'latin1']
        
        # 如果是docx文件，使用专门的处理方法
        if file_path.lower().endswith('.docx'):
            return self._read_docx(file_path)
            
        for encoding in encodings:
            try:
                with open(file_path, 'r', encoding=encoding) as f:
                    content = f.read()
                logger.info(f"成功使用 {encoding} 编码读取文件")
                return content
            except UnicodeDecodeError:
                continue
            except Exception as e:
                logger.error(f"使用 {encoding} 编码读取文件失败: {str(e)}")
                continue
        
        raise Exception(f"无法使用任何已知编码格式读取文件: {file_path}")

    def _extract_text_from_mcp_response(self, response: dict) -> str:
        """从MCP响应中提取文本内容"""
        try:
            logger.info(f"开始解析MCP响应...")
            
            if not response:
                logger.error("MCP响应为空")
                return ""

            def extract_nested_content(obj):
                """递归提取嵌套的内容"""
                if isinstance(obj, str):
                    try:
                        # 尝试解析JSON字符串
                        parsed = json.loads(obj)
                        return extract_nested_content(parsed)
                    except json.JSONDecodeError:
                        return obj
                
                if isinstance(obj, dict):
                    # 检查常见的内容字段
                    if "text" in obj:
                        return obj["text"]
                    if "content" in obj:
                        return extract_nested_content(obj["content"])
                    # 遍历所有值
                    for value in obj.values():
                        result = extract_nested_content(value)
                        if result:
                            return result
                
                if isinstance(obj, list):
                    # 合并所有文本内容
                    texts = []
                    for item in obj:
                        result = extract_nested_content(item)
                        if result:
                            texts.append(result)
                    return "\n".join(texts) if texts else ""
                
                return ""

            # 从响应中提取文本
            text_content = extract_nested_content(response)
            
            if text_content:
                logger.info(f"成功提取文本，长度: {len(text_content)}")
                return text_content
            
            logger.error("无法从响应中提取文本内容")
            return ""
            
        except Exception as e:
            logger.error(f"解析MCP响应失败: {str(e)}")
            return ""

    def _process_agent_response(self, response: dict) -> Optional[dict]:
        """处理Agent响应"""
        try:
            if "error" in response:
                logger.error(f"Agent响应包含错误: {response['error']}")
                return None
            
            if "artifacts" in response and response["artifacts"]:
                text_content = response["artifacts"][0]["parts"][0]["text"]
                try:
                    parsed_content = json.loads(text_content)
                    # 如果解析后的内容还包含artifacts，继续解析
                    if isinstance(parsed_content, dict) and "artifacts" in parsed_content:
                        inner_text = parsed_content["artifacts"][0]["parts"][0]["text"]
                        try:
                            return json.loads(inner_text)
                        except json.JSONDecodeError:
                            return {"content": inner_text}
                    return parsed_content
                except json.JSONDecodeError as e:
                    logger.error(f"解析Agent响应JSON失败: {str(e)}")
                    return {"content": text_content}
            
            logger.error("Agent响应格式无效")
            return None
        except Exception as e:
            logger.error(f"处理Agent响应失败: {str(e)}")
            return None

    def _call_agent_with_retry(self, port: int, data: dict, max_retries: int = 3, timeout: int = 60) -> dict:
        """带重试机制的Agent调用"""
        for attempt in range(max_retries):
            try:
                if "api_key" not in data:
                    data["api_key"] = self.api_key
                
                response = self.session.post(
                    f"http://localhost:{port}/task",
                    json={"message": {"content": data}},
                    timeout=timeout
                )
                response.raise_for_status()
                return response.json()
            except requests.exceptions.Timeout:
                logger.warning(f"调用Agent超时 (端口 {port})，第{attempt + 1}次重试")
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)  # 指数退避
            except Exception as e:
                logger.error(f"调用Agent失败 (端口 {port}): {str(e)}")
                if attempt == max_retries - 1:
                    return {"error": str(e)}
                time.sleep(2 ** attempt)

    def process_document(self, file_path: str, party_type: str = "甲方") -> dict:
        """处理文档的主流程"""
        try:
            # 1. 调用文档处理Agent
            logger.info(f"开始处理文档: {file_path}")
            doc_response = self._call_agent_with_retry(self.processor_port, {
                "file_path": file_path
            })
            
            logger.info("文档处理Agent已返回响应")
            
            doc_result = self._process_agent_response(doc_response)
            if not doc_result:
                raise Exception("文档处理失败")
            
            # 提取文本内容
            contract_text = self._extract_text_from_mcp_response(doc_result)
            if not contract_text:
                # 尝试直接读取文件
                logger.warning("MCP提取失败，尝试直接读取文件")
                contract_text = self._read_file_with_multiple_encodings(file_path)
                if not contract_text:
                    raise Exception("无法提取文档内容")
            
            logger.info(f"文档处理完成，文本长度: {len(contract_text)}")
            logger.debug(f"提取的文本内容前200字符: {contract_text[:200]}...")

            # 2. 并行调用三个专家Agent
            logger.info("开始专家评审...")
            expert_responses = {}
            all_issues = []
            
            # 法律专家
            logger.info("调用法律专家...")
            legal_response = self._call_agent_with_retry(self.legal_port, {
                "text": contract_text,
                "party_type": party_type
            })
            legal_issues = self._process_agent_response(legal_response)
            expert_responses['legal'] = legal_issues
            if legal_issues:
                all_issues.extend(legal_issues)
            
            # 商业专家
            logger.info("调用商业专家...")
            business_response = self._call_agent_with_retry(self.business_port, {
                "text": contract_text,
                "party_type": party_type
            })
            business_issues = self._process_agent_response(business_response)
            expert_responses['business'] = business_issues
            if business_issues:
                all_issues.extend(business_issues)
            
            # 格式专家
            logger.info("调用格式专家...")
            format_response = self._call_agent_with_retry(self.format_port, {
                "text": contract_text
            })
            format_issues = self._process_agent_response(format_response)
            expert_responses['format'] = format_issues
            if format_issues:
                all_issues.extend(format_issues)
            
            logger.info(f"专家评审完成，发现 {len(all_issues)} 个问题")

            # 3. 调用整合Agent
            logger.info("开始整合分析...")
            integration_response = self._call_agent_with_retry(self.integration_port, {
                "text": contract_text,
                "party_type": party_type,
                "issues": all_issues
            }, timeout=90)  # 给整合Agent更多时间
            
            final_report = self._process_agent_response(integration_response)
            if not final_report:
                raise Exception("生成最终报告失败")
            
            # 处理最终报告格式
            if isinstance(final_report, dict) and "artifacts" in final_report:
                try:
                    final_report = json.loads(final_report["artifacts"][0]["parts"][0]["text"])
                except (json.JSONDecodeError, KeyError, IndexError):
                    logger.warning("无法解析最终报告的嵌套结构")
            
            # 4. 保存结果
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            output_file = os.path.join(
                self.output_dir,
                f"contract_analysis_{timestamp}.json"
            )
            
            result = {
                "file_path": file_path,
                "party_type": party_type,
                "expert_responses": expert_responses,
                "issues": all_issues,
                "analysis": final_report
            }
            
            with open(output_file, "w", encoding="utf-8") as f:
                json.dump(result, f, ensure_ascii=False, indent=2)
            
            logger.info(f"分析完成，结果已保存至: {output_file}")
            return result

        except Exception as e:
            logger.error(f"处理失败: {str(e)}")
            return {
                "error": str(e),
                "file_path": file_path,
                "party_type": party_type
            }

def print_issues(issues: List[dict]):
    """打印问题详情"""
    if not issues:
        print("\n未发现问题")
        return
        
    # 确保issues是列表且每个元素都是字典
    if isinstance(issues, str):
        try:
            issues = json.loads(issues)
        except json.JSONDecodeError:
            logger.error("无法解析问题列表")
            print("\n无法解析问题列表")
            return
            
    if not isinstance(issues, list):
        logger.error(f"问题列表格式错误: {type(issues)}")
        print("\n问题列表格式错误")
        return
        
    # 过滤掉非字典元素
    issues = [issue for issue in issues if isinstance(issue, dict)]
    if not issues:
        print("\n没有有效的问题记录")
        return
    
    # 按风险等级分类
    risk_levels = {
        "高": [],
        "中": [],
        "低": []
    }
    
    # 按类型分类
    issue_types = {
        "法律风险": [],
        "商业风险": [],
        "格式问题": []
    }
    
    # 分类统计
    for issue in issues:
        risk_level = issue.get("风险等级", "低")
        issue_type = issue.get("类型", "其他")
        
        if risk_level in risk_levels:
            risk_levels[risk_level].append(issue)
            
        for type_key in issue_types.keys():
            if type_key in issue_type:
                issue_types[type_key].append(issue)
                break
    
    # 打印风险等级统计
    print("\n=== 风险等级分布 ===")
    for level, level_issues in risk_levels.items():
        if level_issues:
            print(f"{level}风险: {len(level_issues)}个")
    
    # 打印问题类型统计
    print("\n=== 问题类型分布 ===")
    for type_name, type_issues in issue_types.items():
        if type_issues:
            print(f"{type_name}: {len(type_issues)}个")
    
    # 打印详细问题列表
    print("\n=== 问题详情 ===")
    for level in ["高", "中", "低"]:
        if risk_levels[level]:
            print(f"\n{level}风险问题:")
            for i, issue in enumerate(risk_levels[level], 1):
                print(f"\n{i}. {issue.get('类型', '未知类型')}")
                print(f"   条款: {issue.get('条款', 'N/A')}")
                print(f"   描述: {issue.get('问题描述', 'N/A')}")
                if "法律依据" in issue:
                    print(f"   法律依据: {issue['法律依据']}")
                if "影响分析" in issue:
                    print(f"   影响分析: {issue['影响分析']}")
                print(f"   修改建议: {issue.get('修改建议', 'N/A')}")

def print_expert_responses(expert_responses: Dict):
    """打印专家回复"""
    if not expert_responses:
        print("\n未获取到专家评审结果")
        return

    expert_names = {
        'legal': '法律专家',
        'business': '商业专家',
        'format': '格式专家'
    }

    print("\n=== 专家评审详情 ===")
    for expert_type, response in expert_responses.items():
        print(f"\n{expert_names.get(expert_type, '未知专家')}评审结果:")
        if not response:
            print("  未返回评审结果")
            continue

        # 处理嵌套的响应格式
        if isinstance(response, dict) and "artifacts" in response:
            try:
                response = json.loads(response["artifacts"][0]["parts"][0]["text"])
            except (json.JSONDecodeError, KeyError, IndexError):
                print("  评审结果格式错误")
                continue

        if isinstance(response, str):
            try:
                response = json.loads(response)
            except json.JSONDecodeError:
                print("  评审结果格式错误")
                continue

        if not isinstance(response, list):
            print("  评审结果格式错误")
            continue

        for i, issue in enumerate(response, 1):
            if not isinstance(issue, dict):
                continue
            print(f"\n  {i}. {issue.get('类型', '未知类型')}")
            print(f"     风险等级: {issue.get('风险等级', 'N/A')}")
            print(f"     条款: {issue.get('条款', 'N/A')}")
            print(f"     问题描述: {issue.get('问题描述', 'N/A')}")
            if "法律依据" in issue:
                print(f"     法律依据: {issue['法律依据']}")
            if "影响分析" in issue:
                print(f"     影响分析: {issue['影响分析']}")
            if "商业优化" in issue:
                print(f"     商业优化: {issue['商业优化']}")
            print(f"     修改建议: {issue.get('修改建议', 'N/A')}")

def main():
    """主函数"""
    import argparse
    
    parser = argparse.ArgumentParser(description="合同文档处理工具")
    parser.add_argument("file_path", help="合同文件路径")
    parser.add_argument("--party-type", default="甲方", choices=["甲方", "乙方"],
                      help="分析视角 (默认: 甲方)")
    parser.add_argument("--output", "-o", help="输出文件路径")
    
    args = parser.parse_args()
    
    # 检查文件是否存在
    if not os.path.exists(args.file_path):
        print(f"错误: 文件不存在: {args.file_path}")
        return
    
    # 处理文档
    processor = ContractProcessor()
    result = processor.process_document(args.file_path, args.party_type)
    
    # 打印结果摘要
    if "error" in result:
        print(f"\n处理失败: {result['error']}")
        return
        
    print("\n=== 分析结果摘要 ===")
    print(f"文件: {os.path.basename(args.file_path)}")
    print(f"分析视角: {args.party_type}")
    
    issues = result.get("issues", [])
    if isinstance(issues, str):
        try:
            issues = json.loads(issues)
        except json.JSONDecodeError:
            issues = []
    
    print(f"发现问题数量: {len(issues) if isinstance(issues, list) else 0}")
    
    # 打印专家评审详情
    print_expert_responses(result.get("expert_responses", {}))
    
    if "analysis" in result:
        analysis = result["analysis"]
        if isinstance(analysis, str):
            try:
                analysis = json.loads(analysis)
            except json.JSONDecodeError:
                analysis = {}
                
        print(f"\n=== 风险评估 ===")
        if isinstance(analysis, dict):
            if "summary" in analysis:
                summary = analysis["summary"]
                print(f"风险评分: {summary.get('risk_score', 'N/A')}/100")
                print(f"风险等级: {summary.get('risk_level', 'N/A')}")
                print(f"总问题数: {summary.get('total_issues', 'N/A')}")
                print(f"高风险不利条款: {summary.get('unfavorable_high', 'N/A')}")
                print(f"中风险不利条款: {summary.get('unfavorable_medium', 'N/A')}")
                print(f"低风险不利条款: {summary.get('unfavorable_low', 'N/A')}")
                print(f"有利条款数: {summary.get('favorable_clauses', 'N/A')}")
                print(f"违法条款数: {summary.get('illegal_clauses', 'N/A')}")
            else:
                print(f"风险评分: {analysis.get('risk_score', 'N/A')}/100")
                print(f"风险等级: {analysis.get('risk_level', 'N/A')}")
            
            if "analysis" in analysis:
                analysis_detail = analysis["analysis"]
                print("\n=== 分析详情 ===")
                if "key_risks" in analysis_detail:
                    print("\n主要风险点:")
                    for risk in analysis_detail["key_risks"]:
                        print(f"- {risk}")
                if "favorable_points" in analysis_detail:
                    print("\n有利条款:")
                    for point in analysis_detail["favorable_points"]:
                        print(f"- {point}")
                if "impact_analysis" in analysis_detail:
                    print("\n影响分析:")
                    print(analysis_detail["impact_analysis"])
                if "optimization_suggestions" in analysis_detail:
                    print("\n优化建议:")
                    for suggestion in analysis_detail["optimization_suggestions"]:
                        print(f"- {suggestion}")
            
            if "recommendation" in analysis:
                recommendation = analysis["recommendation"]
                print(f"\n=== 签约建议 ===")
                if isinstance(recommendation, dict):
                    print(f"\n{recommendation.get('signing_advice', 'N/A')}")
                    
                    if "negotiation_points" in recommendation:
                        print("\n谈判要点:")
                        for point in recommendation["negotiation_points"]:
                            print(f"- {point}")
                            
                    if "risk_mitigation" in recommendation:
                        print("\n风险缓解措施:")
                        for measure in recommendation["risk_mitigation"]:
                            print(f"- {measure}")
    
    # 打印问题详情
    if issues:
        print_issues(issues)
    
    # 如果指定了输出文件，保存完整报告
    if args.output:
        with open(args.output, "w", encoding="utf-8") as f:
            json.dump(result, f, ensure_ascii=False, indent=2)
        print(f"\n完整报告已保存至: {args.output}")
    else:
        print(f"\n完整报告已保存至: {os.path.join(processor.output_dir, 'contract_analysis_' + datetime.now().strftime('%Y%m%d_%H%M%S') + '.json')}")

if __name__ == "__main__":
    main() 

# **`process_contract.py`**

## 基础配置

### 环境配置
- **AI Studio API Key**：用于访问AI模型服务
- **日志配置**：详细的处理过程记录
- **输出目录**：`contract_analysis_results`

### 端口配置
- **处理Agent**：`7005`
- **法律Agent**：`7002`
- **商业Agent**：`7003`
- **格式Agent**：`7004`
- **整合Agent**：`7007`

## ContractProcessor类

### 核心功能
- **文档处理**：支持多种格式（docx、txt等）
- **多Agent协调**：调用各专业Agent进行分析
- **结果整合**：汇总分析结果并生成报告

### 关键方法

#### 文件处理
- `_read_docx`：
  - docx文件读取
  - 段落提取和合并

- `_read_file_with_multiple_encodings`：
  - 多编码支持（UTF-8、GBK、GB2312等）
  - 文件格式自动识别

#### Agent通信
- `_call_agent_with_retry`：
  - 重试机制（最大3次）
  - 指数退避策略
  - 超时控制（默认60秒）

- `_process_agent_response`：
  - JSON解析
  - 嵌套响应处理
  - 错误处理

#### 结果处理
- `_extract_text_from_mcp_response`：
  - 递归解析
  - 多层JSON提取
  - 文本合并

## 处理流程

![](https://ai-studio-static-online.cdn.bcebos.com/c002980caa034721b39fc4f487c622781582f2daae8c48098df7e77e41986c41)

### 1. 文档处理
- 读取文件内容
- 格式转换
- 文本提取

### 2. 专家评审
- **法律分析**：法律风险识别
- **商业分析**：商业条款评估
- **格式检查**：规范性审查

### 3. 结果整合
- 问题汇总
- 风险评估
- 建议生成

## 输出格式

### 分析结果摘要
- 文件信息
- 分析视角
- 问题数量

### 专家评审详情
- 每个专家的具体发现
- 风险等级分布
- 问题类型统计

### 风险评估
- 风险评分
- 风险等级
- 条款统计
  - 高风险不利条款
  - 中风险不利条款
  - 低风险不利条款
  - 有利条款
  - 违法条款

### 分析详情
- 主要风险点
- 有利条款
- 影响分析
- 优化建议

### 签约建议
- 总体建议
- 谈判要点
- 风险缓解措施

## 错误处理

### 重试机制
- **HTTP重试**：
  - 最大重试次数：3次
  - 退避因子：1
  - 重试状态码：500、502、503、504

### 异常处理
- 文件读取错误
- JSON解析错误
- Agent响应超时
- 格式转换错误

## 使用方式

### 命令行参数
- **file_path**：合同文件路径
- **--party-type**：分析视角（甲方/乙方）
- **--output**：输出文件路径

### 输出方式
- 控制台实时输出
- JSON文件保存
- 详细日志记录

In [None]:
!python process_contract.py "文件路径"  --party-type 乙方 #（或甲方）

## **项目运行结果**
### **mcp_service.py**
![](https://ai-studio-static-online.cdn.bcebos.com/dc3ee99453914b44a81d5d87b11c27d123340826906d4e848b0e879f0aa0a621)

### **agents.py**
![](https://ai-studio-static-online.cdn.bcebos.com/84e19f21e1e7407ea3d926ea1a36d60b0bc5cddf63bd4a6fbf6022cc8f956214)

### **process_contract.py**
```bash
aistudio@jupyter-15069160-9188151:~$ python process_contract.py sample_contracts/软件开发服务合同_示例.docx  --party-type 乙方
2025-05-23 01:28:04,313 - ContractProcessor - INFO - 开始处理文档: sample_contracts/软件开发服务合同_示例.docx
2025-05-23 01:28:04,333 - ContractProcessor - INFO - 文档处理Agent已返回响应
2025-05-23 01:28:04,333 - ContractProcessor - INFO - 开始解析MCP响应...
2025-05-23 01:28:04,333 - ContractProcessor - INFO - 成功提取文本，长度: 1013
2025-05-23 01:28:04,333 - ContractProcessor - INFO - 文档处理完成，文本长度: 1013
2025-05-23 01:28:04,333 - ContractProcessor - INFO - 开始专家评审...
2025-05-23 01:28:04,333 - ContractProcessor - INFO - 调用法律专家...
2025-05-23 01:28:35,904 - ContractProcessor - INFO - 调用商业专家...
2025-05-23 01:29:16,005 - ContractProcessor - INFO - 调用格式专家...
2025-05-23 01:29:35,312 - ContractProcessor - INFO - 专家评审完成，发现 8 个问题
2025-05-23 01:29:35,312 - ContractProcessor - INFO - 开始整合分析...
2025-05-23 01:30:20,639 - ContractProcessor - INFO - 分析完成，结果已保存至: contract_analysis_results/contract_analysis_20250523_013020.json

=== 分析结果摘要 ===
文件: 软件开发服务合同_示例.docx
分析视角: 乙方
发现问题数量: 8

=== 专家评审详情 ===

法律专家评审结果:

  1. 法律风险
     风险等级: 中
     条款: 第十一条 违约处理
     问题描述: 违约金比例未明确上限，可能导致违约成本过高。
     法律依据: 《中华人民共和国合同法》第一百一十四条：当事人可以约定一方违约时应当根据违约情况向对方支付一定数额的违约金，也可以约定因违约产生的损失赔偿额的计算方法。约定的违约金低于造成的损失的，当事人可以请求人民法院或者仲裁机构予以增加；约定的违约金过分高于造成的损失的，当事人可以请求人民法院或者仲裁机构予以适当减少。
     修改建议: 在违约处理条款中明确违约金的最高限额，例如：违约金累计不超过合同总金额的10%。

  2. 法律风险
     风险等级: 低
     条款: 第十二条 争议解决方式
     问题描述: 争议解决方式仅规定了诉讼，未考虑其他更灵活的解决方式。
     法律依据: 《中华人民共和国合同法》第一百二十八条：当事人可以通过和解或者调解解决合同争议。当事人不愿和解、调解或者和解、调解不成的，可以根据仲裁协议向仲裁机构申请仲裁。涉外合同的当事人可以根据仲裁协议向中国仲裁机构或者其他仲裁机构申请仲裁。当事人没有订立仲裁协议或者仲裁协议无效的，可以向人民法院起诉。
     修改建议: 在争议解决条款中加入仲裁作为可选方式，例如：协商不成的，任何一方均可向甲方所在地仲裁机构申请仲裁，或者直接向甲方所在地人民法院提起诉讼。

  3. 法律风险
     风险等级: 中
     条款: 第九条 保密义务
     问题描述: 保密义务虽然规定，但缺乏具体保密措施和保密信息范围的定义。
     法律依据: 《中华人民共和国合同法》第四十三条：当事人在订立合同过程中知悉的商业秘密，无论合同是否成立，不得泄露或者不正当地使用。泄露或者不正当地使用该商业秘密给对方造成损失的，应当承担损害赔偿责任。
     修改建议: 在保密条款中明确保密信息的具体范围，例如：包括但不限于技术资料、系统设计、源代码等，并增加具体的保密措施，如加密存储、限制访问等。

商业专家评审结果:

  1. 商业风险
     风险等级: 中
     条款: 第六条 支付方式：1. 合同签订后3日内，甲方支付合同总额的50%作为预付款；
     问题描述: 预付款比例较高，对乙方资金流有一定压力，同时也增加了甲方的违约风险。
     影响分析: 若甲方在支付预付款后出现资金问题或违约行为，乙方可能面临项目中断或资金损失的风险。
     商业优化: 通过调整预付款比例，平衡甲乙双方的资金风险，确保项目的顺利进行。
     修改建议: 将预付款比例调整为30%，以降低乙方的资金压力和甲方的违约风险。

  2. 商业风险
     风险等级: 高
     条款: 第十一条 违约处理：1. 甲方逾期付款，每逾期一日按应付金额的0.05%支付违约金；2. 乙方逾期交付，每逾期一日按合同总额的0.05%支付违约金；
     问题描述: 违约金计算方式可能对乙方不利，逾期交付的违约金基于合同总额计算，而甲方逾期付款的违约金仅基于应付金额计算。
     影响分析: 在乙方逾期交付的情况下，违约金可能迅速累积，导致乙方承担较大的经济压力。
     商业优化: 通过调整违约金计算方式，确保甲乙双方在违约情况下的经济责任更加合理和公平。
     修改建议: 将乙方逾期交付的违约金计算方式改为基于未交付部分的价值计算，以更公平地分配风险。

  3. 商业风险
     风险等级: 中
     条款: 第十一条 违约处理：3. 任何一方违反保密义务，应向对方支付50万元违约金。
     问题描述: 违反保密义务的违约金金额固定，可能不足以弥补因泄密造成的实际损失。
     影响分析: 若因乙方泄密导致甲方重大损失，固定的违约金可能无法覆盖甲方的实际损失。
     商业优化: 通过根据实际损失调整违约金，确保违约方承担的经济责任与其造成的损害相符。
     修改建议: 将违反保密义务的违约金改为根据实际损失进行赔偿，并设定最低和最高赔偿限额。

  4. 商业机会
     风险等级: 低
     条款: 第三条 服务内容：乙方同意为甲方开发智能合同审查系统，具体包括：系统维护和升级。
     问题描述: 系统维护和升级可能带来额外的商业机会和收入。
     影响分析: 通过提供优质的维护和升级服务，乙方可以增强与甲方的长期合作关系，并可能获得额外的项目或合同。
     商业优化: 通过明确维护和升级服务的细节，乙方可以最大化其在这一领域的商业利益。
     修改建议: 在合同中明确维护和升级服务的具体范围、周期和费用，以确保乙方能够充分利用这一商业机会。

格式专家评审结果:

  1. 格式问题
     风险等级: 中
     条款: 整体合同
     问题描述: 合同整体结构完整，但缺少对附件的说明，如果合同有相关的附件或者补充协议，应当进行明确说明。
     修改建议: 在第八章或其他适当位置增加一条关于合同附件或补充协议的说明，例如：'第十五条 合同附件：本合同如有附件或补充协议，经双方签字盖章后，与本合同具有同等法律效力。'

=== 风险评估 ===
风险评分: 60/100
风险等级: 中
总问题数: 8
高风险不利条款: 1
中风险不利条款: 4
低风险不利条款: 0
有利条款数: 1
违法条款数: 0

=== 分析详情 ===

主要风险点:
- 违约金计算方式对乙方可能不利，特别是逾期交付时基于合同总额计算。
- 预付款比例较高，增加了乙方的资金流压力和甲方的违约风险。
- 保密义务条款缺乏具体保密措施和保密信息范围的定义。
- 争议解决方式仅规定了诉讼，未考虑其他更灵活的解决方式。

有利条款:
- 系统维护和升级条款为乙方提供了额外的商业机会和收入可能性。

影响分析:
本合同对乙方而言，存在一定的商业和法律风险。主要风险点集中在违约金计算方式、预付款比例、保密义务和争议解决方式上。这些条款可能对乙方的经济利益和长期合作关系产生不利影响。然而，系统维护和升级条款为乙方提供了潜在的商业机会。乙方应仔细评估这些风险，并在谈判中寻求相应的优化和调整。

优化建议:
- 调整违约金计算方式，确保更公平地分配甲乙双方在违约情况下的经济责任。
- 降低预付款比例，以减轻乙方的资金流压力和降低甲方的违约风险。
- 在保密条款中明确保密信息的具体范围和保密措施，以更全面地保护乙方的技术成果和商业信息。
- 在争议解决条款中加入仲裁作为可选方式，以提供更多争议解决途径。
- 明确维护和升级服务的具体范围、周期和费用，以最大化乙方在这一领域的商业利益。

=== 签约建议 ===

乙方在签约前应认真评估合同中的各项条款，特别是与违约金计算、预付款比例、保密义务和争议解决相关的条款。建议乙方与甲方进行进一步谈判，以争取更有利的条款和条件。同时，乙方应充分利用系统维护和升级条款带来的商业机会。

谈判要点:
- 调整违约金计算方式，确保公平性。
- 降低预付款比例，平衡资金风险。
- 明确保密信息的范围和保密措施。
- 增加仲裁作为争议解决的可选方式。
- 细化维护和升级服务的具体内容和费用。

风险缓解措施:
- 在合同中明确双方的权利和义务，以减少潜在的纠纷。
- 建立有效的沟通机制，确保项目进展顺利并及时解决可能出现的问题。
- 对乙方的技术人员进行保密培训，强化保密意识，防止信息泄露。
- 在项目执行过程中保持与甲方的良好合作关系，为可能的商业机会奠定基础。

=== 风险等级分布 ===
高风险: 1个
中风险: 5个
低风险: 2个

=== 问题类型分布 ===
法律风险: 3个
商业风险: 3个
格式问题: 1个

=== 问题详情 ===

高风险问题:

1. 商业风险
   条款: 第十一条 违约处理：1. 甲方逾期付款，每逾期一日按应付金额的0.05%支付违约金；2. 乙方逾期交付，每逾期一日按合同总额的0.05%支付违约金；
   描述: 违约金计算方式可能对乙方不利，逾期交付的违约金基于合同总额计算，而甲方逾期付款的违约金仅基于应付金额计算。
   影响分析: 在乙方逾期交付的情况下，违约金可能迅速累积，导致乙方承担较大的经济压力。
   修改建议: 将乙方逾期交付的违约金计算方式改为基于未交付部分的价值计算，以更公平地分配风险。

中风险问题:

1. 法律风险
   条款: 第十一条 违约处理
   描述: 违约金比例未明确上限，可能导致违约成本过高。
   法律依据: 《中华人民共和国合同法》第一百一十四条：当事人可以约定一方违约时应当根据违约情况向对方支付一定数额的违约金，也可以约定因违约产生的损失赔偿额的计算方法。约定的违约金低于造成的损失的，当事人可以请求人民法院或者仲裁机构予以增加；约定的违约金过分高于造成的损失的，当事人可以请求人民法院或者仲裁机构予以适当减少。
   修改建议: 在违约处理条款中明确违约金的最高限额，例如：违约金累计不超过合同总金额的10%。

2. 法律风险
   条款: 第九条 保密义务
   描述: 保密义务虽然规定，但缺乏具体保密措施和保密信息范围的定义。
   法律依据: 《中华人民共和国合同法》第四十三条：当事人在订立合同过程中知悉的商业秘密，无论合同是否成立，不得泄露或者不正当地使用。泄露或者不正当地使用该商业秘密给对方造成损失的，应当承担损害赔偿责任。
   修改建议: 在保密条款中明确保密信息的具体范围，例如：包括但不限于技术资料、系统设计、源代码等，并增加具体的保密措施，如加密存储、限制访问等。

3. 商业风险
   条款: 第六条 支付方式：1. 合同签订后3日内，甲方支付合同总额的50%作为预付款；
   描述: 预付款比例较高，对乙方资金流有一定压力，同时也增加了甲方的违约风险。
   影响分析: 若甲方在支付预付款后出现资金问题或违约行为，乙方可能面临项目中断或资金损失的风险。
   修改建议: 将预付款比例调整为30%，以降低乙方的资金压力和甲方的违约风险。

4. 商业风险
   条款: 第十一条 违约处理：3. 任何一方违反保密义务，应向对方支付50万元违约金。
   描述: 违反保密义务的违约金金额固定，可能不足以弥补因泄密造成的实际损失。
   影响分析: 若因乙方泄密导致甲方重大损失，固定的违约金可能无法覆盖甲方的实际损失。
   修改建议: 将违反保密义务的违约金改为根据实际损失进行赔偿，并设定最低和最高赔偿限额。

5. 格式问题
   条款: 整体合同
   描述: 合同整体结构完整，但缺少对附件的说明，如果合同有相关的附件或者补充协议，应当进行明确说明。
   修改建议: 在第八章或其他适当位置增加一条关于合同附件或补充协议的说明，例如：'第十五条 合同附件：本合同如有附件或补充协议，经双方签字盖章后，与本合同具有同等法律效力。'

低风险问题:

1. 法律风险
   条款: 第十二条 争议解决方式
   描述: 争议解决方式仅规定了诉讼，未考虑其他更灵活的解决方式。
   法律依据: 《中华人民共和国合同法》第一百二十八条：当事人可以通过和解或者调解解决合同争议。当事人不愿和解、调解或者和解、调解不成的，可以根据仲裁协议向仲裁机构申请仲裁。涉外合同的当事人可以根据仲裁协议向中国仲裁机构或者其他仲裁机构申请仲裁。当事人没有订立仲裁协议或者仲裁协议无效的，可以向人民法院起诉。
   修改建议: 在争议解决条款中加入仲裁作为可选方式，例如：协商不成的，任何一方均可向甲方所在地仲裁机构申请仲裁，或者直接向甲方所在地人民法院提起诉讼。

2. 商业机会
   条款: 第三条 服务内容：乙方同意为甲方开发智能合同审查系统，具体包括：系统维护和升级。
   描述: 系统维护和升级可能带来额外的商业机会和收入。
   影响分析: 通过提供优质的维护和升级服务，乙方可以增强与甲方的长期合作关系，并可能获得额外的项目或合同。
   修改建议: 在合同中明确维护和升级服务的具体范围、周期和费用，以确保乙方能够充分利用这一商业机会。

完整报告已保存至: contract_analysis_results/contract_analysis_20250523_013020.json
```
### 生成的json文件
![](https://ai-studio-static-online.cdn.bcebos.com/d5826e944ba741c29851f5a2017fc3d564f82c1816b648fc9876ad7160f40da1)
![](https://ai-studio-static-online.cdn.bcebos.com/7740800ae9924bdbbe6506f79b8bd12ec7be169f26d74cda8ee114189f64aa05)
![](https://ai-studio-static-online.cdn.bcebos.com/5dab9e54439541edb0a8876099c33a255b88467c782d473e894d5b29c6f865b1)

## **应用场景**

1. **合同起草阶段**
   - 自动识别常见法律风险
   - 提供标准条款建议
   - 确保格式规范性

2. **合同谈判阶段**
   - 快速分析对方条款
   - 评估条款影响程度
   - 生成谈判要点建议

3. **合同签订前审查**
   - 全面风险评估
   - 生成风险量化报告
   - 提供最终签约建议

4. **合同库管理**
   - 批量合同风险扫描
   - 建立风险预警机制
   - 合同条款标准化

## **未来拓展**

1. **模型升级**
   - 接入更多大语言模型
   - 支持私有化模型部署
   - 优化风险识别准确率

2. **功能增强**
   - 添加合同比对功能
   - 支持更多文档格式
   - 增加合同模板推荐

3. **Agent扩展**
   - 增加行业专家Agent
   - 添加合规审查Agent
   - 支持自定义Agent接入

4. **系统优化**
   - 引入知识库支持
   - 增加API调用接口
   - 提供可视化界面

## 联系方式
### 问题反馈/与我联系：Wechat：X_ruilian