这个文件是对md文件进行抽取的文件

In [17]:
import ollama
import pymysql
from py2neo import Graph, Node, Relationship
import json
import re
from typing import Dict, List, Any
import logging
import time

class NSTIKnowledgeExtractor:
    def __init__(self, mysql_config: Dict, neo4j_config: Dict, ollama_host: str = "http://localhost:11434", ollama_model: str = "deepseek-r1:1.5b"):
        self.mysql_config = mysql_config
        self.neo4j_config = neo4j_config
        self.ollama_host = ollama_host
        self.ollama_model = ollama_model
        self.setup_logging()
        
        # 初始化数据库连接
        self.mysql_conn = None
        self.neo4j_graph = None
        self.connect_databases()
    
    def setup_logging(self):
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        self.logger = logging.getLogger(__name__)
    
    def connect_databases(self):
        """连接MySQL和Neo4j数据库"""
        try:
            # 连接MySQL
            self.mysql_conn = pymysql.connect(**self.mysql_config)
            self.logger.info("MySQL连接成功")
            
            # 连接Neo4j
            self.neo4j_graph = Graph(**self.neo4j_config)
            self.logger.info("Neo4j连接成功")
            
        except Exception as e:
            self.logger.error(f"数据库连接失败: {e}")
    
    def check_ollama_health(self) -> bool:
        """检查Ollama服务是否健康"""
        try:
            import requests
        except ImportError:
            self.logger.error("请安装requests库: pip install requests")
            return False

        try:
            response = requests.get(f"{self.ollama_host}/api/tags", timeout=2)
            return response.status_code == 200
        except:
            return False
    
    def chunk_text(self, text: str, chunk_size: int = 3000) -> List[str]:
        """将长文本分割成适合模型处理的块"""
        # 按段落分割
        paragraphs = re.split(r'\n\s*\n', text)
        chunks = []
        current_chunk = ""
        
        for paragraph in paragraphs:
            if len(current_chunk) + len(paragraph) < chunk_size:
                current_chunk += paragraph + "\n\n"
            else:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = paragraph + "\n\n"
        
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks
    
    def call_ollama_with_retry(self, prompt: str, max_retries: int = 3) -> str:
        """带重试机制的Ollama调用"""
        # 先检查Ollama服务是否可用
        if not self.check_ollama_health():
            self.logger.error("Ollama服务不可用，请检查是否启动")
            return ""

        for attempt in range(max_retries):
            try:
                response = ollama.generate(model=self.ollama_model, prompt=prompt)
                return response['response']
            except Exception as e:
                self.logger.warning(f"Ollama调用失败 (尝试 {attempt + 1}/{max_retries}): {e}")
                time.sleep(2)
        
        self.logger.error("Ollama调用完全失败")
        return ""
    
    def extract_knowledge_from_chunk(self, chunk: str, chunk_id: int) -> Dict:
        """从文本块中提取知识"""
        
        prompt = f"""
        请从以下医学文献片段中提取结构化知识，严格按照JSON格式返回：

        需要提取的实体和关系：

        实体类型：
        1. Disease(疾病): name(名称), type(类型), description(描述), prevalence(患病率), mortality(病死率)
        2. Symptom(症状): name(名称), type(类型), description(描述), severity(严重程度)
        3. DiagnosticMethod(诊断方法): name(名称), type(类型), description(描述), sensitivity(敏感度), specificity(特异度)
        4. Treatment(治疗方法): name(名称), type(类型), description(描述), efficacy(效果)
        5. Pathogen(病原体): name(名称), type(类型), description(描述), related_diseases(相关疾病)
        6. Drug(药物): name(名称), type(类型), description(描述), indication(适应症)
        7. Recommendation(推荐意见): content(内容), evidence_level(证据等级), recommendation_level(推荐等级)

        关系类型：
        1. HAS_SYMPTOM(有症状)
        2. DIAGNOSED_BY(通过诊断)
        3. TREATED_WITH(通过治疗)
        4. CAUSED_BY(由病原体引起)
        5. PRESCRIBED_DRUG(使用药物)
        6. FOLLOWS_RECOMMENDATION(遵循推荐)

        请返回如下JSON格式：
        {{
            "entities": {{
                "diseases": [],
                "symptoms": [],
                "diagnostic_methods": [],
                "treatments": [],
                "pathogens": [],
                "drugs": [],
                "recommendations": []
            }},
            "relationships": []
        }}

        文本内容：
        {chunk}

        注意：只返回JSON格式，不要其他文字。
        """
        
        response = self.call_ollama_with_retry(prompt)
        
        try:
            # 尝试从响应中提取JSON
            json_match = re.search(r'\{.*\}', response, re.DOTALL)
            if json_match:
                return json.loads(json_match.group())
            else:
                self.logger.warning(f"块 {chunk_id} 未找到有效JSON")
                return {"entities": {}, "relationships": []}
        except json.JSONDecodeError as e:
            self.logger.error(f"块 {chunk_id} JSON解析失败: {e}")
            return {"entities": {}, "relationships": []}
    
    def extract_knowledge_from_markdown(self, md_file_path: str) -> Dict:
        """从Markdown文件提取知识"""
        
        # 读取Markdown文件
        with open(md_file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 分割文本
        chunks = self.chunk_text(content)
        self.logger.info(f"将文本分割为 {len(chunks)} 个块")
        
        all_entities = {
            "diseases": [],
            "symptoms": [],
            "diagnostic_methods": [],
            "treatments": [],
            "pathogens": [],
            "drugs": [],
            "recommendations": []
        }
        all_relationships = []
        
        # 处理每个块
        for i, chunk in enumerate(chunks):
            self.logger.info(f"处理块 {i+1}/{len(chunks)}")
            result = self.extract_knowledge_from_chunk(chunk, i+1)
            
            # 合并实体
            for entity_type in all_entities.keys():
                if entity_type in result.get("entities", {}):
                    all_entities[entity_type].extend(result["entities"][entity_type])
            
            # 合并关系
            if "relationships" in result:
                all_relationships.extend(result["relationships"])
            
            # 避免频繁调用
            time.sleep(1)
        
        return {
            "entities": all_entities,
            "relationships": all_relationships
        }

In [18]:
class MySQLManager:
    def __init__(self, mysql_conn):
        self.conn = mysql_conn
        self.cursor = self.conn.cursor()
        self.create_tables()
    
    def create_tables(self):
        """创建MySQL表结构"""
        
        tables = {
            'diseases': """
                CREATE TABLE IF NOT EXISTS diseases (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(255) NOT NULL,
                    type VARCHAR(100),
                    description TEXT,
                    prevalence VARCHAR(100),
                    mortality VARCHAR(100),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """,
            'symptoms': """
                CREATE TABLE IF NOT EXISTS symptoms (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(255) NOT NULL,
                    type VARCHAR(100),
                    description TEXT,
                    severity VARCHAR(50),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """,
            'diagnostic_methods': """
                CREATE TABLE IF NOT EXISTS diagnostic_methods (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(255) NOT NULL,
                    type VARCHAR(100),
                    description TEXT,
                    sensitivity VARCHAR(100),
                    specificity VARCHAR(100),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """,
            'treatments': """
                CREATE TABLE IF NOT EXISTS treatments (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(255) NOT NULL,
                    type VARCHAR(100),
                    description TEXT,
                    efficacy TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """,
            'pathogens': """
                CREATE TABLE IF NOT EXISTS pathogens (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(255) NOT NULL,
                    type VARCHAR(100),
                    description TEXT,
                    related_diseases TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """,
            'drugs': """
                CREATE TABLE IF NOT EXISTS drugs (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(255) NOT NULL,
                    type VARCHAR(100),
                    description TEXT,
                    indication TEXT,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """,
            'recommendations': """
                CREATE TABLE IF NOT EXISTS recommendations (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    content TEXT NOT NULL,
                    evidence_level VARCHAR(50),
                    recommendation_level VARCHAR(50),
                    consistency_rate VARCHAR(50),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """,
            'relationships': """
                CREATE TABLE IF NOT EXISTS relationships (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    source_type VARCHAR(50) NOT NULL,
                    source_id INT NOT NULL,
                    target_type VARCHAR(50) NOT NULL,
                    target_id INT NOT NULL,
                    relationship_type VARCHAR(100) NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """
        }
        
        for table_name, create_sql in tables.items():
            try:
                self.cursor.execute(create_sql)
                self.conn.commit()
            except Exception as e:
                print(f"创建表 {table_name} 失败: {e}")
    
    def save_entities_to_mysql(self, entities: Dict):
        """保存实体到MySQL"""
        
        for entity_type, entity_list in entities.items():
            if not entity_list:
                continue
                
            for entity in entity_list:
                # 构建插入SQL
                columns = ', '.join(entity.keys())
                placeholders = ', '.join(['%s'] * len(entity))
                sql = f"INSERT INTO {entity_type} ({columns}) VALUES ({placeholders})"
                
                try:
                    self.cursor.execute(sql, list(entity.values()))
                    self.conn.commit()
                except Exception as e:
                    print(f"插入{entity_type}失败: {e}")
                    self.conn.rollback()

In [19]:
class Neo4jManager:
    def __init__(self, neo4j_graph):
        self.graph = neo4j_graph
    
    def clear_database(self):
        """清空图数据库"""
        self.graph.run("MATCH (n) DETACH DELETE n")
    
    def create_entities_in_neo4j(self, entities: Dict):
        """在Neo4j中创建实体节点"""
        
        node_mapping = {}  # 存储节点ID映射
        
        for entity_type, entity_list in entities.items():
            for entity in entity_list:
                # 创建节点
                node_properties = {k: v for k, v in entity.items() if v is not None}
                node = Node(entity_type.upper(), **node_properties)
                self.graph.create(node)
                
                # 存储节点映射
                if entity_type not in node_mapping:
                    node_mapping[entity_type] = {}
                node_mapping[entity_type][entity['name']] = node
        
        return node_mapping
    
    def create_relationships_in_neo4j(self, relationships: List, node_mapping: Dict):
        """在Neo4j中创建关系"""
        
        for rel in relationships:
            try:
                source_node = node_mapping[rel['source_type']][rel['source_name']]
                target_node = node_mapping[rel['target_type']][rel['target_name']]
                
                relationship = Relationship(source_node, rel['type'], target_node)
                self.graph.create(relationship)
            except KeyError as e:
                print(f"创建关系失败，找不到节点: {e}")

In [None]:
def main():
    # 数据库配置
    mysql_config = {
        'host': 'localhost',
        'user': 'root',
        'password': '123456',
        'database': 'doctor',
        'charset': 'utf8mb4'
    }
    
    neo4j_config = {
        'uri': "bolt://localhost:7687",
        'auth': ("neo4j", "test1234")
    }
    
    # 初始化提取器
    extractor = NSTIKnowledgeExtractor(mysql_config, neo4j_config)
    
    # 提取知识
    md_file_path = r"O:\MyProject\RAG\DB\uploads\output.md"  # 您的OCR转换文件
    knowledge_data = extractor.extract_knowledge_from_markdown(md_file_path)
    
    # 保存到MySQL
    mysql_manager = MySQLManager(extractor.mysql_conn)
    mysql_manager.save_entities_to_mysql(knowledge_data["entities"])
    
    # 保存到Neo4j
    neo4j_manager = Neo4jManager(extractor.neo4j_graph)
    neo4j_manager.clear_database()
    node_mapping = neo4j_manager.create_entities_in_neo4j(knowledge_data["entities"])
    neo4j_manager.create_relationships_in_neo4j(knowledge_data["relationships"], node_mapping)
    
    print("知识抽取和存储完成！")
    
    # 关闭连接
    if extractor.mysql_conn:
        extractor.mysql_conn.close()

if __name__ == "__main__":
    main()

2025-10-06 23:37:40,348 - INFO - MySQL连接成功
2025-10-06 23:37:40,361 - INFO - Neo4j连接成功
2025-10-06 23:37:40,365 - INFO - 将文本分割为 7 个块
2025-10-06 23:37:40,366 - INFO - 处理块 1/7
2025-10-06 23:37:46,406 - ERROR - Ollama调用完全失败
2025-10-06 23:37:47,413 - INFO - 处理块 2/7
2025-10-06 23:37:53,446 - ERROR - Ollama调用完全失败
2025-10-06 23:37:54,450 - INFO - 处理块 3/7
2025-10-06 23:38:00,485 - ERROR - Ollama调用完全失败
2025-10-06 23:38:01,492 - INFO - 处理块 4/7
