In [None]:
# 伪代码和概念结构

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.prompts import FewShotPromptTemplate, PromptTemplate
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from langchain_neo4j import Neo4jGraph

# 1. 初始化模型和图数据库连接
llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0.2,  # 设置温度以控制输出的随机性
    base_url="",
    api_key="",
)

In [None]:
# Neo4jGraph用于后续直接写入图数据库
graph = Neo4jGraph(
    url="", 
    username="neo4j", 
    password=""
)

In [4]:
# 2. 定义输出结构 (Pydantic Models) 和解析器
from typing import List


class GazetteerInfo(BaseModel):
    lake_name: str = Field(description="湖泊的名称")
    location: str = Field(description="湖泊所在的古代地名")
    gazetteer_source: str = Field(description="记载该湖泊的方志名称")
    content: str = Field(description="方志中关于该湖泊的原始记载内容")

class PoemInfo(BaseModel):
    lake_name: str = Field(description="诗词中提到的湖泊名称")
    poem_name: str = Field(description="提到该湖泊的诗词名称")
    poem_full_text: str = Field(description="提到该湖泊的完整诗词内容")

# 支持多湖泊的输出结构
class MultipleGazetteerInfo(BaseModel):
    extractions: List[GazetteerInfo] = Field(description="从文本中提取的所有湖泊信息列表")

class MultiplePoemInfo(BaseModel):
    extractions: List[PoemInfo] = Field(description="从文本中提取的所有诗词信息列表")

# 创建输出解析器 - 使用多湖泊解析器
gazetteer_parser = PydanticOutputParser(pydantic_object=GazetteerInfo)
poem_parser = PydanticOutputParser(pydantic_object=PoemInfo)

# 多湖泊解析器
multi_gazetteer_parser = PydanticOutputParser(pydantic_object=MultipleGazetteerInfo)
multi_poem_parser = PydanticOutputParser(pydantic_object=MultiplePoemInfo)

In [67]:
import json
from pathlib import Path
from typing import List
from langchain_core.documents import Document
# 1. 定义要读取的文件路径
input_path = Path("split_outputs_jsonl/processed_chunks2.jsonl")

# 2. 准备一个空列表来存放加载后的 Document 对象
loaded_documents: List[Document] = []

print(f"\n--- 开始从文件加载 Document 块: {input_path} ---")

# 3. 检查文件是否存在
if not input_path.exists():
    print(f"--- 错误：文件不存在 -> {input_path} ---")
else:
    try:
        # 以读取模式打开文件
        with open(input_path, "r", encoding="utf-8") as f:
            # 逐行读取文件
            for line in f:
                # a. 使用 json.loads 将每一行的 JSON 字符串解析为字典
                data_record = json.loads(line)
                
                # b. 使用字典中的数据重新创建 Document 对象
                doc = Document(
                    page_content=data_record["page_content"],
                    metadata=data_record["metadata"]
                )
                
                # c. 将创建好的 Document 对象添加到列表中
                loaded_documents.append(doc)

        print(f"--- 加载完成！成功从文件恢复了 {len(loaded_documents)} 个 Document 对象。 ---")

        # 验证一下加载回来的第一个 Document
        if loaded_documents:
            print("\n验证加载的第一个文档块:")
            first_doc = loaded_documents[0]
            print(f"内容:\n---\n{first_doc.page_content[:100]}...\n---")
            print(f"元数据: {first_doc.metadata}")
            
    except (IOError, json.JSONDecodeError) as e:
        print(f"--- 读取或解析文件时发生错误: {input_path}. 错误详情: {e} ---")

docs = loaded_documents


--- 开始从文件加载 Document 块: split_outputs_jsonl/processed_chunks2.jsonl ---
--- 加载完成！成功从文件恢复了 432 个 Document 对象。 ---

验证加载的第一个文档块:
内容:
---
永乐大典卷之二千二百六十八【六模】
湖
巢湖
《合肥志》
在合肥县东南六十里。亦名焦湖。汉明帝十一年。漅湖出黄金。庐江太守以献。漅。音勦或曰勦湖。俗讹为焦湖。以其在巢县。亦曰巢湖。周围四百里。港（汊）...
---
元数据: {'source': './data_simplified.txt'}


In [68]:
# 4. 创建 Few-shot 提示
# 方志的 Few-shot 示例
gazetteer_examples = [
    {
        "input": "《大清一统志》：西湖在杭州府城西，周三十里。其水甘澄，能疗疾。苏轼尝官此，有诗纪其事。",
        "output": MultipleGazetteerInfo(
            extractions=[GazetteerInfo(
                lake_name="西湖",
                location="杭州府",
                gazetteer_source="大清一统志",
                content="西湖在杭州府城西，周三十里。其水甘澄，能疗疾。苏轼尝官此，有诗纪其事。"
            )]
        )
    },
    {
        "input": "《太平寰宇记》：洞庭湖在岳州之南，方圆八百里，其气势浩瀚，为天下之冠。鄱阳湖在饶州，纵广三百三十里。",
        "output": MultipleGazetteerInfo(
            extractions=[
                GazetteerInfo(
                    lake_name="洞庭湖",
                    location="岳州",
                    gazetteer_source="太平寰宇记",
                    content="洞庭湖在岳州之南，方圆八百里，其气势浩瀚，为天下之冠。"
                ),
                GazetteerInfo(
                    lake_name="鄱阳湖",
                    location="饶州",
                    gazetteer_source="太平寰宇记",
                    content="鄱阳湖在饶州，纵广三百三十里。"
                )
            ]
        )
    }
]

