# Week 7: Agentic RAG with LangGraph

**What We're Testing This Week:**

Week 7 extends our RAG system with **intelligent, adaptive retrieval** using LangGraph's agentic architecture with guardrail validation and iterative query refinement.

## Agentic RAG Features

### Traditional RAG vs. Agentic RAG

**Traditional RAG (Week 5-6)**:
```
Query â†’ Always Retrieve â†’ Generate Answer
```

**Agentic RAG (Week 7)**:
```
Query â†’ Guardrail Validation (Score 0-100)
  â”œâ”€ Score < 60 â†’ Out of Scope (reject with helpful message)
  â””â”€ Score >= 60 â†’ Retrieve Documents
       â†“
     Grade Documents
       â”œâ”€ Relevant â†’ Generate Answer
       â””â”€ Not Relevant â†’ Rewrite Query â†’ Retry (max 2 attempts)
```

### Key Capabilities

1. **Guardrail Validation** - LLM validates query scope (0-100 score) before retrieval
   - Score < 60: Query is out-of-scope (e.g., "What is a dog?")
   - Score >= 60: Query is relevant to ML/NLP research papers
2. **Out-of-Scope Handling** - Automatically rejects queries outside ML/NLP domain
3. **Document Grading** - Validates that retrieved papers are relevant
4. **Query Refinement** - Rewrites vague queries for better results
5. **Reasoning Transparency** - Shows the agent's decision-making steps
6. **Iterative Improvement** - Can retry with better queries if needed (max 2 attempts)

### Architecture: LangGraph Workflow

![LangGraph Agentic RAG Workflow](../../static/langgraph-mermaid.png)

**Workflow Nodes:**
- **start** â†’ **guardrail** (LLM scoring 0-100)
- **retrieve** â†’ **tool_retrieve** (executes search)
- **grade_documents** (LLM relevance check)
- **rewrite_query** (query refinement if documents not relevant)
- **end** (terminates with answer or rejection)

### New Response Fields

- `reasoning_steps`: Detailed decision-making trace
- `retrieval_attempts`: Number of search attempts (0-2)
- `rewritten_query`: Query after refinement (if rewritten)

### Configuration (GraphConfig)

- `max_retrieval_attempts`: 2
- `guardrail_threshold`: 60/100
- `model`: "llama3.2:1b"
- `temperature`: 0.0
- `top_k`: 3

---

## 1. Prerequisites

### 1. Environment Variables Setup

**Copy the example file and add your API keys:**

```bash
cp .env.example .env
```

