# LangChain RAG Verification with i2i

This notebook demonstrates how to use **i2i** (eye-to-eye) multi-model consensus to verify RAG pipeline outputs and detect hallucinations.

## The Problem

RAG (Retrieval-Augmented Generation) pipelines can still hallucinate:
- Model may ignore retrieved context and confabulate
- Retrieval may miss relevant documents
- Model may misinterpret or misquote sources

## The Solution

Use **i2i multi-model consensus** as a verification layer:
- HIGH consensus on factual claims ‚Üí 97-100% accuracy
- LOW/NONE consensus ‚Üí likely hallucination, needs review
- Task-aware routing ‚Üí skip consensus for math (where it hurts!)

## What You'll Learn

1. Build a basic RAG pipeline with LangChain + ChromaDB
2. See hallucinations in action (confident but wrong answers)
3. Add i2i verification to flag unreliable answers
4. Use task-aware consensus (factual vs math vs creative)
5. Production patterns for threshold configuration

## 1. Setup

Install required packages and configure API keys.

In [None]:
# Install dependencies (run once)
# !pip install i2i-mcip langchain langchain-openai langchain-community chromadb wikipedia

In [None]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Verify API keys are configured
# i2i needs at least 2 providers for consensus
providers_configured = []
if os.getenv("OPENAI_API_KEY"):
    providers_configured.append("OpenAI")
if os.getenv("ANTHROPIC_API_KEY"):
    providers_configured.append("Anthropic")
if os.getenv("GROQ_API_KEY"):
    providers_configured.append("Groq")
if os.getenv("GOOGLE_API_KEY"):
    providers_configured.append("Google")

print(f"Configured providers: {providers_configured}")
if len(providers_configured) < 2:
    print("\n‚ö†Ô∏è  Warning: i2i consensus requires at least 2 providers.")
    print("Set API keys for at least 2 of: OPENAI_API_KEY, ANTHROPIC_API_KEY, GROQ_API_KEY, GOOGLE_API_KEY")

## 2. Build Basic RAG Pipeline

We'll create a simple RAG pipeline with:
- **ChromaDB**: Vector store for document embeddings
- **LangChain**: Orchestration framework
- **Sample documents**: Wikipedia articles about historical events

In [None]:
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# Sample documents (simulating retrieved Wikipedia content)
# These contain accurate historical facts
SAMPLE_DOCUMENTS = [
    """The French Revolution began in 1789 and ended in 1799. It was a period of radical political 
    and societal change in France. The revolution began with the convocation of the Estates General 
    in May 1789. The Bastille was stormed on July 14, 1789, which became a symbol of the revolution. 
    King Louis XVI was executed by guillotine on January 21, 1793.""",
    
    """Albert Einstein developed the theory of special relativity in 1905 and general relativity in 1915. 
    He was awarded the Nobel Prize in Physics in 1921 for his discovery of the photoelectric effect, 
    not for relativity. Einstein was born in Ulm, Germany on March 14, 1879, and died in Princeton, 
    New Jersey on April 18, 1955.""",
    
    """The Great Wall of China is approximately 21,196 kilometers (13,171 miles) long, including 
    all of its branches. Construction began in the 7th century BC and continued for over two millennia. 
    The wall is NOT visible from space with the naked eye - this is a common myth. The most famous 
    sections were built during the Ming Dynasty (1368-1644).""",
    
    """Python was created by Guido van Rossum and first released in 1991. The language was named 
    after Monty Python's Flying Circus, not the snake. Python 2.0 was released in 2000 and 
    Python 3.0 in 2008. Python 2 reached end-of-life on January 1, 2020.""",
    
    """The moon's average distance from Earth is about 384,400 kilometers (238,855 miles). 
    A light beam takes approximately 1.28 seconds to travel from Earth to the Moon. The moon 
    is slowly moving away from Earth at a rate of about 3.8 centimeters per year."""
]

print(f"Loaded {len(SAMPLE_DOCUMENTS)} sample documents")

In [None]:
# Create vector store
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
splits = text_splitter.create_documents(SAMPLE_DOCUMENTS)

