# Week 01 · Advanced RAG with Services

**Objective**: Build RAG, CAG, and CRAG using service layer.

**Architecture**: Uses `RAGService`, `CAGService`, `CRAGService` from `context_engineering.application.chat_service`

**Provider Support**: Uses OpenRouter unified API for multi-provider LLM access (GPT-4o, Claude, Gemini, etc.) or direct OpenAI

In [None]:
#  Setup & Installations
import sys

if "google.colab" in sys.modules or True:
    print(" Installing required packages...")
    %pip install -q langchain-core>=0.1.0 langchain-openai>=0.0.5 langchain-community>=0.0.20 chromadb>=0.4.0 python-dotenv>=1.0.0

print(" Packages ready")

In [None]:
#  Imports & Environment Setup
import os
import sys
import time
from pathlib import Path
from dotenv import load_dotenv

# Add project root to path
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root / "src"))

# Load environment
load_dotenv(project_root / ".env")

# Check for API key (OpenRouter preferred, OpenAI as fallback)
openrouter_key = os.getenv("OPENROUTER_API_KEY")
openai_key = os.getenv("OPENAI_API_KEY")

if not openrouter_key and not openai_key:
    raise EnvironmentError(
        "   No API key found!\n"
        "   Add OPENROUTER_API_KEY (recommended) or OPENAI_API_KEY to .env"
    )

# Load configuration
from context_engineering.config import (
    VECTOR_DIR, CACHE_DIR, TOP_K_RESULTS,
    CHAT_MODEL, EMBEDDING_MODEL, PROVIDER
)

provider = "OpenRouter" if openrouter_key else "OpenAI"
print(" Environment loaded")
print(f" Provider: {provider}")
print(f" Project root: {project_root}")

## Import Chat Services

Using RAG/CAG/CRAG services from application layer (NOT defined here!)

In [None]:
#  Import Chat Services
from context_engineering.application.chat_service import (
    RAGService,
    CAGService,
    CRAGService,
    CAGCache
)

print(" Chat services loaded from service layer")
print(" Location: context_engineering.application.chat_service")
print("\n Available services:")
print("   1. RAGService   - Standard RAG with modern LCEL")
print("   2. CAGService   - Cache-Augmented Generation (semantic)")
print("   3. CRAGService  - Corrective RAG with confidence scoring")
print("   4. CAGCache     - Semantic cache (FAQs + History)")

In [None]:
#  Connect to Vector Store & Initialize LLM
from langchain_community.vectorstores import Chroma
from context_engineering.infrastructure.llm_providers import (
    get_default_embeddings,
    get_chat_llm
)

# Initialize using service factories (supports OpenRouter + multi-provider)
embeddings = get_default_embeddings()
llm = get_chat_llm(temperature=0)

print(f" LLM initialized: {CHAT_MODEL}")
print(f" Embeddings initialized: {EMBEDDING_MODEL}")
print(f" Provider: {PROVIDER}")

# Connect to vector store
if not VECTOR_DIR.exists():
    raise FileNotFoundError(f" Run 02_chunk_and_embed.ipynb first")

vectorstore = Chroma(
    persist_directory=str(VECTOR_DIR),
    embedding_function=embeddings,
    collection_name="nawaloka"
)

# Create retriever
retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={"k": TOP_K_RESULTS}
)

print(f" Connected to vector store")
print(f" Collection size: {vectorstore._collection.count()} vectors")

---
# 1️ Standard RAG with Service

In [None]:
#  Initialize RAG Service
rag_service = RAGService(
    retriever=retriever,
    llm=llm,
    k=TOP_K_RESULTS
)

print(" RAGService initialized")
print(f" Retrieval: top-{TOP_K_RESULTS} documents")

In [None]:
#  Generate Answer with RAG Service
USER_QUERY = "Tell me about Nawaloka's cardiology services and health check packages."

print(f" Query: {USER_QUERY}\n")
print("=" * 80)
print("GENERATING ANSWER WITH RAG SERVICE...")
print("=" * 80)

result = rag_service.generate(USER_QUERY)

