# RAG vectorstore

When splitting documents for retrieval, there are often conflicting desires:

1. You may want to keep documents small, ensuring that their embeddings accurately represent their meaning. If they become too long, the embeddings can lose their meaning.
2. You also want to maintain documents long enough to retain the context of each chunk.

The `RAGVectorStore` strikes a balance by splitting and storing small chunks and different variations of data. During retrieval, it initially retrieves the small chunks but then looks up the parent IDs for those chunks and returns the larger documents.

The challenge lies in correctly managing the lifecycle of the three levels of documents:
- Original documents
- Chunks extracted from the original documents
- Transformations of chunks to generate more vectors for improved retrieval

The `RAGVectorStore`, in combination with other components, is designed to address this challenge.

In [31]:
#!pip install 'langchain-parent' openai tiktokena
!poetry install -q  # FIXME

For the sample, we are using the set of documents from Wikipedia.
We would like to answer questions related to mathematics.

In [32]:
query = "What is the difference between pure and applied mathematics?"

In [33]:
from typing import List
from typing import Union

from chromadb import Documents
%load_ext autoreload
%autoreload 2

import tempfile
import logging

nb_documents_to_import = 3  # How many documents should be imported from Wikipedia?
top_k = 3  # How many documents should be selected to answer the question?

ROOT_PATH = tempfile._gettempdir() + "/rag"

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [34]:
# Activate logging and prints
logging.getLogger().setLevel(logging.INFO)


def pretty_print_docs(docs: Union[str, List[Documents]], metadatas=[], kind: str = "Variations"):
    def print_metadata(d):
        s = ",\n".join([f"{metadata}={repr(d.metadata.get(metadata))}" for metadata in metadatas])
        if s:
            return f'\n\033[92m{s}\033[0m'
        return ""

    def print_doc(d, i):
        r = f"\033[94m{kind} {i + 1}:\n{d.page_content[:80]}"
        if len(d.page_content) > 80:
            r += f"...[:{max(0, len(d.page_content) - 80)}]"
        r += f'\033[0m{print_metadata(d)}'
        return r

    if type(docs) is list:
        print(
            f"\n{'-' * 40}\n".join(
                [print_doc(d, i)
                 for i, d in enumerate(docs)]
            )
        )
    else:
        print(f'\033[92m{docs}\033[0m')

In [35]:
# %% Set debug and trace
from langchain.callbacks import StdOutCallbackHandler
from typing import *

from langchain.globals import set_debug, set_verbose

set_debug(False)
set_verbose(False)
if False:
    VERBOSE_INPUT = True
    VERBOSE_OUTPUT = True


    class ExStdOutCallbackHandler(StdOutCallbackHandler):
        def on_text(
                self,
                text: str,
                color: Optional[str] = None,
                end: str = "",
                **kwargs: Any,
        ) -> None:
            if VERBOSE_INPUT:
                print("====")
                super().on_text(text=text, color=color, end=end)

        def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
            """Ajoute une trace des outputs du llm"""
            if VERBOSE_OUTPUT:
                print("\n\033[1m> Finished chain with\033[0m")
                knows_keys = {
                    "answer",
                    "output_text",
                    "text",
                    "result",
                    "outputs",
                    "output",
                }
                if "outputs" in outputs:
                    print("\n\033[33m")
                    print(
                        "\n---\n".join(
                            [text["text"].strip() for text in outputs["outputs"]]
                        )
                    )
                    print("\n\033[0m")
                elif knows_keys.intersection(outputs):
                    # Prend la première cles en intersection
                    print(
                        f"\n\033[33m{outputs[next(iter(knows_keys.intersection(outputs)))]}\n\033[0m"
                    )
                else:
                    pass


    CALLBACKS = [ExStdOutCallbackHandler()]
else:
    CALLBACKS = []

In [36]:
!pip install --quiet --upgrade pip langchain wikipedia
from langchain.retrievers import WikipediaRetriever

documents = WikipediaRetriever(top_k_results=nb_documents_to_import).get_relevant_documents("mathematic")
pretty_print_docs(documents, kind="Documents")

