# M1.4 — Query Pipeline & Response Generation

**Complete 7-Stage RAG Pipeline:**  
Query → Retrieval → Rerank → Context → LLM → Answer

---

## Purpose
Build a production-ready RAG query pipeline that transforms user queries into grounded, cited answers through seven sequential stages. This notebook demonstrates hybrid search, cross-encoder reranking, query-type specific optimization, and comprehensive metrics tracking.

## Concepts Covered
- **Query Classification**: 6 types (factual, how-to, comparison, definition, troubleshooting, opinion)
- **Query Expansion**: LLM-based alternative phrasings for 15-25% recall improvement
- **Hybrid Retrieval**: Dense (semantic) + sparse (BM25) with auto-tuned alpha (0.3-0.8)
- **Cross-Encoder Reranking**: ms-marco-MiniLM-L-6-v2 for 10-20% relevance gain
- **Context Preparation**: Deduplication, source attribution, length guards
- **Prompt Engineering**: Query-type specific templates
- **Metrics & Fallbacks**: Timings, scores, graceful error handling

## After Completing This Module
You will be able to:
- ✅ Build end-to-end RAG pipelines with 7 stages
- ✅ Implement hybrid search (dense + sparse)
- ✅ Apply query-type specific optimizations
- ✅ Use cross-encoder reranking for quality
- ✅ Handle missing API keys gracefully
- ✅ Track comprehensive metrics
- ✅ Make informed accuracy vs latency trade-offs

## Context in RAG21D Track
**Prerequisites**: M1.3 (Indexing & Retrieval Strategies)  
**Builds on**: Vector DB setup, hybrid search foundations  
**Adds**: Query understanding, reranking, production metrics  
**Enables**: Production RAG with 60-80% hallucination reduction

## 1. Reality Check: What Query Pipelines Do / Don't Do

**What RAG Query Pipelines DO:**
- ✅ Reduce hallucination by 60-80% through grounded responses
- ✅ Handle 1,000+ documents efficiently with proper indexing
- ✅ Provide source attribution for trust and verification
- ✅ Adapt to diverse query types (factual, how-to, troubleshooting, etc.)

**What They DON'T Do:**
- ❌ Eliminate all hallucinations (LLMs can still extrapolate)
- ❌ Work without quality source data (garbage in, garbage out)
- ❌ Handle multi-turn context automatically (requires session management)
- ❌ Operate instantly (adds 200-400ms latency)

**Key Trade-offs:**
1. **Accuracy vs Latency:** Reranking improves quality but adds 50-100ms
2. **Coverage vs Precision:** More chunks = better coverage but noisier context

In [None]:
# Reality Check Demonstration
print("✅ RAG Query Pipelines CAN:")
print("  • Reduce hallucination 60-80%")
print("  • Handle 1,000+ documents")
print("  • Provide source attribution")
print("  • Adapt to query types\n")

print("❌ RAG Query Pipelines CANNOT:")
print("  • Eliminate all hallucinations")
print("  • Work with poor source data")

print("\n⚖️ Key Trade-offs:")
print("  1. Accuracy vs Latency (reranking adds 50-100ms)")
print("  2. Coverage vs Precision (more chunks = more noise)")

# Expected: 4-line capabilities list + 2-line limitations + 2 trade-offs

## 2. Query Understanding (Type, Expansion, Keywords)

**Stage 1 of 7:** Understanding user intent before retrieval.

**Three Key Steps:**
1. **Classification:** Identify query type (factual, how-to, comparison, definition, troubleshooting, opinion)
2. **Expansion:** Generate alternative phrasings to capture more relevant documents (optional, LLM-based)
3. **Keyword Extraction:** Extract key terms for filtering and metadata matching

**Why It Matters:**
- Different query types need different retrieval strategies (alpha tuning)
- Query expansion can improve recall by 15-25%
- Keywords enable metadata filtering for faster retrieval

In [None]:
# Query Understanding Demo
from src.m1_4_query_pipeline.module import QueryProcessor, QueryType
from src.m1_4_query_pipeline.config import get_clients

# Initialize processor
openai_client, _ = get_clients()
processor = QueryProcessor(openai_client)

# Sample query
sample_query = "How do I improve RAG accuracy?"

# 1. Classify query type
query_type = processor.classify(sample_query)
print(f"📋 Query Type: {query_type.value}")

# 2. Generate expansions (if keys exist)
if openai_client:
    expansions = processor.expand(sample_query, num_expansions=2)
    print(f"🔄 Expansions ({len(expansions)}):")
    for i, exp in enumerate(expansions[:3], 1):
        print(f"  {i}. {exp[:70]}...")
