In [1]:
# Complete Retrieval Pipeline: From Basic to Advanced Techniques

## 1. Setup and Imports

import os
import time
from typing import Dict, List, Optional, Union

# LlamaIndex imports
from llama_index.core.schema import Document, NodeWithScore, QueryBundle, TextNode
from llama_index.core import VectorStoreIndex
from llama_index.core.retrievers import BaseRetriever
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

# Hugging Face imports
from sentence_transformers import SentenceTransformer, CrossEncoder
import numpy as np

## 2. Create Sample Documents

def create_sample_documents():
    """Create a set of sample documents for demonstration."""
    texts = [
        "Python is a high-level, interpreted programming language known for its readability and simplicity.",
        "Machine learning is a subset of artificial intelligence that enables systems to learn from data.",
        "Neural networks are computing systems inspired by biological neural networks in animal brains.",
        "Deep learning uses neural networks with many layers to extract high-level features from data.",
        "Natural language processing (NLP) helps computers understand, interpret, and manipulate human language.",
        "Transformers are deep learning models that use self-attention mechanisms to process sequential data.",
        "BERT is a transformer-based language model pre-trained on large text corpora.",
        "GPT models are autoregressive language models that use transformer architectures.",
        "Python libraries like PyTorch and TensorFlow are widely used for deep learning development.",
        "The pandas library provides data structures and tools for data manipulation and analysis in Python.",
        "LlamaIndex is a data framework for LLM applications to connect custom data sources to language models.",
        "Hybrid search combines multiple retrieval methods to improve search quality.",
        "BM25 is a bag-of-words retrieval function used in information retrieval.",
        "Vector search finds documents by measuring similarity in embedding space.",
        "Reranking is a two-stage process that refines initial search results with a more complex model.",
        "Retrieval pipelines often combine multiple techniques to achieve the best search results."
    ]
    
    documents = []
    for i, text in enumerate(texts):
        doc = Document(text=text, id_=f"doc_{i}")
        documents.append(doc)
    
    return documents

# Create the documents
documents = create_sample_documents()
print(f"Created {len(documents)} sample documents")

# Convert documents to nodes
nodes = [TextNode(text=doc.text, id_=doc.id_) for doc in documents]
print("Converted documents to nodes")

## 3. Basic BM25 Retriever

# Set up basic BM25 retriever
bm25_retriever = BM25Retriever.from_defaults(nodes=nodes, similarity_top_k=5)
print("Created BM25 retriever")

def test_retriever(retriever, query_text, name="Retriever"):
    """Test a retriever with a query and print results."""
    print(f"\n=== Testing {name} ===")
    start_time = time.time()
    query_bundle = QueryBundle(query_text)
    results = retriever.retrieve(query_bundle)
    end_time = time.time()
    
    print(f"Query: '{query_text}'")
    print(f"Retrieved {len(results)} documents in {(end_time - start_time):.4f} seconds")
    
    for i, node in enumerate(results, 1):
        print(f"{i}. Score: {node.score:.4f} - {node.node.get_content()}")
    
    return results

# Test BM25 retriever
query = "How is Python used in machine learning?"
bm25_results = test_retriever(bm25_retriever, query, "BM25 Retriever")

## 4. Vector Search Retriever

# Set up bi-encoder for vector search
print("\nSetting up vector search with bi-encoder...")
model_name = "sentence-transformers/all-MiniLM-L6-v2"
bi_encoder = SentenceTransformer(model_name, device="cpu")
embed_model = HuggingFaceEmbedding(model_name=model_name)

# Create embeddings for nodes
for node in nodes:
    text = node.get_content()
    embedding = bi_encoder.encode(text)
    node.embedding = embedding

# Create vector index
vector_index = VectorStoreIndex(nodes=nodes, embed_model=embed_model)
vector_retriever = vector_index.as_retriever(similarity_top_k=5)
print("Created vector retriever")

# Test vector retriever
vector_results = test_retriever(vector_retriever, query, "Vector Retriever")

## 5. Simple Fusion Retriever

class SimpleFusionRetriever(BaseRetriever):
    """Simple fusion retriever that combines results from multiple retrievers."""
    
    def __init__(self, retrievers: Dict[str, BaseRetriever]):
        """Initialize with retrievers dictionary."""
        self.retrievers = retrievers
        super().__init__()
    
    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        """Retrieve nodes given query."""
        all_nodes = {}
        
        for name, retriever in self.retrievers.items():
            results = retriever.retrieve(query_bundle)
            
            # Add to results dictionary (keyed by node_id for deduplication)
            for node in results:
                all_nodes[node.node.node_id] = node
        
        # Return all unique nodes
        return list(all_nodes.values())

# Create simple fusion retriever
simple_fusion_retriever = SimpleFusionRetriever(
    retrievers={"vector": vector_retriever, "bm25": bm25_retriever}
)
print("\nCreated simple fusion retriever")

# Test simple fusion retriever
simple_fusion_results = test_retriever(simple_fusion_retriever, query, "Simple Fusion Retriever")

