## 准备工作

智谱API申请，大家可以请前往 https://open.bigmodel.cn/ 申请智谱API

安装 Milvus 向量库

### 基础环境

* python 3.9.6
* nodejs v18.17.1

## Milvus 向量数据库

Milvus 是一个高度灵活、可靠且速度极快的云原生开源向量数据库。它为 embedding 相似性搜索和 AI 应用程序提供支持，并努力使每个组织都可以访问向量数据库。 Milvus 可以存储、索引和管理由深度神经网络和其他机器学习（ML）模型生成的十亿级别以上的 embedding 向量。
它具备高可用、高性能、易拓展的特点，用于海量向量数据的实时召回。

### 为什么选择使用 Milvus
* 高性能：性能高超，可对海量数据集进行向量相似度检索。
* 高可用、高可靠：Milvus 支持在云上扩展，其容灾能力能够保证服务高可用。
* 混合查询：Milvus 支持在向量相似度检索过程中进行标量字段过滤，实现混合查询。
* 开发者友好：支持多语言、多工具的 Milvus 生态系统。

安装 Milvus 请参考 https://milvus.io/docs/install_standalone-docker.md 官方文档

当前使用版本 <span style="color: red">v2.2.11</span>

## LangChain

LangChain 是一个开源框架，用于构建基于大型语言模型（LLM）的应用程序。LLM 是基于大量数据预先训练的大型深度学习模型，可以生成对用户查询的响应，例如回答问题或根据基于文本的提示创建图像。LangChain 提供各种工具和抽象，以提高模型生成的信息的定制性、准确性和相关性。例如，开发人员可以使用 LangChain 组件来构建新的提示链或自定义现有模板。LangChain 还包括一些组件，可让 LLM 无需重新训练即可访问新的数据集。

当前项目使用了 LangChain 中的文档加载器、ChatZhipuAI 组件。LangChain 已经对接了智谱的大语言模型，可以直接使用。

当前使用版本 <span style="color: red">0.1.17</span>

## 创建 Milvus 向量库数据集和写入向量数据

首先需要在向量数据库中创建集合并添加索引

In [3]:
from pymilvus import Collection, FieldSchema, DataType, CollectionSchema, connections

connections.connect(alias='main', host="127.0.0.1", port=19530)

def create_index():
    fields = [
        FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
        FieldSchema(name="document_ebm", dtype=DataType.FLOAT_VECTOR, dim=1024),
        FieldSchema(name="document_text", dtype=DataType.VARCHAR, max_length=65535),
    ]
    schema = CollectionSchema(fields=fields)
    # 创建集合
    collection = Collection(name='documents', schema=schema, using='main')
    # 添加索引
    index_params = {
        "index_type": "HNSW",
        "metric_type": "IP",
        "params": {
            "M": 32,
            "efConstruction": 512
        }
    }
    collection.create_index(
        field_name="document_ebm",
        index_params=index_params
    )

    collection.create_index(
        field_name="document_text",
        index_params={
            "index_type": "marisa-trie"
        }
    )
    # 将数据加载到内存中
    collection.load()

### HNSW 索引类型
HNSW 是一种基于图的索引算法。它按照一定的规则为图像构建多层导航结构。在这种结构中，上层更加稀疏，节点之间的距离更远；较低的层更密集，节点之间的距离更近。搜索从最上层开始，找到本层中距离目标最近的节点，然后进入下一层开始另一次搜索。经过多次迭代，可以快速逼近目标位置。

为了提高性能，HNSW 将图每层节点的最大度数限制为 M 。此外，还可以使用 efConstruction （构建索引时）或 ef （搜索目标时）指定搜索范围。

使用场景
* 非常高速的查询
* 要求召回率尽可能高
* 大内存资源

HNSW 索引params参数
* M 节点的最大度数 (2, 2048)
* efConstruction 索引时间内最近邻居的动态列表的大小。较高的 efConstruction 可能会提高索引质量，但代价是增加索引时间。(1, int_max)

### IP （内积）
两个嵌入之间的IP距离

## 将文档转换成向量

这里利用 langchain 框架的文档加载器来加载需要的文档资源

In [None]:
from langchain_community.document_loaders import DirectoryLoader

def loadDocumentation(path: str, glob: str = "*.text"):
    """读取目录下指定文件"""
    loader = DirectoryLoader(path=path, glob=glob)
    return loader.load()

调用智谱AI向量模型将文本转换成Embedding，并将转换后的Embedding写入到Milvus 向量数据库中。

In [2]:
from zhipuai import ZhipuAI
from pymilvus import Collection, connections
import os