else:
    print("⚠️ Skipping expansions (no API keys)")

# 3. Extract keywords
keywords = processor.extract_keywords(sample_query)
print(f"🔑 Keywords: {keywords[:6]}")

# Expected: Query type, 2-3 expansion examples, ≤6 keywords

## 3. Retrieval Strategies (Hybrid Dense+Sparse, Alpha)

**Stage 2 of 7:** Fetching relevant documents using hybrid search.

**Hybrid Search Components:**
1. **Dense Embeddings:** Semantic similarity via OpenAI text-embedding-3-small
2. **Sparse Embeddings:** Keyword matching via BM25 encoding
3. **Alpha Tuning:** Query-type specific weighting (0.3-0.8)

**Alpha Values by Query Type:**
- Factual/Definition: 0.7 (favor semantic understanding)
- How-to: 0.5 (balanced)
- Troubleshooting: 0.3 (favor exact terms/error codes)
- Comparison/Opinion: 0.6 (moderate semantic bias)

**Why Hybrid Works:**
- Dense captures meaning: "car" matches "vehicle", "automobile"
- Sparse captures exactness: "error 404" requires literal match
- Alpha balancing optimizes for query intent

In [None]:
# Retrieval Strategy Demo
from src.m1_4_query_pipeline.module import SmartRetriever, QueryType

# Initialize retriever
openai_client, pinecone_client = get_clients()
retriever = SmartRetriever(openai_client, pinecone_client)

# Sample query
sample_query = "How do I improve RAG accuracy?"
query_type = QueryType.HOW_TO

# Get alpha for this query type
alpha = retriever._get_alpha_for_query_type(query_type)
print(f"⚙️ Alpha for {query_type.value}: {alpha}")

# Retrieve results
results = retriever.retrieve(
    query=sample_query,
    query_type=query_type,
    top_k=3,
    namespace="demo"
)

print(f"\\n📄 Retrieved {len(results)} results:")
for i, result in enumerate(results[:3], 1):
    preview = result.text[:80].replace('\\n', ' ')
    print(f"  {i}. [score={result.score:.3f}] {preview}...")

# Expected: Alpha value, top 3 results with score + 80-char preview

## 4. Reranking with Cross-Encoder

**Stage 3 of 7:** Refining initial results with deeper semantic scoring.

**Why Rerank?**
- Initial retrieval uses **bi-encoders** (fast, independent embeddings)
- Reranking uses **cross-encoders** (slower, joint query-document encoding)
- Cross-encoders achieve 10-20% better relevance at cost of 50-100ms latency

**Model:** `cross-encoder/ms-marco-MiniLM-L-6-v2`
- Trained on MS MARCO passage ranking
- Optimized for query-document relevance scoring
- Compact (6 layers) for production speed

**Process:**
1. Take top-K initial results (e.g., K=5)
2. Score each with cross-encoder
3. Re-sort by rerank_score (preserve original_score)
4. Return top-N (e.g., N=3)

In [None]:
# Reranking Demo
from src.m1_4_query_pipeline.module import Reranker

# Initialize reranker
reranker = Reranker()

# Use results from previous retrieval
print(f"🔄 Reranking {len(results)} initial results...")

# Rerank
reranked = reranker.rerank(
    query=sample_query,
    results=results,
    top_k=3
)

print(f"\\n🏆 Top 3 after reranking:")
for i, result in enumerate(reranked[:3], 1):
    orig_score = result.original_score or 0
    rerank_score = result.rerank_score or 0
    diff = rerank_score - orig_score
    preview = result.text[:60].replace('\\n', ' ')
    print(f"  {i}. [rerank={rerank_score:.3f}, orig={orig_score:.3f}, Δ={diff:+.3f}]")
    print(f"     {preview}...")

# Expected: Top 3 with rerank_score, original_score, and difference

## 5. Context Preparation (Dedup, Sources, Limits)

**Stage 4 of 7:** Building clean, attributed context for the LLM.

**Three Critical Steps:**
1. **Deduplication:** Remove redundant chunks (same content from multiple sources)
2. **Source Attribution:** Tag each chunk with [Source N: source_name] for transparency
3. **Length Guard:** Enforce max context length (e.g., 4000 chars) to fit token budget

**Why It Matters:**
- **Dedup:** Prevents wasting tokens on repeated information
- **Sources:** Builds user trust; enables citation verification
- **Length limits:** Ensures prompts fit within model context window

