# Advanced RAG Retrieval Strategies: Flow and Modular

This notebook demonstrates the implementation of modular RAG and RAG flow for advanced retrieval strategies.

In [1]:
# Install required packages
!pip install llama-index openai python-dotenv cohere langchain pyvis ragas datasets

Collecting cohere
  Downloading cohere-5.5.8-py3-none-any.whl.metadata (3.3 kB)
Collecting boto3<2.0.0,>=1.34.0 (from cohere)
  Downloading boto3-1.34.144-py3-none-any.whl.metadata (6.6 kB)
Collecting fastavro<2.0.0,>=1.9.4 (from cohere)
  Downloading fastavro-1.9.5-cp312-cp312-macosx_10_9_universal2.whl.metadata (5.5 kB)
Collecting httpx-sse<0.5.0,>=0.4.0 (from cohere)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting parameterized<0.10.0,>=0.9.0 (from cohere)
  Downloading parameterized-0.9.0-py2.py3-none-any.whl.metadata (18 kB)
Collecting tokenizers<1,>=0.15 (from cohere)
  Using cached tokenizers-0.19.1-cp312-cp312-macosx_11_0_arm64.whl.metadata (6.7 kB)
Collecting types-requests<3.0.0,>=2.0.0 (from cohere)
  Downloading types_requests-2.32.0.20240712-py3-none-any.whl.metadata (1.9 kB)
Collecting botocore<1.35.0,>=1.34.144 (from boto3<2.0.0,>=1.34.0->cohere)
  Downloading botocore-1.34.144-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7

In [3]:
import os
from dotenv import load_dotenv
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core import (
    Settings,
    SimpleDirectoryReader,
    StorageContext,
    VectorStoreIndex,
    load_index_from_storage,
)
from llama_index.core.node_parser import SentenceWindowNodeParser
from llama_index.core.indices.postprocessor import MetadataReplacementPostProcessor
from llama_index.core.query_pipeline import QueryPipeline, InputComponent, CustomQueryComponent
from llama_index.core.response_synthesizers.tree_summarize import TreeSummarize
# from llama_index.postprocessor.cohere_rerank import CohereRerank
from typing import Dict, Any
from llama_index.core.indices.query.query_transform import HyDEQueryTransform
from pyvis.network import Network
from ragas.metrics import faithfulness, answer_relevancy, context_precision, context_recall
from ragas import evaluate
from datasets import Dataset

# Load environment variables
load_dotenv()

# Set up API keys
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY')
os.environ['COHERE_API_KEY'] = os.getenv('COHERE_API_KEY')

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
# Index documents
documents = SimpleDirectoryReader("./data").load_data()
node_parser = SentenceWindowNodeParser.from_defaults(
    window_size=3,
    window_metadata_key="window",
    original_text_metadata_key="original_text",
)
llm = OpenAI(model="gpt-3.5-turbo")
embed_model = OpenAIEmbedding(model="text-embedding-3-small")
Settings.llm = llm
Settings.embed_model = embed_model
Settings.node_parser = node_parser

if not os.path.exists("storage"):
    index = VectorStoreIndex.from_documents(documents)
    index.set_index_id("avengers")
    index.storage_context.persist("./storage")
else:
    store_context = StorageContext.from_defaults(persist_dir="./storage")
    index = load_index_from_storage(
        storage_context=store_context, index_id="avengers"
    )

FileNotFoundError: [Errno 2] No such file or directory: '/Users/satwikpandey/Dev/ModularRag/storage/docstore.json'

In [None]:
# Define custom components
class HydeComponent(CustomQueryComponent):
    """HyDE query rewrite component."""
    def _validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
        assert "input" in input, "input is required"
        return input

    @property
    def _input_keys(self) -> set:
        return {"input"}

    @property
    def _output_keys(self) -> set:
        return {"output"}

    def _run_component(self, **kwargs) -> Dict[str, Any]:
        hyde = HyDEQueryTransform(include_original=True)
        query_bundle = hyde(kwargs["input"])
        return {"output": query_bundle.embedding_strs[0]}

class RagasComponent(CustomQueryComponent):
    """Ragas evaluation component."""
    def _validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
        return input
    
    @property
    def _input_keys(self) -> set:
        return {"question", "nodes", "answer", "ground_truth"}

    @property
    def _output_keys(self) -> set:
        return {"answer", "source_nodes", "evaluation"}

    def _run_component(self, **kwargs) -> Dict[str, Any]:
        question, ground_truth, nodes, answer = kwargs.values()
        data = {
            "question": [question],
            "contexts": [[n.get_content() for n in nodes]],
            "answer": [str(answer)],
            "ground_truth": [ground_truth],
        }
        dataset = Dataset.from_dict(data)
        metrics = [faithfulness, answer_relevancy, context_precision, context_recall]
        evaluation = evaluate(dataset, metrics)
        return {"answer": str(answer), "source_nodes": nodes, "evaluation": evaluation}

In [None]:
# Set up query pipeline
retriever = index.as_retriever()
query_rewriter = HydeComponent()
reranker = CohereRerank()
meta_replacer = MetadataReplacementPostProcessor(target_metadata_key="window")
evaluator = RagasComponent()

p = QueryPipeline(verbose=True)
p.add_modules(
    {
        "input": InputComponent(),
        "query_rewriter": query_rewriter,
        "retriever": retriever,
        "meta_replacer": meta_replacer,
        "reranker": reranker,
        "output": TreeSummarize(),
        "evaluator": evaluator,
    }
)

p.add_link("input", "query_rewriter", src_key="input")
p.add_link("query_rewriter", "retriever")
p.add_link("retriever", "meta_replacer")
p.add_link("input", "reranker", src_key="input", dest_key="query_str")
p.add_link("meta_replacer", "reranker", dest_key="nodes")
p.add_link("input", "output", src_key="input", dest_key="query_str")
p.add_link("reranker", "output", dest_key="nodes")
p.add_link("input", "evaluator", src_key="input", dest_key="question")
p.add_link("input", "evaluator", src_key="ground_truth", dest_key="ground_truth")
p.add_link("reranker", "evaluator", dest_key="nodes")
p.add_link("output", "evaluator", dest_key="answer")

In [None]:
# Visualize query pipeline
net = Network(notebook=True, cdn_resources="in_line", directed=True)
net.from_nx(p.clean_dag)
net.write_html("output/pipeline_dag.html")

In [None]:
# Run query pipeline
question = "Which two members of the Avengers created Ultron?"
ground_truth = "Tony Stark (Iron Man) and Bruce Banner (The Hulk)."
output = p.run(input=question, ground_truth=ground_truth)
print(f"Answer: {output['answer']}")
print(f"Evaluation: {output['evaluation']}")

In [None]:
# Print intermediate results (optional)
output, intermediates = p.run_with_intermediates(input=question, ground_truth=ground_truth)

print("Retriever output:")
for node in intermediates["retriever"].outputs["output"]:
    print(f"Node: {node.text}\n")

print("Meta replacer output:")
for node in intermediates["meta_replacer"].outputs["nodes"]:
    print(f"Node: {node.text}\n")