In [None]:
"""
Langchain以其模块化和集成性胜出，用开发LLM的通用框架。与此相比，LlamaIndex是为LLMs学习私有知识，擅长搜索和检索，专门用于构建RAG系统的框架。
https://docs.llamaindex.ai/en/stable/index.html
"""
import os
from wayne_utils import load_data, save_data
import jieba

# 基础包
from llama_index.core import Settings
from llama_index.core.schema import TextNode, Document

embed_model = None
llm = None

doc_dir = "/home/jiangpeiwen2/jiangpeiwen2/projects/text2table_preprocess/CPL/raw/FirstCollection/doc"
data_path = "/home/jiangpeiwen2/jiangpeiwen2/projects/text2table_preprocess/CPL/pairs/texts.text"

# 1 文本预处理

## 1.1 数据加载为Document

In [None]:
"""批量数据加载
1. SimpleDirectoryReader: 批量加载数据为Document列表，指定目录和编码格式，可迭代子目录，指定后缀或禁止后缀，
    - 支持文件格式：无后缀（默认为txt）, txt, csv, docx, epub, ipynb, jpe, md, mp3,pdf, ppt等
    - 不支持格式：JSON，建议使用专门的json加载器
2. wayne_utils.load_data: 从文件加载数据，支持json, txt, csv, excel等格式。需要后续手动处理为Document列表
"""
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter, SimpleNodeParser
reader = SimpleDirectoryReader(
    input_dir = doc_dir,
    # input_files = [],                 # 指定文件列表
    # required_exts=[".txt"],           # 指定的后缀
    # exclude = [".txt"],               # 排除的后缀
    # recursive = True,                 # 是否递归子目录
    # filename_as_id = True,            # 是否使用文件名作为Document的id
    # encoding = "utf-8"                  # "utf-16"
)
documents = reader.load_data()
# 或者如下
texts = load_data(data_path, "text")
documents = []
for text in texts[:20]:
    title = text.split("###")[0]
    content = text.split("###")[1]
    documents.append( Document(text=content, metadata={"title": title}) )

## 1.2 Document切分为Node

In [None]:
"""chunk切分
1. 手动构建chunk然后实例化为Node
2. SentenceSplitter/SimpleNodeParser：分句器/节点解析器，直接从Document列表进行批量切分为Nodes
"""

# 自行进行切割Node化
chunks = []
node_list = [ TextNode( text=chunks[i], id_=str(i), metadata={} ) for i in range(len(chunks)) ]

# 或者使用SentenceSplitter进行切分
def chinese_tokenizer(text):
    return list(jieba.cut(text))

sent_spliter = SentenceSplitter(
    chunk_size = 128,
    chunk_overlap = 16,
    tokenizer = chinese_tokenizer,
    paragraph_separator = "\\n",
    secondary_chunking_regex = None     # 正则表达式，用于将段落拆分为句子
)
# 分割单个Document
node = sent_spliter.split_text( documents[0].text )
# 成批量处理Documents
node_list = sent_spliter.get_nodes_from_documents(documents, show_progress=True)

"""
此外，还可设置node之间的关系，形成图结构
node1.relationships[DocumentRelationship.NEXT] = node2.get_doc_id()
node2.relationships[DocumentRelationship.PREVIOUS] = node1.get_doc_id()
"""

# 2 Embed和LLMs加载

In [None]:
# 语言模型
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.huggingface import HuggingFaceLLM

In [None]:
gpu_id = 1
embedding_model_path = "/home/jiangpeiwen2/jiangpeiwen2/projects/TKGT/Hybrid_RAG/retriever/embed_model/sentence-transformer"
llm_path = "/home/jiangpeiwen2/jiangpeiwen2/workspace/LLMs/glm-4-9b-chat"
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)

embed_model = HuggingFaceEmbedding( model_name = embedding_model_path )
Settings.embed_model = embed_model
llm = HuggingFaceLLM(
    model_name = llm_path,
    tokenizer_name  = llm_path,
    model_kwargs={"trust_remote_code":True},
    tokenizer_kwargs={"trust_remote_code":True}
)
Settings.llm = llm

# 3 Node list向量索引与持久化

## 3.1 Faiss向量库：稠密向量

