In [14]:
import warnings
warnings.filterwarnings("ignore")
from model import QwenLLM, RagEmbedding, RagLLM
from langchain_chroma import Chroma
import chromadb
import numpy as np

In [15]:
embedding_cls = RagEmbedding(model_name="BAAI/bge-m3", device="cpu")

模型加载成功，使用设备: cpu


In [3]:
PERSIST_DIRECTORY = "./chroma_db/zhidu_db"
COLLECTION_NAME = "zhidu_db"

In [16]:
zhidu_db = Chroma(
    COLLECTION_NAME,
    embedding_cls,
    persist_directory=PERSIST_DIRECTORY
)

In [17]:
prompt_template = """
    你是企业员工助手，熟悉公司考勤和报销标准等规章制度，需要根据提供的上下文信息context来回答员工的提问。\
    请直接回答问题，如果上下文信息context没有和问题相关的信息，请直接先回答不知道 \
    问题：{question} 
    "{context}"
    回答：
"""

In [18]:
llm = RagLLM()

In [25]:
from langchain.prompts import PromptTemplate
import time

In [28]:
def run_rag_pipline(query, context_query, k=3, context_query_type="query", 
                          stream=False, prompt_template=prompt_template,
                          temperature=0.1, llm=None, vector_db=None):
    """
    修复版RAG管道函数
    
    Args:
        query (str): 用户查询
        context_query (str or list): 用于检索的查询或文档
        k (int): 返回的相关文档数量
        context_query_type (str): 查询类型，可选值为"query"、"vector"或"doc"
        stream (bool): 是否使用流式输出
        prompt_template (str): 提示模板
        temperature (float): 模型温度参数
        llm (LangChain LLM): 语言模型实例，如果为None则创建新实例
        vector_db: 向量数据库实例，如果为None则使用传入的文档
        
    Returns:
        str: 模型回答
    """
    # 创建LLM实例（如果未提供）
    if llm is None:
        print("创建新的QwenLLM实例...")
        llm = QwenLLM(timeout=60, max_retries=3)
    
    # 处理上下文检索
    if context_query_type == "doc":
        # 直接使用提供的文档
        related_docs = context_query
        context = "\n".join([f"上下文{i+1}: {doc} \n" for i, doc in enumerate(related_docs)])
    else:
        # 没有向量数据库时直接使用context_query作为上下文
        if vector_db is None:
            print("警告: 没有提供向量数据库，直接使用context_query作为上下文")
            if isinstance(context_query, list):
                context = "\n".join([f"上下文{i+1}: {doc} \n" for i, doc in enumerate(context_query)])
            else:
                context = f"上下文: {context_query}"
        else:
            # 使用向量数据库检索
            if context_query_type == "vector":
                related_docs = vector_db.similarity_search_by_vector(context_query, k=k)
            else:  # "query"
                related_docs = vector_db.similarity_search(context_query, k=k)
            
            context = "\n".join([f"上下文{i+1}: {doc.page_content} \n" 
                              for i, doc in enumerate(related_docs)])
    
    # 打印调试信息
    print()
    print("#"*100)
    print(f"query: {query}")
    print(f"context: {context}")
    
    # 构建提示
    prompt = PromptTemplate(
        input_variables=["question", "context"],
        template=prompt_template,
    )
    llm_prompt = prompt.format(question=query, context=context)
    
    # 使用语言模型生成回答
    try:
        start_time = time.time()
        
        if stream:
            print(f"response: ")
            response = llm(llm_prompt, stream=True, temperature=temperature)
            full_response = ""
            
            try:
                for chunk in response:
                    if isinstance(chunk, dict) and 'choices' in chunk:
                        text = chunk['choices'][0].get('text', '')
                    elif hasattr(chunk, 'choices') and len(chunk.choices) > 0:
                        text = chunk.choices[0].text
                    else:
                        text = str(chunk)
                    
                    print(text, end='', flush=True)
                    full_response += text
                
                print()  # 添加换行
                elapsed = time.time() - start_time
                print(f"完成，耗时: {elapsed:.2f}秒")
                return full_response
            except Exception as e:
                print(f"\n流式输出处理错误: {str(e)}")
                # 失败时回退到非流式模式
                return llm(llm_prompt, stream=False, temperature=temperature)
        else:
            # 非流式模式
            response = llm(llm_prompt, stream=False, temperature=temperature)
            elapsed = time.time() - start_time
            print(f"response: {response}")
            print(f"完成，耗时: {elapsed:.2f}秒")
            return response
            
    except Exception as e:
        print(f"错误: {str(e)}")
        # 在发生错误时提供错误说明
        return f"抱歉，处理您的请求时发生错误: {str(e)}"

query2doc
- 利用大模型生成伪文档，来提升检索性能

In [22]:
def query2doc(query):
    prompt = f"你是一名公司员工制度的问答助手, 熟悉公司规章制度，请简短回答以下问题: {query}"
    doc_info = llm(prompt, stream=False)
    context_query = f"{query}, {doc_info}"
    print("#"*20, 'query2doc')
    print(context_query)
    print("#"*20, 'query2doc')
    return context_query

In [29]:
query = "那个，我们公司有什么规定来着？"

In [30]:
run_rag_pipline(query, query, k=3)

创建新的QwenLLM实例...
警告: 没有提供向量数据库，直接使用context_query作为上下文

####################################################################################################
query: 那个，我们公司有什么规定来着？
context: 上下文: 那个，我们公司有什么规定来着？
response: 公司的具体规定可能会包括考勤制度、休假政策、工作时间、绩效评价、报销标准以及相关的行为准则等。你需要查阅相关的员工手册或者询问人力资源部门以获取最准确的信息。
完成，耗时: 1.97秒


'公司的具体规定可能会包括考勤制度、休假政策、工作时间、绩效评价、报销标准以及相关的行为准则等。你需要查阅相关的员工手册或者询问人力资源部门以获取最准确的信息。'

In [31]:
run_rag_pipline(query, query2doc(query), k=3)

#################### query2doc
那个，我们公司有什么规定来着？, 抱歉，由于我是人工智能，并不具备实时查询具体公司内部规定的能力。我建议你可以查阅公司的员工手册或者联系人力资源部门获取准确的信息。
#################### query2doc
创建新的QwenLLM实例...
警告: 没有提供向量数据库，直接使用context_query作为上下文

####################################################################################################
query: 那个，我们公司有什么规定来着？
context: 上下文: 那个，我们公司有什么规定来着？, 抱歉，由于我是人工智能，并不具备实时查询具体公司内部规定的能力。我建议你可以查阅公司的员工手册或者联系人力资源部门获取准确的信息。
response: 公司规定通常包含在员工手册中，内容可能包括考勤制度、休假政策、绩效评价、薪酬福利、保密协议和行为准则等。如果你需要具体某一方面的规定，建议直接咨询人力资源部门或查阅相关文件。
完成，耗时: 2.31秒


'公司规定通常包含在员工手册中，内容可能包括考勤制度、休假政策、绩效评价、薪酬福利、保密协议和行为准则等。如果你需要具体某一方面的规定，建议直接咨询人力资源部门或查阅相关文件。'