In [None]:
import os
from langchain.document_loaders import PyPDFLoader
from typing import List, Tuple, Dict, Any
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.tools.retriever import create_retriever_tool
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_experimental.text_splitter import SemanticChunker
from langchain.embeddings import HuggingFaceEmbeddings
from datasets import load_dataset
from langchain.schema import Document
import numpy as np
import json
import faiss
from langchain.chat_models import ChatOllama
from datasets import load_dataset
from evaluate import load
import time
import pandas as pd

# DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")

In [None]:
from llama_cpp import Llama
from langchain.embeddings.base import Embeddings

# 自定义 LangChain 的 Embeddings 类封装
class LlamaCppEmbeddings(Embeddings):
    def __init__(self, model_path: str):
        self.llm = Llama(model_path=model_path, embedding=True)

    def embed_documents(self, texts: list[str]):
        # return [self.llm.embed(text)["data"][0]["embedding"] for text in texts]
        embeddings = []
        for text in texts:
            result = self.llm.embed(text)
            if isinstance(result, list) and isinstance(result[0], list):
                embeddings.append(result[0])
            else:
                embeddings.append(result)
        return embeddings

    def embed_query(self, text):
        # return self.llm.embed(text)["data"][0]["embedding"]
        result = self.llm.embed(text)
        return result[0] if isinstance(result, list) and isinstance(result[0], list) else result

In [None]:
class Proof():
    def __init__(self, document: Document, vector: List[np.ndarray], score: float):
        self.document = document
        self.vector = vector
        self.score = score

In [None]:
class Client:
    """
    轻量级rag客户端，负责数据集加载、向量存储构建与检索。
    """
    def __init__(self, model_path: str = "./models/Qwen3-Embedding/Qwen3-Embedding-0.6B-Q8_0.gguf", 
                vectorstore_path: str = "faiss_db"): # dashscope_api_key: str,使用api调用embedding模型
        os.environ.setdefault("KMP_DUPLICATE_LIB_OK", "TRUE")
        self.vectorstore_path = vectorstore_path

        # 如果使用Qwen/Qwen3-Embedding-4B
        # self.embeddings = HuggingFaceEmbeddings(
        #     model_name=model_name,
        #     model_kwargs={"device": "cuda"},  # 如果是CPU，可以改成 "cpu"
        #     encode_kwargs={"normalize_embeddings": True}
        # )
        self.embeddings = LlamaCppEmbeddings(model_path=model_path)
        self.db: FAISS = None

    def _chunk_text(self, text: str, chunk_size=800, overlap= 200) -> list[str]:
        """
        将文本分块处理，使用递归字符分割器。
        """
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=overlap,
            separators=["\n\n", "\n", "。", ".", "！", "？", "!", "?", " ", ""],
            length_function=len
        )
        return splitter.split_text(text)
    
    def _semantic_chunk_docs(self, docs: list[Document],
                             buffer_size=3,
                             breakpoint_threshold_type="percentile",
                             sentence_split_regex=r"(?<=[.?!])\s+") -> list[Document]:
        """
        使用语义分块器对文档进行分块处理。
        """
        splitter = SemanticChunker(
            embeddings=self.embeddings,
            buffer_size=buffer_size,
            breakpoint_threshold_type=breakpoint_threshold_type,
            sentence_split_regex=sentence_split_regex
        )
        return splitter.split_documents(docs)

    # 读取PDF文件并提取文本内容
    def _read_pdfs(self, pdf_paths: List[str]) -> List[Document]:
        docs = []
        for i, path in enumerate(pages):
            loader = PyPDFLoader(path)
            pages = loader.load_and_split()
            for page in pages:
                docs.append(Document(page_content=page.page_content, metadata={'source': path, 'doc_id': i}))
        return docs

    # 读取JSON文件夹中的所有文件
    def _load_json_folder(self, folder_path: str) -> List[Document]:
        docs = []
        i = 0
        for filename in os.listdir(folder_path):
            if not filename.endswith('.json'):
                continue
            filepath = os.path.join(folder_path, filename)
            with open(filepath, encoding='utf-8') as f:
                data = json.load(f)
            content = f"{data.get('title', '')}\n{data.get('content', '')}".strip()
            if content:
                docs.append(Document(page_content=content, metadata={'source': filepath, 'doc_id': i}))
                i+=1
        return docs
    
    # 在线读取数据集
    def _streaming_load_dataset(self, sample_size=100, language='en', date_version='20231101') -> List[str]:
        # 启用streaming模式在线读取huggingface datasets
        dataset = load_dataset("wikimedia/wikipedia", f'{date_version}.{language}', streaming=True)
        docs = []
        for i, item in enumerate(dataset['train']):
            if i >= sample_size:
                break
            text = item.get('text', '')
            title = item.get('title', '')
            if not text:
                continue
            # snippet = text[:5000]
            meta = {'source': f'wikipedia://{language}/{item.get("id")}', 'doc_id': i}
            docs.append(Document(page_content=f"{title}\n{text}", metadata=meta))
        print(f"Streamed {len(docs)} Wikipedia docs.")
        return docs
    
    def build_vectorstore(self, sample_size=100, batch_size=10, 
                          streaming=False, folder_path=None, pdf_paths:List[str]=None,
                          buffer_size=3, threshold_type="percentile", sentence_split_regex=r"(?<=[.?!])\s+"):
        docs = []
        if streaming:
            # 在线读取数据集
            docs.extend(self._streaming_load_dataset(sample_size))
        elif folder_path is not None and pdf_paths is None:
            # 从指定文件夹加载JSON文件
            docs.extend(self._load_json_folder(folder_path))
        elif pdf_paths is not None:
            # 从PDF文件加载
            docs.extend(self._read_pdfs(pdf_paths))

        # 使用 SemanticChunker 分块
        chunks = self._semantic_chunk_docs(docs, buffer_size, threshold_type, sentence_split_regex)
        print(f"Total chunks after semantic split: {len(chunks)}")

        # 构建 FAISS
        texts, metadatas = [], []
        for i, doc in enumerate(chunks):
            texts.append(doc.page_content)
            metadatas.append({
                "source": doc.metadata.get("source", ""),
                "doc_id": doc.metadata.get("doc_id", ""),
                "faiss_id": i
            })
            if len(texts) >= batch_size or i == len(chunks) - 1:
                if self.db is None:
                    self.db = FAISS.from_texts(texts, embedding=self.embeddings, metadatas=metadatas, normalize_L2=True)
                else:
                    self.db.add_texts(texts, metadatas=metadatas)
                texts.clear()
                metadatas.clear()
                print(f"Inserted batch up to chunk {i+1}/{len(chunks)}")

        # 保存向量库
        if self.db:
            self.db.save_local(self.vectorstore_path)
            print(f"Vectorstore saved to {self.vectorstore_path}")
        else:
            print("No data processed.")

    def load_vectorstore(self) -> None:
        """
        加载已保存的向量存储
        """
        if not os.path.exists(self.vectorstore_path):
            raise FileNotFoundError(f"Vectorstore directory '{self.vectorstore_path}' not found.")
        self.db = FAISS.load_local(
            self.vectorstore_path,
            embeddings=self.embeddings,
            allow_dangerous_deserialization=True
        )
        print(f"Vectorstore {self.vectorstore_path} loaded.")

    def retrieve(self, query:str, top_k=4):
        """
        通过query在FAISS向量库中检索k个最相似文档，
        返回每个Document对象、其特征向量及相似度得分
        """
        # 检查向量库是否已加载
        if self.db is None:
            raise ValueError("Vectorstore尚未加载，请先调用load_vectorstore或build_vectorstore")

        # 查询向量并归一化
        query_vec = np.array(self.embeddings.embed_query(query), dtype=np.float32)
        query_norm = query_vec / np.linalg.norm(query_vec)

        # 使用 FAISS 内积搜索（等价于余弦相似度）
        scores, indices = self.db.index.search(query_norm.reshape(1, -1), top_k)

        results = []
        for i, idx in enumerate(indices[0]):
            if idx == -1:
                continue
            doc_id = self.db.index_to_docstore_id[idx]
            doc = self.db.docstore.search(doc_id)

            faiss_index = int(idx)
            vec = self.db.index.reconstruct(faiss_index).tolist()
            score = float(scores[0][i])  # 余弦相似度
            results.append(Proof(doc, vec, score))
        
        return results, query_norm.tolist()