## 6. Weighted Fusion Retriever

class WeightedFusionRetriever(BaseRetriever):
    """Weighted fusion retriever that combines and rescores results."""
    
    def __init__(self, retrievers: Dict[str, BaseRetriever], weights: Dict[str, float]):
        """Initialize with retrievers and weights."""
        self.retrievers = retrievers
        self.weights = weights
        super().__init__()
    
    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        """Retrieve nodes with weighted fusion approach."""
        all_results = {}
        
        for name, retriever in self.retrievers.items():
            results = retriever.retrieve(query_bundle)
            weight = self.weights.get(name, 1.0)
            
            for node_with_score in results:
                node_id = node_with_score.node.node_id
                weighted_score = node_with_score.score * weight
                
                if node_id not in all_results:
                    all_results[node_id] = {
                        "node": node_with_score.node,
                        "scores": {}
                    }
                all_results[node_id]["scores"][name] = weighted_score
        
        final_results = []
        for node_id, data in all_results.items():
            combined_score = sum(data["scores"].values())
            node_with_score = NodeWithScore(
                node=data["node"],
                score=combined_score
            )
            final_results.append(node_with_score)
        
        final_results.sort(key=lambda x: x.score, reverse=True)
        return final_results

# Create weighted fusion retriever
weighted_fusion_retriever = WeightedFusionRetriever(
    retrievers={"vector": vector_retriever, "bm25": bm25_retriever},
    weights={"vector": 0.7, "bm25": 0.3}
)
print("\nCreated weighted fusion retriever")

# Test weighted fusion retriever
weighted_fusion_results = test_retriever(weighted_fusion_retriever, query, "Weighted Fusion Retriever")

## 7. Reranker with Cross-Encoder

class CrossEncoderReranker:
    """Reranker using HuggingFace Cross-Encoder models."""
    
    def __init__(
        self,
        model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
        top_n: Optional[int] = None,
    ):
        """Initialize with CrossEncoder model."""
        self.model_name = model_name
        self.top_n = top_n
        
        # Load cross-encoder model
        self.model = CrossEncoder(model_name, device="cpu")
        print(f"Loaded CrossEncoder model: {model_name}")
    
    def rerank(self, query: str, nodes: List[NodeWithScore]) -> List[NodeWithScore]:
        """Rerank nodes for a given query."""
        if not nodes:
            return []
        
        # Extract texts from nodes
        node_texts = [node.node.get_content() for node in nodes]
        
        # Create query-document pairs
        query_doc_pairs = [(query, text) for text in node_texts]
        
        # Get scores from cross-encoder
        rerank_scores = self.model.predict(query_doc_pairs)
        
        # Create new NodeWithScore objects with updated scores
        reranked_nodes = []
        for i, node in enumerate(nodes):
            reranked_node = NodeWithScore(
                node=node.node,
                score=float(rerank_scores[i])
            )
            reranked_nodes.append(reranked_node)
        
        # Sort by new scores (descending)
        reranked_nodes.sort(key=lambda x: x.score, reverse=True)
        
        # Apply top_n filter if specified
        if self.top_n is not None and self.top_n < len(reranked_nodes):
            reranked_nodes = reranked_nodes[:self.top_n]
        
        return reranked_nodes

class RerankedRetriever(BaseRetriever):
    """Retriever with reranking capabilities."""
    
    def __init__(
        self,
        base_retriever: BaseRetriever,
        reranker: CrossEncoderReranker,
        fetch_k: int = 10,
    ):
        """Initialize with base retriever and reranker."""
        self.base_retriever = base_retriever
        self.reranker = reranker
        self.fetch_k = fetch_k
        super().__init__()
    
    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        """Retrieve and rerank nodes for the given query."""
        # Step 1: Get initial candidates from base retriever
        base_nodes = self.base_retriever.retrieve(query_bundle)
        
        # Limit candidates if fetch_k is specified
        if self.fetch_k is not None and len(base_nodes) > self.fetch_k:
            base_nodes = base_nodes[:self.fetch_k]
        
        # Step 2: Rerank the candidates
        reranked_nodes = self.reranker.rerank(
            query=query_bundle.query_str,
            nodes=base_nodes
        )
        
        return reranked_nodes

# Create cross-encoder reranker
reranker = CrossEncoderReranker(
    model_name="cross-encoder/ms-marco-MiniLM-L-6-v2",
    top_n=5
)

# Create reranked retriever with weighted fusion as base
reranked_retriever = RerankedRetriever(
    base_retriever=weighted_fusion_retriever,
    reranker=reranker,
    fetch_k=10
)
print("\nCreated reranked retriever")

# Test reranked retriever
reranked_results = test_retriever(reranked_retriever, query, "Reranked Retriever")

## 8. Complete Configurable Pipeline