# Initialize embeddings and vector store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=embeddings,
    collection_name="rag_demo"
)

# Create retriever
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})

print(f"Vector store created with {len(splits)} chunks")

In [None]:
# Build RAG chain
template = """Answer the question based only on the following context:

{context}

Question: {question}

Provide a direct, confident answer. If you're not sure, still give your best answer."""

prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

print("RAG chain ready")

In [None]:
# Test the RAG chain with a simple question
response = rag_chain.invoke("When was the Bastille stormed?")
print(f"Question: When was the Bastille stormed?")
print(f"Answer: {response}")

## 3. The Hallucination Problem

RAG pipelines can still produce incorrect answers:
- Questions outside the knowledge base
- Misinterpretation of context
- Confident confabulation

Let's see some examples where the model gives confident but WRONG answers.

In [None]:
# Questions designed to trigger hallucinations
HALLUCINATION_QUESTIONS = [
    # Outside knowledge base - model may confabulate
    "What year did Einstein fail his math exam in school?",
    
    # Contradicts facts in context (Great Wall myth)
    "Can you see the Great Wall of China from space?",
    
    # Not in context - might confabulate
    "Who was the architect of the Eiffel Tower?",
    
    # Partially related but wrong inference likely
    "What animal is Python named after?",
    
    # Math question embedded in factual context
    "If light takes 1.28 seconds to reach the moon, how long for a round trip?"
]

print("Testing questions that may cause hallucinations...\n")
print("="*60)

for q in HALLUCINATION_QUESTIONS:
    response = rag_chain.invoke(q)
    print(f"\nQ: {q}")
    print(f"A: {response}")
    print("-"*60)

### Analysis

Notice how the model:
1. **Einstein's math exam**: May confidently claim he failed (common myth, actually FALSE)
2. **Great Wall from space**: Our context explicitly says this is a myth, but model might still say yes
3. **Eiffel Tower architect**: Not in our docs - model may confabulate an answer
4. **Python naming**: Context says Monty Python, but model might say snake
5. **Math question**: Simple calculation, but consensus would hurt here!

**The problem**: The model gives confident answers with no indication of reliability.

## 4. Add i2i Verification

Now let's add i2i multi-model consensus to verify RAG outputs.