print(f"\n  Generation time: {result['generation_time']:.2f}s")
print(f" Documents used: {result['num_docs']}")
print("\n" + "=" * 80)
print("ANSWER")
print("=" * 80)
print(result['answer'])
print("\n" + "=" * 80)
print("EVIDENCE URLS")
print("=" * 80)
for url in result['evidence_urls']:
    print(f"  - {url}")

---
# 2️ Cache-Augmented Generation (CAG) with Semantic Matching

CAG uses lightweight semantic similarity to cache responses:
- **FAQs**: Static questions, never expire
- **History**: User queries, 24-hour TTL
- **Matching**: Cosine similarity (catches paraphrases!)

In [None]:
#  Initialize CAG Service with Semantic Cache
CACHE_DIR.mkdir(parents=True, exist_ok=True)

# Create semantic cache (embedder required for similarity matching)
cache = CAGCache(
    cache_dir=CACHE_DIR,
    embedder=embeddings,  # Uses same embeddings as vector store
    similarity_threshold=0.90,  # Catches paraphrased questions
    history_ttl_hours=24  # History expires after 24 hours
)

# Create CAG service
cag_service = CAGService(
    rag_service=rag_service,
    cache=cache
)

print(" CAGService initialized (semantic matching)")
print(f" Cache directory: {CACHE_DIR}")
stats = cache.stats()
print(f" Cached responses: {stats['total_cached']}")
print(f" Similarity threshold: {stats['similarity_threshold']}")
print(f" History TTL: {stats['history_ttl_hours']}h")

###  Load Known FAQs

FAQs are defined in `config/faqs.yaml` and can be:
1. **Loaded** into cache (registers questions)
2. **Warmed** (generates responses via RAG)

Once warmed, FAQs provide **instant responses** for common questions!

In [None]:
#  Load Known FAQs from Config (Optional)
# FAQs are defined in config/faqs.yaml

from context_engineering.config import KNOWN_FAQS

print(f" Found {len(KNOWN_FAQS)} FAQs in config/faqs.yaml")
print("\n Sample FAQs:")
for faq in KNOWN_FAQS[:5]:
    print(f"   - {faq}")
print(f"   ... and {len(KNOWN_FAQS) - 5} more\n")

# Load FAQs into cache (this just registers them, doesn't generate responses yet)
loaded = cag_service.load_faqs(KNOWN_FAQS)
print(f" Loaded {loaded} new FAQs into cache")

# To warm FAQs (generate responses), uncomment:
# print("\n Warming FAQs (this may take a few minutes)...")
# cag_service.warm_faqs()

print("\n Tip: Run cag_service.warm_faqs() to pre-generate FAQ responses")

In [None]:
#  Initialize CRAG Service
from context_engineering.config import CRAG_EXPANDED_K

crag_service = CRAGService(
    retriever=retriever,
    llm=llm,
    initial_k=TOP_K_RESULTS,
    expanded_k=CRAG_EXPANDED_K
)

print(" CRAGService initialized")
print(f" Initial retrieval: top-{TOP_K_RESULTS}")
print(f" Corrective retrieval: top-{CRAG_EXPANDED_K}")


---
#  Interactive Inference: Ask Your Own Question

 **IMPORTANT**: Run all cells above first to initialize all services!

Run the cell below to ask your own question and get answers from **all 3 RAG systems** side-by-side!


In [None]:
#  Interactive Inference - Ask Your Own Question

# INITIALIZE MISSING SERVICES (fallback if cells weren't run)
if 'rag_service' not in dir():
    rag_service = RAGService(retriever=retriever, llm=llm, k=TOP_K_RESULTS)
if 'cag_service' not in dir():
    cache = CAGCache(cache_dir=CACHE_DIR, embedder=embeddings, similarity_threshold=0.90)
    cag_service = CAGService(rag_service=rag_service, cache=cache)
if 'crag_service' not in dir():
    from context_engineering.config import CRAG_EXPANDED_K
    crag_service = CRAGService(retriever=retriever, llm=llm, initial_k=TOP_K_RESULTS, expanded_k=CRAG_EXPANDED_K)