class RetrievalPipeline:
    """A configurable retrieval pipeline that combines multiple techniques."""
    
    def __init__(
        self,
        use_bm25: bool = True,
        use_vector: bool = True,
        use_hybrid: bool = True,
        use_reranking: bool = True,
        vector_weight: float = 0.7,
        bm25_weight: float = 0.3,
        top_k: int = 5,
        rerank_top_k: int = 10
    ):
        """Initialize the pipeline with configuration."""
        self.config = {
            "use_bm25": use_bm25,
            "use_vector": use_vector,
            "use_hybrid": use_hybrid,
            "use_reranking": use_reranking,
            "vector_weight": vector_weight,
            "bm25_weight": bm25_weight,
            "top_k": top_k,
            "rerank_top_k": rerank_top_k
        }
        
        self.retrievers = {}
        self.pipeline = None
        print("Created configurable retrieval pipeline")
    
    def build(self, nodes: List[TextNode]):
        """Build the pipeline based on the configuration and document nodes."""
        print("\nBuilding retrieval pipeline...")
        
        # Build BM25 retriever if enabled
        if self.config["use_bm25"]:
            self.retrievers["bm25"] = BM25Retriever.from_defaults(
                nodes=nodes, 
                similarity_top_k=self.config["top_k"]
            )
            print("- Added BM25 retriever")
        
        # Build vector retriever if enabled
        if self.config["use_vector"]:
            # Set up embedding model
            model_name = "sentence-transformers/all-MiniLM-L6-v2"
            embed_model = HuggingFaceEmbedding(model_name=model_name)
            
            # Ensure nodes have embeddings
            bi_encoder = SentenceTransformer(model_name, device="cpu")
            for node in nodes:
                if node.embedding is None:
                    node.embedding = bi_encoder.encode(node.get_content())
            
            # Create vector index and retriever
            vector_index = VectorStoreIndex(nodes=nodes, embed_model=embed_model)
            self.retrievers["vector"] = vector_index.as_retriever(
                similarity_top_k=self.config["top_k"]
            )
            print("- Added vector retriever")
        
        # Build hybrid retriever if enabled and we have multiple retrievers
        base_retriever = None
        if self.config["use_hybrid"] and len(self.retrievers) > 1:
            weights = {}
            if "vector" in self.retrievers:
                weights["vector"] = self.config["vector_weight"]
            if "bm25" in self.retrievers:
                weights["bm25"] = self.config["bm25_weight"]
            
            base_retriever = WeightedFusionRetriever(
                retrievers=self.retrievers,
                weights=weights
            )
            print("- Added weighted fusion for hybrid retrieval")
        else:
            # Use the first available retriever as base
            retriever_name = next(iter(self.retrievers.keys()))
            base_retriever = self.retrievers[retriever_name]
            print(f"- Using {retriever_name} as base retriever")
        
        # Build reranker if enabled
        if self.config["use_reranking"]:
            reranker = CrossEncoderReranker(
                model_name="cross-encoder/ms-marco-MiniLM-L-6-v2",
                top_n=self.config["top_k"]
            )
            
            self.pipeline = RerankedRetriever(
                base_retriever=base_retriever,
                reranker=reranker,
                fetch_k=self.config["rerank_top_k"]
            )
            print("- Added cross-encoder reranker")
        else:
            self.pipeline = base_retriever
        
        print("Pipeline build complete!")
        return self
    
    def retrieve(self, query: Union[str, QueryBundle], verbose: bool = True):
        """Execute the retrieval pipeline on a query."""
        if self.pipeline is None:
            raise ValueError("Pipeline not built. Call build() first.")
        
        # Convert string query to QueryBundle if needed
        if isinstance(query, str):
            query = QueryBundle(query)
        
        start_time = time.time()
        results = self.pipeline.retrieve(query)
        end_time = time.time()
        
        if verbose:
            print(f"\n=== Retrieval Results ===")
            print(f"Query: '{query.query_str}'")
            print(f"Retrieved {len(results)} documents in {(end_time - start_time):.4f} seconds")
            
            for i, node in enumerate(results, 1):
                print(f"{i}. Score: {node.score:.4f} - {node.node.get_content()}")
        
        return results

# Create and test a complete pipeline
complete_pipeline = RetrievalPipeline(
    use_bm25=True,
    use_vector=True,
    use_hybrid=True,
    use_reranking=True,
    vector_weight=0.7,
    bm25_weight=0.3,
    top_k=5,
    rerank_top_k=10
).build(nodes)

# Test the pipeline with a query
pipeline_results = complete_pipeline.retrieve("How can Python be used for natural language processing?")

## 9. Compare Different Pipeline Configurations