In [None]:
"""
Embeddings存储在index内部，不论是Faiss还是VectorStoreIndex。具体而言，如果传入的是Node list，则要求所有node的embeddings都是已存在。
使用Faiss数据库保存向量，在查询期间，索引使用Faiss查询前k个嵌入，并返回相应的索引"""
# ! pip install llama-index-vector-stores-faiss faiss-gpu faiss-cpu
from llama_index.vector_stores.faiss import FaissVectorStore
import faiss
from llama_index.core.vector_stores import VectorStoreQuery
from llama_index.core import StorageContext, VectorStoreIndex

# 对node list进行embedding，并将嵌入结果存储在Node数据结构中
embed_model( node_list )
d = len(node_list[0].embedding)
faiss_index = faiss.IndexFlatL2(d)
faiss_vector_store = FaissVectorStore(faiss_index=faiss_index)       # 或者从from_persist_path
new_ids = faiss_vector_store.add(node_list)
# vector_store.persist( "persist/index" )
    
# 直接存储Faiss数据库索引
faiss_vector_store.persist( "persist/faiss_store" )
faiss_vector_store = FaissVectorStore.from_persist_path( "persist/faiss_store" )

## 3.2 VectorStoreIndex：稠密向量

In [None]:
from llama_index.core import VectorStoreIndex, StorageContext, load_index_from_storage
"""创建对node list中每个node/chunk的向量和索引结构，通常需要先使用StorageContext创建索引结构，然后使用Indexer创建向量
StorageContext：存储上下文，里面保存docstore、index_store、vector_store、graph_store和property_graph_store
"""
storage_context = StorageContext.from_defaults(
    # docstore: Optional[BaseDocumentStore] = None,
    # index_store: Optional[BaseIndexStore] = None,
    # vector_store: Optional[BasePydanticVectorStore] = None,
    # image_store: Optional[BasePydanticVectorStore] = None,
    # vector_stores: Optional[Dict[str, BasePydanticVectorStore]] = None,
    # graph_store: Optional[GraphStore] = None,
    # property_graph_store: Optional[PropertyGraphStore] = None,
    # persist_dir: Optional[str] = None,                                        # 持久化目录，如果传入这个参数，上述索引都从这个目录直接加载；否则要么使用传入的，要么直接实例化空类。
)

# 将文档数据（node）加入存储
storage_context.docstore.add_documents( node_list )

# 直接使用VectorStoreIndex实例化对node_list进行embedding后获得索引index，但不会保存vector
# 也可以使用其from_vector_store方法，传入已有的向量存储（如Faiss），直接获得索引
vector_index = VectorStoreIndex(
    nodes = node_list,
    # use_async: bool = False,
    # store_nodes_override: bool = False,
    # embed_model: Optional[EmbedType] = None,                  # 要么自己传入，要么使用Settings的embed_model，只能产生稠密的向量索引
    # insert_batch_size: int = 2048,
    # # parent class params
    # objects: Optional[Sequence[IndexNode]] = None,
    # index_struct: Optional[IndexDict] = None,
    storage_context = storage_context,
    # callback_manager: Optional[CallbackManager] = None,
    # transformations: Optional[List[TransformComponent]] = None,
    show_progress = True,
)
storage_context.vector_store.stores_text = True
# 已经包含文档、向量、索引三部分的要素
storage_context.persist( "persist/normal_storage_context" )
# 从持久化存储中恢复索引并作为检索器使用
storage_context = StorageContext.from_defaults( persist_dir = "persist/normal_storage_context")

## 3.3 Qdrant向量库：稀疏向量+稠密向量

In [None]:
"""稀疏向量主要是TF-IDF/BM25等，在Llama_index中只有特定向量库插件原生支持稀疏向量
Qdrant的 BM42，需要使用docker container启动一个客户端client
```bash
docker pull qdrant/qdrant
docker run -p 6333:6333 -p 6334:6334 \
    -v $(pwd)/qdrant_storage:/qdrant/storage:z \
    qdrant/qdrant
```
此外，该向量库不支持仅稀疏查询
"""
# ! pip install llama-index llama-index-vector-stores-qdrant fastembed
from fastembed import SparseTextEmbedding
from llama_index.core import VectorStoreIndex, StorageContext

