## 1. 选择模型

In [12]:
import getpass
import os

os.environ["LANGSMITH_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGSMITH_PROJECT_ID"] = "project-rag-1.0.0"
os.environ["LANGSMITH_PROJECT_NAME"] = "rag-sample"
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "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"

from langchain_deepseek.chat_models import ChatDeepSeek  # 导入 DeepSeek 的聊天模型
from langchain_huggingface import HuggingFaceEmbeddings # 导入 HuggingFace 的嵌入模型
from langchain.chains import RetrievalQA # 导入检索问答链
from langchain.prompts import PromptTemplate # 导入提示模板
from langchain.chains import LLMChain # 导入 LLM 链
from langchain.memory import ConversationBufferMemory # 导入对话缓冲区内存
from langchain_chroma import Chroma # 导入 Chroma 向量存储

### 1. 聊天模型

In [13]:
# 创建 DeepSeek 聊天模型实例
deepseek_api_key = "sk-fffbb9b8a78d436a91a4780356b67a93"
# 选择deepseek-V3模型
llm = ChatDeepSeek(model="deepseek-chat", api_key = deepseek_api_key, temperature=0, base_url='https://api.deepseek.com')
# 选择deepseek-R1模型
# llm = ChatDeepSeek(model="deepseek-reason", api_key = deepseek_api_key, temperature=0, base_url='https://api.deepseek.com')

### 2. 嵌入模型

In [None]:
from langchain_community.embeddings import HuggingFaceEmbeddings

# 初始化模型
model_kwargs = {'device': 'cuda'} # 使用 GPU 进行推理
encode_kwargs = {'normalize_embeddings': True}  # 是否归一化

# 创建 HuggingFace 嵌入模型实例
# 这里使用了 BAAI 的 BGE 模型，可以根据需要选择其他模型
bge_zh_embeddings = HuggingFaceEmbeddings(
    model_name="BAAI/bge-large-zh-v1.5",
    model_kwargs=model_kwargs,
    encode_kwargs=encode_kwargs
)

In [None]:
bge_m3_embeddings = HuggingFaceEmbeddings(
    model_name="BAAI/bge-m3",
    model_kwargs=model_kwargs,
    encode_kwargs=encode_kwargs
)

In [None]:
filter_embeddings = HuggingFaceEmbeddings(
    model_name="aspire/acge_text_embedding",
    model_kwargs=model_kwargs,
    encode_kwargs=encode_kwargs
)

config.json:   0%|          | 0.00/204 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/3.67M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.67M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

## 2. 构建索引

### 加载数据

#### JSONL文件

In [None]:
from langchain_community.document_loaders import JSONLoader


file_path = "database/json/sample_data.jsonl" # 数据文件路径

# 定义元数据函数
def metadata_func(record: dict, metadata: dict) -> dict:
    metadata["title"] = record.get("Title")
    metadata["date"] = record.get("Date")
    metadata["viewcount"] = record.get("ViewCount")
    metadata['source'] = file_path

    return metadata


# 加载数据
json_loader = JSONLoader(
    file_path=file_path,
    jq_schema='.[]',
    content_key="Content",
    metadata_func=metadata_func,
    text_content=True,
    json_lines=True,
    )

json_data = json_loader.load()

In [None]:
# print(json_data)

#### PDF文件

In [None]:
from langchain_community.document_loaders import PyPDFDirectoryLoader

# 加载 PDF 文件
pdf_loader = PyPDFDirectoryLoader("database/pdf")
pdf_data = pdf_loader.load()

In [None]:
# print(pdf_data)

In [None]:
all_data = json_data + pdf_data

## 切分数据

In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 创建文本分割器，设置分割参数
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    add_start_index=True,
    separators=[
        "\n\n",
        "\n",
        " ",
        ".",
        ",",
        "\u200b",  # Zero-width space
        "\uff0c",  # Fullwidth comma
        "\u3001",  # Ideographic comma
        "\uff0e",  # Fullwidth full stop
        "\u3002",  # Ideographic full stop
        "",
    ],
)
# 将数据分割成小块
data_splits = text_splitter.split_documents(all_data)