def compare_pipeline_configurations(nodes, query, configs):
    """Compare different pipeline configurations on the same query."""
    print("\n=== Pipeline Configuration Comparison ===")
    print(f"Query: '{query}'")
    
    results = {}
    for name, config in configs.items():
        print(f"\nTesting configuration: {name}")
        
        # Create pipeline with configuration
        pipeline = RetrievalPipeline(**config).build(nodes)
        
        # Retrieve results
        start_time = time.time()
        retrieval_results = pipeline.retrieve(query, verbose=False)
        end_time = time.time()
        
        # Store results
        results[name] = {
            "time": end_time - start_time,
            "results": retrieval_results
        }
        
        # Print summary
        print(f"Retrieved {len(retrieval_results)} documents in {(end_time - start_time):.4f} seconds")
        print(f"Top result: {retrieval_results[0].node.get_content()[:100]}...")
    
    # Print comparison
    print("\n=== Summary ===")
    for name, result in results.items():
        print(f"{name}: {result['time']:.4f}s, {len(result['results'])} results")
    
    return results

# Define different configurations to compare
pipeline_configs = {
    "BM25 Only": {
        "use_bm25": True,
        "use_vector": False,
        "use_hybrid": False,
        "use_reranking": False
    },
    "Vector Only": {
        "use_bm25": False,
        "use_vector": True,
        "use_hybrid": False,
        "use_reranking": False
    },
    "Hybrid (No Reranking)": {
        "use_bm25": True,
        "use_vector": True,
        "use_hybrid": True,
        "use_reranking": False
    },
    "Complete Pipeline": {
        "use_bm25": True,
        "use_vector": True,
        "use_hybrid": True,
        "use_reranking": True
    }
}

# Compare different configurations
comparison_query = "What are transformer models used for in NLP?"
comparison_results = compare_pipeline_configurations(nodes, comparison_query, pipeline_configs)

print("\nRetrieval pipeline notebook complete!")

Created 16 sample documents
Converted documents to nodes
Created BM25 retriever

=== Testing BM25 Retriever ===
Query: 'How is Python used in machine learning?'
Retrieved 5 documents in 0.0031 seconds
1. Score: 1.7306 - Machine learning is a subset of artificial intelligence that enables systems to learn from data.
2. Score: 1.6152 - Python libraries like PyTorch and TensorFlow are widely used for deep learning development.
3. Score: 0.9407 - Transformers are deep learning models that use self-attention mechanisms to process sequential data.
4. Score: 0.8661 - Deep learning uses neural networks with many layers to extract high-level features from data.
5. Score: 0.6322 - Python is a high-level, interpreted programming language known for its readability and simplicity.

Setting up vector search with bi-encoder...


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.5k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Created vector retriever

=== Testing Vector Retriever ===
Query: 'How is Python used in machine learning?'
Retrieved 5 documents in 0.0153 seconds
1. Score: 0.6398 - Python is a high-level, interpreted programming language known for its readability and simplicity.
2. Score: 0.5692 - Python libraries like PyTorch and TensorFlow are widely used for deep learning development.
3. Score: 0.5632 - Machine learning is a subset of artificial intelligence that enables systems to learn from data.
4. Score: 0.4787 - The pandas library provides data structures and tools for data manipulation and analysis in Python.
5. Score: 0.4156 - Natural language processing (NLP) helps computers understand, interpret, and manipulate human language.

Created simple fusion retriever

=== Testing Simple Fusion Retriever ===
Query: 'How is Python used in machine learning?'
Retrieved 7 documents in 0.0132 seconds
1. Score: 0.6322 - Python is a high-level, interpreted programming language known for its readability 

In [2]:
# Complete Retrieval Pipeline: From Basic to Advanced

# 1. Setup and Imports

import time
from typing import Dict, List, Optional, Union

# LlamaIndex imports
from llama_index.core.schema import Document, NodeWithScore, QueryBundle, TextNode
from llama_index.core import VectorStoreIndex
from llama_index.core.retrievers import BaseRetriever
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

# Hugging Face imports
from sentence_transformers import SentenceTransformer, CrossEncoder
import numpy as np

# 2. Create Sample Documents

# Create a set of sample documents
texts = [
    "Python is a high-level programming language known for its readability.",
    "Machine learning is a subset of AI that enables systems to learn from data.",
    "Neural networks are computing systems inspired by biological neural networks.",
    "Deep learning uses neural networks with many layers to extract features from data.",
    "Natural language processing helps computers understand human language.",
    "Python libraries like PyTorch and TensorFlow are used for deep learning.",
    "BM25 is a bag-of-words retrieval function used in information retrieval.",
    "Vector search finds documents by measuring similarity in embedding space.",
    "Reranking refines initial search results with a more complex model.",
    "Retrieval pipelines combine multiple techniques for better search results."
]

# Convert to documents and nodes
documents = [Document(text=text, id_=f"doc_{i}")
             for i, text in enumerate(texts)]
nodes = [TextNode(text=doc.text, id_=doc.id_) for doc in documents]
print(f"Created {len(documents)} sample documents")

# 3. Test Function


def test_retriever(retriever, query_text, name="Retriever"):
    """Test a retriever with a query and print results."""
    print(f"\n=== Testing {name} ===")
    start_time = time.time()
    results = retriever.retrieve(QueryBundle(query_text))
    end_time = time.time()

    print(f"Query: '{query_text}'")
    print(
        f"Retrieved {len(results)} documents in {(end_time - start_time):.4f} seconds")

    for i, node in enumerate(results[:3], 1):  # Show just top 3 for brevity
        print(f"{i}. Score: {node.score:.4f} - {node.node.get_content()}")

    return results

