In [1]:
from pathlib import Path

def load_markdown_simple(file_path: Path) -> str:
    """
    以 UTF-8 编码读取 Markdown 文件的全部内容。
    (这是一个简化版本，不包含显式的异常处理)

    Args:
        file_path (Path): .md 文件的路径对象。

    Returns:
        str: 文件的文本内容。
    """
    # .read_text() 方法是 pathlib.Path 提供的便捷函数，
    # 默认已使用 'utf-8' 编码，非常适合读取 .md 文件。
    return file_path.read_text(encoding='utf-8')

In [2]:
md_content = load_markdown_simple(Path(r"E:\02open-project\deepresearch-agent\test\parser_test\test.md"))

In [3]:
md_content



In [4]:
from pathlib import Path
import sys

# 1. 在 Jupyter 中，使用 Path.cwd() 获取当前 notebook 所在的目录
# Path.cwd() 会返回 '.../my_project/notebooks'
notebook_dir = Path.cwd()

# 2. 获取项目根目录 (即 'notebooks/' 的上一级)
# 这才是你想要的 BASE_DIR ('.../my_project')
BASE_DIR = notebook_dir.parent.parent

# 3. 将路径添加到 sys.path
# 注意：sys.path 需要的是字符串(str)格式的路径
if str(BASE_DIR) not in sys.path:
    sys.path.append(str(BASE_DIR))

In [5]:
from src.backend.domain.models import DocumentSource
from src.backend.infrastructure.parse.splitter import MarkdownSplitter

  from .autonotebook import tqdm as notebook_tqdm


2025-11-14 15:43:19,760 - src.backend.core.logging - INFO - 日志系统已初始化，级别: INFO


In [6]:
source = DocumentSource(
    file_path=Path(r"E:\02open-project\deepresearch-agent\test\parser_test\test.md"),
    document_name="test"
)

In [7]:
splitter = MarkdownSplitter(
    headers_to_split_on=[
       ("#", "Header 1"),
       ("##", "Header 2"),
       ("###", "Header 3"),
       ("####", "Header 4"),
       ("#####", "Header 5"),
    ]
)

2025-11-14 15:43:19,886 - src.backend.infrastructure.parse.splitter - INFO - MarkdownSplitter 初始化完毕。最大Tokens: 1024，重叠Tokens: 100，编码器: cl100k_base


In [8]:
chunks = splitter.split(markdown_content=md_content, source=source)

2025-11-14 15:43:19,891 - src.backend.infrastructure.parse.splitter - INFO - 开始分割文档: test (ID: 3512b1f1-7938-4bef-b006-42d02f4fb8b5)
2025-11-14 15:43:19,907 - src.backend.infrastructure.parse.splitter - INFO - 文档 test 分割完毕，共生成 26 个 DocumentChunk。


In [9]:
chunks[10]

DocumentChunk(chunk_id='0df26bb0-3312-4771-b462-6cfbe532f087', document_id='3512b1f1-7938-4bef-b006-42d02f4fb8b5', document_name='test', content='# 2 星间互联网格化计算方法  \n在 §1 中的空天网格体系和空天全域网格索 引大表基础上， 本文提出了网格化星间互联计算 方法， 以用户获取特定区域的遥感卫星数据和未 来覆盖资源为例， 主要步骤如下： （ 1 ）通过预先计 算网格间的通视情况， 查找网格索引大表中的通 视情况和 GeoSOT -3D 二级索引来代替传统复杂 的通视计算方法。 （ 2 ）在无法直接通视的情况下 进行星间数据传输。建立网格化虚拟节点， 并使 用虚拟拓扑， 通过查询网格通视大表， 迭代出满 足覆盖要求的数据星间传输路径。', parent_headings=['2 星间互联网格化计算方法'], summary=None, hypothetical_questions=[], metadata={'Header 1': '2 星间互联网格化计算方法'})

In [10]:
for c in chunks:
    print(c.parent_headings)

