# 解耦 RAGFlow：核心 RAG 流程的简化 Notebook 实践

RAGFlow 作为一个功能强大的工程项目，其代码为了确保鲁棒性而存在较高的耦合度，并包含了复杂的错误处理逻辑。这对于希望专注于理解核心 RAG 算法流程的学习者来说，无疑增加了难度和时间成本。

为了帮助对 RAGFlow 底层 RAG 算法感兴趣的朋友们更清晰、高效地掌握其核心逻辑，本文提取并简化了 RAG 流程中的关键代码。我们将其整合到一个 Jupyter Notebook 中，你可以通过逐个运行代码单元，直观地体验和理解从文档处理到最终生成等各个环节的操作。

**核心目标:** 通过实践这份简化代码，你可以在 Jupyter Notebook 中逐一运行 RAG 的关键组件，从而深入理解其核心步骤，为将来把相关工程代码整合到自己的项目中奠定坚实基础。



为了理解并简化 RAGFlow 的核心 RAG 流程，我们首先将其宏观上划分为**离线处理**（构建知识库）和**在线查询处理**（响应用户提问）两大阶段，并进一步细化为以下六个关键的技术步骤：

1.  **文档解析 (Document Parsing):** 从原始文件（如 PDF）中提取结构化信息。
2.  **文本分块 (Text Chunking):** 将长文本分割成大小适中的块。
3.  **嵌入生成 (Embedding Generation):** 将文本块转换为向量。
4.  **索引构建 (Indexing):** 将文本块及其向量存入检索引擎。
5.  **检索 (Retrieval):** 根据用户问题查找相关的文本块。
6.  **生成 (Generation):** 结合问题与上下文生成答案。

![RAG组件示意图](./images/rag-flowchart.png)

在梳理清楚这套标准流程之后，下一步便是深入 RAGFlow 的项目代码，定位这些步骤的具体实现。