# 4. Basic Retrievers


# Set up BM25 retriever
bm25_retriever = BM25Retriever.from_defaults(nodes=nodes, similarity_top_k=5)

# Set up vector retriever
model_name = "sentence-transformers/all-MiniLM-L6-v2"
bi_encoder = SentenceTransformer(model_name, device="cpu")

# Create embeddings for nodes
for node in nodes:
    node.embedding = bi_encoder.encode(node.get_content())

# Create vector index and retriever
vector_index = VectorStoreIndex(
    nodes=nodes, embed_model=HuggingFaceEmbedding(model_name=model_name))
vector_retriever = vector_index.as_retriever(similarity_top_k=5)

print("Created basic retrievers")

# 5. Hybrid Retriever


class WeightedFusionRetriever(BaseRetriever):
    """Weighted fusion retriever that combines results from multiple retrievers."""

    def __init__(self, retrievers: Dict[str, BaseRetriever], weights: Dict[str, float]):
        """Initialize with retrievers and weights."""
        self.retrievers = retrievers
        self.weights = weights
        super().__init__()

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        all_results = {}

        for name, retriever in self.retrievers.items():
            results = retriever.retrieve(query_bundle)
            weight = self.weights.get(name, 1.0)

            for node_with_score in results:
                node_id = node_with_score.node.node_id
                weighted_score = node_with_score.score * weight

                if node_id not in all_results:
                    all_results[node_id] = {
                        "node": node_with_score.node,
                        "scores": {}
                    }
                all_results[node_id]["scores"][name] = weighted_score

        final_results = []
        for node_id, data in all_results.items():
            final_results.append(NodeWithScore(
                node=data["node"],
                score=sum(data["scores"].values())
            ))

        return sorted(final_results, key=lambda x: x.score, reverse=True)


# Create weighted fusion retriever
hybrid_retriever = WeightedFusionRetriever(
    retrievers={"vector": vector_retriever, "bm25": bm25_retriever},
    weights={"vector": 0.7, "bm25": 0.3}
)
print("Created hybrid retriever")

# 6. Reranker