[]
['空天网格化星间通视及路由路径规划算法']
['空天网格化星间通视及路由路径规划算法']
['空天网格化星间通视及路由路径规划算法']
['1 卫星网格化建模']
['1 卫星网格化建模', '1.1 空天网格体系']
['1 卫星网格化建模', '1.2 空天全域网格索引大表']
['1 卫星网格化建模', '1.3 卫星轨迹与资源网格化']
['1 卫星网格化建模', '1.3.1 卫星轨迹网格索引大表']
['1 卫星网格化建模', '1.3.2 卫星资源能力索引大表']
['2 星间互联网格化计算方法']
['2 星间互联网格化计算方法', '2.1 星间通视分析']
['2 星间互联网格化计算方法', '2.1.1 通视网格查找表']
['2 星间互联网格化计算方法', '2.1.2 网格通视计算']
['2 星间互联网格化计算方法', '2.2 星间路由空间链路动态规划']
['2 星间互联网格化计算方法', '2.2.1 网格化网络拓扑模型']
['2 星间互联网格化计算方法', '2.2.1 网格化网络拓扑模型']
['3 仿真实验', '3.1 实验设计']
['3 仿真实验', '3.2 空天网格索引大表']
['3 仿真实验', '3.3 星间通视分析']
['3 仿真实验', '3.3 星间通视分析']
['3 仿真实验', '3.4 星间路由空间链路动态规划']
['4 结 语']
['4 结 语', '参 考 文 献']
['4 结 语', '参 考 文 献']
['4 结 语', 'Aerospace Grid⁃Based Algorithm of Inter⁃satellite Visibility and Route Path Planning for Satellite Constellation']


In [11]:
from src.backend.infrastructure.llm.clients import preprocessing_chunk_client
from src.backend.infrastructure.parse.preprocessor import LLMPreprocessor

2025-11-14 15:43:24,679 - src.backend.core.logging - INFO - 日志系统已初始化，级别: INFO


In [12]:
preprocessor = LLMPreprocessor(preprocessing_chunk_client)

2025-11-14 15:43:24,684 - src.backend.infrastructure.parse.preprocessor - INFO - LLM预处理器 (LLMPreprocessor) 初始化完成。


In [13]:
processed_chunks = [await preprocessor.preprocess(chunk) for chunk in chunks]

2025-11-14 15:43:52,570 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 15:44:02,582 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 15:44:09,873 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 15:44:35,959 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 15:44:40,975 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 15:44:46,607 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 15:44:54,390 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 15:44:56,591 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 15:45:08,794 - httpx - INFO - HTTP Request: POST http

In [14]:
processed_chunks