**Token Budget Calculation:**
- Model window: 8K tokens (GPT-4o-mini)
- Reserve for response: 1K tokens
- System prompt: ~200 tokens
- Available for context: ~6.8K tokens (≈4000 characters)

In [None]:
# Context Preparation Demo
from src.m1_4_query_pipeline.module import ContextBuilder

# Initialize builder
builder = ContextBuilder(max_length=4000)

# Build context with scores
context_data = builder.context_with_scores(reranked)

print(f"📦 Context Statistics:")
print(f"  • Chunks: {context_data['num_chunks']}")
print(f"  • Avg Score: {context_data['avg_score']:.3f}")
print(f"  • Unique Sources: {context_data['unique_sources']}")
print(f"  • Context Length: {len(context_data['context'])} chars")

print(f"\\n📚 Sources:")
for i, source in enumerate(context_data['sources'][:3], 1):
    print(f"  {i}. {source}")

# Expected: num_chunks, avg_score, unique_sources count, sources list

## 6. Prompt Engineering per Query Type

**Stage 5 of 7:** Crafting query-type specific prompts for optimal responses.

**Six Template Types:**
1. **Factual:** "Provide factual answers based strictly on context"
2. **How-to:** "Provide clear, step-by-step instructions"
3. **Comparison:** "Compare and contrast options"
4. **Definition:** "Provide clear definitions"
5. **Troubleshooting:** "Help diagnose and solve problems"
6. **Opinion:** "Provide balanced perspectives"

**Template Structure:**
- **System prompt:** Sets behavior and constraints
- **User prompt:** Injects context + query with task-specific framing

**Example (How-to):**
- System: "You are a helpful AI assistant. Provide clear, step-by-step instructions based on the given context."
- User: "Context: [retrieved chunks]\n\nQuestion: [query]\n\nProvide step-by-step instructions based on the context above."

In [None]:
# Prompt Engineering Demo
from src.m1_4_query_pipeline.module import PromptBuilder

# Initialize builder
prompt_builder = PromptBuilder()

# Build prompt for HOW_TO query type
prompt = prompt_builder.build_prompt(
    query=sample_query,
    context=context_data['context'],
    query_type=QueryType.HOW_TO
)

print(f"🎯 Query Type: {QueryType.HOW_TO.value}")
print(f"\\n📝 System Prompt (first 100 chars):")
print(f"  {prompt['system'][:100]}...")

print(f"\\n📝 User Prompt (first 200 chars):")
print(f"  {prompt['user'][:200]}...")

# Expected: System prompt preview + User prompt preview (≤200 chars each)

## 7. Response Generation (Non-Streaming & Streaming)

**Stage 6 of 7:** Generating the final answer using the LLM.

**Two Generation Modes:**

**1. Non-Streaming (Batch):**
- Wait for complete response before returning
- Simpler error handling
- Good for analytics, logging, caching
- User waits 1-3 seconds for full answer

**2. Streaming:**
- Token-by-token delivery as generated
- Better perceived latency (first token in ~300ms)
- Enables real-time UI updates
- Requires stream-aware error handling

**Configuration:**
- Model: `gpt-4o-mini` (fast, cost-effective)
- Temperature: 0.1 (low for factual consistency)
- Max tokens: 500 (constrains response length)

In [None]:
# Response Generation Demo
from src.m1_4_query_pipeline.module import ResponseGenerator

# Initialize generator
generator = ResponseGenerator(openai_client)

# 1. Non-streaming generation
print("🤖 Non-Streaming Generation:")
if openai_client:
    answer = generator.generate(prompt, temperature=0.1, max_tokens=200)
    print(f"  {answer[:150]}..." if len(answer) > 150 else f"  {answer}")
else:
    print("  ⚠️ Skipping API calls (no keys found)")

# 2. Streaming generation (first few tokens)
print("\\n🌊 Streaming Generation (sample):")
if openai_client:
    print("  ", end="", flush=True)
    token_count = 0
    for chunk in generator.stream(prompt, temperature=0.1, max_tokens=200):
        print(chunk, end="", flush=True)
        token_count += 1
        if token_count >= 20:  # Limit to ~20 tokens for demo
            print("...")
            break
    print()
else:
    print("  ⚠️ Skipping streaming (no keys found)")

# Expected: Non-stream answer (~150 chars) + streaming first ~20 tokens

## 8. Complete Pipeline Run + Timings & Metadata

**Stage 7 of 7:** End-to-end integration with metrics tracking.

