###### https://medium.com/the-ai-forum/implementing-contextual-retrieval-in-rag-pipeline-8f1bc7cbd5e0

In [118]:
import hashlib
import os
import getpass
from typing import List, Tuple
from dotenv import load_dotenv
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain.vectorstores import FAISS
from langchain_openai import ChatOpenAI
from langchain_groq import ChatGroq
from langchain.prompts import ChatPromptTemplate
from rank_bm25 import BM25Okapi
from langchain.schema import BaseRetriever, Document
from langchain.retrievers import ContextualCompressionRetriever,BM25Retriever,EnsembleRetriever
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain_community.document_transformers.embeddings_redundant_filter import EmbeddingsRedundantFilter
from langchain.retrievers.document_compressors import FlashrankRerank
from langchain_community.embeddings import HuggingFaceEmbeddings
import time
import random
from transformers import AutoTokenizer

from whoosh.index import create_in, open_dir, exists_in
from whoosh.fields import Schema, TEXT, ID
from whoosh.qparser import QueryParser
from pydantic import Field
from typing import List

import numpy as np

In [3]:
import os
from dotenv import load_dotenv
load_dotenv()

True

### BM25 Retriever

In [54]:
class WhooshIndexManager:
    def __init__(self, index_dir: str = "./storage/bm25_index"):
        """
        Initialize the Whoosh index manager.

        Args:
            index_dir (str): Directory to store the Whoosh index.
        """
        self.index_dir = index_dir
        self.schema = Schema(
            doc_id=ID(stored=True, unique=True),
            content=TEXT(stored=True)
        )
        self.index = self._initialize_index()

    def _initialize_index(self):
        """Initialize or load the index."""
        if not os.path.exists(self.index_dir):
            # Create the directory if it doesn't exist
            os.makedirs(self.index_dir, exist_ok=True)
            print(f"Created directory: {self.index_dir}")
            # Create a new index
            return create_in(self.index_dir, self.schema)
        elif exists_in(self.index_dir):
            # Open the existing index
            print(f"Loading existing index from: {self.index_dir}")
            return open_dir(self.index_dir)
        else:
            # Directory exists, but it's not a valid Whoosh index
            raise ValueError(
                f"The directory '{self.index_dir}' exists but does not contain a valid Whoosh index. "
                "Delete the directory or ensure it contains a valid index."
            )

    def add_documents(self, documents: List[Document]):
        """Add documents to the index."""
        writer = self.index.writer()
        for i, doc in enumerate(documents):
            writer.add_document(doc_id=str(i), content=doc.page_content)
        writer.commit()
        print(f"Added {len(documents)} documents to the index.")

    def search(self, query: str, top_n: int = 10) -> List[dict]:
        """Search the index."""
        with self.index.searcher() as searcher:
            parser = QueryParser("content", schema=self.schema)
            parsed_query = parser.parse(query)
            results = searcher.search(parsed_query, limit=top_n)
            return [
                {"content": result["content"], "doc_id": result["doc_id"]}
                for result in results
            ]

In [55]:
class WhooshRetriever(BaseRetriever):
    index_manager: object = Field(...)
    k: int = Field(default=10)

    def __init__(self, index_manager, k: int = 10):
        """
        Initialize the Whoosh retriever.

        Args:
            index_manager (WhooshIndexManager): The Whoosh index manager.
            k (int): Number of documents to retrieve.
        """
        super().__init__(index_manager=index_manager,
                         k=k)  # Initialize the BaseRetriever with Pydantic fields

    def _get_relevant_documents(self, query: str) -> List[Document]:
        """Retrieve documents in LangChain-compatible format."""
        results = self.index_manager.search(query, top_n=self.k)
        return [
            Document(page_content=result["content"], metadata={
                     "doc_id": result["doc_id"]})
            for result in results
        ]

In [None]:
# # Example documents
# documents = [
#     {"content": "This is the first document."},
#     {"content": "This is the second document."},
#     {"content": "This is the third document."},
# ]

# # Initialize the Whoosh index manager
# index_manager = WhooshIndexManager(index_dir="./storage/bm25_index")

# # Add documents to the index
# index_manager.add_documents(documents)

In [None]:
# # Initialize the Whoosh retriever
# whoosh_retriever = WhooshRetriever(index_manager=index_manager, k=10)