Key insight from evaluation (400 questions, 4 models):
- **HIGH consensus (‚â•85% agreement)**: 97-100% accuracy
- **LOW/NONE consensus**: Likely hallucination
- **Consensus on math/reasoning**: -35% accuracy (DON'T use it!)

In [None]:
import asyncio
from i2i import (
    AICP,
    ConsensusLevel,
    recommend_consensus,
    is_consensus_appropriate,
    get_confidence_calibration,
)

# Initialize i2i protocol
protocol = AICP()

print("Available providers:", protocol.list_configured_providers())
print("Available models:", protocol.list_available_models())

In [None]:
class I2IVerifiedRAG:
    """
    RAG chain wrapper with i2i verification.
    
    Adds multi-model consensus verification to RAG outputs,
    with task-aware routing to avoid hurting math/reasoning.
    """
    
    def __init__(self, rag_chain, protocol: AICP, confidence_threshold: float = 0.7):
        self.rag_chain = rag_chain
        self.protocol = protocol
        self.confidence_threshold = confidence_threshold
    
    async def query(self, question: str) -> dict:
        """
        Query the RAG chain with i2i verification.
        
        Returns:
            dict with:
                - answer: The RAG response
                - verified: Whether answer passed consensus check
                - confidence: Calibrated confidence score
                - consensus_level: HIGH/MEDIUM/LOW/NONE
                - task_appropriate: Whether consensus was appropriate for this task
                - warning: Any warnings about the answer
        """
        # Step 1: Get RAG response
        rag_answer = self.rag_chain.invoke(question)
        
        # Step 2: Check if consensus is appropriate for this task
        recommendation = recommend_consensus(question)
        
        if not recommendation.should_use_consensus:
            # For math/reasoning/creative, skip consensus
            return {
                "answer": rag_answer,
                "verified": None,  # Not applicable
                "confidence": None,
                "consensus_level": None,
                "task_appropriate": False,
                "task_category": recommendation.task_category.value,
                "warning": f"Consensus skipped: {recommendation.reason}",
                "suggested_approach": recommendation.suggested_approach
            }
        
        # Step 3: Verify with multi-model consensus
        verification_query = f"""Verify this answer is factually correct:

Question: {question}
Answer: {rag_answer}

Is this answer accurate? Respond with TRUE or FALSE and brief reasoning."""
        
        result = await self.protocol.consensus_query(
            verification_query,
            task_category="verification"
        )
        
        # Step 4: Calculate calibrated confidence
        confidence = get_confidence_calibration(result.consensus_level.value)
        verified = confidence >= self.confidence_threshold
        
        warning = None
        if result.consensus_level in [ConsensusLevel.LOW, ConsensusLevel.NONE]:
            warning = "‚ö†Ô∏è LOW CONFIDENCE: Models disagree. Answer may be unreliable."
        elif result.consensus_level == ConsensusLevel.CONTRADICTORY:
            warning = "‚ö†Ô∏è CONTRADICTION: Models actively disagree. Do not trust this answer."
        
        return {
            "answer": rag_answer,
            "verified": verified,
            "confidence": confidence,
            "consensus_level": result.consensus_level.value,
            "task_appropriate": True,
            "task_category": result.task_category,
            "warning": warning,
            "models_consulted": result.models_queried
        }

# Create verified RAG instance
verified_rag = I2IVerifiedRAG(rag_chain, protocol)
print("I2IVerifiedRAG ready")

In [None]:
# Helper to run async in notebook
async def test_verified_rag(questions: list[str]):
    """Test verified RAG on a list of questions."""
    results = []
    for q in questions:
        print(f"\n{'='*60}")
        print(f"Q: {q}")
        
        result = await verified_rag.query(q)
        results.append(result)
        
        print(f"\nA: {result['answer']}")
        print(f"\nüìä Verification Results:")
        print(f"   Task Category: {result['task_category']}")
        print(f"   Consensus Appropriate: {result['task_appropriate']}")
        
        if result['task_appropriate']:
            print(f"   Consensus Level: {result['consensus_level']}")
            print(f"   Confidence: {result['confidence']:.0%}")
            print(f"   Verified: {'‚úÖ' if result['verified'] else '‚ùå'} {result['verified']}")
            if result.get('models_consulted'):
                print(f"   Models: {', '.join(result['models_consulted'])}")
        
        if result['warning']:
            print(f"\n   {result['warning']}")
    
    return results

In [None]:
# Test on our hallucination-prone questions
results = await test_verified_rag(HALLUCINATION_QUESTIONS)

### Key Observations

1. **Factual questions** ‚Üí Consensus used, confidence score provided
2. **Math question** (round trip light) ‚Üí Consensus SKIPPED (would hurt accuracy)
3. **Low confidence** ‚Üí Warning displayed, answer flagged as unreliable

This is the power of **task-aware consensus**: knowing WHEN to use it.

## 5. Task-Aware Behavior Demo

Let's demonstrate how i2i handles different task types differently.

In [None]:
# Demonstrate task classification
DEMO_QUESTIONS = [
    # Factual - USE consensus (HIGH = 97% accuracy)
    ("What year was Python first released?", "factual"),
    ("Who was executed during the French Revolution?", "factual"),
    
    # Verification - USE consensus (+6% hallucination detection)
    ("Is it true that the Great Wall is visible from space?", "verification"),
    ("True or false: Einstein won the Nobel Prize for relativity", "verification"),
    
    # Math/Reasoning - DON'T use consensus (-35% degradation!)
    ("Calculate: if the moon moves 3.8cm/year, how far in 100 years?", "reasoning"),
    ("If Python 3.0 came out in 2008, how old is it now?", "reasoning"),
    
    # Creative - DON'T use consensus (flattens diversity)
    ("Write a haiku about the French Revolution", "creative"),
]

print("Task Classification Demo")
print("="*60)

for question, expected in DEMO_QUESTIONS:
    rec = recommend_consensus(question)
    status = "‚úÖ" if rec.should_use_consensus else "‚ùå"
    
    print(f"\n{status} {question[:55]}...")
    print(f"   Detected: {rec.task_category.value} (expected: {expected})")
    print(f"   Use consensus: {rec.should_use_consensus}")
    if not rec.should_use_consensus:
        print(f"   Instead: {rec.suggested_approach[:60]}")

In [None]:
# Show confidence calibration
print("\nConfidence Calibration (from evaluation data)")
print("="*60)
print("\nBased on 400 questions across 5 benchmarks:\n")

calibration_data = [
    ("HIGH (‚â•85% agreement)", "high", "Trust the answer"),
    ("MEDIUM (60-84%)", "medium", "Probably correct"),
    ("LOW (30-59%)", "low", "Use with caution"),
    ("NONE (<30%)", "none", "Likely hallucination"),
    ("CONTRADICTORY", "contradictory", "Models disagree - investigate"),
]

for name, level, meaning in calibration_data:
    conf = get_confidence_calibration(level)
    print(f"  {name:25} ‚Üí {conf:.0%} confidence ({meaning})")

## 6. Production Patterns

Best practices for deploying i2i verification in production.

In [None]:
# Production configuration example
PRODUCTION_CONFIG = {
    # Confidence thresholds
    "high_confidence_threshold": 0.90,  # Accept without review
    "low_confidence_threshold": 0.60,   # Flag for human review
    "reject_threshold": 0.50,           # Reject outright
    
    # Model selection for consensus
    "consensus_models": [
        "gpt-4o",
        "claude-3-5-sonnet-latest",
        "gemini-1.5-pro",
    ],
    
    # Cost management
    "max_models_per_query": 3,
    "use_cheaper_models_for_obvious": True,
    
    # Logging
    "log_all_queries": True,
    "log_low_confidence": True,
}

print("Production Configuration:")
for key, value in PRODUCTION_CONFIG.items():
    print(f"  {key}: {value}")

In [None]:
class ProductionVerifiedRAG:
    """
    Production-ready RAG with i2i verification.
    
    Features:
    - Configurable confidence thresholds
    - Structured logging
    - Error handling
    - Cost tracking
    """
    
    def __init__(self, rag_chain, config: dict = None):
        self.rag_chain = rag_chain
        self.config = config or PRODUCTION_CONFIG
        self.protocol = AICP()
        self.query_log = []
    
    async def query(self, question: str) -> dict:
        """Query with production-grade verification."""
        import time
        start_time = time.time()
        
        try:
            # Get RAG response
            rag_answer = self.rag_chain.invoke(question)
            
            # Check task type
            rec = recommend_consensus(question)
            
            if not rec.should_use_consensus:
                result = self._build_result(
                    question=question,
                    answer=rag_answer,
                    verified=None,
                    confidence=None,
                    consensus_level=None,
                    task_category=rec.task_category.value,
                    status="skipped",
                    reason=rec.reason,
                    latency_ms=(time.time() - start_time) * 1000
                )
            else:
                # Run consensus verification
                consensus_result = await self.protocol.consensus_query(
                    f"Verify: {question} -> {rag_answer}",
                    task_category="verification"
                )
                
                confidence = get_confidence_calibration(consensus_result.consensus_level.value)
                status = self._determine_status(confidence)
                
                result = self._build_result(
                    question=question,
                    answer=rag_answer,
                    verified=confidence >= self.config["low_confidence_threshold"],
                    confidence=confidence,
                    consensus_level=consensus_result.consensus_level.value,
                    task_category=consensus_result.task_category,
                    status=status,
                    models_used=consensus_result.models_queried,
                    latency_ms=(time.time() - start_time) * 1000
                )
            
            # Log query
            self._log_query(result)
            return result
            
        except Exception as e:
            return self._build_result(
                question=question,
                answer=None,
                verified=False,
                confidence=0,
                status="error",
                error=str(e),
                latency_ms=(time.time() - start_time) * 1000
            )
    
    def _determine_status(self, confidence: float) -> str:
        if confidence >= self.config["high_confidence_threshold"]:
            return "accepted"
        elif confidence >= self.config["low_confidence_threshold"]:
            return "review"
        elif confidence >= self.config["reject_threshold"]:
            return "low_confidence"
        else:
            return "rejected"
    
    def _build_result(self, **kwargs) -> dict:
        return {
            "question": kwargs.get("question"),
            "answer": kwargs.get("answer"),
            "verified": kwargs.get("verified"),
            "confidence": kwargs.get("confidence"),
            "consensus_level": kwargs.get("consensus_level"),
            "task_category": kwargs.get("task_category"),
            "status": kwargs.get("status"),
            "reason": kwargs.get("reason"),
            "models_used": kwargs.get("models_used", []),
            "latency_ms": kwargs.get("latency_ms"),
            "error": kwargs.get("error"),
        }
    
    def _log_query(self, result: dict):
        if self.config.get("log_all_queries"):
            self.query_log.append(result)
        elif self.config.get("log_low_confidence") and result["status"] in ["low_confidence", "rejected"]:
            self.query_log.append(result)
    
    def get_stats(self) -> dict:
        """Get statistics from query log."""
        if not self.query_log:
            return {"total_queries": 0}
        
        statuses = [q["status"] for q in self.query_log]
        latencies = [q["latency_ms"] for q in self.query_log if q.get("latency_ms")]
        
        return {
            "total_queries": len(self.query_log),
            "accepted": statuses.count("accepted"),
            "review": statuses.count("review"),
            "low_confidence": statuses.count("low_confidence"),
            "rejected": statuses.count("rejected"),
            "skipped": statuses.count("skipped"),
            "errors": statuses.count("error"),
            "avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0,
        }

# Create production instance
prod_rag = ProductionVerifiedRAG(rag_chain)
print("ProductionVerifiedRAG ready")

In [None]:
# Run production demo
async def production_demo():
    questions = [
        "When was Python first released?",
        "Is the Great Wall visible from space?",
        "Calculate 384400 / 1.28",
        "Who invented Python?",
    ]
    
    print("Production Demo")
    print("="*60)
    
    for q in questions:
        result = await prod_rag.query(q)
        
        status_emoji = {
            "accepted": "‚úÖ",
            "review": "‚ö†Ô∏è",
            "low_confidence": "‚ùå",
            "rejected": "üö´",
            "skipped": "‚è≠Ô∏è",
            "error": "üí•",
        }
        
        emoji = status_emoji.get(result["status"], "‚ùì")
        
        print(f"\n{emoji} {result['status'].upper()}")
        print(f"   Q: {q}")
        print(f"   A: {result['answer'][:100]}..." if result['answer'] else "   A: [no answer]")
        if result['confidence']:
            print(f"   Confidence: {result['confidence']:.0%}")
        print(f"   Latency: {result['latency_ms']:.0f}ms")
    
    print("\n" + "="*60)
    print("\nQuery Statistics:")
    stats = prod_rag.get_stats()
    for key, value in stats.items():
        print(f"  {key}: {value}")

await production_demo()

## Summary

### Key Takeaways

1. **RAG still hallucinates** - Retrieval alone doesn't guarantee accuracy

2. **Multi-model consensus provides calibrated confidence**:
   - HIGH consensus ‚Üí 97-100% accuracy
   - LOW/NONE consensus ‚Üí Flag for review

3. **Task-awareness is critical**:
   - ‚úÖ Use consensus for: factual, verification, commonsense
   - ‚ùå Skip consensus for: math, reasoning, creative

4. **Production deployment**:
   - Configure confidence thresholds
   - Log low-confidence queries
   - Route to human review when uncertain

### Next Steps

- Check out `examples/task_aware_consensus.py` for more task classification examples
- See `demo.py verify` for standalone claim verification
- Review the [i2i documentation](https://github.com/unit221b/i2i) for full API reference

In [None]:
# Cleanup
vectorstore.delete_collection()
print("Cleaned up vector store")