基于 RAG(Retrieval-Augmented Generation)的商品知识问答系统,支持文档导入、多路检索、知识图谱构建与大模型生成回答。
文档导入流程
┌─────────┐ ┌──────────┐ ┌────────────┐ ┌───────────┐
│ PDF/MD │──▶│ MinerU │──▶│ 文本分块 │──▶│ BGE-M3 │
│ 文件 │ │ PDF解析 │ │ Chunking │ │ 向量化 │
└─────────┘ └──────────┘ └────────────┘ └───────────┘
│
┌──────────────┐ ┌─────────────┐ │
│ 知识图谱构建 │◀──│ LLM 实体抽取 │◀──┘
│ (Neo4j) │ └─────────────┘
└──────────────┘
│
┌──────▼──────────┐
│ Milvus │
│ (混合向量索引) │
└─────────────────┘
查询回答流程
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ 用户查询 │──▶│ 商品名称确认 │──▶│ Query 改写 │
└──────────┘ └──────────────┘ └──────────────┘
│
┌──────────────┐ ┌──────▼───────┐
│ 知识图谱 │◀──│ 混合向量检索 │
│ (Neo4j) │ │ (HyDE + 常规) │
└──────────────┘ └──────────────┘
│ │
┌──────▼────────────────────▼───────┐
│ RRF 多路融合排序 │
│ (Reciprocal Rank Fusion) │
└──────────────────┬─────────────────┘
│
┌──────────────────▼─────────────────┐
│ BGE Reranker 重排序 │
└──────────────────┬─────────────────┘
│
┌──────────────────▼─────────────────┐
│ LLM 生成最终回答 │
│ (通义千问 / OpenAI 接口) │
└────────────────────────────────────┘
| 层级 | 技术 |
|---|---|
| LLM | 通义千问(DashScope)、OpenAI 兼容接口 |
| 向量数据库 | Milvus(混合检索:dense + sparse 向量) |
| 图数据库 | Neo4j(知识图谱存储与查询) |
| 文档数据库 | MongoDB(对话历史) |
| 对象存储 | MinIO(文件存储) |
| ** Embedding 模型** | BGE-M3(本地部署,dense + sparse 混合向量) |
| Reranker 模型 | BGE Reranker Large(本地部署) |
| PDF 解析 | MinerU |
| 流程编排 | LangGraph |
| 框架 | FastAPI + Uvicorn |
| 测试 | pytest |
my_rag_202605/
├── shopkeeper_brain/
│ └── knowledge/ # 核心知识处理模块
│ ├── processor/ # 处理流水线
│ │ ├── import_process/ # 文档导入流水线(8 个节点)
│ │ │ ├── node1_entry.py # 入口:识别文件类型
│ │ │ ├── node2_pdf_to_md.py # PDF → Markdown 转换
│ │ │ ├── node3_md_img.py # Markdown 图片处理
│ │ │ ├── node4_document_split.py # 文档分块
│ │ │ ├── node5_item_name_recognition.py # 商品名称识别
│ │ │ ├── node6_bge_embedding.py # BGE-M3 向量化
│ │ │ ├── node7_import_milvus.py # Milvus 写入
│ │ │ └── node8_knowledge_graph.py # 知识图谱构建
│ │ └── query_process/ # 查询回答流水线(8 个节点)
│ │ ├── node1_item_name_confirm.py # 商品名称确认 / Query 改写
│ │ ├── node2_search_embedding.py # 常规向量检索
│ │ ├── node3_search_embedding_hyde.py # HyDE 检索
│ │ ├── node4_query_kg.py # 知识图谱检索
│ │ ├── node5_web_search_mcp.py # MCP 网络搜索
│ │ ├── node6_rrf_node.py # RRF 多路融合
│ │ ├── node7_rerank.py # BGE 重排序
│ │ └── node8_answer_output.py # LLM 答案生成
│ ├── utils/ # 工具模块
│ │ ├── llm_util.py # LLM 客户端封装
│ │ ├── milvus_util.py # Milvus 操作(混合检索 CRUD)
│ │ ├── neo4j_utils.py # Neo4j 客户端
│ │ ├── mongo_util.py # MongoDB 客户端
│ │ ├── mongo_history_uitl.py # 对话历史 CRUD
│ │ ├── embedding_util.py # BGE-M3 向量化
│ │ ├── reranker_uitl.py # BGE Reranker
│ │ ├── minio_utils.py # MinIO 对象存储
│ │ ├── task_util.py # 任务管理
│ │ └── sse_util.py # SSE 流式输出
│ ├── prompts/
│ │ └── prompt.py # 所有 LLM Prompt 模板
│ ├── database_test/ # 数据库连接测试
│ │ ├── mongo_test.py
│ │ ├── milvus_test.py
│ │ └── neo4j_test.py
│ ├── .env # 环境变量配置
│ ├── requirements.txt
│ └── knowledge_graph.html # 知识图谱可视化
├── models/ # 本地模型文件
│ ├── BAAI/bge-m3/ # BGE-M3 embedding 模型
│ └── bge-reranker-large/ # BGE Reranker 模型
└── tests/ # 单元测试
├── conftest.py
├── test_bge_embedding_node.py
├── test_document_split_node.py
├── test_entry_node.py
├── test_md_img_node.py
├── test_pdf_to_md_node.py
├── test_rerank_node.py
├── test_reranker.py
└── debug_bge.py
PDF/Markdown → [node1] 文件类型识别
├── PDF → [node2] MinerU 解析为 Markdown
└── MD → 直接进入分块
[node3] Markdown 图片处理(提取 / 上传)
[node4] 文档分块(按段落 / 语义切分)
[node5] LLM 识别商品名称(归属哪个商品文档)
[node6] BGE-M3 生成 dense + sparse 混合向量
├──→ [node7] 写入 Milvus chunks 集合
└──→ [node8] LLM 抽取实体/关系 → 写入 Milvus 图谱索引 + Neo4j
关键设计:
- PDF 解析使用 MinerU,支持公式、表格、多栏布局
- 向量化使用 BGE-M3 的混合向量(dense 捕捉语义,sparse 捕捉关键词),比单一向量检索更全面
- 知识图谱同步构建:每个 chunk 的实体关系同时写入 Neo4j,支持后续知识图谱检索
- 商品名称识别确保每个 chunk 都关联到具体商品
用户 Query → [node1] 商品名称确认 + Query 改写(提取商品、澄清模糊)
│
┌───────────┼───────────┐
▼ ▼ ▼
[node2] [node3] [node4]
常规向量 HyDE 向量 知识图谱
检索 检索 检索
(Milvus) (Milvus) (Neo4j)
│ │ │
└───────────┴───────────┘
│
[node5] RRF 融合排序
(将三路检索结果按倒数排名融合)
│
[node6] BGE Reranker 重排序
(用更强的交叉编码器模型精排 top-N)
│
[node7] LLM 生成最终回答
(将排序后的上下文 + 原问题喂给 LLM)
关键设计:
- HyDE(Hypothetical Document Embeddings):先用 LLM 生成一个"假设答案",再将该答案向量化去检索,比直接用 Query 检索效果更好,尤其适合复杂问句
- RRF(Reciprocal Rank Fusion):无参数的排名融合,避免单一检索路径的偏差
- BGE Reranker:交叉编码器重排序,比向量检索的双编码器精度更高,用于精筛 top 结果
确保已安装以下服务:
| 服务 | 版本 | 用途 | 默认端口 |
|---|---|---|---|
| Milvus | 2.4+ | 向量数据库 | 19530 |
| Neo4j | 5.x | 图数据库 | 7687 |
| MongoDB | 6.0+ | 对话历史存储 | 27017 |
| MinIO | - | 对象存储 | 9000 |
可使用 Docker 快速启动:
# Milvus
docker run -d --name milvus -p 19530:19530 -p 9091:9091 milvusdb/milvus:v2.4.0
# Neo4j
docker run -d --name neo4j -p 7474:7474 -p 7687:7687 -e NEO4J_AUTH=neo4j/12345678 neo4j:5
# MongoDB
docker run -d --name mongo -p 27017:27017 -e MONGO_INITDB_ROOT_USERNAME=admin -e MONGO_INITDB_ROOT_PASSWORD=123456 mongo:6
# MinIO
docker run -d --name minio -p 9000:9000 -p 9001:9001 minio/minio server /data --console-address ":9001"Python 依赖:
cd shopkeeper_brain
pip install -r requirements.txt在 shopkeeper_brain/knowledge/ 目录下创建 .env 文件:
# ===== LLM 配置 =====
OPENAI_API_KEY=your_dashscope_api_key
OPENAI_API_BASE=https://dashscope.aliyuncs.com/compatible-mode/v1
LLM_DEFAULT_MODEL=qwen-flash
VL_MODEL=qwen-vl-flash
ITEM_MODEL=qwen-flash
KG_MODEL=qwen-flash
# ===== Embedding 模型路径 =====
BGE_M3_PATH=/path/to/models/BAAI/bge-m3
BGE_RERANKER_LARGE=/path/to/models/bge-reranker-large
EMBEDDING_DIM=1536
# ===== Milvus =====
MILVUS_URL=http://localhost:19530
# ===== Neo4j =====
NEO4J_URI=bolt://localhost:7687
NEO4J_USER=neo4j
NEO4J_PASSWORD=your_neo4j_password
# ===== MongoDB =====
MONGO_URL=mongodb://admin:123456@localhost:27017
# ===== MinIO =====
MINIO_ENDPOINT=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_BUCKET=knowledge
# ===== Milvus 集合名 =====
CHUNKS_COLLECTION=chunks
ENTITY_NAME_COLLECTION=kb_graph_entity_names_v2
ITEM_NAME_COLLECTION=kb_item_names_v2下载 BGE 模型到 models/ 目录:
# BGE-M3(混合向量模型)
git lfs install
git clone https://huggingface.co/BAAI/bge-m3 ./models/BAAI/bge-m3
# BGE Reranker Large
git clone https://huggingface.co/BAAI/bge-reranker-large ./models/bge-reranker-large| 集合名 | 用途 | 主要字段 |
|---|---|---|
chunks |
文档分块向量索引 | dense_vector, sparse_vector, item_name, file_title, text |
kb_graph_entity_names_v2 |
知识图谱实体向量 | dense_vector, sparse_vector, entity_name, source_chunk_id, context |
kb_item_names_v2 |
商品名称索引 | 用于 Query 中商品名的匹配识别 |
节点类型:
- Entity:实体节点(名称、描述、类型、来源 chunk_id)
- Chunk:文档块节点(ID、商品名)
关系类型:
HAS_OPERATION(操作步骤)HAS_PART(组成部分)HAS_STEP(操作步骤)USES_TOOL(使用工具)HAS_WARNING(注意事项)NEXT_STEP(下一步)AFFECTS(影响关系)REQUIRES(前置需求)MENTIONED_IN(被提及)
对话历史存储在 chat_message 集合:
{
"session_id": "会话 ID",
"role": "user | assistant",
"text": "消息内容",
"rewritten_query": "LLM 改写后的查询",
"item_names": ["本轮识别到的商品名列表"],
"ts": "时间戳"
}API 层位于 knowledge/api/(开发中),计划提供以下接口:
| 接口 | 方法 | 说明 |
|---|---|---|
/import |
POST | 导入文档(PDF/MD) |
/query |
POST | 查询知识库 |
/chat |
POST | 对话(带历史上下文) |
/history/{session_id} |
GET | 获取对话历史 |
/graph |
GET | 知识图谱可视化 |
cd tests
pytest -v覆盖范围:
test_entry_node.py— 文件类型识别test_pdf_to_md_node.py— PDF 解析test_md_img_node.py— Markdown 图片处理test_document_split_node.py— 文档分块test_bge_embedding_node.py— 向量化test_rerank_node.py/test_reranker.py— 重排序
数据库连接测试:
cd shopkeeper_brain/knowledge
python -m database_test.milvus_test
python -m database_test.neo4j_test
python -m database_test.mongo_test- 在
import_process/或query_process/下创建nodeX_name.py - 定义
node(state: GraphState) -> GraphState函数 - 在流水线定义文件中注册节点和边
# nodeX_example.py
def node(state: GraphState) -> GraphState:
state["result"] = process(state["input"])
return state在 knowledge/prompts/prompt.py 中添加模板变量,并在 LLM 调用处引用。
在查询流水线中添加新的 node_n_search_*.py,在 node6_rrf_node.py 的 RRF 融合中注册即可自动参与多路融合排序。