[94mDocuments 1:
Mathematics is an area of knowledge that includes the topics of numbers, formula...[:3920][0m
----------------------------------------
[94mDocuments 2:
The history of mathematics deals with the origin of discoveries in mathematics a...[:3920][0m
----------------------------------------
[94mDocuments 3:
Mathematical Reviews is a journal published by the American Mathematical Society...[:3920][0m


# Select provider
## Select the LLM
Before starting, we need to:
- Set the environment variables
- Choose a language model (LLM), determine the context size, and set the maximum number of tokens for generation
- Enable all caches

In [37]:
import os
from dotenv import load_dotenv

load_dotenv(override=True)
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = "XXXXX"
if "COHERE_API_KEY" not in os.environ:
    os.environ["COHERE_API_KEY"] = "XXXX"

In [38]:
!pip install --quiet openai tiktoken
from langchain.llms import OpenAI

context_size = 512  # For the demonstration use a smal context_size.
max_tokens = int(context_size * (10 / 100))  # 10% for the response
max_input_tokens = context_size - max_tokens
llm = OpenAI(
    temperature=0.5,
    max_tokens=max_tokens,
)
context_size, max_tokens, max_input_tokens

(512, 51, 461)

In [39]:
# Add a cache
from langchain.cache import SQLiteCache
import langchain

LANCHAIN_CACHE_PATH = ROOT_PATH + "/cache_llm"
langchain.llm_cache = SQLiteCache(database_path=LANCHAIN_CACHE_PATH)


## Select the embedding implementation

In [40]:
from langchain.embeddings import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()

In [41]:
# Add a cache
CACHE_EMBEDDING_PATH = ROOT_PATH + "/cache_embedding"
from langchain.storage import LocalFileStore

fs = LocalFileStore(CACHE_EMBEDDING_PATH)

from langchain.embeddings import CacheBackedEmbeddings

embeddings = CacheBackedEmbeddings.from_bytes_store(
    embeddings, fs, namespace=embeddings.model if hasattr(embeddings, "model") else "unknown"
)

# Transform documents
The idea is to transform a document into multiple versions and calculate a vector for each one.

In [42]:
from langchain.text_splitter import *
from langchain_parent.document_transformers import *

The first step is to split the document to ensure compatibility with the `max_input_tokens`.

In [43]:
parent_transformer = TokenTextSplitter(
    chunk_size=max_input_tokens,
    chunk_overlap=0
)

Let's test the transformation.

In [44]:
chunk_documents = parent_transformer.transform_documents(documents)
f"before:{len(documents)} documents, after:{len(chunk_documents)} chunks"

'before:3 documents, after:6 chunks'

We need multiple variations for each chunk.

In [45]:
chunk_transformer = DocumentTransformers(
    transformers=[
        GenerateQuestionsTransformer.from_llm(llm),
        SummarizeTransformer.from_llm(llm),
        CopyDocumentTransformer(),
    ]
)

Note that we require all transformations for each chunk, including the original chunk. This is why we include the `CopyDocumentTransformer()`.

Now, let's test the transformation.

In [46]:
variations_of_chunks = chunk_transformer.transform_documents(chunk_documents[:1])
# Select the variations for the first chunk
pretty_print_docs(variations_of_chunks)



[94mVariations 1:
What are the major subdisciplines of mathematics?[0m
----------------------------------------
[94mVariations 2:
What is the foundational crisis of mathematics?[0m
----------------------------------------
[94mVariations 3:
How has the interaction between mathematical innovations and scientific discover...[:55][0m
----------------------------------------
[94mVariations 4:
SUMMARY:
Mathematics is an area of knowledge that covers topics such as numbers,...[:185][0m
----------------------------------------
[94mVariations 5:
Mathematics is an area of knowledge that includes the topics of numbers, formula...[:2472][0m


# Saving all Variations in a Vector Store
Now, our goal is to store the chunks and their respective variations in a vector store. During retrieval, the process begins by fetching the smaller chunks but then involves looking up the parent IDs for those chunks and returning the original chunk.

A specialized vector store is designed for this purpose: the `RAGVectorStore`.
It's not a standalone vector store but rather a wrapper for another vector store. When you add a document, the document undergoes transformation with the `parent_transformer`, and each chunk is enriched with various versions through the `chunk_transformer`.

## Build step by step
First, we need to create some persistent components:
- A standard vector store
- A document store to store each original chunk returned by the retriever and the relationship between the document and chunks.

In [47]:
from langchain.vectorstores import Chroma

VS_PATH = ROOT_PATH + "/vs"
chroma_vectorstore = Chroma(
    collection_name="all_variations_of_chunks",
    embedding_function=embeddings,
    persist_directory=VS_PATH,

)

In [48]:
DOCSTORE_PATH = ROOT_PATH + "/chunks"
from langchain.storage import EncoderBackedStore
from langchain.storage import LocalFileStore
import pickle

docstore = EncoderBackedStore[str, Document](
    store=LocalFileStore(root_path=DOCSTORE_PATH),
    key_encoder=lambda x: x,
    value_serializer=pickle.dumps,
    value_deserializer=pickle.loads
)

All documents must have a unique ID in their metadata. 
Then, it's possible to use the advanced `RAGVectorStore`. 
It's a wrapper around a standard vector store, specialized for managing different transformations and the lifecycle of documents.

In [49]:
from langchain_parent.vectorstores import RAGVectorStore

vectorstore = RAGVectorStore(
    vectorstore=chroma_vectorstore,
    docstore=docstore,
    source_id_key="source",  # Uniq id of documents
    parent_transformer=parent_transformer,
    chunk_transformer=chunk_transformer,
)

Now, it's time to add documents to this vector store.
- If the `parent_transformer` is set, the document is transformed into a new list of chunk documents (generally, this is a split phase).
- Then, if the `chunk_transformer` is set, each chunk document is transformed to generate some variations.
- Each transformation of all chunks is added to the destination vector store (in this case, it's referred to as "chroma").
- All chunks are saved in the DocStore with the list of all associated variations.
- All IDs of chunks generated for each document are saved in the doc store. This makes it possible to remove the document and all associated chunks when needed.

In [50]:
ids = vectorstore.add_documents(documents)
ids



OperationalError: attempt to write a readonly database

While conducting the search, an embedding is computed for the query and subsequently compared to the embeddings of all the transformed chunks. The metadata for each transformed chunk contains a reference to the ID of the original chunk, allowing for the retrieval of the respective chunk.

In [21]:
pretty_print_docs(vectorstore.search(query=query, search_type="similarity"), ["_chunk_id"])

[94mVariations 1:
Mathematics is an area of knowledge that includes the topics of numbers, formula...[:2472][0m
[92m_chunk_id='c2070637-451a-414c-a6df-b591c03fbc59'[0m


The IDs returned by `add_documents()` consist of a list of `document IDs`. You can utilize these IDs to remove all related chunks and variations.

When you examine the langchain API, you may wonder where to store the document IDs from the vector store.

# Index Vector Store
To manage the lifecycle of the documents in the vector store, you can utilize an `index()`.
A `RecordManager` can keep track of the evolution of each document. Use `index()` to import the documents.

In [22]:
from langchain.indexes import index, SQLRecordManager

record_manager = SQLRecordManager(
    namespace="record_manager_cache",
    db_url=f"sqlite:///{ROOT_PATH}/record_manager.db"
)
record_manager.create_schema()

In [23]:
# Save all the information in:
# - record manager
# - docstore
# - vectorstore
index(
    docs_source=documents,
    record_manager=record_manager,
    vector_store=vectorstore,
    cleanup="incremental",
    source_id_key="source",
)



{'num_added': 2, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 0}

## Alternative factory
To simplify the creation of the persistance ecosystem, you can use the `from_vs_in_memory` method for in-memory usage only, and `from_vs_in_sql` for usage with SQL.

In [25]:
vectorstore, index_kwargs = RAGVectorStore.from_vs_in_memory(
    vectorstore=chroma_vectorstore,
    parent_transformer=parent_transformer,
    chunk_transformer=chunk_transformer,
    source_id_key="source",
)
index(
    docs_source=documents,
    cleanup="incremental",
    **index_kwargs  
)



{'num_added': 3, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

In [26]:
vectorstore, index_kwargs = RAGVectorStore.from_vs_in_sql(
    vectorstore=chroma_vectorstore,
    parent_transformer=parent_transformer,
    chunk_transformer=chunk_transformer,
    source_id_key="source",
    db_url=f"sqlite:///{ROOT_PATH}/record_manager.db",
)
index(
    docs_source=documents,
    cleanup="incremental",
    **index_kwargs  
)

{'num_added': 0, 'num_updated': 0, 'num_skipped': 3, 'num_deleted': 0}

If you import the same documents, you will notice that all documents are skipped. Without using `index()`, the same document will be present twice. This has the same effect as dividing the `top_k` by two during the search!

In [27]:
index(
    docs_source=documents,
    cleanup="incremental",
    **index_kwargs  
)

{'num_added': 0, 'num_updated': 0, 'num_skipped': 3, 'num_deleted': 0}

If your document is changed, the previous version is deleted.

In [28]:
documents[0].page_content += " Is changed."
index(
    docs_source=documents,  # PPR: on peut y placer un loader
    cleanup="incremental",
    **index_kwargs
)



{'num_added': 1, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 1}

To delete the old records, use the `full` strategy.

In [29]:
del documents[0]
index(
    docs_source=documents,  # PPR: on peut y placer un loader
    cleanup="full",
    **index_kwargs
)

{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 1}

It's important to note that there are three ways to save parts of the data:

- In the *vector store*: this includes the bucket, metadata, and the associated embedding vectors.
- In the *doc store*: this covers the original bucket and the relationship between parent and chunks before the *chunk transformations*.
- In the *SQLRecordManager*: this involves the references of the parent document or chunks.

Each source does not manage transactions. If a problem occurs while adding a document, it is highly likely that the sources will be inconsistent.

# Use advanced retrievers
Just like with the standard vector store, you can convert the `RAGVectorStore` into a `Retriever`.

In [None]:
retriever = vectorstore.as_retriever()
selected_chunks = retriever.get_relevant_documents(query)
len(selected_chunks)

### Specialized Retrievers
It's possible to combine multiple retrievers or use specialized retrievers for advanced applications.

The `SelfQueryRetriever` can generate a metadata filter. We use it to provide the option to filter the chunks by the title of the original document.

In [None]:
from langchain.chains.query_constructor.schema import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever

metadata_field_info = [
    AttributeInfo(
        name="title",
        description="The title of the document.",
        type="string",
    ),
]
document_content_description = "Documents on mathematics"
self_retriever = SelfQueryRetriever.from_llm(
    llm,
    vectorstore,
    document_content_description,
    metadata_field_info,
    verbose=True)

pretty_print_docs(self_retriever.get_relevant_documents("In the document 'History of mathematics', " + query),
                  ["title"])

It's possible to use it with the variations, but you must directly use the `chroma_vectorstore`.

In [None]:
from langchain.chains.query_constructor.schema import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever

metadata_field_info = [
    AttributeInfo(
        name="title",
        description="The title of the document.",
        type="string",
    ),
    AttributeInfo(
        name="transformer",
        description="The transformations of the documents. "
                    "Must be GenerateQuestionsTransformer or SummarizeTransformer.",
        type="string",
    ),
]
document_content_description = "documents on mathematics"
chroma_retriever = SelfQueryRetriever.from_llm(
    llm,
    chroma_vectorstore,  # In this case, use the chroma vectorstore, to retrieve the variations
    document_content_description,
    metadata_field_info,
    verbose=True)

pretty_print_docs(chroma_retriever.get_relevant_documents("Sumarize of 'History of mathematic"),
                  ["transformer", "title"])

With filter, we can obtain a retriever specialized in summaries.

In [None]:
summary_retriever = chroma_vectorstore.as_retriever(
    search_kwargs={"filter": {"transformer": {"$eq": "SummarizeTransformer"}}})
pretty_print_docs(summary_retriever.get_relevant_documents(query), ["transformer"])

And combine it with the chunk retriever.

In [None]:
from langchain.retrievers.merger_retriever import MergerRetriever

merge_retriever = MergerRetriever(retrievers=[self_retriever, summary_retriever])
pretty_print_docs(merge_retriever.get_relevant_documents(query), ["transformer"])

Retrieval results may vary with minor changes in query phrasing or if the embeddings do not accurately capture the data's semantics. The `MultiQueryRetriever` streamlines the prompt-tuning process by employing an LLM to generate multiple queries from diverse perspectives based on a user input query. For each query, it retrieves a collection of pertinent documents and combines the unique results from all queries to obtain a larger set of potentially relevant documents.

In [None]:
from langchain.retrievers.multi_query import MultiQueryRetriever

# Generate 3 questions from the user questions, and these version to find a better candidats in vectorstore
multi_query_retriever = MultiQueryRetriever.from_llm(
    llm=llm,
    retriever=merge_retriever,
)

pretty_print_docs(multi_query_retriever.get_relevant_documents(query), ["transformer"])

At this stage, when we employ the retriever:

- Multiple queries are generated to locate the relevant documents (via `multi_query_retriever`).
- For each query:
    - Both the original chunk and the chunk summary are retrieved.
    - If feasible, a metadata filter is applied (via `self_retriever`).
    - The query vector is compared to all variations of each chunk.
    - The best variations are retrieved, but only a subset of the original chunks is utilized (via `RAGVectorStore`).
- Only this selected candidate can be used to answer a question.

# Use a compressor
It's possible to use a *compressor*, to filter the selection.

You can combine some filter in a pipeline.
- The [CohereRerank](https://python.langchain.com/docs/integrations/retrievers/cohere-reranker) can rank the chunks.
- The [EmbeddingsFilter](https://python.langchain.com/docs/modules/data_connection/retrievers/contextual_compression#embeddingsfilter) can add a similarity threshold
- The [LLMChainFilter](https://python.langchain.com/docs/modules/data_connection/retrievers/contextual_compression#llmchainfilter) decide which of the initially retrieved documents to filter out and which ones to return, without manipulating the document contents.
- ...

In [None]:
!pip install -q cohere
from langchain.retrievers.document_compressors import CohereRerank

cohere_rerank = CohereRerank(top_n=top_k)

In [None]:
! pip install -q simsimd
from langchain.retrievers.document_compressors import *
from langchain.retrievers import ContextualCompressionRetriever

embeddings_filter = EmbeddingsFilter(
    embeddings=embeddings,
    similarity_threshold=0.7  # Threshold for determining when two documents are redundant.
)

In [None]:
# Combine compressors
compressor = DocumentCompressorPipeline(
    transformers=[
        # embeddings_filter,
        cohere_rerank,
    ]
)

Now, we can a filter with our pipeline.

In [None]:
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=multi_query_retriever
)

pretty_print_docs(compression_retriever.get_relevant_documents(query))

In [None]:
final_retriever = compression_retriever

# Asking a Question

Now, it's possible to utilize this architecture to pose a question.

In [None]:
from importlib import reload

# FIXME
reload(logging)
logging.getLogger("langchain.retrievers.self_query.base").setLevel(logging.WARN)
logging.getLogger().setLevel(logging.WARN)  # Deactivate the logs

A problem can arise if the number of documents to be analysed is too large for the size of the prompt.
Several strategies are available to manage this, identified by the 
[`chain_type`](https://python.langchain.com/docs/use_cases/question_answering/vector_db_qa#chain-type) parameter.

In [None]:
from langchain.chains.question_answering import load_qa_chain

chain = load_qa_chain(
    llm,
    chain_type="stuff",  # or "map_reduce", "refine", "map_rerank"
)
result = chain(
    {
        "input_documents": final_retriever.get_relevant_documents(query),
        "question": query,
    }
)
print(result["output_text"])

If the documents have `sources` and the URLs are not too large, you can use `RetrievalQAWithSourcesChain`.

In [None]:
from langchain.chains import RetrievalQAWithSourcesChain

chain = RetrievalQAWithSourcesChain.from_chain_type(
    llm=llm,
    chain_type="map_reduce",
    retriever=final_retriever,
)
result = chain(query)
print(result["answer"])
pretty_print_docs(result["sources"])

For more precise control over the document references used, opt for `RetrievalQAWithReferencesChain`.

In [None]:
# FIXME
#!pip install -q langchain_qa_with_references
#!cd../langchain-qa_with_references/ && poetry install

In [None]:
from langchain_qa_with_references.chains import RetrievalQAWithReferencesChain

chain = RetrievalQAWithReferencesChain.from_chain_type(
    llm=llm,
    chain_type="map_reduce",
    retriever=final_retriever,
)
result = chain(query)
print(result["answer"])
pretty_print_docs(result["source_documents"], ['source'])


Lastly, if you wish to identify the specific text fragments utilized by the LLM to formulate its response, select the `RetrievalQAWithReferencesAndVerbatimsChain` option.

In [None]:
from langchain_qa_with_references.chains import RetrievalQAWithReferencesAndVerbatimsChain

chain = RetrievalQAWithReferencesAndVerbatimsChain.from_chain_type(
    llm=llm,
    chain_type="map_reduce",
    retriever=final_retriever,
    callbacks=CALLBACKS,
)
result = chain(query)
print(result["answer"])
pretty_print_docs(result["source_documents"], ["source", "verbatims"])

In [None]:
# Clean up
import shutil

shutil.rmtree(ROOT_PATH)

In [30]:
#TODO: https://www.google.com/search?q=site%3Adrive.google.com+inurl%3Afolders+datasience
#Pour rechercher des folders GDrive pour source de doc
# https://drive.google.com/drive/folders/1i3jXi0o-COk7L9Mfrg55Ysae8cdEIAHU
# TODO: LongContextReorder, OpenAIMetadataTagger