In [None]:
clients = [Client(vectorstore_path="./common_sense_db"), Client(vectorstore_path="./computer_science_coding_related_db"),
               Client(vectorstore_path="./law_related_db"), Client(vectorstore_path="./medicine_related_db")]
folder_paths = ["./classified_dataset/common_sense", "./classified_dataset/computer_science_coding_related",
             "./classified_dataset/law_related", "./classified_dataset/medicine_related"]
for c, f in zip(clients, folder_paths):
    print(f"Building vectorstore for {f}...")
    c.build_vectorstore(folder_path=f)

In [None]:
class Server:
    """
    Server 类，负责：
    1) 接收客户端选择的上下文数据
    2) 验证数据完整性（通过 Proof 信息）
    3) 调用 Ollama 部署的 Qwen3:4B 模型生成答案
    """
    def __init__(self, model_name: str = "qwen3:4b"):
        self.llm = ChatOllama(model=model_name)

    def verify_documents(self):
        return

    def build_prompt(self, query: str, contexts: List[str]) -> str:
        """构造 Prompt，将 query 和上下文拼接"""
        prompt = "You are an AI assistant. Use the following contexts to answer the question:\n"
        for i, c in enumerate(contexts, 1):
            prompt += f"Context {i}: {c}\n"
        prompt += f"Question: {query}\nAnswer:"
        return prompt

    def generate_answer(self, query: str, contexts: List[str]) -> str:
        """
        验证 Proof 后调用模型生成答案
        """
        # if not self.verify_documents(contexts, proofs):
        #     raise ValueError("Proof verification failed! Data may be tampered.")
        prompt = self.build_prompt(query, contexts)
        response = self.llm.predict(prompt)
        return response