**Full Pipeline Flow:**
1. Query Understanding → Classification, expansion, keywords
2. Retrieval → Hybrid search with auto-tuned alpha
3. Reranking → Cross-encoder refinement
4. Context Building → Dedup, sources, length limits
5. Prompt Engineering → Query-type specific templates
6. Generation → LLM response
7. Metadata Collection → Timings, sources, scores

**Key Metrics:**
- **retrieval_time:** Embedding + Pinecone query (50-150ms)
- **rerank_time:** Cross-encoder scoring (50-100ms)
- **generation_time:** LLM response (500-2000ms)
- **total_time:** End-to-end latency (600-2300ms)

**Metadata:**
- chunks_retrieved, avg_score, sources list
- Query type and keywords for analytics

In [None]:
# Complete Pipeline Demo
from src.m1_4_query_pipeline.module import ProductionRAG

# Initialize production RAG
rag = ProductionRAG(
    openai_client=openai_client,
    pinecone_client=pinecone_client,
    use_expansion=False,
    use_reranking=True
)

# Run complete pipeline
test_query = "How can I reduce RAG latency?"
print(f"🔍 Running complete pipeline for: '{test_query}'\\n")

result = rag.query(
    query=test_query,
    top_k=5,
    rerank_top_k=3,
    namespace="demo",
    temperature=0.1
)

# Display results
print(f"📊 Pipeline Results:")
print(f"  • Query Type: {result['query_type']}")
print(f"  • Chunks Retrieved: {result['chunks_retrieved']}")
print(f"  • Avg Score: {result['avg_score']:.3f}")
print(f"\\n⏱️ Timings:")
print(f"  • Retrieval: {result['retrieval_time']}s")
print(f"  • Generation: {result['generation_time']}s")
print(f"  • Total: {result['total_time']}s")
print(f"\\n📚 Sources: {', '.join(result['sources'][:3])}")

# Expected: chunks_retrieved, timings, sources comma-separated

## 9. Common Failures, Fallbacks, Decision Card

### Five Common Production Failures

**1. Empty Retrieval Results**
- **Cause:** Query too specific, no indexed content matches
- **Fallback:** Return "No relevant information found" + suggest query refinement
- **Prevention:** Monitor retrieval hit rates; add default content

**2. API Timeout (Pinecone/OpenAI)**
- **Cause:** Network issues, service degradation, rate limits
- **Fallback:** Retry with exponential backoff (3 attempts); return cached response if available
- **Prevention:** Set reasonable timeouts (2s retrieval, 10s generation); implement circuit breakers

**3. Context Overflow**
- **Cause:** Retrieved chunks exceed token budget
- **Fallback:** Truncate context to max_length; prioritize highest-scoring chunks
- **Prevention:** Smart chunking (512 tokens/chunk); enforce strict length limits

**4. LLM Hallucination Despite Context**
- **Cause:** Model extrapolates beyond provided information
- **Fallback:** Add "based on context only" constraint in system prompt; log for review
- **Prevention:** Lower temperature (0.1); use stricter prompts; post-filter responses

**5. Reranker Model Load Failure**
- **Cause:** Model file missing, memory limits, corrupted weights
- **Fallback:** Skip reranking; return initial retrieval results
- **Prevention:** Pre-load models at startup; monitor memory usage; graceful degradation

---

### Decision Card: When to Use RAG Pipelines

**✅ Use RAG When:**
- Content changes frequently (docs, FAQs, knowledge bases)
- Need source attribution for compliance/trust
- Query diversity is high (can't pre-generate all answers)
- Domain knowledge exceeds LLM training cutoff
- Handling 100+ documents with evolving content

**❌ Don't Use RAG When:**
- Answers are static and finite (use pre-generated cache)
- Real-time latency critical (<100ms requirement)
- Content fits in single prompt (<4K tokens)
- Queries are highly repetitive (use lookup table)
- No infrastructure for vector DB + embeddings

**💰 Cost Considerations:**
- Embedding costs: ~$0.0001 per 1K tokens
- Pinecone: ~$70/month (starter tier, 100K vectors)
- LLM generation: ~$0.0005 per query (GPT-4o-mini)
- Reranker: Free (local cross-encoder)

**⚖️ Latency Budget:**
- No reranking: ~200-300ms
- With reranking: ~300-400ms
- If acceptable for your use case, RAG is viable

---

**Link to Previous Module:**  
See **M1.3 — Indexing & Retrieval Strategies** for vector DB setup and hybrid search foundations.