ZHIPUAI_API_KEY = os.environ.get('ZHIPUAI_API_KEY') # 替换成您自己的 key
client = ZhipuAI(api_key=ZHIPUAI_API_KEY)

connections.connect(alias='main', host="127.0.0.1", port=19530)
collection = Collection(name='documents', using='main')


def documentEmbedding(docs):
    document_ebm = []
    document_text = []
    for doc in docs:
        embedding = getEmbedding(doc.page_content)
        document_ebm.append(embedding)
        document_text.append(doc.page_content)

    documents = [document_ebm, document_text]
    # 批量写入数据
    res = collection.insert(data=documents)
    collection.flush()
    # 打印结果
    print(f"插入行数 -> {res.insert_count}")


def getEmbedding(text):
    """ 调用智谱的 embedding-2 模型，将文本转换成 embedding 结果 """
    response = client.embeddings.create(
        model="embedding-2",
        input=text,
    )
    return response.data[0].embedding


if __name__ == '__main__':
    # 读取文档
    docs = loadDocumentation("./text/", "*.text")
    documentEmbedding(docs)

## 根据提出的问题在向量库中查找最相似的结果
Milvus 中的向量相似度搜索会计算查询向量与具有指定相似度度量的集合中的向量之间的距离，并返回最相似的结果。通过指定过滤标量字段或主键字段的布尔表达式，您可以执行混合搜索。

首先将问题调用智谱的 embedding-2 模型，将问题转成 embedding，再利用问题的 embedding 去向量库中查询最相似的结果，返回 topk 1 个结果

In [None]:
def getDocuments(question: str):
    query_vector = getEmbedding(question)
    search_params = {"metric_type": "IP", "params": {"nprobe": 16}, "offset": 0}
    results = collection.search(data=[query_vector], anns_field='document_ebm', param=search_params, limit=1, output_fields=['document_text'])
    documents = []
    for result in results:
        for r in result:
            document_text = str(r.entity.get('document_text'))
            documents.append(document_text)

    return documents

langchain 已经集成了智谱的API接口，只需要导入 ChatZhipuAI 

定义AI的 prompt

* System prompt """请根据以下内容回答提出的问题，不要使用自身知识回答问题，回答的越详细越好""" AI 会根据自身的知识库来回答提出的问题，在 System prompt 中拒绝AI自身的回答
* User prompt """请基于以下内容:
\"\"\"
{context}
\"\"\"
来回答用户提出的问题，请用中文回答。

问题: {question}"""

使用 HumanMessagePromptTemplate 模版对象，传入 question 和 context，生成 user 的 prompt

调用智谱的AI模型，得到流式输出结果

In [None]:
from langchain.callbacks import AsyncIteratorCallbackHandler
from langchain_community.chat_models import ChatZhipuAI
from langchain_core.callbacks.manager import CallbackManager
from langchain_core.messages import SystemMessage
from langchain.prompts import HumanMessagePromptTemplate
import asyncio
import json

async def sse_chat_bot(query: str):
    documents = getDocuments(query)
    print("查询到的文档")
    print(f"\033[92m{documents}\033[0m")
    callback = AsyncIteratorCallbackHandler()
    streaming_chat = ChatZhipuAI(
        model="glm-3-turbo",
        api_key=ZHIPUAI_API_KEY,
        temperature=0.01,
        streaming=True,
        callback_manager=CallbackManager([callback]),
    )
    # 定义问题模版
    general_user_template = general_user_template = """
请基于以下内容:
\"\"\"
{context}
\"\"\"
来回答用户提出的问题，请用中文回答。

问题: {question}"""

    human_message = HumanMessagePromptTemplate.from_template(general_user_template, input_variables=["question", "context"])
    messages = [
        SystemMessage(content="请根据以下内容回答提出的问题，不要使用自身知识回答问题，回答的越详细越好"),
        human_message.format(question=query, context='/n'.join(documents)),
    ]

    agenerate = streaming_chat.agenerate([messages])
    finalAnswer = asyncio.create_task(agenerate)
    async for token in callback.aiter():
        yield json.dumps({
            'output': token,
            'end': False
        }, ensure_ascii=False)

    result = await finalAnswer

    yield json.dumps({
        'output': result.generations[0][0].text,
        'end': True
    }, ensure_ascii=False)

## Demo 演示

可以前往 https://github.com/zxing258974/RagDemo 下载当前演示项目的源码

本项目使用了前后端分离，前端使用了 VUE3 ，位于 RagPage/ 目录下。后端使用了 FastApi 框架。更多的项目信息可以参考 README.md 文件