你可以先在 VScode 中将 RAGFlow 项目代码 clone 下来，使用命令 `git clone https://github.com/infiniflow/ragflow.git`；或者，你也可以直接访问其 GitHub 仓库页面 ([https://github.com/infiniflow/ragflow](https://github.com/infiniflow/ragflow))，然后点击键盘上的 '.' 键进入 GitHub 的在线开发者模式（web editor）。

通过浏览整个项目的结构（如下图所示），你会发现与 RAG 功能最相关的主要是 `deepdoc`（负责文档解析）和 `rag`（负责 RAG 核心实现，包括文档解析、分块、嵌入、检索、生成逻辑）这两个模块。

![RAGFlow 项目结构示意图](./images/ragflow-proj-structure.png)

因此，我们接下来的代码拆解和封装工作将重点围绕这两个模块进行。

rag/app/naive.py：主要功能是通过对deepdoc中各个文档格式的parser进行继承封装，比如对pdf parser基类的继承class Pdf(PdfParser)，其次就是chunk函数负责分类，其涉及到的依赖模块有`api`, `rag.nlp`, `rag.utils`模块，我们可以直接将对应的模块复制到我们的项目中（避免重新造轮子和保证能够顺利运行代码）。

我们可以参考这个思路，继承已经写好的类的基础上再做些自定义的拓展。接下来就通过这个方式自己完成文档解析和文本分块。

--- 
### 阶段一：文档解析 (PDF Focus)



In [2]:
from deepdoc.parser import PdfParser
from timeit import default_timer as timer
import logging 
from rag.nlp import rag_tokenizer, naive_merge, tokenize_table, tokenize_chunks, find_codec, concat_img, \
    naive_merge_docx, tokenize_chunks_docx

import re

class Pdf(PdfParser):
    def __call__(self, filename, binary=None, from_page=0,
                 to_page=100000, zoomin=3, callback=None):
        start = timer()
        first_start = start
        callback(msg="OCR started")
        self.__images__(
            filename if not binary else binary,
            zoomin,
            from_page,
            to_page,
            callback
        )
        callback(msg="OCR finished ({:.2f}s)".format(timer() - start))
        logging.info("OCR({}~{}): {:.2f}s".format(from_page, to_page, timer() - start))

        start = timer()
        self._layouts_rec(zoomin)
        callback(0.63, "Layout analysis ({:.2f}s)".format(timer() - start))

        start = timer()
        self._table_transformer_job(zoomin)
        callback(0.65, "Table analysis ({:.2f}s)".format(timer() - start))

        start = timer()
        self._text_merge()
        callback(0.67, "Text merged ({:.2f}s)".format(timer() - start))
        tbls = self._extract_table_figure(True, zoomin, True, True)
        # self._naive_vertical_merge()
        self._concat_downward()
        # self._filter_forpages()

        logging.info("layouts cost: {}s".format(timer() - first_start))
        return [(b["text"], self._line_tag(b, zoomin))
                for b in self.boxes], tbls


  from .autonotebook import tqdm as notebook_tqdm


我们创建了一个自定义的 Pdf 类，它继承自 deepdoc 库中的 PdfParser，从而拥有其全部核心解析功能。

我们通过实现 __call__ 方法，使得 Pdf 类的实例可以像函数一样被直接调用。当调用一个 Pdf 实例时，它会自动按顺序执行以下核心的 PDF 解析流程：

1.  **OCR 处理:** 对 PDF 页面进行光学字符识别，提取图像中的文本。
2.  **布局分析:** 识别文本块、标题、段落等布局结构。
3.  **表格识别:** 检测并提取 PDF 中的表格数据。
4.  **文本合并:** 将识别出的文本块按照逻辑顺序进行合并。
5.  **表格/图片提取:** 单独提取表格和图片信息。

这种设计将复杂的 PDF 解析步骤封装在 Pdf 类的调用操作中，简化了使用流程。同时也是将开源项目整合到自己项目中的方法之一。

测试一下PDF文档解析：

In [3]:
from IPython.display import display, Markdown

binary=None
from_page=0
to_page=100000
# lang="Chinese"
lang="English"
is_english = lang.lower() == "english" 

pdf_parser = Pdf()

filename = "example-pdf/report.pdf"
# filename="example-pdf/首都在线个股研报.pdf"

def dummy(prog=None, msg=""):
    if prog:
        print(f"处理进度: {prog*100:.1f}%")
    if msg:
        print(f"处理信息: {msg}")

sections, tables = pdf_parser(filename if not binary else binary, from_page=from_page, to_page=to_page, callback=dummy)
Markdown(tables[0][0][1])

# tables



处理信息: OCR started
处理进度: 60.0%
处理信息: OCR finished (6.57s)
处理进度: 63.0%
处理信息: Layout analysis (3.66s)
处理进度: 65.0%
处理信息: Table analysis (0.16s)
处理进度: 67.0%
处理信息: Text merged (0.00s)


<table><caption>PERFORMANCEREQUIREMENTS</caption>
<tr><td  >SECTION</td><td  >COMPLIANT (yes/no)</td></tr>
<tr><td  >6.1 -Each helmet shall be accompanied by manufacturers'instructions explaining the proper method of size adjustment,use,care and useful service life guidelines.</td><td  >Yes</td></tr>
<tr><td  >6.2-Each helmet shall bear permanent markings in at least1.5mm(0.06in.)high letters stating thefollowing information:</td><td></td></tr>
<tr><td  >6.2-name or identification mark of themanufacturer;</td><td  >Yes</td></tr>
<tr><td  >6.2-the date ofmanufacture;</td><td  >Yes</td></tr>
<tr><td  >6.2- the American National Standard Designation;</td><td  >Yes</td></tr>
<tr><td  >6.2- the applicable Type and Class Designations;</td><td  >Yes</td></tr>
<tr><td  >6.2-the approximate headsize range (see table 1).</td><td  >Yes</td></tr>
</table>

---
### 阶段二：文本分块 (Chunking)

将从 PDF 中提取的长文本（`sections`）和表格（`tables`）分割成语义相关且大小合适的块 (Chunks)。这有助于后续的嵌入和检索。

**核心逻辑:**

1.  **表格处理:** 使用 `tokenize_table` 将提取的表格数据转换为适合 RAG 的格式。
2.  **文本合并与分割:** 使用 `naive_merge` 方法，根据设定的 `chunk_token_num` (块的最大 Token 数) 和 `delimiter` (分隔符) 将连续的文本块 (`sections`) 合并再分割。
3.  **块 Token 化:** 使用 `tokenize_chunks` 对生成的文本块进行 Token 化处理，并添加文档元数据。
4.  **结果排序 (可选但推荐):** 根据文本块在原始文档中的位置 (页码、Y 坐标) 进行排序，保持上下文连贯性。

In [4]:
def chunk(filename, binary=None, from_page=0, to_page=100000,
          lang="Chinese", callback=None, **kwargs):
    """
        Supported file formats are docx, pdf, excel, txt.
        This method apply the naive ways to chunk files.
        Successive text will be sliced into pieces using 'delimiter'.
        Next, these successive pieces are merge into chunks whose token number is no more than 'Max token number'.
    """

    is_english = lang.lower() == "english"  # is_english(cks)
    parser_config = kwargs.get(
        "parser_config", {
            "chunk_token_num": 128, "delimiter": "\n!?。；！？", "layout_recognize": "DeepDOC"})
    doc = {
        "docnm_kwd": filename,
        "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))
    }
    doc["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(doc["title_tks"])
    results = []
    pdf_parser = None

    pdf_parser = Pdf()
    
    sections, tables = pdf_parser(filename if not binary else binary, from_page=from_page, to_page=to_page,
                                    callback=callback)

    results = tokenize_table(tables, doc, is_english)
    st = timer()
    chunks = naive_merge(
        sections, int(parser_config.get(
            "chunk_token_num", 128)), parser_config.get(
            "delimiter", "\n!?。；！？"))
    if kwargs.get("section_only", False):
        return chunks

    results.extend(tokenize_chunks(chunks, doc, is_english, pdf_parser))
    print("naive_merge({}): {}".format(filename, timer() - st))
    
    # 按位置排序结果
    for item in results:
        if item['position_int']:
            pos = list(item['position_int'][0]) # Convert tuple to list for modification
            # 假设格式是 (page, x1, y1, x2, y2)
            if len(pos) == 5 and pos[2] > pos[4]: # If y1 > y2, swap them
                pos[2], pos[4] = pos[4], pos[2]
            item['position_int'] = [tuple(pos)] + item['position_int'][1:] # Update with corrected tuple

    # 使用 sorted 函数进行排序
    # key 函数返回一个元组 (page_number, top_y_coordinate)
    # sorted 会先按元组的第一个元素排序，如果相同，则按第二个元素排序
    results = sorted(results, key=lambda item: (item['position_int'][0][0], item['position_int'][0][2]))

    return results

In [5]:
chunked_documents = chunk(filename, binary=None, from_page=0, to_page=100000, callback=dummy, parser_config={"chunk_token_num": 128, "delimiter": "\n!?。；！？", "layout_recognize": "DeepDOC"})
chunked_documents



处理信息: OCR started
处理进度: 60.0%
处理信息: OCR finished (6.64s)
处理进度: 63.0%
处理信息: Layout analysis (3.61s)
处理进度: 65.0%
处理信息: Table analysis (0.17s)
处理进度: 67.0%
处理信息: Text merged (0.00s)
naive_merge(example-pdf/report.pdf): 0.05168387503363192


[{'docnm_kwd': 'example-pdf/report.pdf',
  'title_tks': 'exampl pdf report',
  'title_sm_tks': 'exampl pdf report',
  'image': <PIL.Image.Image image mode=RGB size=1464x1195>,
  'page_num_int': [1, 1, 1, 1, 2],
  'position_int': [(1, 294, 67, 55, 782),
   (1, 216, 704, 139, 151),
   (1, 174, 662, 167, 236),
   (1, 182, 670, 322, 393),
   (2, 65, 553, 86, 143)],
  'top_int': [55, 139, 167, 322, 86],
  'content_with_weight': 'INTERTEKTESTREPORTCORTLAND,NEWYORK130453933USROUTE11TESTREPORTNO.3096874-001ANSIZ89.1-2003 INITIALTESTINGOF SEONGANSAVECO.,LTD-SEONGANSAVE MODELNUMBER:FASHIONI TYPEIRENDERED TO:SEONGANSAVECO.LTD 318-2YANGJEONG-2-DONG,BUSANJIN-GU BUSAN,KOREAREP.OF(SOUTH)AbstractTheprotective capidentified as aSeongAnSave Co.,Ltd.,model Fashion I,submitted by the manufacturer,wasreceived inpristine conditionon08/11/06,08/30/06,and09/06/06,andwas evaluated in accordance with the requirements of ANSI Z89.1-2003 entitled,“American National Standard forIndustrial HeadProtection,”between 0

--- 
### 阶段三：嵌入生成 (Embedding Generation)

将每个文本块转换为一个数值向量（Embedding），这个向量代表了文本块的语义信息。相似语义的文本块会有相似的向量表示。

**核心逻辑:**

1.  **选择嵌入模型:** 确定使用的嵌入模型。
2.  **配置 API 客户端:** 设置访问模型服务的 API Key 和 Base URL。
3.  **调用嵌入接口:** 对每个文本块的内容 (`content_with_weight`) 调用嵌入模型的 API，获取其向量表示。


In [6]:
import os
import logging
from openai import OpenAI
from dotenv import load_dotenv 

# === 加载 .env 文件 ===
load_dotenv()

# === 嵌入生成函数 ===
def generate_embedding(
    text: str,
    api_key: str = os.getenv("EMBEDDING_API_KEY", "your_api_key_here"),
    base_url: str = os.getenv("EMBEDDING_BASE_URL", "your_embedding_api_url_here"),
    model_name: str = os.getenv("EMBEDDING_MODEL_NAME", "your_embedding_model_name"),
    dimensions: int = 1024, 
    encoding_format: str = "float"
) -> list[float] | None:
    """
    使用 OpenAI 兼容的 API 为给定文本生成嵌入。

    Args:
        text: 需要嵌入的输入文本。
        api_key: 嵌入服务的 API 密钥。
        base_url: 嵌入服务的基础 URL。
        model_name: 要使用的嵌入模型的名称。

    Returns:
        代表嵌入向量的浮点数列表，如果发生错误则返回 None。
    """
    # 验证输入文本是否为非空字符串
    if not text or not isinstance(text, str):
        logging.warning("无效的文本输入，无法生成嵌入。返回 None。")
        return None
        
    # 初始化 OpenAI 客户端
    client = OpenAI(
        api_key=api_key,
        base_url=base_url
    )
    # 调用 OpenAI 的嵌入接口
    try:
        completion = client.embeddings.create(
            model=model_name,
            input=text,
            dimensions=dimensions,
            encoding_format=encoding_format
        )

        embedding = completion.data[0].embedding
        return embedding
    except Exception as e:
        print(f"OpenAI API 请求失败: {e}")
        return None


In [7]:
from openai import OpenAI

def generate_embedding(text: str, api_key: str = None, base_url: str = None, model_name: str = "text-embedding-v3", dimensions: int = 1024, encoding_format: str = "float"):
    api_key = "lm-studio"
    base_url = "http://127.0.0.1:1234/v1"    
    model_name = "text-embedding-bge-m3"

    # 初始化 OpenAI 客户端
    client = OpenAI(
        api_key=api_key,
        base_url=base_url
    )

    # 调用 OpenAI 的嵌入接口
    try:
        completion = client.embeddings.create(
            model=model_name,
            input=text,
            dimensions=dimensions,
            encoding_format=encoding_format
        )

        embedding = completion.data[0].embedding
        return embedding
    except Exception as e:
        print(f"OpenAI API 请求失败: {e}")
        return None


--- 
### 阶段四：索引构建 (Indexing with Elasticsearch)

RAGFlow采用的是Elasticsearch作为向量数据库，主要因为它是开源数据库当中，为数不多可以兼具全文搜索和向量搜索两路混合召回能力的数据库。

**相关资源**:

* Elasticsearch 安装 (macOS 参考):

    - https://mp.weixin.qq.com/s/XN-XrRt5S_JD5KNLRQrXzg

    - https://mp.weixin.qq.com/s/1hBUOvmW7sbPqmlTAD_R9w

* Elasticsearch 搜索方法参考:

    - https://github.com/elastic/elasticsearch-labs/tree/main/notebooks

本阶段需掌握 ES 的索引创建、批量插入、关键词及向量检索等基本操作。

**目标:** 将处理好的文本块（含元数据和向量）存入 Elasticsearch，以便快速检索。

execute_insert_process 函数负责编排整个流程：文档解析 -> 文本分块 -> 生成嵌入 -> 批量插入 ES。

**核心逻辑:**

1.  **数据准备 (`process_item`):**
    * 为每个文本块生成一个唯一的 `chunk_id` (例如使用 `xxhash`)。
    * 整合块的元数据（内容 Tokens、文档名、文档 ID 等）。
    * 调用 `generate_embedding` 获取向量。
    * 构建符合 Elasticsearch 索引结构的字典。
2.  **Elasticsearch 连接与设置:**
    * 连接到 Elasticsearch 实例。
    * （可选但推荐）定义索引的 `mapping`，明确字段类型（特别是向量字段）。
    * 创建 Elasticsearch 索引。
3.  **数据插入 (`insert`):**
    * 使用 Elasticsearch 的 `bulk` API 批量插入准备好的数据，提高效率。
    * 处理可能的插入错误和超时。

In [8]:
import xxhash
import logging

ATTEMPT_TIME = 2

logger = logging.getLogger('ragflow.es_conn')

def process_item(item, file_name):
    """
    处理单条数据
    """
    try:      
        content = item.get("content_with_weight", "")
        if not content:
             print("Warning: Skipping item with empty 'content_with_weight'.")
             return None

        # 生成 chunk_id using content and file_name
        chunk_id = xxhash.xxh64((content + file_name).encode("utf-8")).hexdigest()

        # 生成 doc_id based on file_name
        doc_id = xxhash.xxh64(file_name.encode("utf-8")).hexdigest()
        # 构建数据字典
        d = {
            "id": chunk_id,
            "doc_id": doc_id,
            "docnm": file_name,
            "title_tks": item.get("title_tks", []),
            "docnm_kwd": item.get("docnm_kwd", os.path.splitext(file_name)[0]), 
            "content_ltks": item["content_ltks"],
            "content_with_weight": item["content_with_weight"],
            "content_sm_ltks": item["content_sm_ltks"],
            "important_kwd": [],
            "important_tks": [],
            "question_kwd": [],
            "question_tks": [],
            "create_time": str(datetime.datetime.now()).replace("T", " ")[:19],
            "create_timestamp_flt": datetime.datetime.now().timestamp()
        }
        # 生成并添加嵌入向量
        embedding_vector = generate_embedding(content)
       
        if embedding_vector:
            vector_field_name = f"q_{len(embedding_vector)}_vec"
            d[vector_field_name] = embedding_vector

        else:
            print(f"Warning: Could not generate embedding for chunk {chunk_id}. Skipping vector field.")
            pass 

        return d

    except Exception as e:
        print(f"process_item error: {e}")
        return None
        
def parse(file_path):
    # 使用自定义的 PDF 解析器
    results = chunk(file_path, callback=dummy)
    return results

def insert(client, documents: list[dict], indexName: str) -> list[str]:
        # Refers to https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
        operations = []
        for d in documents:
            assert "_id" not in d
            assert "id" in d
            d_copy = copy.deepcopy(d)
            meta_id = d_copy.pop("id", "")
            operations.append(
                {"index": {"_index": indexName, "_id": meta_id}})
            operations.append(d_copy)

        res = []
        for _ in range(ATTEMPT_TIME):
            try:
                res = []
                r = client.bulk(index=(indexName), operations=operations,
                                 refresh=False, timeout="60s")
                if re.search(r"False", str(r["errors"]), re.IGNORECASE):
                    return res

                for item in r["items"]:
                    for action in ["create", "delete", "index", "update"]:
                        if action in item and "error" in item[action]:
                            res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"]))
                return res
            except Exception as e:
                res.append(str(e))
                logger.warning("ESConnection.insert got exception: " + str(e))
                res = []
                if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
                    res.append(str(e))
                    time.sleep(3)
                    continue
        return res


def execute_insert_process(es_client, file_path, file_name, indexName):
    """
    执行文档处理和插入 Elasticsearch 的函数.
    如果索引存在，则先删除再插入。

    :param es_client: elasticsearch client
    :param file_path: 文件路径
    :param file_name: 文件名 (用于 process_item)
    :param indexName: 索引名称
    """

    try:
        # 1. 检查索引是否存在 (Check if the index exists)
        if es_client.indices.exists(index=indexName):
            logger.info(f"Index '{indexName}' exists. Deleting it...")
            # 2. 如果存在，删除索引 (If it exists, delete the index)
            # ignore=[400, 404] prevents errors if the index doesn't exist (race condition) or other delete issues
            es_client.indices.delete(index=indexName, ignore=[400, 404])
            logger.info(f"Index '{indexName}' deleted.")
        else:
            logger.info(f"Index '{indexName}' does not exist. Proceeding with insertion.")
    except Exception as e:
        # 3. 处理检查/删除索引时可能出现的错误 (Handle potential errors during check/delete)
        logger.error(f"Error checking or deleting index '{indexName}': {e}")

    documents = parse(file_path)
    results = []
    for item in documents:
        processed_item = process_item(item, file_name)
        results.append(processed_item)
    # 
    insert(es_client, results, indexName)


In [15]:

import datetime
from elasticsearch import Elasticsearch
import copy

# Connect to 'http://localhost:9200'
es_client = Elasticsearch(
    "https://192.168.3.174:9200/", 
    verify_certs=False, 
    # api_key=("7Hb2FZYB9eekk6FT9B39", "pSBLeqtaRl2lfNsyng2NtA"),
    basic_auth=("elastic", "_9NsIQgJNFRFo7vKpk*L")
    )

indexName = "demo-index"
file_name = "example-pdf/report.pdf"
file_path = os.path.join(os.path.dirname('__file__'), file_name)
    
execute_insert_process(es_client, file_path, file_name, indexName)

  es_client.indices.delete(index=indexName, ignore=[400, 404])


处理信息: OCR started
处理进度: 60.0%
处理信息: OCR finished (6.63s)
处理进度: 63.0%
处理信息: Layout analysis (3.51s)
处理进度: 65.0%
处理信息: Table analysis (0.16s)
处理进度: 67.0%
处理信息: Text merged (0.00s)
naive_merge(example-pdf/report.pdf): 0.04160279082134366





--- 
### 阶段五：检索 (Retrieval)

当用户提出问题时，此阶段的目标是从 Elasticsearch 索引中找出与问题最相关的文本块。

**核心逻辑:**

1.  **查询嵌入:** 使用与索引时相同的 `generate_embedding` 函数，将用户的问题转换为查询向量。
2.  **构建 ES 查询:**
    * **向量相似度查询 (KNN):** 使用 `knn` 查询，基于查询向量查找 `k` 个最相似的文本块向量。`num_candidates` 参数可以提高查找的准确性（但会增加计算量）。
    * **关键词查询 (可选):** 可以结合传统的关键词匹配查询（如 `match` 或 `bool` 查询）来过滤或增强结果。
    * **过滤器 (可选):** 可以添加 `filter` 来限制搜索范围，例如只在特定的 `doc_id` 内搜索。
3.  **执行查询:** 向 Elasticsearch 发送查询请求。
4.  **提取结果:** 从 Elasticsearch 的响应中解析出相关的文本块内容及其元数据。

In [10]:
def hybrid_search(
    es_client,
    index_name: str,
    query_text: str,
    keyword_field: str,
    keyword_text: str,
    vector_field: str,
    k: int = 5,
    num_candidates: int = 10,
    size: int = 10,

    ):
    """
    Performs a hybrid search (keyword + KNN vector search) on Elasticsearch.

    Args:
        es_client: An initialized Elasticsearch client instance.
        index_name (str): The name of the Elasticsearch index to search.
        query_text (str): The text query to generate the embedding from for KNN search.
        keyword_field (str): The field to perform the keyword ('match') search on.
        keyword_text (str): The text to search for in the keyword_field.
        vector_field (str): The name of the dense vector field in the index.
        k (int, optional): The number of nearest neighbors to return for KNN search. Defaults to 5.
        num_candidates (int, optional): The number of candidates to consider for KNN search. Defaults to 10.
        size (int, optional): The total number of hits to return. Defaults to 10.


    Returns:
        list: A list of search result hits, or None if an error occurs.
              Each hit is a dictionary representing a document.

    Raises:
        ValueError: If embedding_function is not provided or fails.
        Exception: For Elasticsearch search errors.
    """

    try:
        # 1. Generate the query vector using the provided function
        query_vector = generate_embedding(query_text)

        # 2. Define the KNN search part
        knn_query = {
            "field": vector_field,
            "query_vector": query_vector,
            "k": k,
            "num_candidates": num_candidates,
        }

        # 3. Define the keyword search part
        keyword_query = {
            "match": {
                keyword_field: keyword_text
            }
        }

        # 4. Perform the search request
        print(f"Performing hybrid search on index '{index_name}'...")
        response = es_client.search(
            index=index_name,
            query=keyword_query, # Keyword query
            knn=knn_query,       # KNN vector query
            size=size,           # Total number of results to return
            # request_timeout=30 # Optional: Set a timeout
        )

        # 5. Extract and return the results
        results = response.get("hits", {}).get("hits", [])
        print(f"Found {len(results)} results.")
        return results

    except ValueError as ve:
         print(f"Error during embedding generation: {ve}")
         return None
    except Exception as e:
        print(f"An error occurred during Elasticsearch search: {e}")
        return None


In [20]:
# query = "营业收入"
# keyword_text="2016年营业收入是多少？"

query = "Compliant results"
keyword_text="What are the results of electrical insulation tests?"

results= hybrid_search(es_client=es_client,
              index_name=indexName,
              query_text=query,
              keyword_field="content_with_weight",
              keyword_text=keyword_text,
              vector_field="q_1024_vec",
              k=3,
              num_candidates=5,
              size=3)

Performing hybrid search on index 'demo-index'...
Found 3 results.




In [21]:
for result_ in results: 
    display(Markdown(result_["_source"]['content_with_weight']))

Details of the instrument calibration are maintained in laboratory records. Themeasurement of uncertaintyis availableuponrequest.IntroductionThis report describes the results of the test program conducted in accordance with ANSI Z89.1-2003 entitled,“American National Standard for Industrial Head Protection," performed on specimens submitted by the manufacturer.Testing of the abovementioned protective caps began only upon Interteks’receipt of the signed quote. Intertek Testing Services,located in Cortland NY,conducted the test evaluations.ProductDescriptionIntertek received 30 production protective caps with date code(s): 6/06 and 7/06.The test sampleswereidentified asspecimens1-30.

Requirements:Theprotective cap shall withstand2,200volts（rootmean square),AC,60Hertz,for1minutes.Leakage shall not exceed 3milliamperes.Results:ELECTRICALINSULATION (ClaSSE)Method:Theprotective capwas tested in accordance with Section9.7.Requirements:Theprotectivecap shall withstand 20,000volts（rootmean square),AC,60 Hertz,for3minutes.Leakage shall not exceed9milliamperes.At 30,000volts,the test sample shallnotburn through.Results:ConclusionThe protective cap identified as a Seong An Save,model Fashion I,met the minimum performancerequirements definedin ANSI Z89.1-2003 entitled,“American National Standard for Industrial HeadProtection”.The helmet type and class defined for this modelas a result of the evaluationsperformed in this report are determined tobeTypeI-Class G,E,& C.

<table><caption>PERFORMANCEREQUIREMENTS</caption>
<tr><td  >SECTION</td><td  >COMPLIANT (yes/no)</td></tr>
<tr><td  >6.1 -Each helmet shall be accompanied by manufacturers'instructions explaining the proper method of size adjustment,use,care and useful service life guidelines.</td><td  >Yes</td></tr>
<tr><td  >6.2-Each helmet shall bear permanent markings in at least1.5mm(0.06in.)high letters stating thefollowing information:</td><td></td></tr>
<tr><td  >6.2-name or identification mark of themanufacturer;</td><td  >Yes</td></tr>
<tr><td  >6.2-the date ofmanufacture;</td><td  >Yes</td></tr>
<tr><td  >6.2- the American National Standard Designation;</td><td  >Yes</td></tr>
<tr><td  >6.2- the applicable Type and Class Designations;</td><td  >Yes</td></tr>
<tr><td  >6.2-the approximate headsize range (see table 1).</td><td  >Yes</td></tr>
</table>


--- 
### 阶段六：生成 (Generation)

最后一步，将用户原始问题和检索到的相关文本块（作为上下文）一起提供给大语言模型 (LLM)，让模型生成一个基于所提供信息的、连贯的答案。

**核心逻辑:**

1.  **构建 Prompt:** 创建一个清晰的指令，告诉 LLM 基于提供的上下文信息来回答指定的问题。
2.  **配置 LLM 客户端:** 设置访问 LLM 服务的 API Key 和 Base URL。
3.  **调用 LLM API:** 发送构建好的 Prompt 给 LLM。
4.  **获取并展示答案:** 从 LLM 的响应中提取生成的文本答案。

In [13]:
from openai import OpenAI

def get_llm_answer(question, results_, api_key, base_url="https://api.deepseek.com", model_name="deepseek-chat"):

    prompt = f""" Answer the question based on the information provided below.\n{results_}\nQuestion: {question}"""

    llm_client = OpenAI(api_key=api_key, base_url=base_url)
    response = llm_client.chat.completions.create(
        model=model_name,
        messages=[
            {"role": "system", "content": "You are a helpful assistant"},
            {"role": "user", "content": prompt}
        ]
    )

    return response.choices[0].message.content


In [22]:
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv("DEEPSEEK_API_KEY", "your_api_key_here")


question ="what are the results of electrical insulation tests?"

response = get_llm_answer(question, results, api_key)
print(response)

The results of the electrical insulation tests for the protective cap are as follows:

1. The protective cap withstood 2,200 volts (root mean square), AC, 60 Hertz for 1 minute, with leakage not exceeding 3 milliamperes. This meets the requirements for Class E electrical insulation.

2. The protective cap also withstood 20,000 volts (root mean square), AC, 60 Hertz for 3 minutes, with leakage not exceeding 9 milliamperes. At 30,000 volts, the test sample did not burn through.

These results confirm that the protective cap meets the minimum performance requirements defined in ANSI Z89.1-2003, entitled "American National Standard for Industrial Head Protection," and is determined to be Type I-Class G, E, & C.