# 诗词的 Few-shot 示例
poem_examples = [
    {
        "input": "望洞庭湖赠张丞相 - 孟浩然\n八月湖水平，涵虚混太清。气蒸云梦泽，波撼岳阳城。",
        "output": MultiplePoemInfo(
            extractions=[PoemInfo(
                lake_name="洞庭湖",
                poem_name="望洞庭湖赠张丞相",
                poem_full_text="八月湖水平，涵虚混太清。气蒸云梦泽，波撼岳阳城。"
            )]
        )
    },
    {
        "input": "饮湖上初晴后雨 - 苏轼\n水光潋滟晴方好，山色空蒙雨亦奇。欲把西湖比西子，淡妆浓抹总相宜。\n又题\n朝曦迎客艳重冈，晚雨留人入醉乡。此意自佳君不会，一杯当属水仙王。-- 此诗亦咏西湖",
        "output": MultiplePoemInfo(
            extractions=[
                PoemInfo(
                    lake_name="西湖",
                    poem_name="饮湖上初晴后雨",
                    poem_full_text="水光潋滟晴方好，山色空蒙雨亦奇。欲把西湖比西子，淡妆浓抹总相宜。"
                ),
                PoemInfo(
                    lake_name="西湖",
                    poem_name="又题",
                    poem_full_text="朝曦迎客艳重冈，晚雨留人入醉乡。此意自佳君不会，一杯当属水仙王。"
                )
            ]
        )
    }
]

# 创建 Few-shot 提示模板
example_prompt = PromptTemplate(
    input_variables=["input", "output"],
    template="输入:\n{input}\n输出:\n{output}"
)

# 方志的 Few-shot 提示
gazetteer_prompt = FewShotPromptTemplate(
    examples=gazetteer_examples,
    example_prompt=example_prompt,
    suffix="输入:\n{input}\n输出:",
    input_variables=["input"],
    example_separator="\n\n",
    prefix="""
    你是一个专门从古代文献中提取一个或多个湖泊信息的专家。
    注意区分湖的别名和本名,区分方志和诗词，只保留本名。
    如果不是文献方志，如诗词等，请不要提取，返回空列表。
    如果文中提到多个湖泊，且这些湖泊不是同一湖泊的别名，请提取所有湖泊信息。
    如果是文献方志，请根据以下格式提取信息：\n{format_instructions}
    """,
    partial_variables={"format_instructions": multi_gazetteer_parser.get_format_instructions()}
)

# 诗词的 Few-shot 提示
poem_prompt = FewShotPromptTemplate(
    examples=poem_examples,
    example_prompt=example_prompt,
    suffix="输入:\n{input}\n输出:",
    input_variables=["input"],
    example_separator="\n\n",
    prefix="""你是一个专门从中国古典诗词中提取一个或多个湖泊信息的专家。
    注意区分湖的别名和本名，区分方志和诗词，只保留本名。
    如果不是诗词，如方志等，请不要提取，返回空列表。
    如果文中提到多个湖泊，且这些湖泊不是同一湖泊的别名，请提取所有湖泊信息。
    如果是诗词，请根据以下格式，请根据以下格式提取信息：\n{format_instructions}
    """,
    partial_variables={"format_instructions": multi_poem_parser.get_format_instructions()}
)

In [69]:
# 5. 构建处理链
gazetteer_chain = gazetteer_prompt | llm | multi_gazetteer_parser
poem_chain = poem_prompt | llm | multi_poem_parser

In [70]:
poem_res=poem_chain.invoke({"input":docs[0].page_content})
gazetteer_res=gazetteer_chain.invoke({"input": docs[0].page_content})

print("诗词提取结果:", poem_res)
print("方志提取结果:", gazetteer_res)

诗词提取结果: extractions=[]
方志提取结果: extractions=[GazetteerInfo(lake_name='巢湖', location='合肥县', gazetteer_source='合肥志', content='在合肥县东南六十里。亦名焦湖。汉明帝十一年。漅湖出黄金。庐江太守以献。漅。音勦或曰勦湖。俗讹为焦湖。以其在巢县。亦曰巢湖。周围四百里。港（汊）大小三百六十。占合肥。舒城。巢县。庐江。四邑之境。')]


In [71]:
import time
from typing import List
from langchain_core.documents import Document