## 存储数据

In [None]:
vectordb_m3 = Chroma(
    collection_name="sample_collection_1",
    embedding_function=bge_m3_embeddings,
    persist_directory="./sample_chroma_1",  # Where to save data locally, remove if not necessary
).from_documents(documents=data_splits, embedding=bge_m3_embeddings)  # 创建 Chroma 向量存储实例

vectordb_zh = Chroma(
    collection_name="sample_collection_2",
    embedding_function=bge_zh_embeddings,
    persist_directory="./sample_chroma_2",  # Where to save data locally, remove if not necessary
).from_documents(documents=data_splits, embedding=bge_zh_embeddings)  # 创建 Chroma 向量存储实例

NameError: name 'bge_zh_embeddings' is not defined

## 3. 检索增强

### EnsembleRetriever（合并多个检索器）

In [None]:
from langchain_community.document_transformers import EmbeddingsClusteringFilter
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain.retrievers import MergerRetriever

In [None]:
retriever_zh = vectordb_zh.as_retriever(
    search_type="similarity", search_kwargs={"k": 5, "include_metadata": True}
)
retriever_m3 = vectordb_m3.as_retriever(
    search_type="mmr", search_kwargs={"k": 5, "include_metadata": True}
)

lotr = MergerRetriever(retrievers=[retriever_zh, retriever_m3])

NameError: name 'vectordb_zh' is not defined

### 上下文压缩 + 重排序

In [None]:
from langchain_community.document_transformers import LongContextReorder

# Reorder the documents:
# Less relevant document will be at the middle of the list and more
# relevant elements at beginning / end.
reordering = LongContextReorder()

filter = EmbeddingsRedundantFilter(embeddings=filter_embeddings)
#filter_ordered_by_retriever = EmbeddingsClusteringFilter(
#    embeddings=filter_embeddings,
#    num_clusters=10,
#    num_closest=1,
#    sorted=True,
#)

pipeline = DocumentCompressorPipeline(transformers=[filter, reordering])
compression_retriever = ContextualCompressionRetriever(
    base_compressor=pipeline, base_retriever=lotr
)

NameError: name 'lotr' is not defined

### MultiQueryRetriever（多角度问题召回器）

In [None]:
import logging
from typing import List
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_core.documents import Document
from langchain_core.runnables import chain

logging.basicConfig()
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)

# Output parser will split the LLM result into a list of queries
class LineListOutputParser(BaseOutputParser[List[str]]):
    """Output parser for a list of lines."""
    def parse(self, text: str) -> List[str]:
        lines = text.strip().split("\n")
        return list(filter(None, lines))  # Remove empty lines
    

def my_retriever(question: str) -> List[Document]:
    """Create a retriever that generates multiple queries."""
    # Prompt template for generating multiple queries
    QUERY_PROMPT = PromptTemplate(
        input_variables=["question"],
        template="""你的任务是生成五个不同的问题版本，以便从向量数据库中检索相关文档。通过从多个角度重新表述用户的问题，你的目标是帮助用户克服基于距离的相似性搜索的一些局限性。请将这些替代问题用换行分隔。原始问题：{question}""",
    )
    output_parser = LineListOutputParser()
    llm = ChatDeepSeek(model="deepseek-chat", api_key = deepseek_api_key, temperature=0, base_url='https://api.deepseek.com')
    llm_chain = QUERY_PROMPT | llm | output_parser
    retriever = MultiQueryRetriever(
        retriever=compression_retriever, llm_chain=llm_chain, parser_key="lines"
    )  # "lines" is the key (attribute name) of the parsed output
    docs = retriever.invoke(question)
    reordering = LongContextReorder()
    reordered_docs = reordering.transform_documents(docs)
    
    return reordered_docs