# （1）从fastembed加载稀疏模型
sparse_model = SparseTextEmbedding(
    model_name="Qdrant/bm42-all-minilm-l6-v2-attentions",
)
# 直接embedding
# node_text_list = [ node_list[i].text for i in range(len(node_list)) ]
# sparse_embeddings = sparse_model.embed( node_text_list )


# （2）构建Qdrant向量数据库的客户端和存储实例，持久化有两种模式，一种是直接传入client，另一种是传入path。持久化通过collection_name获取。
import qdrant_client
from llama_index.vector_stores.qdrant import QdrantVectorStore

client = qdrant_client.QdrantClient( "http://localhost:6333" )      # "http://localhost:6333", path="persist/hybrid2_storage_context"
# aclient = qdrant_client.AsyncQdrantClient(path="persist/hybrid2_storage_context")

if client.collection_exists("llama2_bm42"):
    client.delete_collection("llama2_bm42")

sparse_vector_store = QdrantVectorStore(
    collection_name="llama2_bm42",
    client=client,
    enable_hybrid=True,
    fastembed_sparse_model="Qdrant/bm42-all-minilm-l6-v2-attentions",
)


# （3）混合索引，storage_context在初始化时已经包含了稀疏向量，然后再加入稠密向量（必须）
storage_context = StorageContext.from_defaults(vector_store=sparse_vector_store)

hybrid_index = VectorStoreIndex(
    nodes = node_list,
    storage_context = storage_context,
    # dense embedding model
    embed_model=embed_model,
    show_progress = True,
)

In [None]:
# （4）RAG
chat_engine = hybrid_index.as_chat_engine(
    chat_mode="condense_plus_context",
    llm=llm,
)
response = chat_engine.chat("对借款期限没有约定或者约定不明确应该怎么处理？")
print(str(response))

# 4 检索

## 4.1 Faiss向量数据库检索

In [None]:
from llama_index.core.vector_stores import VectorStoreQuery

faiss_vector_store = FaissVectorStore.from_persist_path( "persist/faiss_store" )
query_string = "双方口头约定月利率1.5%。从2013年5月份起胡军、李薇及其亲属每月通过转账向王红支付利息直至2014年3月27日。2015年胡军、李薇偿还了2万元本金"
query = VectorStoreQuery( 
    query_embedding = embed_model( [TextNode( text=query_string )] )[0].embedding, 
    similarity_top_k = 3,
    query_str = query_string
)
query_results = faiss_vector_store.query( query )
for _id in query_results.ids:
    print(node_list[int(_id)-1].text)

## 4.2 基于VectorStoreIndex的检索

In [None]:
from llama_index.core import StorageContext, load_index_from_storage
storage_context = StorageContext.from_defaults( persist_dir = "persist/normal_storage_context")
vector_index = load_index_from_storage(storage_context)
vector_retriever = VectorIndexRetriever(index=vector_index, similarity_top_k=3, score = 0.1)
query = "\\n本院认为：本案争议焦点在于原告与被告李影之间的借贷关系是否成立并生效以及在此前提之下被告梁进是否负有还款义务。"
vector_retriever.retrieve( query )

## 4.3 基于Qdrant向量库的混合检索

In [None]:
"""
客户端自动管理持久化
"""
import qdrant_client
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.vector_stores.qdrant import QdrantVectorStore

client = qdrant_client.QdrantClient("http://localhost:6333")

vector_store = QdrantVectorStore(
    collection_name="llama2_bm42",
    client=client,
    enable_hybrid=True,
    fastembed_sparse_model="Qdrant/bm42-all-minilm-l6-v2-attentions",
)
# 直接从现有的collection的vector_store中获取
loaded_index = VectorStoreIndex.from_vector_store(
    vector_store,
    embed_model=embed_model
)

chat_engine = loaded_index.as_chat_engine(
    chat_mode="condense_plus_context",
    llm=llm,
)


In [None]:
response = chat_engine.chat("对借款期限没有约定或者约定不明确应该怎么处理？")
print(str(response))

## 4.4 稀疏检索：BM25

In [None]:
"""
由于LLama_index原生不支持仅稀疏查询，可以使用rank_bm25等包实现
!pip install rank_bm25
!pip install llama-index-retrievers-bm25
BM25Retriever是检索器，不存储数据，只是在数据上进行检索
"""

from rank_bm25 import BM25Okapi
from llama_index.retrievers.bm25 import BM25Retriever