[[DocumentChunk(chunk_id='a9e58dcf-3f92-4094-a492-066edd49bd4a', document_id='3512b1f1-7938-4bef-b006-42d02f4fb8b5', document_name='test', content='第 46 卷 第 1 期 2021 年 1 月  \nDOI ： 10.13203/j.whugis20200234  \nVol.46 No.1 Jan. 2021  \n文章编号 ： 1671⁃8860(2021)01⁃0050⁃08  \n这是一张包含一个二维码的图片。二维码是黑色模块排列在正方形网格上的图案，用于存储信息。扫描该二维码可以获取其中包含的信息，如网址、文本或联系方式。  \n<!-- image -->', parent_headings=[], summary='本文介绍二维码技术，其通过黑色模块排列在正方形网格上存储信息，扫描可获取网址、文本或联系方式。', hypothetical_questions=['二维码如何排列黑色模块以存储信息？', '扫描二维码可以获取哪些类型的信息？', '二维码技术在信息存储与获取中的应用场景是什么？', '正方形网格在二维码中的作用是什么？'], metadata={})],
 [DocumentChunk(chunk_id='1cdde383-38db-41de-9581-ca9aec8a3bee', document_id='3512b1f1-7938-4bef-b006-42d02f4fb8b5', document_name='test', content='# 空天网格化星间通视及路由路径规划算法  \n李 爽 1 李德仁 2 程承旗 3 陈 波 3 沈 欣 2 童晓冲 4  \n1 复旦大学历史地理研究中心， 上海， 200433  \n2 武汉大学测绘遥感信息工程国家重点实验室， 湖北 武汉， 430079  \n3 北京大学工学院， 北京， 100871  \n4 信息工程大学地理空间信息学院， 河南 郑州， 450001  \n摘 要 ： 通信、 导航、 遥感一体的天基信息服务系统的建设将对跨境实时通信、 动目标全球跟踪、 灾害快速响应 等提供有力保障， 同时也对高效的网络通信 , 特别是卫星路

In [17]:
import asyncio
import logging
from typing import List
from src.backend.domain.models import DocumentChunk

log = logging.getLogger(__name__)

async def process_chunk_with_semaphore(
    chunk: DocumentChunk, 
    preprocessor: LLMPreprocessor, 
    semaphore: asyncio.Semaphore
) -> List[DocumentChunk]:
    """
    一个带有限流器（Semaphore）的异步工作单元。
    
    Args:
        chunk: 待处理的块。
        preprocessor: 处理器实例。
        semaphore: 必须传入 asyncio.Semaphore 实例。
        
    Returns:
        成功时返回 [chunk]，失败时返回 []。
    """
    # async with semaphore: 会在执行前“请求”一个令牌。
    # 如果并发数已满，它会在这里异步等待，直到有令牌可用。
    async with semaphore:
        try:
            log.debug(f"开始处理块: {chunk.chunk_id}")
            
            # 关键：调用你真正的异步处理方法
            # (假设 preprocessor.process 返回 List[DocumentChunk])
            return await preprocessor.preprocess(chunk)
        
        except asyncio.TimeoutError:
            log.error(f"处理块 {chunk.chunk_id} 超时。")
            return []
        except Exception as e:
            log.error(f"处理块 {chunk.chunk_id} 失败: {e}", exc_info=True)
            return []

async def run_concurrent_preprocessing(
    chunks: List[DocumentChunk], 
    preprocessor: LLMPreprocessor,
    max_concurrency: int = 30
) -> List[DocumentChunk]:
    """
    在生产环境中并发处理所有文本块的主函数。

    Args:
        chunks: 所有待处理的块列表。
        preprocessor: 异步处理器。
        max_concurrency: 最大并发 API 请求数 (防止 429 错误)。

    Returns:
        成功处理和扩充的块的列表。
    """
    
    # 1. 创建信号量，限制并发数为 50
    semaphore = asyncio.Semaphore(max_concurrency)
    
    log.info(f"开始并发处理 {len(chunks)} 个块 (最大并发数: {max_concurrency})...")

    # 2. 创建所有任务的列表
    tasks = []
    for chunk in chunks:
        tasks.append(
            process_chunk_with_semaphore(chunk, preprocessor, semaphore)
        )

    # 3. 并发执行所有任务
    # results_list 是一个列表的列表, e.g., [[chunk1], [chunk2], [], [chunk4], ...]
    results_list = await asyncio.gather(*tasks)

    # 4. 展平 (Flatten) 结果，过滤掉失败的空列表
    enriched_chunks = [
        item 
        for sublist in results_list 
        if sublist  # 确保 sublist 不是空列表 []
        for item in sublist
    ]

    log.info(f"处理完成。成功扩充 {len(enriched_chunks)} / {len(chunks)} 个块。")
    return enriched_chunks

In [18]:
processed_chunks = await run_concurrent_preprocessing(
    chunks=chunks,
    preprocessor=preprocessor,
    max_concurrency=30  # 根据 API 限制调整
)

2025-11-14 16:01:54,350 - __main__ - INFO - 开始并发处理 26 个块 (最大并发数: 30)...
2025-11-14 16:02:10,835 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 16:02:11,260 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 16:02:11,436 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 16:02:11,840 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 16:02:12,341 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 16:02:13,295 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 16:02:13,669 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 200 OK"
2025-11-14 16:02:13,812 - httpx - INFO - HTTP Request: POST http://127.0.0.1:4000/chat/completions "HTTP/1.1 