INFO:langchain.retrievers.multi_query:Generated queries: ['1. 京剧中的旦角有哪些著名的表演流派？  ', '2. 京剧旦角的表演风格主要分为哪几类？  ', '3. 京剧旦角的艺术流派有哪些代表性人物？  ', '4. 京剧旦角的流派划分及其特点是什么？  ', '5. 京剧旦角在表演艺术上有哪些不同的派别？']


In [None]:
# Print the unique documents
#print(f"Number of unique documents: {len(unique_docs)}")
#print(unique_docs[0].page_content)
#print("=" * 50)
#print(unique_docs[-1].page_content)

Number of unique documents: 15
尚派京剧
京剧，又称平剧、京戏等，中国国粹之一，是中国影响最大的戏曲剧种，分布地以北京为中心，遍及全国各地。京剧流派主要是指演员的表演艺术风格和艺术特点，并且这种风格特点得到师承和传播。
一个剧种中出现不同的流派是艺术发展的必然产物，多种流派的形成是艺术昌盛的反映。京剧旦角主要分为四大流派：梅派、程派、荀派、尚派。
尚派艺术的创始人是尚小云。尚派行腔吐字清楚，以嗓音清亮激越、旋律跌宕缭绕的传统，以板头的变化运用，打破唱腔的固定节奏，展示唱腔的丰富内涵；又以斩钉截铁的断和错综有力的顿挫，使唱腔错落有致，往往在平易简约、坚实整齐中呈现峭险之处，显得力透纸背。
本期线上U课由佳木斯市群众艺术馆邀请佳木斯市京剧团知名艺术家、国家一级演员吴玲玲，为大家讲解尚派京剧的基本知识、念白、唱段，并进行教学示范。
扫描二维码观看精彩课程
课程内容
第一节 走进京剧
京剧流播全国，影响甚广，有“国剧”之称。京剧的角色分为生、旦、净、丑、杂、武、流等行当，后三行已不再立专行。各行当都有一套表演程式，唱念做打的技艺各具特色。
第二节 青衣与花旦的区别
i
国家公共文化数字支撑平台
数字资源标准规范
第四部分
数字资源元数据标准规范、交换标准规范
及著录规则
委托方：文化部全国公共文化发展中心
研制方：北京大学
201
5
年
4
月


### 自查询-过滤元数据

In [None]:
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_openai import ChatOpenAI

metadata_field_info = [
    AttributeInfo(
        name="title",
        description="The genre of the movie. One of ['science fiction', 'comedy', 'drama', 'thriller', 'romance', 'action', 'animated']",
        type="string",
    ),
    AttributeInfo(
        name="viewcount",
        description="The year the movie was released",
        type="integer",
    ),
    AttributeInfo(
        name="date",
        description="The name of the movie director",
        type="string",
    ),
]

## 4. 生成回答

In [None]:
template = """使用以下的上下文片段回答最后的问题。
如果你不知道答案，只需说你不知道，不要编造答案。
答案最多使用三句话，并尽量保持简洁。在回答的最后总是说“谢谢提问！”

{context}

问题：{question}

回答：
"""

rag_prompt = PromptTemplate.from_template(template)

In [None]:
from langchain_core.documents import Document
from typing_extensions import List, TypedDict


class State(TypedDict):
    question: str
    context: List[Document]
    answer: str

def retrieve(state: State):
    retrieved_docs = my_retriever(state["question"])
    return {"context": retrieved_docs}

def generate(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    messages = rag_prompt.invoke({"question": state["question"], "context": docs_content})
    response = llm.invoke(messages)
    return {"answer": response.content}

In [None]:
from langgraph.graph import START, StateGraph

workflow = StateGraph(State).add_sequence([retrieve, generate])
workflow.add_edge(START, "retrieve")
app = workflow.compile()

In [None]:
question = "京剧旦角主要分为哪几个流派？"

result = app.invoke({"question": question})

# print(f'Context: {result["context"]}\n\n')
print(f'Answer: \n{result["answer"]}')