topk = 3
query = "对借款期限没有约定或者约定不明确应该怎么处理？"

In [None]:
llama_bm25_retriever =  BM25Retriever.from_defaults(
    docstore=vector_index.docstore, similarity_top_k=topk, verbose=True,
    tokenizer=chinese_tokenizer,
    language="zh"
)
llama_bm25_retriever.retrieve( query )

In [None]:
import heapq
import numpy as np

class Rank_BM25_Retriever:
    """
    param node_list: llamainxde框架处理后的Node list
    param similarity_top_k: 返回前几个
    param score：threshold
    """
    
    def __init__(self, node_list, similarity_top_k = 2, score = 0.7):
        self.topk = similarity_top_k
        self.score = score
        self.text_list = [ node.text for node in node_list]
        corpus = [ chinese_tokenizer( sentence ) for sentence in self.text_list]
        try:
            self.retriever = BM25Okapi( corpus ) if len(corpus) > 0 else None
        except:
            raise Exception(corpus)
        
    
    def from_nodes_to_list( self, nodes_list ):
        ret_list = []
        for node in nodes_list:
            ret_list.append( node.text )
        return ret_list
    
    def retrieve( self, query):
        tokenized_query = chinese_tokenizer( query )
        if self.retriever==None:
            return []
        doc_scores = self.retriever.get_scores( tokenized_query )
        top_k_values, top_k_indices = self.find_top_k_elements(doc_scores, self.topk)

        max_abs_val = np.max(np.abs(top_k_values))  
        normalized_top_k_values = top_k_values / max_abs_val
        ret = []
        for i in range( len( normalized_top_k_values)):
            sentence = self.text_list[ top_k_indices[i] ]
            score = normalized_top_k_values[i]
            if score>= self.score:
                ret.append( {"text": sentence, "score": score})
        return ret

    def find_top_k_elements( self, array, topk):  
        # 使用一个列表来存储元素及其索引的元组  
        indexed_array = [(value, index) for index, value in enumerate(array)]  
        
        # 使用 heapq.nlargest 找到前 k 个最大的元素及其索引  
        top_k_elements = heapq.nlargest(topk, indexed_array, key=lambda x: x[0])  
        
        # 提取值和索引  
        top_k_values = [element[0] for element in top_k_elements]  
        top_k_indices = [element[1] for element in top_k_elements]  
        
        return top_k_values, top_k_indices

In [None]:
kapi_bm25_retriever = Rank_BM25_Retriever( node_list, similarity_top_k=3, score=0.1 )
kapi_bm25_retriever.retrieve( query )

# 5 进阶：自定义Hybrid Retriever

In [None]:
from typing import Optional
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks

from llama_index.core import KeywordTableIndex, VectorStoreIndex, StorageContext, load_index_from_storage
from llama_index.core.retrievers import KeywordTableSimpleRetriever, VectorIndexRetriever 
from llama_index.retrievers.bm25 import BM25Retriever