Then edit `.env` and add your:
- `JINA_API_KEY` - Get from [Jina AI](https://jina.ai/) for hybrid search
- `LANGFUSE_PUBLIC_KEY` - Get from Langfuse UI after setup (see step 2 below)
- `LANGFUSE_SECRET_KEY` - Get from Langfuse UI after setup (see step 2 below)

The other values in `.env.example` can be kept as-is for now.

### 2. Langfuse v3 Self-Hosted Setup

This project uses **Langfuse v3** (self-hosted) which includes:
- **langfuse-web**: Web UI at http://localhost:3001
- **langfuse-worker**: Background job processor
- **langfuse-postgres**: Database for traces
- **langfuse-redis**: Cache and queue management
- **langfuse-minio**: S3-compatible object storage
- **clickhouse**: Analytics database

**First-time setup:**
1. Make sure `.env` has all the auto-generated secrets from `.env.example`
2. Start services: `docker compose up langfuse-web langfuse-worker langfuse-postgres langfuse-redis langfuse-minio clickhouse -d`
3. Visit http://localhost:3001 and create your first user
4. Go to Settings â†’ API Keys to get your `LANGFUSE_PUBLIC_KEY` and `LANGFUSE_SECRET_KEY`
5. Copy these keys to your `.env` file

**Note:** If Langfuse keys are missing, tracing will be disabled but the API will still work.

### 3. Ollama Model Setup

**The `llama3.2:1b` model is automatically pulled when you start the Docker services.**

If you need to manually pull it:
```bash
# Pull model in the Ollama container
docker exec rag-ollama ollama pull llama3.2:1b

# Or if running Ollama locally
ollama pull llama3.2:1b
```

**Verify model is available:**
```bash
docker exec rag-ollama ollama list
```

### 4. Start All Services

**Ensure all services are running:**
```bash
docker compose up --build -d
```

**Service Access Points:**
- **FastAPI**: http://localhost:8000/docs
- **OpenSearch**: http://localhost:9200
- **Ollama**: http://localhost:11434
- **Langfuse UI**: http://localhost:3001

---

## 2. Service Health Check

In [None]:
import sys
import os
from pathlib import Path
import requests
import time

print(f"Python Version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")

# Find project root
current_dir = Path.cwd()
if current_dir.name == "week7" and current_dir.parent.name == "notebooks":
    project_root = current_dir.parent.parent
elif (current_dir / "compose.yml").exists():
    project_root = current_dir
else:
    project_root = current_dir.parent.parent

if project_root.exists():
    print(f"Project root: {project_root}")
    sys.path.insert(0, str(project_root))
else:
    print("âš  Project root not found - check directory structure")

# Load .env file if it exists
env_file = project_root / ".env"
if env_file.exists():
    print(f"\nâœ“ Loading environment from: {env_file}")
    with open(env_file) as f:
        for line in f:
            line = line.strip()
            if line and not line.startswith('#') and '=' in line:
                key, value = line.split('=', 1)
                if key not in os.environ:
                    os.environ[key] = value
    print("âœ“ Environment variables loaded")
else:
    print(f"\nâš  No .env file found at: {env_file}")
    print("  Run: cp .env.example .env")
    print("  Then add your JINA_API_KEY, LANGFUSE_PUBLIC_KEY, and LANGFUSE_SECRET_KEY")

# Configuration for notebook tests
REQUEST_TIMEOUT = 300
TRUNCATE_ANSWERS = True
TRUNCATE_LENGTH = 200

print("\nâœ“ Setup complete")

In [None]:
print("WEEK 7 SERVICE HEALTH CHECK")
print("=" * 40)

services = {
    "FastAPI": "http://localhost:8000/api/v1/health",
    "Ollama": "http://localhost:11434/api/version"
}

all_healthy = True
for service_name, url in services.items():
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            print(f"âœ“ {service_name}: Healthy")
        else:
            print(f"âœ— {service_name}: HTTP {response.status_code}")
            all_healthy = False
    except:
        print(f"âœ— {service_name}: Not accessible")
        all_healthy = False

# Check if Ollama model is available
print("\nChecking Ollama model availability...")
try:
    response = requests.get("http://localhost:11434/api/tags", timeout=5)
    if response.status_code == 200:
        models = [m['name'] for m in response.json().get('models', [])]
        if 'llama3.2:1b' in models:
            print("âœ“ llama3.2:1b model is available")
        else:
            print("âš  llama3.2:1b not found. Run: docker exec rag-ollama ollama pull llama3.2:1b")
            all_healthy = False
except:
    print("âš  Could not check Ollama models")

if all_healthy:
    print("\nâœ“ All services ready for Week 7!")
else:
    print("\nâš  Some services need attention. Run: docker compose up --build -d")

## 3. Test Traditional RAG (Baseline)

First, let's test the traditional RAG endpoint to establish a baseline.

In [None]:
print("TRADITIONAL RAG TEST (Baseline)")
print("=" * 40)

question = "What are attention mechanisms?"
print(f"Question: {question}\n")

start_time = time.time()

try:
    response = requests.post(
        "http://localhost:8000/api/v1/ask",
        json={
            "query": question,
            "top_k": 3,
            "use_hybrid": True,
            "model": "llama3.2:3b"
        },
        timeout=REQUEST_TIMEOUT
    )
    
    elapsed = time.time() - start_time
    
    if response.status_code == 200:
        data = response.json()
        print(f"âœ“ Traditional RAG ({elapsed:.1f}s)")
        
        # Display answer with configurable truncation
        answer = data['answer']
        if TRUNCATE_ANSWERS and len(answer) > TRUNCATE_LENGTH:
            print(f"\nAnswer: {answer[:TRUNCATE_LENGTH]}...")
            print(f"(truncated, full length: {len(answer)} chars)")
        else:
            print(f"\nAnswer: {answer}")
        
        # Display sources with validation
        sources = data.get('sources', [])
        print(f"\nSources: {len(sources)} papers")
        if sources:
            for i, source in enumerate(sources[:3], 1):  # Show first 3
                if isinstance(source, dict):
                    print(f"  {i}. {source.get('title', 'Unknown')}")
                else:
                    print(f"  {i}. {source}")
        
        print(f"Search mode: {data.get('search_mode', 'unknown')}")
    else:
        print(f"âœ— Request failed: {response.status_code}")
        
except Exception as e:
    print(f"âœ— Error: {e}")

## 4. Test Agentic RAG - Scenario 1: Out-of-Scope Rejection

Test if the guardrail correctly rejects queries outside the ML/NLP domain.

In [None]:
print("AGENTIC RAG - SCENARIO 1: Out-of-Scope Rejection")
print("=" * 50)

question = "What is a dog?"
print(f"Question: {question}")
print("Expected: Guardrail should reject (score < 60) and explain scope\n")

start_time = time.time()

try:
    response = requests.post(
        "http://localhost:8000/api/v1/ask-agentic",
        json={
            "query": question,
            "top_k": 3,
            "use_hybrid": True,
        },
        timeout=REQUEST_TIMEOUT
    )
    
    elapsed = time.time() - start_time
    
    if response.status_code == 200:
        data = response.json()
        print(f"âœ“ Agentic RAG ({elapsed:.1f}s)")
        print(f"\nAnswer: {data['answer']}")
        print(f"\nRetrieval attempts: {data.get('retrieval_attempts', 0)}")
        print(f"\nReasoning steps:")
        for i, step in enumerate(data.get('reasoning_steps', []), 1):
            print(f"  {i}. {step}")
        
        # Check if guardrail score is in reasoning steps
        guardrail_step = next(
            (s for s in data.get('reasoning_steps', []) if 'validated' in s.lower() and 'score' in s.lower()),
            None
        )
        if guardrail_step:
            print(f"\nGuardrail validation: {guardrail_step}")
        
        if data.get('retrieval_attempts', 0) == 0:
            print("\nâœ“ SUCCESS: Query correctly rejected by guardrail (no retrieval)!")
        else:
            print("\nâš  UNEXPECTED: Query should have been rejected without retrieval")
    else:
        print(f"âœ— Request failed: {response.status_code}")
        print(f"Response: {response.text}")
        
except Exception as e:
    print(f"âœ— Error: {e}")

## 5. Test Agentic RAG - Scenario 2: Successful Retrieval

Test if the agent correctly retrieves and grades documents for research questions.

In [None]:
print("AGENTIC RAG - SCENARIO 2: Successful Retrieval")
print("=" * 50)

question = "What are transformers in machine learning?"
print(f"Question: {question}")
print("Expected: Agent should pass guardrail, retrieve documents and generate answer\n")

start_time = time.time()

try:
    response = requests.post(
        "http://localhost:8000/api/v1/ask-agentic",
        json={
            "query": question,
            "top_k": 3,
            "use_hybrid": True,
            "model": "llama3.2:3b"
        },
        timeout=REQUEST_TIMEOUT
    )
    
    elapsed = time.time() - start_time
    
    if response.status_code == 200:
        data = response.json()
        print(f"âœ“ Agentic RAG ({elapsed:.1f}s)")
        
        # Display answer with better formatting
        answer = data.get('answer', '')
        print(f"\nAnswer:\n{'-'*50}")
        if TRUNCATE_ANSWERS and len(answer) > 500:  # Use longer limit for detailed answers
            print(answer[:500] + "...")
            print(f"(truncated, full length: {len(answer)} chars)")
        else:
            print(answer)
        print('-'*50)
        
        # Display sources with validation
        sources = data.get('sources', [])
        print(f"\nSources: {len(sources)} papers")
        if sources:
            for i, source in enumerate(sources, 1):
                if isinstance(source, dict):
                    print(f"  {i}. {source.get('title', source.get('id', 'Unknown'))}")
                elif isinstance(source, str):
                    print(f"  {i}. {source}")
                else:
                    print(f"  {i}. {str(source)}")
        
        print(f"\nRetrieval attempts: {data.get('retrieval_attempts', 0)}")
        print(f"\nReasoning steps:")
        for i, step in enumerate(data.get('reasoning_steps', []), 1):
            print(f"  {i}. {step}")
        

        # Check rewritten_query field
        if data.get('rewritten_query') is None:
            print("\nâœ“ Query was not rewritten (worked on first attempt)")
        else:
            print(f"\nâ†’ Query was rewritten to: {data['rewritten_query']}")
        
        if data.get('retrieval_attempts', 0) >= 1:
            print("\nâœ“ SUCCESS: Agent retrieved and used documents!")
        else:
            print("\nâš  UNEXPECTED: Agent didn't retrieve for research question")
    else:
        print(f"âœ— Request failed: {response.status_code}")
        print(f"Response: {response.text}")
        
except Exception as e:
    print(f"âœ— Error: {e}")

## 6. Test Agentic RAG - Scenario 3: Query Rewriting

Test if the agent rewrites vague queries for better results.

In [None]:
print("AGENTIC RAG - SCENARIO 3: Query Rewriting")
print("=" * 50)

question = "Tell me about ML stuff"
print(f"Question: {question}")
print("Expected: Agent may rewrite query if documents aren't relevant\n")

start_time = time.time()

try:
    response = requests.post(
        "http://localhost:8000/api/v1/ask-agentic",
        json={
            "query": question,
            "top_k": 3,
            "use_hybrid": True,
            "model": "llama3.2:3b"
        },
        timeout=REQUEST_TIMEOUT
    )
    
    elapsed = time.time() - start_time
    
    if response.status_code == 200:
        data = response.json()
        print(f"âœ“ Agentic RAG ({elapsed:.1f}s)")
        
        # Display answer with better formatting
        answer = data.get('answer', '')
        print(f"\nAnswer:\n{'-'*50}")
        if TRUNCATE_ANSWERS and len(answer) > 500:
            print(answer[:500] + "...")
            print(f"(truncated, full length: {len(answer)} chars)")
        else:
            print(answer)
        print('-'*50)
        
        print(f"\nRetrieval attempts: {data.get('retrieval_attempts', 0)}")
        print(f"\nReasoning steps:")
        for i, step in enumerate(data.get('reasoning_steps', []), 1):
            print(f"  {i}. {step}")
        
        # Check for guardrail validation step
        print("\nValidating guardrail and rewrite steps:")
        reasoning_steps = data.get('reasoning_steps', [])
        if any("validated" in step.lower() for step in reasoning_steps):
            guardrail_step = next(s for s in reasoning_steps if "validated" in s.lower())
            print(f"  âœ“ Guardrail validation: {guardrail_step}")
        else:
            print("  âš  Guardrail validation step missing")
        
        # Check for query rewriting
        if data.get('rewritten_query'):
            print(f"\nâœ“ Query was rewritten!")
            print(f"  Original: {question}")
            print(f"  Rewritten: {data['rewritten_query']}")
        elif data.get('retrieval_attempts', 0) > 1:
            print("\nâ†’ Multiple retrieval attempts detected")
            if any("rewritten" in step.lower() for step in reasoning_steps):
                print("  âœ“ Rewrite step found in reasoning")
            else:
                print("  âš  Multiple attempts but no rewrite info")
        else:
            print("\nâ†’ Query worked on first attempt (no rewrite needed)")
        
        if data.get('retrieval_attempts', 0) > 1:
            print(f"\nâœ“ Agent performed {data['retrieval_attempts']} retrieval attempts")
    else:
        print(f"âœ— Request failed: {response.status_code}")
        print(f"Response: {response.text}")
        
except Exception as e:
    print(f"âœ— Error: {e}")

In [None]:
print("AGENTIC RAG - SCENARIO 4: Multiple Out-of-Scope Queries")
print("=" * 50)

test_queries = [
    ("What is a dog?", "Biology question"),
    ("What's the weather today?", "Weather question"),
    ("Hello, how are you?", "Greeting"),
]

print("Testing guardrail rejection with various non-ML/NLP queries:\n")

for query, description in test_queries:
    print(f"Query: {query}")
    print(f"Type: {description}")
    
    try:
        response = requests.post(
            "http://localhost:8000/api/v1/ask-agentic",
            json={"query": query, "top_k": 3, "use_hybrid": True},
            timeout=30
        )
        
        if response.status_code == 200:
            data = response.json()
            
            # Check if rejected (no retrieval)
            is_rejected = data['retrieval_attempts'] == 0
            
            # Get guardrail score from reasoning if available
            guardrail_step = next(
                (s for s in data['reasoning_steps'] if 'validated' in s.lower() and 'score' in s.lower()),
                None
            )
            
            print(f"Result: {'âœ“ REJECTED' if is_rejected else 'âœ— ACCEPTED'} (attempts: {data['retrieval_attempts']})")
            if guardrail_step:
                print(f"Guardrail: {guardrail_step}")
        else:
            print(f"âœ— Request failed: {response.status_code}")
    except Exception as e:
        print(f"âœ— Error: {e}")
    
    print("-" * 50)

## 8. Interactive Testing

Try your own questions!

In [None]:
def ask_agentic(question: str, show_full_answer: bool = False):
    """Helper function to test agentic RAG.
    
    Args:
        question: The question to ask
        show_full_answer: If True, show full answer regardless of TRUNCATE_ANSWERS setting
    """
    print(f"Question: {question}\n")
    
    start = time.time()
    
    try:
        response = requests.post(
            "http://localhost:8000/api/v1/ask-agentic",
            json={"query": question, "top_k": 3, "use_hybrid": True},
            timeout=REQUEST_TIMEOUT
        )
        
        elapsed = time.time() - start
        
        if response.status_code == 200:
            data = response.json()
            print(f"âœ“ Response in {elapsed:.1f}s\n")
            
            # Display answer
            answer = data.get('answer', '')
            print(f"Answer:\n{'-'*50}")
            if not show_full_answer and TRUNCATE_ANSWERS and len(answer) > 500:
                print(answer[:500] + "...")
                print(f"(truncated, full length: {len(answer)} chars)")
            else:
                print(answer)
            print('-'*50)
            
            # Display metadata
            print(f"\nRetrieval attempts: {data.get('retrieval_attempts', 0)}")
            
            # Display sources with validation
            sources = data.get('sources', [])
            print(f"Sources: {len(sources)}")
            if sources:
                for i, source in enumerate(sources[:3], 1):  # Show first 3
                    if isinstance(source, dict):
                        print(f"  {i}. {source.get('title', source.get('id', 'Unknown'))}")
                    elif isinstance(source, str):
                        print(f"  {i}. {source}")
            
            # Display reasoning
            print(f"\nReasoning:")
            for step in data.get('reasoning_steps', []):
                print(f"  â€¢ {step}")
        else:
            print(f"âœ— Error: {response.status_code}")
            print(response.text)
    except Exception as e:
        print(f"âœ— Exception: {e}")

# Try it!
ask_agentic("How does BERT differ from GPT?")

In [None]:
# Try more questions
ask_agentic("What is the capital of France?")  # Should reject as out-of-scope

In [None]:
ask_agentic("Explain self-attention mechanisms")  # Should retrieve papers

## Summary

### What We Tested in Week 7:

**Agentic RAG Capabilities**:
1. âœ… **Guardrail Validation** - LLM validates query scope (0-100 score) before retrieval
2. âœ… **Out-of-Scope Handling** - Automatically rejects queries outside ML/NLP domain
3. âœ… **Document Grading** - Validates retrieved papers for relevance
4. âœ… **Query Rewriting** - Improves queries if needed
5. âœ… **Reasoning Transparency** - Shows decision-making steps
6. âœ… **Iterative Improvement** - Can retry with better queries (max 2 attempts)

### Key Improvements Over Traditional RAG:

| Feature | Traditional RAG | Agentic RAG |
|---------|----------------|-------------|
| **Query Validation** | None | Guardrail scoring (0-100) |
| **Out-of-Scope Handling** | None | Automatic rejection with helpful message |
| **Retrieval Decision** | Always retrieves | Only if guardrail passes (score >= 60) |
| **Relevance Check** | None | LLM-based document grading |
| **Query Refinement** | None | LLM-based rewriting |
| **Iterations** | Single pass | Up to 2 retrieval attempts |
| **Transparency** | Black box | Detailed reasoning steps |
| **Configuration** | Hardcoded | GraphConfig with thresholds |

### Architecture: 7-Node LangGraph Workflow

```
LangGraph Workflow:
  START
    â†“
  guardrail (LLM scoring 0-100)
    â”œâ”€ score < 60 â†’ out_of_scope â†’ END (rejection message)
    â””â”€ score >= 60 â†’ retrieve
         â†“
       tool_retrieve (ToolNode - executes search)
         â†“
       grade_documents (LLM relevance check)
         â”œâ”€ Relevant â†’ generate_answer â†’ END
         â””â”€ Not relevant â†’ rewrite_query â†’ retrieve (retry, max 2 attempts)
```

### Reasoning Step Format:

The new agentic RAG returns structured reasoning steps:

1. **"Validated query scope (score: X/100)"** - Guardrail validation result
2. **"Retrieved documents (N attempt(s))"** - Number of retrieval attempts
3. **"Graded documents (N relevant)"** - Document relevance check
4. **"Rewritten query for better results"** - Query refinement (if needed)
5. **"Generated answer from context"** - Final answer generation

### Configuration Parameters (GraphConfig):

- `max_retrieval_attempts`: 2 - Maximum retry attempts
- `guardrail_threshold`: 60/100 - Minimum score to proceed
- `model`: "llama3.2:1b" - Default LLM model
- `temperature`: 0.0 - Deterministic generation
- `top_k`: 3 - Documents to retrieve

### Next Steps:

- **Experiment** with different question types and query complexity
- **Monitor** reasoning steps to understand agent decision-making
- **Compare** performance and accuracy with traditional RAG
- **Adjust** guardrail threshold based on your domain requirements
- **Extend** with additional tools (web search, calculations, code execution)

**Week 7 Complete! You now have an intelligent, adaptive RAG system with guardrail validation! ðŸŽ‰**