print("=" * 80)
print(" INTERACTIVE RAG INFERENCE")
print("=" * 80)
print("\n Ask your question about Nawaloka Hospital...\n")

# Input your question here
YOUR_QUESTION = input(" Your question: ")

print(f"\n Processing query: '{YOUR_QUESTION}'\n")
print("=" * 80)

# Run through all 3 RAG systems
results = {}

# 1. Standard RAG
print("\n1  Standard RAG")
print("-" * 80)
start = time.time()
rag_result = rag_service.generate(YOUR_QUESTION)
results['RAG'] = {
    'answer': rag_result.get('answer', 'N/A'),
    'time': rag_result.get('generation_time', rag_result.get('time', 0)),
    'docs': rag_result.get('num_docs', len(rag_result.get('evidence_urls', []))),
    'urls': rag_result.get('evidence_urls', [])
}
print(f" Completed in {results['RAG']['time']:.2f}s")
print(f" Documents retrieved: {results['RAG']['docs']}")
print(f"\n Answer:")
print(results['RAG']['answer'])
print(f"\n Evidence URLs:")
for url in results['RAG']['urls'][:3]:
    print(f"   • {url}")

# 2. Cache-Augmented Generation
print("\n" + "=" * 80)
print("\n2  Cache-Augmented Generation (CAG)")
print("-" * 80)
cag_result = cag_service.generate(YOUR_QUESTION, use_cache=True, verbose=False)
results['CAG'] = {
    'answer': cag_result.get('answer', 'N/A'),
    'time': cag_result.get('generation_time', cag_result.get('time', 0)),
    'docs': cag_result.get('num_docs', cag_result.get('docs_used', len(cag_result.get('evidence_urls', [])))),
    'cache_hit': cag_result.get('cache_hit', False),
    'urls': cag_result.get('evidence_urls', [])
}
print(f" Completed in {results['CAG']['time']:.2f}s")
print(f" Cache hit: {results['CAG']['cache_hit']}")
print(f" Documents retrieved: {results['CAG']['docs']}")
print(f"\n Answer:")
print(results['CAG']['answer'])
print(f"\n Evidence URLs:")
for url in results['CAG']['urls'][:3]:
    print(f"   • {url}")

# 3. Corrective RAG
print("\n" + "=" * 80)
print("\n3  Corrective RAG (CRAG)")
print("-" * 80)
crag_result = crag_service.generate(YOUR_QUESTION, confidence_threshold=0.6, verbose=False)
results['CRAG'] = {
    'answer': crag_result.get('answer', 'N/A'),
    'time': crag_result.get('generation_time', crag_result.get('time', 0)),
    'docs': crag_result.get('docs_used', crag_result.get('num_docs', len(crag_result.get('evidence_urls', [])))),
    'confidence': crag_result.get('confidence_final', crag_result.get('confidence', 0.0)),
    'corrected': crag_result.get('correction_applied', False),
    'urls': crag_result.get('evidence_urls', [])
}
print(f" Completed in {results['CRAG']['time']:.2f}s")
print(f" Confidence: {results['CRAG']['confidence']:.2f}")
print(f" Correction applied: {results['CRAG']['corrected']}")
print(f" Documents used: {results['CRAG']['docs']}")
print(f"\n Answer:")
print(results['CRAG']['answer'])
print(f"\n Evidence URLs:")
for url in results['CRAG']['urls'][:3]:
    print(f"   • {url}")

print("\n" + "=" * 80)
print(" PERFORMANCE COMPARISON")
print("=" * 80)
print(f"\n{'System':<15} {'Time (s)':<12} {'Docs':<8} {'Special Feature':<30}")
print("-" * 80)
print(f"{'Standard RAG':<15} {results['RAG']['time']:<12.2f} {results['RAG']['docs']:<8} {'Baseline':<30}")

# CAG feature
cag_feature = 'Cache: ' + ('HIT ⚡' if results['CAG']['cache_hit'] else 'MISS')
print(f"{'CAG':<15} {results['CAG']['time']:<12.2f} {results['CAG']['docs']:<8} {cag_feature:<30}")