class Hybrid_Retriever:
    "多路召回Retrieval+重排ReRank"
    def __init__(
        self, 
        node_list:list, 
        hybrid_config:list[bool] = None, 
        embed_model_path:str = None, 
        llm_path:Optional[str] = None, 
        topk=3,
        topk_total=5,
        threshold=0.7, 
        gpu_id:Optional[int]=None,
        rerank_model_path:str='damo/nlp_rom_passage-ranking_chinese-base',
    ):
        """
        param node_list: chunk组成的node的列表
        param hybrid_config: 混合检索器的配置，长度为3的bool列表，三个位置分别表示是否使用关键词表检索器、稠密向量检索器、稀疏BM25检索器
        param embed_model_path: 嵌入模型地址
        param llm_path: 大模型地址
        param topk_total: 多路召回时最大数量
        param top_k: 检索器返回个数
        param threshold: vector_retriever的返回值门槛
        param gpu_id: 使用的GPU编号
        param rerank: 是否对多路召回重排
        """
        self.hybrid_config = hybrid_config
        self.topk = topk
        self.topk_total = topk_total
        self.threshold = threshold
        self._model_load( embed_model_path, llm_path, gpu_id )
        self.nodes = self._get_nodes( node_list )
        self.rerank_model_path = rerank_model_path
        
        # 构建检索器
        if sum( hybrid_config ) == 0:
            raise Exception( f"请至少指定一种检索器" )
        if hybrid_config[0]:
            keyword_storage_context = StorageContext.from_defaults()
            keyword_storage_context.docstore.add_documents(self.nodes)
            keyword_index = KeywordTableIndex(self.nodes, storage_context=keyword_storage_context)
            self.keyword_retriever = KeywordTableSimpleRetriever( keyword_index)
        if hybrid_config[1]:
            dense_storage_context = StorageContext.from_defaults()
            dense_storage_context.docstore.add_documents(self.nodes)
            dense_vector_index = VectorStoreIndex(self.nodes, storage_context=dense_storage_context)
            self.dense_retriever = VectorIndexRetriever(index=dense_vector_index, similarity_top_k=self.topk, score = self.threshold)
        if hybrid_config[2]:
            self.sparse_retriever =  BM25Retriever.from_defaults(
                docstore=vector_index.docstore, similarity_top_k=topk, verbose=True,
                tokenizer=chinese_tokenizer,
                language="zh"
            )
        if rerank_model_path:
            self.reranker = pipeline(task=Tasks.text_ranking, model=rerank_model_path, model_revision='v1.1.0')
    
    def _model_load( self, embed_model_path, llm_path, gpu_id ):
        # 指定GPU
        if gpu_id != None:
            os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
        # 加载嵌入模型
        if Settings.embed_model == None:
            embed_model = HuggingFaceEmbedding(
                model_name = embed_model_path
            )
            Settings.embed_model = embed_model
        # 加载LLM
        if llm_path != None and Settings.llm == None:
            llm = HuggingFaceLLM(
                model_name = llm_path,
                tokenizer_name  = llm_path,
                model_kwargs={"trust_remote_code":True},
                tokenizer_kwargs={"trust_remote_code":True}
            )
            Settings.llm = llm

    def _get_nodes( self, node_list ):
        nodes = []
        for i in range(len(node_list)):
            metadata={ }
            ids = str( len(nodes) )
            node = TextNode( text=node_list[i].text, id_=  ids, metadata=metadata)
            nodes.append( node )
        return nodes
    
    def retrieve( self, query:str ):
        keyword_results_list, dense_results_list, sparse_results_list  = [], [], []
        if self.hybrid_config[0]:
            keyword_results = self.keyword_retriever.retrieve( query )
            for item in keyword_results:
                keyword_results_list.append( item.text )
        if self.hybrid_config[1]:
            dense_results = self.dense_retriever.retrieve( query )
            for item in dense_results:
                dense_results_list.append( item.text )
        if self.hybrid_config[2]:
            sparse_results = self.sparse_retriever.retrieve( query )
            for item in sparse_results:
                sparse_results_list.append( item.text )
        all_results = []
        all_results.extend( keyword_results_list )
        all_results.extend( dense_results_list )
        all_results.extend( sparse_results_list )
        all_results = list(set(all_results))
        if self.rerank_model_path:
            all_results = self.re_rank( query, all_results )
        return all_results
    
    @classmethod
    def top_k_indices( cls, nums, k):
        if not isinstance(nums, list) or not all(isinstance(x, (int, float)) for x in nums):
            raise ValueError("nums 必须是一个浮点数列表")
        if not isinstance(k, int) or k <= 0:
            raise ValueError("k 必须是一个正整数")
        sorted_indices = sorted(range(len(nums)), key=lambda i: nums[i], reverse=True)
        return sorted_indices[: min(k, len(nums))]
    
    def get_rerank_top( self, results_list, results_list_rerank, topk=2):
        indexs = Hybrid_Retriever.top_k_indices( results_list_rerank, topk )
        ret_list = []
        for i in range( len(indexs) ):
            ret_list.append( results_list[indexs[i]]  )
        return ret_list
    
    def re_rank( self, query, all_results ):
        _inputs = {
            'source_sentence': [query],
            'sentences_to_compare': all_results
        }
        results_list_rerank = self.reranker(input=_inputs)['scores'] if len(all_results)>0 else []
        results = self.get_rerank_top( all_results, results_list_rerank, self.topk_total)
        return results
        

In [None]:
hybrid_retriever = Hybrid_Retriever( node_list, [0,1,1] )

In [None]:
query = "沈国松相关的案件"
results = hybrid_retriever.retrieve( query )