# # Retrieve documents based on a query
# query = "second document"
# results = whoosh_retriever._get_relevant_documents(query)
# for result in results:
#     print(result.page_content)

### Storage Class - FAISS

In [111]:
class MyFAISSStorage:
    def __init__(self, storage_path: str, embeddings):
        """
        Args:
            storage_path (str): Path to the directory where the FAISS index will be saved/loaded.
            embeddings: Embeddings model (e.g., OpenAIEmbeddings, SentenceTransformerEmbeddings).
        """
        self.storage_path = storage_path
        self.embeddings = embeddings
        self.vectorstore = None

    def _index_exists(self) -> bool:
        """
        Returns:
            bool: True if the index exists, False otherwise.
        """
        return os.path.exists(os.path.join(self.storage_path, "index.faiss"))

    def _load_or_create_index(self):
        if self._index_exists():
            print("Loading existing FAISS index...")
            self.vectorstore = FAISS.load_local(self.storage_path, self.embeddings, allow_dangerous_deserialization=True)
        else:
            print("Creating new FAISS index...")
            self.vectorstore = FAISS.from_texts([""], self.embeddings, metadatas=[{}])
            self.save_index()

    def load_index(self):
        """
        Raises:
            ValueError: If the index does not exist.
        """
        if not self._index_exists():
            raise ValueError("FAISS index does not exist at the specified storage path.")

        print("Loading existing FAISS index...")
        self.vectorstore = FAISS.load_local(self.storage_path, self.embeddings, allow_dangerous_deserialization=True)

    def get_index(self):
      self.load_index()
      return self.vectorstore

    def add_documents(self, documents: list[Document]):
        """
        Args:
            documents (list[Document]): List of Document objects to add to the index.
        """
        if self.vectorstore is None:
            self._load_or_create_index()

        print(f"Adding {len(documents)} documents to the FAISS index...")
        self.vectorstore.add_documents(documents)
        return self.vectorstore

    def save_index(self):
        if self.vectorstore is None:
            raise ValueError(
                "FAISS index not initialized. Add documents first.")

        print("Saving FAISS index...")
        self.vectorstore.save_local(self.storage_path)

    def query(self, query: str, k: int = 5) -> list[Document]:
        """
        Args:
            query (str): The query string.
            k (int): Number of documents to retrieve.

        Returns:
            list[Document]: List of Document objects most similar to the query.
        """
        if self.vectorstore is None:
            raise ValueError(
                "FAISS index not initialized. Add documents first.")

        print(f"Querying FAISS index for: '{query}'")
        return self.vectorstore.similarity_search(query, k=k)

##### Create a RAG pipeline

### RAG pipeline