class CrossEncoderReranker:
    """Reranker using HuggingFace Cross-Encoder models."""

    def __init__(self, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2", top_n=None):
        self.model = CrossEncoder(model_name, device="cpu")
        self.top_n = top_n
        print(f"Loaded CrossEncoder model: {model_name}")

    def rerank(self, query: str, nodes: List[NodeWithScore]) -> List[NodeWithScore]:
        if not nodes:
            return []

        # Create query-document pairs and get scores
        node_texts = [node.node.get_content() for node in nodes]
        query_doc_pairs = [(query, text) for text in node_texts]
        rerank_scores = self.model.predict(query_doc_pairs)

        # Create reranked nodes
        reranked_nodes = [
            NodeWithScore(node=node.node, score=float(score))
            for node, score in zip(nodes, rerank_scores)
        ]

        # Sort and filter
        reranked_nodes.sort(key=lambda x: x.score, reverse=True)
        if self.top_n is not None:
            reranked_nodes = reranked_nodes[:self.top_n]

        return reranked_nodes


class RerankedRetriever(BaseRetriever):
    """Retriever with reranking capabilities."""

    def __init__(self, base_retriever, reranker, fetch_k=10):
        self.base_retriever = base_retriever
        self.reranker = reranker
        self.fetch_k = fetch_k
        super().__init__()

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        # Get initial candidates
        base_nodes = self.base_retriever.retrieve(query_bundle)
        if self.fetch_k is not None:
            base_nodes = base_nodes[:self.fetch_k]

        # Rerank candidates
        return self.reranker.rerank(query_bundle.query_str, base_nodes)


# Create reranked retriever
reranker = CrossEncoderReranker(top_n=5)
reranked_retriever = RerankedRetriever(
    base_retriever=hybrid_retriever, reranker=reranker)
print("Created reranked retriever")

# 7. Complete Pipeline


class RetrievalPipeline:
    """A configurable retrieval pipeline."""

    def __init__(
        self,
        use_bm25=True,
        use_vector=True,
        use_hybrid=True,
        use_reranking=True,
        vector_weight=0.7,
        bm25_weight=0.3,
        top_k=5,
        rerank_top_k=10
    ):
        self.config = {
            "use_bm25": use_bm25, "use_vector": use_vector,
            "use_hybrid": use_hybrid, "use_reranking": use_reranking,
            "vector_weight": vector_weight, "bm25_weight": bm25_weight,
            "top_k": top_k, "rerank_top_k": rerank_top_k
        }
        self.pipeline = None

    def build(self, nodes):
        """Build the pipeline based on configuration."""
        print(
            f"\nBuilding pipeline: {', '.join(k for k, v in self.config.items() if v and k.startswith('use_'))}")

        # Initialize retrievers
        retrievers = {}
        if self.config["use_bm25"]:
            retrievers["bm25"] = BM25Retriever.from_defaults(
                nodes=nodes, similarity_top_k=self.config["top_k"]
            )

        if self.config["use_vector"]:
            # Ensure nodes have embeddings
            model_name = "sentence-transformers/all-MiniLM-L6-v2"
            embed_model = HuggingFaceEmbedding(model_name=model_name)
            bi_encoder = SentenceTransformer(model_name, device="cpu")

            for node in nodes:
                if node.embedding is None:
                    node.embedding = bi_encoder.encode(node.get_content())

            vector_index = VectorStoreIndex(
                nodes=nodes, embed_model=embed_model)
            retrievers["vector"] = vector_index.as_retriever(
                similarity_top_k=self.config["top_k"]
            )

        # Build base retriever
        if self.config["use_hybrid"] and len(retrievers) > 1:
            weights = {}
            if "vector" in retrievers:
                weights["vector"] = self.config["vector_weight"]
            if "bm25" in retrievers:
                weights["bm25"] = self.config["bm25_weight"]

            base_retriever = WeightedFusionRetriever(
                retrievers=retrievers, weights=weights)
        else:
            # Use the first available retriever
            retriever_name = next(iter(retrievers.keys()))
            base_retriever = retrievers[retriever_name]

        # Add reranker if enabled
        if self.config["use_reranking"]:
            reranker = CrossEncoderReranker(top_n=self.config["top_k"])
            self.pipeline = RerankedRetriever(
                base_retriever=base_retriever,
                reranker=reranker,
                fetch_k=self.config["rerank_top_k"]
            )
        else:
            self.pipeline = base_retriever

        return self

    def retrieve(self, query, verbose=True):
        """Execute the retrieval pipeline."""
        if isinstance(query, str):
            query = QueryBundle(query)

        start_time = time.time()
        results = self.pipeline.retrieve(query)
        elapsed = time.time() - start_time

        if verbose:
            print(f"\n=== Results ({elapsed:.4f}s) ===")
            for i, node in enumerate(results[:3], 1):
                print(f"{i}. Score: {node.score:.4f} - {node.node.get_content()}")

        return results

# 8. Compare Configurations


# Define different configurations
configurations = {
    "BM25 Only": {"use_bm25": True, "use_vector": False, "use_hybrid": False, "use_reranking": False},
    "Vector Only": {"use_bm25": False, "use_vector": True, "use_hybrid": False, "use_reranking": False},
    "Hybrid": {"use_bm25": True, "use_vector": True, "use_hybrid": True, "use_reranking": False},
    "Full Pipeline": {"use_bm25": True, "use_vector": True, "use_hybrid": True, "use_reranking": True}
}

# Test query
query = "How is Python used in machine learning?"

# Test each pipeline configuration
print("\n=== Pipeline Configuration Comparison ===")
for name, config in configurations.items():
    pipeline = RetrievalPipeline(**config).build(nodes)
    start = time.time()
    results = pipeline.retrieve(query, verbose=False)
    elapsed = time.time() - start
    print(f"\n{name} ({elapsed:.4f}s):")
    for i, node in enumerate(results[:2], 1):
        print(f"{i}. {node.node.get_content()}")

print("\nRetrieval pipeline complete!")

Created 10 sample documents
Created basic retrievers
Created hybrid retriever
Loaded CrossEncoder model: cross-encoder/ms-marco-MiniLM-L-6-v2
Created reranked retriever

=== Pipeline Configuration Comparison ===

Building pipeline: use_bm25

BM25 Only (0.0013s):
1. Python libraries like PyTorch and TensorFlow are used for deep learning.
2. Machine learning is a subset of AI that enables systems to learn from data.

Building pipeline: use_vector

Vector Only (0.0135s):
1. Python is a high-level programming language known for its readability.
2. Python libraries like PyTorch and TensorFlow are used for deep learning.

Building pipeline: use_bm25, use_vector, use_hybrid

Hybrid (0.0143s):
1. Python libraries like PyTorch and TensorFlow are used for deep learning.
2. Machine learning is a subset of AI that enables systems to learn from data.

Building pipeline: use_bm25, use_vector, use_hybrid, use_reranking
Loaded CrossEncoder model: cross-encoder/ms-marco-MiniLM-L-6-v2

Full Pipeline (0.

In [1]:
import time
from typing import Dict, List

# LlamaIndex imports
from llama_index.core.schema import Document, NodeWithScore, QueryBundle, TextNode
from llama_index.core import VectorStoreIndex
from llama_index.core.retrievers import BaseRetriever
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from sentence_transformers import CrossEncoder

In [2]:
texts = [
    "Python is a high-level programming language known for its readability.",
    "Machine learning is a subset of AI that enables systems to learn from data.",
    "Neural networks are computing systems inspired by biological neural networks.",
    "Deep learning uses neural networks with many layers to extract features from data.",
    "Natural language processing helps computers understand human language.",
    "Python libraries like PyTorch and TensorFlow are used for deep learning.",
    "BM25 is a bag-of-words retrieval function used in information retrieval.",
    "Vector search finds documents by measuring similarity in embedding space.",
    "Reranking refines initial search results with a more complex model.",
    "Retrieval pipelines combine multiple techniques for better search results."
]

documents = [Document(text=text, id_=f"doc_{i}")
             for i, text in enumerate(texts)]
nodes = [TextNode(text=doc.text, id_=doc.id_) for doc in documents]
print(f"Created {len(documents)} sample documents")

Created 10 sample documents


In [3]:
# Set Up Basic Retrievers

# BM25 retriever for lexical search
bm25_retriever = BM25Retriever.from_defaults(nodes=nodes, similarity_top_k=5)

# Vector retriever for semantic search
embed_model = HuggingFaceEmbedding(
    model_name="sentence-transformers/all-MiniLM-L6-v2")
vector_index = VectorStoreIndex(nodes=nodes, embed_model=embed_model)
vector_retriever = vector_index.as_retriever(similarity_top_k=5)

print("Created basic retrievers")

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.5k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Created basic retrievers


In [4]:
#  Create Hybrid Retriever
class WeightedFusionRetriever(BaseRetriever):
    """Combines results from multiple retrievers with weights."""

    def __init__(self, retrievers: Dict[str, BaseRetriever], weights: Dict[str, float]):
        self.retrievers = retrievers
        self.weights = weights
        super().__init__()

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        all_results = {}

        # Get results from each retriever
        for name, retriever in self.retrievers.items():
            results = retriever.retrieve(query_bundle)
            weight = self.weights.get(name, 1.0)

            # Combine results with weighting
            for node in results:
                node_id = node.node.node_id
                weighted_score = node.score * weight

                if node_id not in all_results:
                    all_results[node_id] = {"node": node.node, "scores": {}}
                all_results[node_id]["scores"][name] = weighted_score

        # Create final results with combined scores
        final_results = [
            NodeWithScore(node=data["node"], score=sum(
                data["scores"].values()))
            for node_id, data in all_results.items()
        ]

        return sorted(final_results, key=lambda x: x.score, reverse=True)


# Create hybrid retriever
hybrid_retriever = WeightedFusionRetriever(
    retrievers={"vector": vector_retriever, "bm25": bm25_retriever},
    weights={"vector": 0.7, "bm25": 0.3}
)
print("Created hybrid retriever")

Created hybrid retriever


In [5]:
# Add Reranking

class RerankedRetriever(BaseRetriever):
    """Two-stage retriever: initial retrieval + cross-encoder reranking."""

    def __init__(self, base_retriever, model_name="cross-encoder/ms-marco-MiniLM-L-6-v2",
                 fetch_k=10, top_k=5):
        self.base_retriever = base_retriever
        self.reranker = CrossEncoder(model_name, device="cpu")
        self.fetch_k = fetch_k
        self.top_k = top_k
        super().__init__()
        print(f"Loaded CrossEncoder model: {model_name}")

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        # Stage 1: Get initial candidates from base retriever
        base_nodes = self.base_retriever.retrieve(query_bundle)[:self.fetch_k]

        # Early return if no results
        if not base_nodes:
            return []

        # Stage 2: Rerank candidates with cross-encoder
        query = query_bundle.query_str
        node_texts = [node.node.get_content() for node in base_nodes]
        rerank_scores = self.reranker.predict(
            [(query, text) for text in node_texts])

        # Create reranked nodes
        reranked_nodes = [
            NodeWithScore(node=node.node, score=float(score))
            for node, score in zip(base_nodes, rerank_scores)
        ]

        # Sort and filter
        reranked_nodes.sort(key=lambda x: x.score, reverse=True)
        return reranked_nodes[:self.top_k] if self.top_k else reranked_nodes


# Create reranked retriever
reranked_retriever = RerankedRetriever(
    base_retriever=hybrid_retriever,
    fetch_k=10,
    top_k=5
)

Loaded CrossEncoder model: cross-encoder/ms-marco-MiniLM-L-6-v2


In [6]:
# 5. Complete Pipeline

class RetrievalPipeline:
    """Configurable retrieval pipeline combining multiple techniques."""

    def __init__(self, use_bm25=True, use_vector=True, use_hybrid=True, use_reranking=True,
                 vector_weight=0.7, bm25_weight=0.3, top_k=5, rerank_top_k=10):
        self.config = {
            "use_bm25": use_bm25, "use_vector": use_vector,
            "use_hybrid": use_hybrid, "use_reranking": use_reranking,
            "vector_weight": vector_weight, "bm25_weight": bm25_weight,
            "top_k": top_k, "rerank_top_k": rerank_top_k
        }
        self.pipeline = None

    def build(self, nodes):
        """Build the pipeline based on configuration."""
        enabled = [k for k, v in self.config.items()
                   if v and k.startswith('use_')]
        print(f"\nBuilding pipeline: {', '.join(enabled)}")

        # Set up retrievers
        retrievers = {}
        if self.config["use_bm25"]:
            retrievers["bm25"] = BM25Retriever.from_defaults(
                nodes=nodes, similarity_top_k=self.config["top_k"]
            )

        if self.config["use_vector"]:
            embed_model = HuggingFaceEmbedding(
                model_name="sentence-transformers/all-MiniLM-L6-v2"
            )
            vector_index = VectorStoreIndex(
                nodes=nodes, embed_model=embed_model)
            retrievers["vector"] = vector_index.as_retriever(
                similarity_top_k=self.config["top_k"]
            )

        # Select base retriever
        if self.config["use_hybrid"] and len(retrievers) > 1:
            weights = {
                "vector": self.config["vector_weight"],
                "bm25": self.config["bm25_weight"]
            }
            base_retriever = WeightedFusionRetriever(
                retrievers=retrievers, weights=weights)
        else:
            retriever_name = next(iter(retrievers.keys()))
            base_retriever = retrievers[retriever_name]

        # Add reranking if enabled
        if self.config["use_reranking"]:
            self.pipeline = RerankedRetriever(
                base_retriever=base_retriever,
                fetch_k=self.config["rerank_top_k"],
                top_k=self.config["top_k"]
            )
        else:
            self.pipeline = base_retriever

        return self

    def retrieve(self, query, verbose=True):
        """Execute the retrieval pipeline on a query."""
        if self.pipeline is None:
            raise ValueError("Pipeline not built. Call build() first.")

        if isinstance(query, str):
            query = QueryBundle(query)

        start_time = time.time()
        results = self.pipeline.retrieve(query)
        elapsed = time.time() - start_time

        if verbose:
            print(f"\n=== Results ({elapsed:.4f}s) ===")
            for i, node in enumerate(results[:3], 1):
                print(f"{i}. Score: {node.score:.4f} - {node.node.get_content()}")

        return results

In [7]:
def test_configurations(nodes, query):
    """Compare different pipeline configurations on the same query."""
    configurations = {
        "BM25 Only": {"use_bm25": True, "use_vector": False, "use_hybrid": False, "use_reranking": False},
        "Vector Only": {"use_bm25": False, "use_vector": True, "use_hybrid": False, "use_reranking": False},
        "Hybrid": {"use_bm25": True, "use_vector": True, "use_hybrid": True, "use_reranking": False},
        "Full Pipeline": {"use_bm25": True, "use_vector": True, "use_hybrid": True, "use_reranking": True}
    }

    print(f"\n=== Pipeline Configuration Comparison ===")
    print(f"Query: '{query}'")

    for name, config in configurations.items():
        pipeline = RetrievalPipeline(**config).build(nodes)
        start = time.time()
        results = pipeline.retrieve(query, verbose=False)
        elapsed = time.time() - start
        print(f"\n{name} ({elapsed:.4f}s):")
        for i, node in enumerate(results[:2], 1):
            print(f"{i}. {node.node.get_content()}")


# Run the comparison
test_configurations(nodes, "How is Python used in machine learning?")
print("\nRetrieval pipeline complete!")


=== Pipeline Configuration Comparison ===
Query: 'How is Python used in machine learning?'

Building pipeline: use_bm25

BM25 Only (0.0024s):
1. Python libraries like PyTorch and TensorFlow are used for deep learning.
2. Machine learning is a subset of AI that enables systems to learn from data.

Building pipeline: use_vector

Vector Only (0.0154s):
1. Python is a high-level programming language known for its readability.
2. Python libraries like PyTorch and TensorFlow are used for deep learning.

Building pipeline: use_bm25, use_vector, use_hybrid

Hybrid (0.0145s):
1. Python libraries like PyTorch and TensorFlow are used for deep learning.
2. Machine learning is a subset of AI that enables systems to learn from data.

Building pipeline: use_bm25, use_vector, use_hybrid, use_reranking
Loaded CrossEncoder model: cross-encoder/ms-marco-MiniLM-L-6-v2

Full Pipeline (0.1995s):
1. Python libraries like PyTorch and TensorFlow are used for deep learning.
2. Python is a high-level programmin

# Production Considerations 

1. Performance Optimization:
    - Pre-compute and cache embeddings
    - Consider using approximate nearest neighbor (ANN) indexes for vector search at scale
    - Adjust fetch_k and top_k based on your application's latency requirements

2. Model Selection:
    - Choose embedding models based on your domain
    - Test different cross-encoder models for quality
    - Consider distilled models for better speed

3. Resource Allocation:
    - BM25 is CPU-bound and memory-efficient
    - Vector search benefits from GPU acceleration
    - Cross-encoders are more resource-intensive

4. Observability:
    - Log timing information for each stage
    - Track retrieval quality metrics
    - Monitor model performance over time

4. Adaptive Configuration:
    - Consider adjusting weights based on query type
    - Use different pipelines for different use cases
    - A/B test configuration changes