# --- 阶段一：并发提取 ---
print("--- 阶段一：开始并发提取方志和诗词信息... ---")

# 准备所有链的输入
# content_chunk 来源于我们之前分割好的 Document 对象的 page_content
inputs = [{"input": doc.page_content} for doc in docs]
concurrency_config = {"max_concurrency": 10} # 设置合理的并发上限

# 并发调用方志链
print("  -> 正在并发处理方志链...")
start_time = time.time()
gazetteer_results = gazetteer_chain.batch(
    inputs, 
    config=concurrency_config, 
    return_exceptions=True
)
print(f"  -> 方志链处理完成，耗时: {time.time() - start_time:.2f} 秒")

# 并发调用诗词链
print("  -> 正在并发处理诗词链...")
start_time = time.time()
poem_results = poem_chain.batch(
    inputs, 
    config=concurrency_config, 
    return_exceptions=True
)
print(f"  -> 诗词链处理完成，耗时: {time.time() - start_time:.2f} 秒")


# --- 阶段二：数据聚合 ---
print("\n--- 阶段二：开始聚合处理结果，准备写入数据库... ---")

gazetteer_data_for_db = []
for i, result in enumerate(gazetteer_results):
    if isinstance(result, Exception) or not result.extractions:
        # 如果调用出错或模型未能提取任何信息，则跳过
        if isinstance(result, Exception):
            print(f"  [警告] 方志提取失败 (块 {i+1}): {result}")
        continue
    # 将 Pydantic 对象转换为字典，方便后续作为参数传递
    for extraction in result.extractions:
        gazetteer_data_for_db.append(extraction.model_dump())

poem_data_for_db = []
for i, result in enumerate(poem_results):
    if isinstance(result, Exception) or not result.extractions:
        if isinstance(result, Exception):
            print(f"  [警告] 诗词提取失败 (块 {i+1}): {result}")
        continue
    for extraction in result.extractions:
        poem_data_for_db.append(extraction.model_dump())

print(f"  -> 聚合完成：准备写入 {len(gazetteer_data_for_db)} 条方志信息和 {len(poem_data_for_db)} 条诗词信息。")


# --- 阶段三：批量写入图数据库 ---
print("\n--- 阶段三：开始批量写入数据到图数据库... ---")

# 1. 批量写入方志数据
if gazetteer_data_for_db:
    # 使用 UNWIND 将列表展开，然后在数据库端进行循环处理
    gazetteer_query = """
    UNWIND $data as row
    MERGE (l:Lake {name: row.lake_name})
    ON CREATE SET l.location = row.location
    MERGE (g:Gazetteer {source: row.gazetteer_source})
    ON CREATE SET g.content = row.content
    MERGE (l)-[:MENTIONED_IN_GAZETTEER]->(g)
    """
    try:
        graph.query(gazetteer_query, {"data": gazetteer_data_for_db})
        print(f"  -> 成功批量写入 {len(gazetteer_data_for_db)} 条方志信息。")
    except Exception as e:
        print(f"  [错误] 批量写入方志信息失败: {e}")

# 2. 批量写入诗词数据
if poem_data_for_db:
    poem_query = """
    UNWIND $data as row
    MERGE (l:Lake {name: row.lake_name})
    MERGE (p:Poem {name: row.poem_name})
    ON CREATE SET p.full_text = row.poem_full_text
    MERGE (l)-[:MENTIONED_IN_POEM]->(p)
    """
    try:
        graph.query(poem_query, {"data": poem_data_for_db})
        print(f"  -> 成功批量写入 {len(poem_data_for_db)} 条诗词信息。")
    except Exception as e:
        print(f"  [错误] 批量写入诗词信息失败: {e}")


print("\n所有文档处理和入库操作已全部完成。")

--- 阶段一：开始并发提取方志和诗词信息... ---
  -> 正在并发处理方志链...
  -> 方志链处理完成，耗时: 88.73 秒
  -> 正在并发处理诗词链...
  -> 诗词链处理完成，耗时: 30.02 秒

--- 阶段二：开始聚合处理结果，准备写入数据库... ---
  [警告] 诗词提取失败 (块 130): Invalid json output: ```
{"extractions":[PoemInfo(lake_name='丰湖', poem_name='丰湖诗', poem_full_text='小米侍郎生较晚，龙眠居士远离呼。\n不知若个丹青手，能冩微澜玉塔图？\n岷峨一老古来少，杭颕二湖天下无。\n帝恐先生晚牢落，南迁犹得管丰湖。\n作桥聊结众生縁，不计全家落瘴烟。\n内翰翻身脱犀带，黄金劝妇助金钱。')]}
```
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/OUTPUT_PARSING_FAILURE 
  -> 聚合完成：准备写入 540 条方志信息和 64 条诗词信息。

--- 阶段三：开始批量写入数据到图数据库... ---
  -> 成功批量写入 540 条方志信息。
  -> 成功批量写入 64 条诗词信息。

所有文档处理和入库操作已全部完成。