In [112]:
class MyRAGPipeline:
    def __init__(self, create_contextual_rag = False, vectorstore_path = "./storage/faiss_local"):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=800,
            chunk_overlap=100,
        )
        #self.embeddings = OpenAIEmbeddings()

        self.vectorstore_path = vectorstore_path
        self.create_contextual_rag = create_contextual_rag
        model_name = "BAAI/bge-large-en-v1.5"
        model_kwargs = {'device': 'cpu'}
        encode_kwargs = {'normalize_embeddings': False}
        self.embeddings = HuggingFaceEmbeddings(
            model_name=model_name,
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs
        )
        # self.llm = ChatOpenAI(
        #     model="gpt-4o",
        #     temperature=0,
        #     max_tokens=None,
        #     timeout=None,
        #     max_retries=2,
        # )

        # self.llm = ChatGroq(
        #     model="llama-3.2-3b-preview",
        #     temperature=0,
        #     max_tokens=None,
        #     timeout=None,
        #     max_retries=2,
        # )

        self.llm = ChatGroq(
            model="mixtral-8x7b-32768",
            temperature=0,
            max_tokens=None,
            timeout=None,
            max_retries=2,
        )
        self.storage_class: MyFAISSStorage = MyFAISSStorage(self.vectorstore_path, self.embeddings)
        self.tokenizer = AutoTokenizer.from_pretrained("unsloth/Llama-3.2-3B-Instruct")
        # self.tokenizer = AutoTokenizer.from_pretrained("mistralai/Mixtral-8x7B-v0.1")
        # self.tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1")

    
    def process_document(self, document: str) -> Tuple[List[Document], List[Document]]:
        chunks = self.text_splitter.create_documents([document])
        contextualized_chunks = self._generate_contextualized_chunks(document, chunks)
        return chunks, contextualized_chunks

    def split_document(self, document: str, max_retries: int = 1, delay: int = 60) -> Tuple[List[Document], List[Document]]:
        chunks = self.text_splitter.split_documents(document)
        print(f"Total number of chunks in document: {len(chunks)}")
        contextualized_chunks = []
        if self.create_contextual_rag:
          contextualized_chunks = self._generate_contextualized_chunks(document, chunks, max_retries, delay)
        return chunks, contextualized_chunks

    def _generate_contextualized_chunks(self, document: str, chunks: List[Document], max_retries: int = 1, delay: int = 60) -> List[Document]:
        contextualized_chunks = []
        for chunk in chunks:
            retries = 0
            while retries <= max_retries:
                try:
                    context = self._generate_context(document, chunk.page_content)
                    contextualized_content = f"{context}\n\n{chunk.page_content}"
                    contextualized_chunks.append(Document(page_content=contextualized_content, metadata=chunk.metadata))
                    break
                except Exception as e:
                    if "rate limit" in str(e).lower() or "exceeded" in str(e).lower() or "quota" in str(e).lower():
                        retries += 1
                        if retries > max_retries:
                            print(f"Max retries ({max_retries}) exceeded for chunk: {chunk.page_content[:50]}...")
                            raise e
                        delay_with_randomness = delay + random.random()
                        print(f"Rate limit error: {e}. Retrying chunk in {delay_with_randomness:.2f} seconds...")
                        time.sleep(delay_with_randomness)
                    else:
                        print(f"Error processing chunk: {chunk.page_content[:50]}... Error: {e}")
                        raise e
        return contextualized_chunks

    def _generate_context(self, document: str, chunk: str) -> str:
        relevant_document = self._extract_relevant_part(document, chunk)

        print(f"Length of the relevant document: {len(relevant_document)}")

        prompt = ChatPromptTemplate.from_template("""
        You are an AI assistant specializing in document analysis. Your task is to provide brief, relevant context for a chunk of text from the whitepaper report.
        Here is the whitepaper:
        <document>
        {document}
        </document>

        Here is the chunk we want to situate within the whole document::
        <chunk>
        {chunk}
        </chunk>

        Provide a concise context (2-3 sentences) for this chunk, considering the following guidelines:
        Please give a short succinct context to situate this chunk within the overall document for the purposes of improving search retrieval of the chunk. Answer only with the succinct context and nothing else.

        Context:
        """)
        messages = prompt.format_messages(document=relevant_document, chunk=chunk)
        response = self.llm.invoke(messages)
        return response.content

    def _extract_relevant_part(self, document: List[Document], chunk: str) -> str:
        """
        Extract a relevant part of the document for context generation.
        This reduces the number of tokens sent to the LLM.
        """
        # Find the position of the chunk in the document
        chunk_start = document[0].page_content.find(chunk)
        chunk_end = chunk_start + len(chunk)
        
        # Extract a window of text around the chunk (e.g., 2048 characters before and after)
        window_size = 2048
        start = max(0, chunk_start - window_size)
        end = min(len(document[0].page_content), chunk_end + window_size)
        
        return document[0].page_content[start:end]

    def _extract_relevant_part_with_maxtokens(self, document: List[Document], chunk: str, max_tokens: int = 2048) -> str:
        chunk_tokens = len(self.tokenizer.encode(chunk))
        
        # Calculate the maximum allowed tokens for context
        max_context_tokens = max_tokens - chunk_tokens
        
        # Estimate the number of characters per token (average is ~4 characters per token)
        chars_per_token = 4
        max_context_chars = max_context_tokens * chars_per_token
        
        # Find the position of the chunk in the document
        chunk_start = document[0].page_content.find(chunk)
        chunk_end = chunk_start + len(chunk)
        
        # Extract a window of text around the chunk
        window_size = min(max_context_chars, len(document[0].page_content))
        start = max(0, chunk_start - window_size // 2)
        end = min(len(document[0].page_content), chunk_end + window_size // 2)
        
        return document[0].page_content[start:end]

    def create_inmemory_vectorstores(self, chunks: List[Document]) -> FAISS:
        return FAISS.from_documents(chunks, self.embeddings)

    def create_vectorstores(self, chunks: List[Document]) -> FAISS:
        return self.storage_class.add_documents(chunks)

    def get_vectorstores(self):
        return self.storage_class.get_index()

    def save_vectorstores(self):
        self.storage_class.save_index()

    def create_bm25_index(self, chunks: List[Document]) -> BM25Okapi:
        tokenized_chunks = [chunk.page_content.split() for chunk in chunks]
        return BM25Okapi(tokenized_chunks)
    
    def create_flashrank_index(self,vectorstore):
        retriever = vectorstore.as_retriever(search_kwargs={"k":20})
        compression_retriever = ContextualCompressionRetriever(base_compressor=FlashrankRerank(), base_retriever=retriever)
        return compression_retriever

    def create_bm25_retriever(self, chunks: List[Document]) -> BM25Retriever:
        bm25_retriever = BM25Retriever.from_documents(chunks)
        return bm25_retriever
    
    def create_ensemble_retriever_reranker(self, vectorstore, bm25_retriever) -> EnsembleRetriever:
        retriever_vs = vectorstore.as_retriever(search_kwargs={"k":20})
        bm25_retriever.k =10
        ensemble_retriever = EnsembleRetriever(
            retrievers=[retriever_vs, bm25_retriever],
            weights=[0.5, 0.5]
        )
        redundant_filter = EmbeddingsRedundantFilter(embeddings=self.embeddings)
        reranker = FlashrankRerank()
        pipeline_compressor = DocumentCompressorPipeline(transformers=[redundant_filter, reranker])
        compression_pipeline = ContextualCompressionRetriever(base_compressor=pipeline_compressor,
                                                      base_retriever=ensemble_retriever)
        return compression_pipeline

    @staticmethod
    def generate_cache_key(document: str) -> str:
        """
        Generate a cache key for a document.
        """
        return hashlib.md5(document.encode()).hexdigest()

    def generate_answer(self, query: str, relevant_chunks: List[str]) -> str:
        prompt = ChatPromptTemplate.from_template("""
        Based on the following information, please provide a concise and accurate answer to the question.
        If the information is not sufficient to answer the question, say so.

        Question: {query}

        Relevant information:
        {chunks}

        Answer:
        """)
        messages = prompt.format_messages(query=query, chunks="\n\n".join(relevant_chunks))
        response = self.llm.invoke(messages)
        return response.content

In [128]:
class EnsembleRetrieverReranker:
    def __init__(self, embeddings):
        """
        Args:
            embeddings: Embeddings model (e.g., OpenAIEmbeddings, SentenceTransformerEmbeddings).
        """
        self.embeddings = embeddings

    def _convert_numpy_to_float(self, obj):
        """
        Recursively converts numpy.float32 values in a dictionary, list, or other object to native Python float.

        Args:
            obj: A dictionary, list, or other object.

        Returns:
            The same object with numpy.float32 values converted to float.
        """
        if isinstance(obj, dict):
            return {key: self._convert_numpy_to_float(value) for key, value in obj.items()}
        elif isinstance(obj, list):
            return [self._convert_numpy_to_float(item) for item in obj]
        elif isinstance(obj, np.float32):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            # Convert numpy arrays to lists of floats
            return obj.astype(float).tolist()
        else:
            return obj

    def _process_document(self, doc):
        """
        Processes a Document object to ensure all numpy.float32 values are converted to float.

        Args:
            doc: A Document object.

        Returns:
            Document: A processed Document object.
        """
        # Convert metadata
        if hasattr(doc, "metadata") and doc.metadata:
            doc.metadata = self._convert_numpy_to_float(doc.metadata)

        # Convert embeddings (if present)
        if hasattr(doc, "embedding") and isinstance(doc.embedding, (np.ndarray, np.float32)):
            doc.embedding = self._convert_numpy_to_float(doc.embedding)

        return doc

    def create_ensemble_retriever_reranker(self, vectorstore, bm25_retriever):
        """
        Creates an ensemble retriever with a reranker.

        Args:
            vectorstore: Initialized FAISS vectorstore.
            bm25_retriever: BM25 retriever to combine with the FAISS retriever.

        Returns:
            ContextualCompressionRetriever: An ensemble retriever with a reranker.
        """
        if vectorstore is None:
            raise ValueError(
                "FAISS vectorstore is not initialized. Ensure the vectorstore is loaded or created.")

        # Ensure the vectorstore is initialized
        retriever_vs = vectorstore.as_retriever(search_kwargs={"k": 20})
        bm25_retriever.k = 10

        # Create ensemble retriever
        ensemble_retriever = EnsembleRetriever(
            retrievers=[retriever_vs, bm25_retriever],
            weights=[0.5, 0.5]
        )

        # Add reranker and compression pipeline
        redundant_filter = EmbeddingsRedundantFilter(
            embeddings=self.embeddings)
        reranker = FlashrankRerank()
        pipeline_compressor = DocumentCompressorPipeline(
            transformers=[redundant_filter, reranker])
        compression_pipeline = ContextualCompressionRetriever(
            base_compressor=pipeline_compressor,
            base_retriever=ensemble_retriever
        )

        return compression_pipeline

    def invoke(self, query, compression_pipeline):
        """
        Invokes the ensemble retriever with reranker and preprocesses the results.

        Args:
            query: The query string.
            compression_pipeline: The ensemble retriever with reranker.

        Returns:
            list[Document]: List of Document objects with all numpy.float32 values converted to float.
        """
        # Use the `invoke` method to retrieve documents
        docs = compression_pipeline.invoke(query)

        # Process each document to ensure all numpy.float32 values are converted to float
        processed_docs = [self._process_document(doc) for doc in docs]

        return processed_docs

In [8]:
from llama_parse import LlamaParse
from langchain_community.document_loaders import UnstructuredMarkdownLoader

instruction = """The provided document is a PDF file containing structured and unstructured content.
It may include financial information, tables, management discussions, and analyses.
Try to capture the essence of the document, including text, tables, and key highlights.
Be precise and ensure data integrity while processing."""


async def parse_pdf(file_path: str):
  parser = LlamaParse(
      result_type="markdown",
      parsing_instruction=instruction,
      max_timeout=5000,
  )
  return await parser.aload_data(file_path)


async def load_and_combine_documents(folder_path: str, output_folder: str):
  for filename in os.listdir(folder_path):
    combined_content = ""
    file_path = os.path.join(folder_path, filename)
    if filename.endswith('.pdf'):
        print(f"Parsing {filename}...")
        parsed_data = await parse_pdf(file_path)
        combined_content += f"# Document: {filename}\n\n{parsed_data}\n\n"
    else:
        print(f"Unsupported file type: {filename}")
    output_file = output_folder + "/" + os.path.splitext(filename)[0] + ".md"
    with open(output_file, "w", encoding="utf-8") as md_file:
        md_file.write(combined_content)
    print(f"All documents combined into {output_file}")


def read_markdown_with_loader(folder_path: str):
  documents = []
  for filename in os.listdir(folder_path):
    file_path = os.path.join(folder_path, filename)
    if filename.endswith('.md'):
      loader = UnstructuredMarkdownLoader(file_path)
      documents.append(loader.load())
  return documents

##### Insatiate RAG Pipeline

In [9]:
folder_path = "./dataset/source"
output_folder = "./dataset/converted_md"
await load_and_combine_documents(folder_path, output_folder)

Unsupported file type: .DS_Store
All documents combined into ./dataset/converted_md/.DS_Store.md
Parsing Diabetes-Whitepaper.pdf...
Started parsing the file under job_id ae7bb38a-5c41-4700-9d7a-9b20d278e080
All documents combined into ./dataset/converted_md/Diabetes-Whitepaper.md
Parsing Erratum_jand.pdf...
Started parsing the file under job_id 7fdf4a08-cf47-4e37-9512-cfaae0f03fea
All documents combined into ./dataset/converted_md/Erratum_jand.md


In [75]:
documents = read_markdown_with_loader(output_folder)

In [113]:
create_contextual_rag = False
my_rag = MyRAGPipeline(create_contextual_rag)

##### Sample Document

##### Process the document

In [12]:
tokenizer = AutoTokenizer.from_pretrained("unsloth/Llama-3.2-3B-Instruct")
tpm_limit = 7000

In [79]:
def get_document_chunks(document, max_retries=1, delay=60):
    details = {}
    source_document = document[0].metadata["source"]
    details["source"] = source_document
    details["original_chunks"], details["contextualized_chunks"] = my_rag.split_document(
        document, max_retries, delay)
    return details

In [84]:
index_manager = WhooshIndexManager(index_dir="./storage/bm25_index")

Created directory: ./storage/bm25_index


In [None]:
def is_within_token_limit(document, current_tokens, tpm_limit):
    tokens = len(tokenizer.encode(str(document)))
    current_tokens += tokens

    if current_tokens >= tpm_limit:
        return False

    return True

In [80]:
def process_document(document, retry_document_processing, current_tokens, delay=60):
  try:
    source_document = document[0].metadata["source"]
    print(f"Processing document: {source_document} with currrent tokens: {current_tokens}")
    details = get_document_chunks(document, 1, delay)

    if bool(details):
        my_rag.create_vectorstores(details["original_chunks"])
        index_manager.add_documents(details["original_chunks"])
        if create_contextual_rag:
            contextualized_vectorstore = my_rag.create_vectorstores(details["contextualized_chunks"])
            index_manager.add_documents(details["contextualized_chunks"])

        # bm25_retriever_original = my_rag.create_bm25_retriever(details["original_chunks"])
        # bm25_retriever_contextualized = my_rag.create_bm25_retriever(details["contextualized_chunks"])
        # original_ensemble_retriever_reranker = my_rag.create_ensemble_retriever_reranker(original_vectorstore, bm25_retriever_original)
        # contextualized_ensemble_retriever_reranker = my_rag.create_ensemble_retriever_reranker(contextualized_vectorstore, bm25_retriever_contextualized)

    if create_contextual_rag and not is_within_token_limit(details, current_tokens, tpm_limit):
        print(f"TPM limit reached, current tokens are: {current_tokens}. Waiting for {delay} seconds...")
        time.sleep(delay)

    return current_tokens

  except Exception as e:
      print(f"Failed to process document: {source_document}. Error: {e}")
      if delay > 60: # Hack: it's retrying, don't append again
        retry_document_processing.append(document)

      return current_tokens

In [85]:
retry_document_processing = []
current_tokens = 0

for document in documents:
  current_tokens = process_document(document, retry_document_processing, current_tokens, 22)

# Retry processing failed documents
if retry_document_processing:
    print("Retrying failed documents...")
    for document in retry_document_processing:
        current_tokens = process_document(document, retry_document_processing, current_tokens, 120)

Processing document: ./dataset/converted_md/Erratum_jand.md with currrent tokens: 0
Total number of chunks in document: 10
Creating new FAISS index...
Saving FAISS index...
Adding 10 documents to the FAISS index...
Added 10 documents to the index.
Processing document: ./dataset/converted_md/Table-of-Contents_jand.md with currrent tokens: 2168
Total number of chunks in document: 12
Adding 12 documents to the FAISS index...
Added 12 documents to the index.
Processing document: ./dataset/converted_md/Diabetes-Whitepaper.md with currrent tokens: 4970
Total number of chunks in document: 5
Adding 5 documents to the FAISS index...
Added 5 documents to the index.
Processing document: ./dataset/converted_md/July-2018-People-&amp;-Events_jand.md with currrent tokens: 5863
Total number of chunks in document: 5
Adding 5 documents to the FAISS index...
Added 5 documents to the index.
Processing document: ./dataset/converted_md/Inuit-Country-Food-Diet-Pattern-Is-Associated-with.md with currrent toke

In [86]:
my_rag.save_vectorstores()

Saving FAISS index...


In [None]:
# original_reranker = cr.create_flashrank_index(original_vectorstore)
# contextualized_reranker = cr.create_flashrank_index(contextualized_vectorstore)

##### Create retriver system with hybrid search with Reranker

In [116]:
index_manager = WhooshIndexManager(index_dir="./storage/bm25_index")

bm25_retriever = WhooshRetriever(index_manager=index_manager, k=10)
vectorstore = my_rag.get_vectorstores()
ensemble_retriever_reranker = my_rag.create_ensemble_retriever_reranker(vectorstore, bm25_retriever)

# # Retrieve documents based on a query
# query = "second document"
# results = bm25_retriever._get_relevant_documents(query)
# for result in results:
#     print(result.page_content)

Loading existing index from: ./storage/bm25_index
Loading existing FAISS index...


In [117]:
ensemble_retriever_reranker.invoke("What is ViDoRe?")

ERROR:langsmith._internal._serde:Failed to use model_dump to serialize <class 'langchain_core.documents.base.Document'> to JSON: PydanticSerializationError(Unable to serialize unknown type: <class 'numpy.float32'>)
ERROR:langsmith._internal._serde:Failed to use model_dump to serialize <class 'langchain_core.documents.base.Document'> to JSON: PydanticSerializationError(Unable to serialize unknown type: <class 'numpy.float32'>)
ERROR:langsmith._internal._serde:Failed to use model_dump to serialize <class 'langchain_core.documents.base.Document'> to JSON: PydanticSerializationError(Unable to serialize unknown type: <class 'numpy.float32'>)


[Document(metadata={'id': 5, 'relevance_score': 0.9874792, 'source': './dataset/converted_md/ColPali_2407.01449v3.md'}, page_content='and textual representations using contrastive losses, though OCR capabilities may be limited.\\n- Visually Rich Document Understanding: Models that encode text alongside visual features to enhance understanding.\\n- PaliGemma Model: A model that combines text and image embeddings for improved performance in tasks like Visual Question Answering and document understanding.\\n\\n5. ViDoRe Benchmark:\\n- A new benchmark designed to evaluate document retrieval systems at the page level, focusing on various modalities (text, figures, tables) and thematic domains (e.g., medical).\\n\\n### Structured Content:\\n\\n#### 1. Neural Retrievers\\n- Bi-encoder Models:\\n- Independent document mapping to dense vector space.\\n- Fast cosine distance computation for query matching.\\n\\n- Cross-encoder Systems:\\n-'),
 Document(metadata={'id': 0, 'relevance_score': 0.986

In [129]:
model_name = "BAAI/bge-large-en-v1.5"
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': False}
embeddings = HuggingFaceEmbeddings(
    model_name=model_name,
    model_kwargs=model_kwargs,
    encode_kwargs=encode_kwargs
)
ensemble_reranker = EnsembleRetrieverReranker(embeddings=embeddings)

INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: BAAI/bge-large-en-v1.5


In [130]:
compression_pipeline = ensemble_reranker.create_ensemble_retriever_reranker(vectorstore, bm25_retriever)

In [131]:
query = "What is ViDoRe?"
results = ensemble_reranker.invoke(query, compression_pipeline)

for doc in results:
    print(doc.page_content)
    print(doc.metadata)

ERROR:langsmith._internal._serde:Failed to use model_dump to serialize <class 'langchain_core.documents.base.Document'> to JSON: PydanticSerializationError(Unable to serialize unknown type: <class 'numpy.float32'>)
ERROR:langsmith._internal._serde:Failed to use model_dump to serialize <class 'langchain_core.documents.base.Document'> to JSON: PydanticSerializationError(Unable to serialize unknown type: <class 'numpy.float32'>)
ERROR:langsmith._internal._serde:Failed to use model_dump to serialize <class 'langchain_core.documents.base.Document'> to JSON: PydanticSerializationError(Unable to serialize unknown type: <class 'numpy.float32'>)


and textual representations using contrastive losses, though OCR capabilities may be limited.\n- Visually Rich Document Understanding: Models that encode text alongside visual features to enhance understanding.\n- PaliGemma Model: A model that combines text and image embeddings for improved performance in tasks like Visual Question Answering and document understanding.\n\n5. ViDoRe Benchmark:\n- A new benchmark designed to evaluate document retrieval systems at the page level, focusing on various modalities (text, figures, tables) and thematic domains (e.g., medical).\n\n### Structured Content:\n\n#### 1. Neural Retrievers\n- Bi-encoder Models:\n- Independent document mapping to dense vector space.\n- Fast cosine distance computation for query matching.\n\n- Cross-encoder Systems:\n-
{'id': 5, 'relevance_score': 0.9874792098999023, 'source': './dataset/converted_md/ColPali_2407.01449v3.md'}
late interaction engines, many popular frameworks lack native multi-vector support, necessitatin