# CRAG feature
crag_conf = results['CRAG']['confidence']
crag_emoji = ' ✅' if crag_conf > 0.7 else ' ⚠️'
crag_feature = f"Confidence: {crag_conf:.2f}{crag_emoji}"
print(f"{'CRAG':<15} {results['CRAG']['time']:<12.2f} {results['CRAG']['docs']:<8} {crag_feature:<30}")

# Summary recommendation
print("=" * 80)
print(" RECOMMENDATION")
print("=" * 80)

fastest = min(results.items(), key=lambda x: x[1]['time'])
print(f" Fastest: {fastest[0]} ({fastest[1]['time']:.2f}s)")

if results['CAG']['cache_hit']:
    print(" Best Choice: CAG (cache hit = instant response)")
elif results['CRAG']['confidence'] > 0.7:
    print(" Best Choice: CRAG (high confidence + corrective capability)")
else:
    print(" Best Choice: Standard RAG (reliable baseline)")

print("\n Inference complete!")
print("=" * 80)


In [None]:
#  Test CAG Performance
print("=" * 80)
print("CAG PERFORMANCE TEST")
print("=" * 80)

test_queries = [
    "What are the visiting hours at Nawaloka?",
    "How do I contact Nawaloka Hospital?",
    "What services does Nawaloka provide?"
]

# First run: populate cache
print("\n1  FIRST RUN (Populating cache)...\n")
for query in test_queries:
    result = cag_service.generate(query, use_cache=True, verbose=False)
    print(f"   Query: {query[:50]}...")
    print(f"   Time: {result['generation_time']:.2f}s | Cache: {result['cache_hit']}")
    print()

# Second run: cache hits
print("\n2  SECOND RUN (Using cache)...\n")
for query in test_queries:
    result = cag_service.generate(query, use_cache=True, verbose=False)
    print(f"   Query: {query[:50]}...")
    print(f"   Time: {result['generation_time']:.2f}s | Cache: {result['cache_hit']}")
    if result['cache_hit']:
        print(f"    INSTANT response from cache!")
    print()

print("\n Cache Statistics:")
stats = cache.stats()
print(f"   Total cached: {stats['total_cached']}")
print(f"   Cache size: {stats['cache_size_kb']:.2f} KB")

---
#  Test CRAG with Different Query Types

In [None]:
#  Test CRAG with Different Query Types
print("=" * 80)
print("CORRECTIVE RAG (CRAG) TEST")
print("=" * 80)

test_cases = [
    {
        'query': "cardiology",
        'label': "Vague query (should trigger correction)"
    },
    {
        'query': "What are Nawaloka Hospital's cardiology services and facilities?",
        'label': "Specific query (should be confident)"
    }
]

for i, test in enumerate(test_cases, 1):
    print(f"\nTest {i}/{len(test_cases)}: {test['label']}")
    print("-" * 80)
    
    result = crag_service.generate(test['query'], confidence_threshold=0.6)
    
    print(f"\n Result:")
    print(f"   Initial confidence: {result['confidence_initial']:.2f}")
    print(f"   Final confidence: {result['confidence_final']:.2f}")
    print(f"   Correction applied: {result['correction_applied']}")
    print(f"   Documents used: {result['docs_used']}")
    print(f"   Generation time: {result['generation_time']:.2f}s")
    print("\n" + "-" * 80)

In [None]:
#  Check All Services Ready
print(" Checking if all services are initialized...\n")

services_ready = True

# Check RAGService
try:
    rag_service
    print(" RAGService: Ready")
except NameError:
    print(" RAGService: NOT initialized")
    services_ready = False

# Check CAGService
try:
    cag_service
    print(" CAGService: Ready")
except NameError:
    print(" CAGService: NOT initialized")
    services_ready = False

# Check CRAGService
try:
    crag_service
    print(" CRAGService: Ready")
except NameError:
    print(" CRAGService: NOT initialized")
    services_ready = False

# Check Vector Store
try:
    vectorstore
    print(" Vector Store: Connected")
