In [None]:
%%capture
!pip install numpy
!pip install langchain unstructured[pdf] chromadb sentence-transformers pypdf langchain_experimental rank_bm25
!pip install transformers
!pip install --upgrade numpy transformers
!pip install --upgrade numpy langchain
!pip install unsloth


from langchain_experimental.text_splitter import SemanticChunker
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain.document_loaders import DirectoryLoader,PyPDFLoader,TextLoader
from sentence_transformers import CrossEncoder
from langchain.text_splitter import RecursiveCharacterTextSplitter
from unsloth import FastLanguageModel
import torch
import re

In [None]:
from huggingface_hub import login
hf_token = "xxx"
login(hf_token)

In [None]:
# !pip install langchain datasets
from datasets import load_dataset
dataset = load_dataset("rag-datasets/rag-mini-wikipedia","text-corpus")
# print(dataset.keys())
# 获取训练集
articles = dataset['passages']  


with open("datasets/rag_mini_wikipedia.txt", "w", encoding="utf-8") as file:
    for article in articles:
        # print(article.keys())
        text = article['passage']  
        file.write(text + "\n")  

数据已保存为 rag_mini_wikipedia.txt


In [None]:
# ----------------------
# 1. 智能分块预处理
# ----------------------
class SmartDocumentProcessor:
    def __init__(self):
        self.embed_model = HuggingFaceEmbeddings(
            # model_name="BAAI/bge-small-zh-v1.5",
            model_name="BAAI/bge-small-en-v1.5",
            model_kwargs={"device": "cuda"},
            encode_kwargs={"batch_size": 16}
        )

    def _detect_content_type(self, text):
        """动态内容类型检测"""
        # 如果文本包含代码相关模式（如def、import、print或代码示例）标记为代码
        # if re.search(r'def |import |print\(|代码示例', text):
        #     return "code"
        # elif re.search(r'\|.+\|', text) and '%' in text:  # 如果文本包含表格相关模式（如|和百分比），标记为表格
        #     return "table"
        return "normal"   # 如果不满足上述条件，标记为普通文本

    def process_documents(self):
        # 加载文档
        # 创建加载器列表，处理知识库中的PDF和文本文件
        loaders = [
            DirectoryLoader("./datasets", glob="**/*.pdf", loader_cls=PyPDFLoader),
            DirectoryLoader("./datasets", glob="**/*.txt", loader_cls=TextLoader)
        ]
        
        documents = []
        
        for loader in loaders:
            documents.extend(loader.load())

        # 创建语义分块器，使用嵌入模型进行语义分块
        chunker = SemanticChunker(
            embeddings=self.embed_model, 
            breakpoint_threshold_amount=82,  
            add_start_index=True   
        )
        base_chunks = chunker.split_documents(documents)  # 使用语义分块器将文档分割为基本块

        # 二次动态分块
        final_chunks = []
        
        for chunk in base_chunks:
            content_type = self._detect_content_type(chunk.page_content)
            if content_type == "code":
                # 如果是代码，设置较小的块大小和重叠，用于保持上下文
                splitter = RecursiveCharacterTextSplitter(
                    chunk_size=256, chunk_overlap=64)
            elif content_type == "table":
                # 如果是表格，设置中等块大小和重叠
                splitter = RecursiveCharacterTextSplitter(
                    chunk_size=384, chunk_overlap=96)
            else:
                splitter = RecursiveCharacterTextSplitter(
                    chunk_size=512, chunk_overlap=128)
                # 如果是普通文本，设置较大的块大小和重叠
            final_chunks.extend(splitter.split_documents([chunk]))
        
        for i, chunk in enumerate(final_chunks):
            chunk.metadata.update({
                "chunk_id": f"chunk_{i}",
                "content_type": self._detect_content_type(chunk.page_content)
            })   

        return final_chunks

In [None]:
# ----------------------
# 2. 混合检索系统
# ----------------------
class HybridRetriever:
    def __init__(self, chunks):
        self.vector_db = Chroma.from_documents(
            chunks,
            embedding=HuggingFaceEmbeddings(model_name="BAAI/bge-large-en-v1.5"),
            persist_directory="./vector_db"
        )

        # 创建BM25检索器，从文档块中初始化，初始检索数量为5
        self.bm25_retriever = BM25Retriever.from_documents(
            chunks,
            k=5  
        )

        self.ensemble_retriever = EnsembleRetriever(
            retrievers=[
                self.vector_db.as_retriever(search_kwargs={"k": 5}),
                self.bm25_retriever
            ],
            weights=[0.6, 0.4]
        )

        # 初始化重排序模型
        self.reranker = CrossEncoder(
            "BAAI/bge-reranker-large",
            device="cuda" if torch.has_cuda else "cpu"
        )

    def retrieve(self, query, top_k=3):
        # 第一阶段：使用混合检索器获取相关文档
        docs = self.ensemble_retriever.get_relevant_documents(query)
        # 第二阶段：为查询和每个文档创建配对，用于重排序
        pairs = [[query, doc.page_content] for doc in docs]
        scores = self.reranker.predict(pairs)
        ranked_docs = sorted(zip(docs, scores), key=lambda x: x[1], reverse=True)
        # 返回top_k结果
        return [doc for doc, _ in ranked_docs[:top_k]]

In [None]:
# ----------------------
# 3. RAG系统集成
# ----------------------
class WikiRAG:
    def __init__(self):
        # 文档处理
        processor = SmartDocumentProcessor()
        chunks = processor.process_documents() #整合检索和生成功能

        # 初始化混合检索器，使用处理后的分块
        self.retriever = HybridRetriever(chunks)

        self.model, self.tokenizer = FastLanguageModel.from_pretrained(
            "unsloth/DeepSeek-R1-Distill-Qwen-14B",
            max_seq_length=4096
        )
        torch.manual_seed(3407)

        # 将模型设置为推理模式
        FastLanguageModel.for_inference(self.model)

    def generate_prompt(self, question, contexts):
        context_str = "\n\n".join([
            f"[source:{doc.metadata['source']}，type：{doc.metadata['content_type']}]\n{doc.page_content}"
            for doc in contexts
        ])
        # 创建提示模板，要求基于上下文回答问题
        return f"""You are a professional assistant, please strictly follow the context of the source:
        {context_str}

        Think through the steps and answer:{question}
        If context information is insufficient, specify the missing information. Finally, give a structured answer."""

    def ask(self, question):
        # 使用检索器获取与问题相关的上下文
        contexts = self.retriever.retrieve(question)

        # 根据问题和上下文生成提示
        prompt = self.generate_prompt(question, contexts)

        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
        # 使用语言模型生成回答
        generated_ids = self.model.generate(
            inputs["input_ids"],
            max_new_tokens=2048,
            temperature=0.3,
            top_p=0.9,

            do_sample=True
        )
        generated_text = self.tokenizer.decode(generated_ids[0], skip_special_tokens=True)
        response = {'choices': [{'text': generated_text}]}
        return response['choices'][0]['text']

In [None]:
rag = WikiRAG()
complex_question = "When did Lincoln begin his political career?"
answer = rag.ask(complex_question)
print(f"question:{complex_question}")
print("answer:")
print(answer)