except NameError:
    print(" Vector Store: NOT connected")
    services_ready = False

print("\n" + "=" * 80)
if services_ready:
    print(" All services ready! You can run the remaining cells.")
else:
    print("  Some services are missing. Please run all cells above first.")
print("=" * 80)

---
#  Comprehensive Comparison: RAG vs CAG vs CRAG

In [None]:
#  Comprehensive Comparison
import pandas as pd

print("=" * 80)
print(" RAG vs CAG vs CRAG COMPARISON")
print("=" * 80)

# comparison_query = "What cardiology services are available at Nawaloka?"
comparison_query = "What cardiology services are available at Nawaloka?"

# Test 1: Standard RAG
print(f"\n1  Standard RAG...")
rag_result = rag_service.generate(comparison_query)
print(f"   ⏱  Time: {rag_result['generation_time']:.2f}s")

# Test 2: CAG (should use cache on second run)
print(f"\n2  Cache-Augmented Generation (CAG)...")
cag_result = cag_service.generate(comparison_query, use_cache=True, verbose=False)
print(f"     Time: {cag_result['generation_time']:.2f}s")
print(f"    Cache hit: {cag_result['cache_hit']}")

# Test 3: CRAG
print(f"\n3  Corrective RAG (CRAG)...")
crag_result = crag_service.generate(comparison_query, confidence_threshold=0.6, verbose=False)
print(f"     Time: {crag_result['generation_time']:.2f}s")
print(f"    Correction: {crag_result['correction_applied']}")

# Create comparison table
comparison_data = [
    {
        'Technique': 'Standard RAG',
        'Latency (s)': f"{rag_result['generation_time']:.2f}",
        'Docs Retrieved': rag_result['num_docs'],
        'Cache Used': 'No',
        'Self-Correcting': 'No',
        'Best For': 'General queries'
    },
    {
        'Technique': 'CAG',
        'Latency (s)': f"{cag_result['generation_time']:.2f}",
        'Docs Retrieved': 'Cached' if cag_result['cache_hit'] else rag_result['num_docs'],
        'Cache Used': 'Yes' if cag_result['cache_hit'] else 'No',
        'Self-Correcting': 'No',
        'Best For': 'Frequent queries'
    },
    {
        'Technique': 'CRAG',
        'Latency (s)': f"{crag_result['generation_time']:.2f}",
        'Docs Retrieved': crag_result['docs_used'],
        'Cache Used': 'No',
        'Self-Correcting': 'Yes',
        'Best For': 'Complex/uncertain queries'
    }
]

df = pd.DataFrame(comparison_data)

print("\n" + "=" * 80)
print("COMPARISON TABLE")
print("=" * 80)
print(df.to_string(index=False))

print("\n" + "=" * 80)
print("KEY INSIGHTS")
print("=" * 80)
print(" RAG: Baseline - reliable for general queries")
print(" CAG: Fastest when cache hits - ideal for FAQs")
print(" CRAG: Most accurate - self-corrects weak evidence")
print(" HYBRID: Combine all three for production!")
print("\n" + "=" * 80)

---
#  Summary

All three RAG techniques implemented using service layer:
-  **RAGService**: Standard RAG with modern LCEL
-  **CAGService**: Cache-augmented generation with semantic matching
-  **CRAGService**: Corrective RAG with confidence scoring

**CAG Semantic Caching**:
-  **Catches paraphrases**: "What are visiting hours?" matches "Tell me the visiting hours"
-  **Two-tier**: Static FAQs (never expire) + Dynamic History (24h TTL)
-  **Lightweight**: Only new queries need embedding, cached ones use stored embeddings
-  **Fast lookup**: Cosine similarity is just a dot product (~1ms for 1000 entries)

**Benefits of service-based architecture**:
-  Reusable across notebooks and production code
-  Easily testable
-  Well-documented
-  Maintainable (logic in one place)

**OpenRouter Multi-Provider Support**:
-  One API key → access to OpenAI, Anthropic, Google, Meta, DeepSeek
-  Configure models in `config/models.yaml`
-  Switch providers without code changes