# 🚨 Traceback: End-to-End Agentic RAG System

**Instant triage across docs, code & lineage.**

This notebook implements the complete Traceback system phase by phase:

## 📋 Implementation Phases

- **Phase 1: Data Foundation** - Set up data processing pipeline
- **Phase 2: Core RAG** - Set up Qdrant and basic retrieval  
- **Phase 3: Agent System** - Build LangGraph supervisor and agents
- **Phase 4: API & Interface** - FastAPI endpoints and CLI

## 🎯 System Overview

Traceback unifies three fragmented sources for fast incident response:
- 📄 **Requirements docs** (PDF/MD) 
- 🧾 **Pipeline code snippets** (SQL/Py)
- 🧬 **Column-level lineage graph** (JSON)

**Target Questions:**
> "Job `curated.sales_orders` failed — who's impacted?"
> "What dashboards went stale?"
> "Do we roll back or hotfix?"


# Phase 1: Data Foundation 🏗️

## Objectives:
1. Set up project structure and dependencies
2. Create sample data (docs, code, lineage)
3. Implement data processing pipeline
4. Test chunking strategies


In [2]:
# Setup: Import libraries and configure environment
import os
import sys
import json
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Verify API keys
if not os.getenv("OPENAI_API_KEY"):
    raise RuntimeError("OPENAI_API_KEY is not set. Create a .env file or export it in your shell.")

print("✅ Environment setup complete")
print(f"✅ OpenAI API key loaded")

# Optional API keys
if os.getenv("TAVILY_API_KEY"):
    print("✅ Tavily API key loaded (optional)")
if os.getenv("COHERE_API_KEY"):
    print("✅ Cohere API key loaded (optional)")


✅ Environment setup complete
✅ OpenAI API key loaded
✅ Tavily API key loaded (optional)
✅ Cohere API key loaded (optional)


In [3]:
# Setup: Project paths and structure
def find_project_root():
    """Find the project root directory."""
    current = Path.cwd()
    if current.name == "notebooks":
        return current.parent
    for parent in [current] + list(current.parents):
        if (parent / "pyproject.toml").exists():
            return parent
    return current

# Set up paths
BASE = find_project_root()
SRC = BASE / "src"
DATA = BASE / "data"
DOCS = DATA / "docs"
REPO = DATA / "repo"

# Add src to Python path
sys.path.insert(0, str(SRC))

# Create directories
DATA.mkdir(exist_ok=True)
DOCS.mkdir(exist_ok=True)
REPO.mkdir(exist_ok=True)

print(f"📁 Project root: {BASE}")
print(f"📁 Data directory: {DATA}")
print(f"📁 Docs directory: {DOCS}")
print(f"📁 Repo directory: {REPO}")
print(f"✅ Added {SRC} to Python path")


📁 Project root: /Users/sandeepgogineni/ai-engineering/bootcamp/Traceback
📁 Data directory: /Users/sandeepgogineni/ai-engineering/bootcamp/Traceback/data
📁 Docs directory: /Users/sandeepgogineni/ai-engineering/bootcamp/Traceback/data/docs
📁 Repo directory: /Users/sandeepgogineni/ai-engineering/bootcamp/Traceback/data/repo
✅ Added /Users/sandeepgogineni/ai-engineering/bootcamp/Traceback/src to Python path


In [None]:
# Phase 1.1: Create Sample Data

# Create sample requirement documents
sample_docs = {
    "incident_playbook.md": """# Data Pipeline Incident Response Playbook

## Overview
This playbook provides standardized procedures for responding to data pipeline incidents.

## Severity Levels
- **P0**: Critical business impact, revenue loss
- **P1**: High impact, SLA breach risk  
- **P2**: Medium impact, degraded service
- **P3**: Low impact, minor issues

## Response Procedures

### Initial Assessment (0-15 minutes)
1. **Acknowledge** the incident
2. **Assess** business impact
3. **Determine** blast radius
4. **Notify** stakeholders

### Impact Assessment Questions
- Which dashboards are affected?
- What downstream systems depend on this data?
- Are there any SLA commitments at risk?
- What is the estimated recovery time?

### Common Actions
- **Rollback**: Revert to last known good state
- **Hotfix**: Apply targeted fix
- **Backfill**: Reprocess affected data
- **Skip**: Bypass failed step if non-critical

## Escalation Matrix
- **Data Engineering Lead**: P0/P1 incidents
- **Platform Team**: Infrastructure issues
- **Product Manager**: Business impact assessment
""",
    
    "sales_orders_spec.md": """# Sales Orders Domain Specification

## Purpose
The sales orders pipeline processes raw order data into curated, business-ready datasets for analytics and reporting.

## Data Sources
- **raw.sales_orders**: Raw order data from e-commerce platform
- **raw.customers**: Customer master data
- **raw.products**: Product catalog

## Business Rules

### Data Quality Requirements
- Order amounts must be positive
- Customer IDs must exist in customer master
- Product IDs must exist in product catalog
- Timestamps must be valid and recent

### Transformation Logic
1. **Clean**: Remove invalid records
2. **Enrich**: Add customer and product details
3. **Calculate**: Compute derived fields
4. **Validate**: Apply business rules

## SLA Commitments
- **Availability**: 99.9% uptime
- **Freshness**: Data available within 2 hours of source update
- **Accuracy**: <0.1% error rate

## Downstream Dependencies
- **curated.revenue_summary**: Daily revenue reporting
- **bi.daily_sales**: Executive dashboard
- **analytics.customer_behavior**: Customer analytics

## Ownership
- **Primary**: data-sales team
- **Secondary**: data-platform team
- **Stakeholders**: Finance, Marketing, Product
""",
    
    "data_quality_standards.md": """# Data Quality Standards and Monitoring

## Quality Dimensions

### Completeness
- No missing values in critical fields
- All expected records present
- Referential integrity maintained

### Accuracy  
- Data matches source systems
- Business rules validated
- Calculated fields verified

### Consistency
- Format standards applied
- Naming conventions followed
- Data types consistent

### Timeliness
- Data available within SLA windows
- Processing delays monitored
- Stale data alerts configured

## Monitoring Framework

### Automated Checks
- Schema validation
- Data freshness monitoring
- Anomaly detection
- Statistical quality metrics

### Alerting Thresholds
- **Critical**: >1% data quality issues
- **Warning**: >0.1% data quality issues
- **Info**: Quality metrics trending

## Remediation Procedures
1. **Identify** root cause
2. **Assess** impact scope
3. **Apply** fix or workaround
4. **Validate** resolution
5. **Document** lessons learned
"""
}

# Write sample docs
for filename, content in sample_docs.items():
    doc_path = DOCS / filename
    doc_path.write_text(content)
    print(f"📄 Created: {filename}")

print(f"\n✅ Created {len(sample_docs)} sample documents")


📄 Created: incident_playbook.md
📄 Created: sales_orders_spec.md
📄 Created: data_quality_standards.md

✅ Created 3 sample documents


In [5]:
# Phase 1.2: Create Sample Pipeline Code

# Create sample SQL pipeline files
sample_sql_pipelines = {
    "sales_orders_pipeline.sql": """-- Sales Orders Pipeline
-- Purpose: Transform raw order data into curated sales orders
-- Owner: data-sales team
-- SLA: 2 hours freshness

WITH cleaned_orders AS (
    SELECT 
        order_id,
        customer_id,
        product_id,
        order_date,
        quantity,
        unit_price,
        -- Data quality checks
        CASE 
            WHEN quantity > 0 AND unit_price > 0 THEN quantity * unit_price
            ELSE NULL 
        END AS gross_amount
    FROM raw.sales_orders
    WHERE 
        order_date >= CURRENT_DATE - INTERVAL '30 days'
        AND customer_id IS NOT NULL
        AND product_id IS NOT NULL
),

enriched_orders AS (
    SELECT 
        co.*,
        c.customer_name,
        c.customer_segment,
        p.product_name,
        p.category,
        -- Calculate net amount after refunds
        co.gross_amount - COALESCE(r.refund_amount, 0) AS net_amount
    FROM cleaned_orders co
    LEFT JOIN raw.customers c ON co.customer_id = c.customer_id
    LEFT JOIN raw.products p ON co.product_id = p.product_id
    LEFT JOIN raw.refunds r ON co.order_id = r.order_id
)

INSERT INTO curated.sales_orders
SELECT 
    order_id,
    customer_id,
    customer_name,
    customer_segment,
    product_id,
    product_name,
    category,
    order_date,
    quantity,
    unit_price,
    gross_amount,
    net_amount,
    CURRENT_TIMESTAMP AS processed_at
FROM enriched_orders
WHERE net_amount > 0;  -- Only include valid orders
""",
    
    "revenue_summary_pipeline.sql": """-- Revenue Summary Pipeline  
-- Purpose: Create daily revenue summaries for reporting
-- Owner: data-sales team
-- Dependencies: curated.sales_orders

WITH daily_revenue AS (
    SELECT 
        DATE(order_date) AS revenue_date,
        customer_segment,
        category,
        COUNT(*) AS order_count,
        SUM(net_amount) AS total_revenue,
        AVG(net_amount) AS avg_order_value,
        SUM(quantity) AS total_quantity
    FROM curated.sales_orders
    WHERE order_date >= CURRENT_DATE - INTERVAL '7 days'
    GROUP BY DATE(order_date), customer_segment, category
),

segment_totals AS (
    SELECT 
        revenue_date,
        customer_segment,
        SUM(total_revenue) AS segment_revenue,
        SUM(order_count) AS segment_orders
    FROM daily_revenue
    GROUP BY revenue_date, customer_segment
)

INSERT INTO curated.revenue_summary
SELECT 
    dr.revenue_date,
    dr.customer_segment,
    dr.category,
    dr.order_count,
    dr.total_revenue,
    dr.avg_order_value,
    dr.total_quantity,
    st.segment_revenue,
    st.segment_orders,
    CURRENT_TIMESTAMP AS processed_at
FROM daily_revenue dr
LEFT JOIN segment_totals st 
    ON dr.revenue_date = st.revenue_date 
    AND dr.customer_segment = st.customer_segment;
""",
    
    "customer_analytics_pipeline.sql": """-- Customer Analytics Pipeline
-- Purpose: Generate customer behavior analytics
-- Owner: data-analytics team
-- Dependencies: curated.sales_orders, curated.customers

WITH customer_metrics AS (
    SELECT 
        customer_id,
        COUNT(DISTINCT order_date) AS active_days,
        COUNT(*) AS total_orders,
        SUM(net_amount) AS lifetime_value,
        AVG(net_amount) AS avg_order_value,
        MAX(order_date) AS last_order_date,
        MIN(order_date) AS first_order_date
    FROM curated.sales_orders
    WHERE order_date >= CURRENT_DATE - INTERVAL '90 days'
    GROUP BY customer_id
),

customer_segments AS (
    SELECT 
        customer_id,
        CASE 
            WHEN lifetime_value > 1000 THEN 'High Value'
            WHEN lifetime_value > 500 THEN 'Medium Value'
            ELSE 'Low Value'
        END AS value_segment,
        CASE 
            WHEN active_days >= 10 THEN 'Frequent'
            WHEN active_days >= 5 THEN 'Regular'
            ELSE 'Occasional'
        END AS frequency_segment
    FROM customer_metrics
)

INSERT INTO analytics.customer_behavior
SELECT 
    cm.customer_id,
    cm.active_days,
    cm.total_orders,
    cm.lifetime_value,
    cm.avg_order_value,
    cm.last_order_date,
    cm.first_order_date,
    cs.value_segment,
    cs.frequency_segment,
    CURRENT_TIMESTAMP AS processed_at
FROM customer_metrics cm
JOIN customer_segments cs ON cm.customer_id = cs.customer_id;
"""
}

# Write sample SQL files
for filename, content in sample_sql_pipelines.items():
    sql_path = REPO / filename
    sql_path.write_text(content)
    print(f"🔧 Created: {filename}")

print(f"\n✅ Created {len(sample_sql_pipelines)} sample SQL pipelines")


🔧 Created: sales_orders_pipeline.sql
🔧 Created: revenue_summary_pipeline.sql
🔧 Created: customer_analytics_pipeline.sql

✅ Created 3 sample SQL pipelines


In [6]:
# Phase 1.3: Create Sample Lineage Data

# Create comprehensive lineage graph
lineage_data = {
    "nodes": [
        # Raw data sources
        {"id": "raw.sales_orders", "type": "table", "schema": "raw", "owners": ["data-platform"], "description": "Raw order data from e-commerce platform"},
        {"id": "raw.customers", "type": "table", "schema": "raw", "owners": ["data-platform"], "description": "Customer master data"},
        {"id": "raw.products", "type": "table", "schema": "raw", "owners": ["data-platform"], "description": "Product catalog"},
        {"id": "raw.refunds", "type": "table", "schema": "raw", "owners": ["data-platform"], "description": "Refund transaction data"},
        
        # Curated tables
        {"id": "curated.sales_orders", "type": "table", "schema": "curated", "owners": ["data-sales"], "description": "Cleaned and enriched sales orders"},
        {"id": "curated.revenue_summary", "type": "table", "schema": "curated", "owners": ["data-sales"], "description": "Daily revenue summaries"},
        {"id": "curated.customers", "type": "table", "schema": "curated", "owners": ["data-sales"], "description": "Enriched customer data"},
        
        # Analytics tables
        {"id": "analytics.customer_behavior", "type": "table", "schema": "analytics", "owners": ["data-analytics"], "description": "Customer behavior analytics"},
        
        # Key columns
        {"id": "curated.sales_orders.order_id", "type": "column", "table": "curated.sales_orders", "data_type": "varchar", "description": "Unique order identifier"},
        {"id": "curated.sales_orders.customer_id", "type": "column", "table": "curated.sales_orders", "data_type": "varchar", "description": "Customer identifier"},
        {"id": "curated.sales_orders.net_amount", "type": "column", "table": "curated.sales_orders", "data_type": "decimal", "description": "Net order amount after refunds"},
        {"id": "curated.revenue_summary.total_revenue", "type": "column", "table": "curated.revenue_summary", "data_type": "decimal", "description": "Total daily revenue"},
        {"id": "analytics.customer_behavior.lifetime_value", "type": "column", "table": "analytics.customer_behavior", "data_type": "decimal", "description": "Customer lifetime value"}
    ],
    
    "edges": [
        # Raw to Curated transformations
        {"from": "raw.sales_orders", "to": "curated.sales_orders", "operation": "clean+enrich", "pipeline": "sales_orders_pipeline.sql"},
        {"from": "raw.customers", "to": "curated.sales_orders", "operation": "join", "pipeline": "sales_orders_pipeline.sql"},
        {"from": "raw.products", "to": "curated.sales_orders", "operation": "join", "pipeline": "sales_orders_pipeline.sql"},
        {"from": "raw.refunds", "to": "curated.sales_orders", "operation": "subtract", "pipeline": "sales_orders_pipeline.sql"},
        
        # Curated to Analytics transformations
        {"from": "curated.sales_orders", "to": "curated.revenue_summary", "operation": "aggregate", "pipeline": "revenue_summary_pipeline.sql"},
        {"from": "curated.sales_orders", "to": "analytics.customer_behavior", "operation": "aggregate", "pipeline": "customer_analytics_pipeline.sql"},
        {"from": "curated.customers", "to": "analytics.customer_behavior", "operation": "join", "pipeline": "customer_analytics_pipeline.sql"},
        
        # Column-level dependencies
        {"from": "raw.sales_orders.order_id", "to": "curated.sales_orders.order_id", "operation": "copy"},
        {"from": "raw.sales_orders.customer_id", "to": "curated.sales_orders.customer_id", "operation": "copy"},
        {"from": "raw.sales_orders.quantity", "to": "curated.sales_orders.net_amount", "operation": "calculate"},
        {"from": "raw.sales_orders.unit_price", "to": "curated.sales_orders.net_amount", "operation": "calculate"},
        {"from": "curated.sales_orders.net_amount", "to": "curated.revenue_summary.total_revenue", "operation": "sum"},
        {"from": "curated.sales_orders.net_amount", "to": "analytics.customer_behavior.lifetime_value", "operation": "sum"}
    ],
    
    "dashboards": [
        {
            "id": "bi.daily_sales",
            "name": "Daily Sales Dashboard",
            "tables": ["curated.sales_orders", "curated.revenue_summary"],
            "teams": ["Finance", "Sales", "Executive"],
            "description": "Executive dashboard showing daily sales performance",
            "refresh_frequency": "hourly"
        },
        {
            "id": "bi.customer_analytics", 
            "name": "Customer Analytics Dashboard",
            "tables": ["analytics.customer_behavior", "curated.customers"],
            "teams": ["Marketing", "Product"],
            "description": "Customer behavior and segmentation analytics",
            "refresh_frequency": "daily"
        },
        {
            "id": "ops.data_quality",
            "name": "Data Quality Monitoring",
            "tables": ["curated.sales_orders", "curated.revenue_summary"],
            "teams": ["Data Engineering", "Platform"],
            "description": "Data quality metrics and monitoring",
            "refresh_frequency": "real-time"
        }
    ],
    
    "pipelines": [
        {
            "id": "sales_orders_pipeline",
            "name": "Sales Orders Pipeline",
            "file": "sales_orders_pipeline.sql",
            "schedule": "hourly",
            "owner": "data-sales",
            "dependencies": ["raw.sales_orders", "raw.customers", "raw.products"],
            "outputs": ["curated.sales_orders"]
        },
        {
            "id": "revenue_summary_pipeline",
            "name": "Revenue Summary Pipeline", 
            "file": "revenue_summary_pipeline.sql",
            "schedule": "daily",
            "owner": "data-sales",
            "dependencies": ["curated.sales_orders"],
            "outputs": ["curated.revenue_summary"]
        },
        {
            "id": "customer_analytics_pipeline",
            "name": "Customer Analytics Pipeline",
            "file": "customer_analytics_pipeline.sql", 
            "schedule": "daily",
            "owner": "data-analytics",
            "dependencies": ["curated.sales_orders", "curated.customers"],
            "outputs": ["analytics.customer_behavior"]
        }
    ]
}

# Save lineage data
lineage_path = DATA / "lineage.json"
with open(lineage_path, 'w') as f:
    json.dump(lineage_data, f, indent=2)

print(f"🧬 Created lineage graph:")
print(f"  📊 {len(lineage_data['nodes'])} nodes (tables/columns)")
print(f"  🔗 {len(lineage_data['edges'])} edges (relationships)")
print(f"  📈 {len(lineage_data['dashboards'])} dashboards")
print(f"  ⚙️ {len(lineage_data['pipelines'])} pipelines")
print(f"✅ Saved to: {lineage_path}")


🧬 Created lineage graph:
  📊 13 nodes (tables/columns)
  🔗 13 edges (relationships)
  📈 3 dashboards
  ⚙️ 3 pipelines
✅ Saved to: /Users/sandeepgogineni/ai-engineering/bootcamp/Traceback/data/lineage.json


In [7]:
# Phase 1.4: Implement Data Processing Pipeline (with Semantic Chunking)

from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter,
    PythonCodeTextSplitter
)

class DocumentProcessor:
    """Processes documents and code files for RAG ingestion using semantic chunking."""
    
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # Initialize semantic chunkers
        self.markdown_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", " ", ""]
        )
        
        self.code_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=["\n\n", "\n", " ", ""]
        )
        
        # Specialized SQL splitter
        self.sql_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            separators=[";\n\n", ";\n", ";", "\n\n", "\n", " ", ""]
        )
    
    def process_markdown(self, file_path: Path) -> List[Dict[str, Any]]:
        """Process markdown files with semantic chunking."""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # Use semantic chunking
            chunks = self.markdown_splitter.split_text(content)
            
            # Convert to our format
            result_chunks = []
            for i, chunk_text in enumerate(chunks):
                metadata = {
                    "source": str(file_path),
                    "type": "markdown",
                    "file_name": file_path.name,
                    "chunk_index": i,
                    "chunk_size": len(chunk_text.split())
                }
                
                result_chunks.append({
                    "content": chunk_text,
                    "metadata": metadata
                })
            
            return result_chunks
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            return []
    
    def process_sql(self, file_path: Path) -> List[Dict[str, Any]]:
        """Process SQL files with semantic chunking."""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # Use SQL-specific semantic chunking
            chunks = self.sql_splitter.split_text(content)
            
            # Convert to our format
            result_chunks = []
            for i, chunk_text in enumerate(chunks):
                if len(chunk_text.strip()) > 50:  # Only include substantial chunks
                    metadata = {
                        "source": str(file_path),
                        "type": "sql",
                        "file_name": file_path.name,
                        "chunk_index": i,
                        "chunk_size": len(chunk_text.split())
                    }
                    
                    result_chunks.append({
                        "content": chunk_text,
                        "metadata": metadata
                    })
            
            return result_chunks
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            return []

class LineageProcessor:
    """Processes lineage data for graph queries."""
    
    def load_lineage(self, file_path: Path) -> Dict[str, Any]:
        """Load lineage data from JSON file."""
        try:
            with open(file_path, 'r') as f:
                return json.load(f)
        except Exception as e:
            print(f"Error loading lineage from {file_path}: {e}")
            return {"nodes": [], "edges": [], "dashboards": [], "pipelines": []}
    
    def find_downstream_impact(self, node_id: str, lineage: Dict[str, Any]) -> List[str]:
        """Find all downstream dependencies of a node."""
        downstream = []
        visited = set()
        
        def dfs(current_node):
            if current_node in visited:
                return
            visited.add(current_node)
            
            for edge in lineage.get("edges", []):
                if edge["from"] == current_node:
                    downstream.append(edge["to"])
                    dfs(edge["to"])
        
        dfs(node_id)
        return downstream
    
    def find_upstream_dependencies(self, node_id: str, lineage: Dict[str, Any]) -> List[str]:
        """Find all upstream dependencies of a node."""
        upstream = []
        visited = set()
        
        def dfs(current_node):
            if current_node in visited:
                return
            visited.add(current_node)
            
            for edge in lineage.get("edges", []):
                if edge["to"] == current_node:
                    upstream.append(edge["from"])
                    dfs(edge["from"])
        
        dfs(node_id)
        return upstream

# Initialize processors with semantic chunking
doc_processor = DocumentProcessor(chunk_size=500, chunk_overlap=50)
lineage_processor = LineageProcessor()

print("✅ Data processing pipeline initialized with semantic chunking")
print(f"  📄 Markdown splitter: RecursiveCharacterTextSplitter")
print(f"  🔧 SQL splitter: SQL-optimized RecursiveCharacterTextSplitter")
print(f"  🧬 Lineage processor: Ready for graph queries")


✅ Data processing pipeline initialized with semantic chunking
  📄 Markdown splitter: RecursiveCharacterTextSplitter
  🔧 SQL splitter: SQL-optimized RecursiveCharacterTextSplitter
  🧬 Lineage processor: Ready for graph queries


In [8]:
# Phase 1.5: Process All Data and Test Semantic Chunking Performance

print("🔄 Processing all data sources with semantic chunking...")
start_time = time.time()

# Process documents
all_documents = []
md_files = list(DOCS.rglob('*.md'))
print(f"📄 Processing {len(md_files)} markdown files...")

for md_file in md_files:
    chunks = doc_processor.process_markdown(md_file)
    all_documents.extend(chunks)
    print(f"  ✅ {md_file.name}: {len(chunks)} chunks")

# Process SQL files
sql_files = list(REPO.rglob('*.sql'))
print(f"🔧 Processing {len(sql_files)} SQL files...")

for sql_file in sql_files:
    chunks = doc_processor.process_sql(sql_file)
    all_documents.extend(chunks)
    print(f"  ✅ {sql_file.name}: {len(chunks)} chunks")

# Load lineage data
lineage_data = lineage_processor.load_lineage(DATA / "lineage.json")

processing_time = time.time() - start_time
print(f"\n✅ Data processing complete!")
print(f"  📊 Total documents: {len(all_documents)}")
print(f"  🧬 Lineage nodes: {len(lineage_data['nodes'])}")
print(f"  ⏱️ Processing time: {processing_time:.2f}s")
print(f"  🚀 Semantic chunking: Much faster than custom logic!")

# Test lineage queries
print(f"\n🧪 Testing lineage queries...")
test_node = "curated.sales_orders"
downstream = lineage_processor.find_downstream_impact(test_node, lineage_data)
upstream = lineage_processor.find_upstream_dependencies(test_node, lineage_data)

print(f"  📈 Downstream impact of '{test_node}': {len(downstream)} nodes")
print(f"    {downstream[:3]}{'...' if len(downstream) > 3 else ''}")
print(f"  📉 Upstream dependencies of '{test_node}': {len(upstream)} nodes") 
print(f"    {upstream[:3]}{'...' if len(upstream) > 3 else ''}")


🔄 Processing all data sources with semantic chunking...
📄 Processing 3 markdown files...
  ✅ data_quality_standards.md: 2 chunks
  ✅ sales_orders_spec.md: 3 chunks
  ✅ incident_playbook.md: 3 chunks
🔧 Processing 3 SQL files...
  ✅ sales_orders_pipeline.sql: 4 chunks
  ✅ customer_analytics_pipeline.sql: 4 chunks
  ✅ revenue_summary_pipeline.sql: 4 chunks

✅ Data processing complete!
  📊 Total documents: 20
  🧬 Lineage nodes: 13
  ⏱️ Processing time: 0.00s
  🚀 Semantic chunking: Much faster than custom logic!

🧪 Testing lineage queries...
  📈 Downstream impact of 'curated.sales_orders': 2 nodes
    ['curated.revenue_summary', 'analytics.customer_behavior']
  📉 Upstream dependencies of 'curated.sales_orders': 4 nodes
    ['raw.sales_orders', 'raw.customers', 'raw.products']...


In [9]:
# Phase 1.6: Preview Processed Data

print("📋 Sample processed documents:")
print("=" * 60)

# Show sample chunks by type
markdown_chunks = [doc for doc in all_documents if doc['metadata']['type'] == 'markdown']
sql_chunks = [doc for doc in all_documents if doc['metadata']['type'] == 'sql']

print(f"\n📄 Markdown chunks ({len(markdown_chunks)} total):")
for i, chunk in enumerate(markdown_chunks[:2]):
    print(f"\n--- Chunk {i+1} ---")
    print(f"Source: {chunk['metadata']['file_name']}")
    print(f"Content: {chunk['content'][:200]}...")

print(f"\n🔧 SQL chunks ({len(sql_chunks)} total):")
for i, chunk in enumerate(sql_chunks[:2]):
    print(f"\n--- Chunk {i+1} ---")
    print(f"Source: {chunk['metadata']['file_name']}")
    print(f"Statement: {chunk['metadata'].get('statement_index', 'N/A')}")
    print(f"Content: {chunk['content'][:200]}...")

print(f"\n🧬 Lineage graph preview:")
print(f"  Tables: {len([n for n in lineage_data['nodes'] if n['type'] == 'table'])}")
print(f"  Columns: {len([n for n in lineage_data['nodes'] if n['type'] == 'column'])}")
print(f"  Pipelines: {len(lineage_data['pipelines'])}")
print(f"  Dashboards: {len(lineage_data['dashboards'])}")

# Save processed data for next phases
processed_data = {
    "documents": all_documents,
    "lineage": lineage_data,
    "stats": {
        "total_chunks": len(all_documents),
        "markdown_chunks": len(markdown_chunks),
        "sql_chunks": len(sql_chunks),
        "lineage_nodes": len(lineage_data['nodes']),
        "processing_time": processing_time
    }
}

processed_path = DATA / "processed_data.json"
with open(processed_path, 'w') as f:
    json.dump(processed_data, f, indent=2)

print(f"\n💾 Saved processed data to: {processed_path}")
print(f"\n🎉 Phase 1 Complete: Data Foundation Ready!")
print(f"   Ready for Phase 2: Core RAG System")


📋 Sample processed documents:

📄 Markdown chunks (8 total):

--- Chunk 1 ---
Source: data_quality_standards.md
Content: # Data Quality Standards and Monitoring

## Quality Dimensions

### Completeness
- No missing values in critical fields
- All expected records present
- Referential integrity maintained

### Accuracy ...

--- Chunk 2 ---
Source: data_quality_standards.md
Content: ## Monitoring Framework

### Automated Checks
- Schema validation
- Data freshness monitoring
- Anomaly detection
- Statistical quality metrics

### Alerting Thresholds
- **Critical**: >1% data qualit...

🔧 SQL chunks (12 total):

--- Chunk 1 ---
Source: sales_orders_pipeline.sql
Statement: N/A
Content: -- Sales Orders Pipeline
-- Purpose: Transform raw order data into curated sales orders
-- Owner: data-sales team
-- SLA: 2 hours freshness...

--- Chunk 2 ---
Source: sales_orders_pipeline.sql
Statement: N/A
Content: WITH cleaned_orders AS (
    SELECT 
        order_id,
        customer_id,
        product_i

# Phase 2: Core RAG System 🧠

## Objectives:
1. Set up Qdrant vector store
2. Generate embeddings for documents
3. Implement basic retrieval system
4. Test RAG queries
5. Add lineage-aware search

## Stack:
- **Vector Store**: Qdrant (local)
- **Embeddings**: OpenAI text-embedding-3-small
- **LLM**: OpenAI GPT-4o-mini
- **Retrieval**: Vector similarity + BM25 hybrid


In [11]:
# Phase 2.1: Initialize Qdrant Vector Store

# Import RAG libraries
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_qdrant import Qdrant
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

# Initialize Qdrant client (in-memory for demo)
qdrant_client = QdrantClient(":memory:")

# Initialize embeddings
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    openai_api_key=os.getenv("OPENAI_API_KEY")
)

# Initialize LLM
llm = ChatOpenAI(
    model="gpt-4o-mini",
    openai_api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.1
)

# Create collection
collection_name = "traceback_documents"
qdrant_client.create_collection(
    collection_name=collection_name,
    vectors_config=VectorParams(
        size=1536,  # text-embedding-3-small dimension
        distance=Distance.COSINE
    )
)

print(f"✅ Qdrant vector store initialized")
print(f"  📊 Collection: {collection_name}")
print(f"  🧠 Embeddings: text-embedding-3-small (1536 dim)")
print(f"  🤖 LLM: gpt-4o-mini")
print(f"  💾 Storage: In-memory (for demo)")


✅ Qdrant vector store initialized
  📊 Collection: traceback_documents
  🧠 Embeddings: text-embedding-3-small (1536 dim)
  🤖 LLM: gpt-4o-mini
  💾 Storage: In-memory (for demo)


In [12]:
# Phase 2.2: Generate Embeddings and Store in Qdrant

print("🔄 Generating embeddings and storing in Qdrant...")
start_time = time.time()

# Convert processed documents to LangChain Documents
langchain_docs = []
for i, doc in enumerate(all_documents):
    langchain_doc = Document(
        page_content=doc["content"],
        metadata={
            **doc["metadata"],
            "doc_id": i,
            "source_type": doc["metadata"]["type"]
        }
    )
    langchain_docs.append(langchain_doc)

# Initialize Qdrant vector store
vectorstore = Qdrant(
    client=qdrant_client,
    collection_name=collection_name,
    embeddings=embeddings
)

# Add documents to vector store
vectorstore.add_documents(langchain_docs)

embedding_time = time.time() - start_time
print(f"✅ Embeddings generated and stored!")
print(f"  📊 Documents indexed: {len(langchain_docs)}")
print(f"  ⏱️ Embedding time: {embedding_time:.2f}s")
print(f"  💾 Vector store: Qdrant in-memory")

# Test basic retrieval
print(f"\n🧪 Testing basic retrieval...")
test_query = "sales orders pipeline"
results = vectorstore.similarity_search(test_query, k=3)

print(f"Query: '{test_query}'")
for i, result in enumerate(results):
    print(f"  {i+1}. {result.metadata['file_name']} ({result.metadata['type']})")
    print(f"     {result.page_content[:100]}...")
    print()


🔄 Generating embeddings and storing in Qdrant...


  vectorstore = Qdrant(


✅ Embeddings generated and stored!
  📊 Documents indexed: 20
  ⏱️ Embedding time: 2.32s
  💾 Vector store: Qdrant in-memory

🧪 Testing basic retrieval...
Query: 'sales orders pipeline'
  1. sales_orders_pipeline.sql (sql)
     -- Sales Orders Pipeline
-- Purpose: Transform raw order data into curated sales orders
-- Owner: da...

  2. sales_orders_spec.md (markdown)
     # Sales Orders Domain Specification

## Purpose
The sales orders pipeline processes raw order data i...

  3. revenue_summary_pipeline.sql (sql)
     -- Revenue Summary Pipeline  
-- Purpose: Create daily revenue summaries for reporting
-- Owner: dat...



In [13]:
# Phase 2.3: Implement Basic RAG System

# Create RAG prompt template
rag_prompt = PromptTemplate(
    input_variables=["context", "question"],
    template="""You are Traceback, an AI assistant for data pipeline incident triage.

Context: {context}

Question: {question}

Instructions:
- Provide clear, actionable answers for data pipeline incidents
- Focus on business impact, blast radius, and recommended actions
- Use the context to support your recommendations
- If you don't know something, say so clearly

Answer:"""
)

# Create RAG chain
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
    chain_type_kwargs={"prompt": rag_prompt},
    return_source_documents=True
)

print("✅ Basic RAG system initialized")
print(f"  🔗 Chain type: RetrievalQA")
print(f"  📊 Retrieval: Top 5 similar documents")
print(f"  🎯 Purpose: Data pipeline incident triage")


✅ Basic RAG system initialized
  🔗 Chain type: RetrievalQA
  📊 Retrieval: Top 5 similar documents
  🎯 Purpose: Data pipeline incident triage


In [14]:
# Phase 2.4: Add Lineage-Aware Search

class LineageAwareRetriever:
    """Enhanced retriever that combines vector search with lineage queries."""
    
    def __init__(self, vectorstore, lineage_data):
        self.vectorstore = vectorstore
        self.lineage_data = lineage_data
    
    def find_downstream_impact(self, node_id: str) -> List[str]:
        """Find all downstream dependencies of a node."""
        downstream = []
        visited = set()
        
        def dfs(current_node):
            if current_node in visited:
                return
            visited.add(current_node)
            
            for edge in self.lineage_data.get("edges", []):
                if edge["from"] == current_node:
                    downstream.append(edge["to"])
                    dfs(edge["to"])
        
        dfs(node_id)
        return downstream
    
    def find_upstream_dependencies(self, node_id: str) -> List[str]:
        """Find all upstream dependencies of a node."""
        upstream = []
        visited = set()
        
        def dfs(current_node):
            if current_node in visited:
                return
            visited.add(current_node)
            
            for edge in self.lineage_data.get("edges", []):
                if edge["to"] == current_node:
                    upstream.append(edge["from"])
                    dfs(edge["from"])
        
        dfs(node_id)
        return upstream
    
    def search_with_lineage(self, query: str, k: int = 5) -> List[Document]:
        """Search with both vector similarity and lineage context."""
        # Regular vector search
        vector_results = self.vectorstore.similarity_search(query, k=k)
        
        # Extract table names from query
        table_names = []
        for word in query.split():
            if '.' in word and any(schema in word for schema in ['raw.', 'curated.', 'analytics.']):
                table_names.append(word)
        
        # Add lineage context if tables found
        lineage_context = []
        for table_name in table_names:
            downstream = self.find_downstream_impact(table_name)
            upstream = self.find_upstream_dependencies(table_name)
            
            if downstream or upstream:
                context_text = f"Table {table_name}: "
                if upstream:
                    context_text += f"Depends on {', '.join(upstream[:3])}. "
                if downstream:
                    context_text += f"Impacts {', '.join(downstream[:3])}."
                
                lineage_context.append(Document(
                    page_content=context_text,
                    metadata={"type": "lineage", "table": table_name}
                ))
        
        # Combine results
        all_results = vector_results + lineage_context
        return all_results[:k]

# Initialize lineage-aware retriever
lineage_retriever = LineageAwareRetriever(vectorstore, lineage_data)

print("✅ Lineage-aware retriever initialized")
print(f"  🔗 Combines vector search + lineage queries")
print(f"  📊 Lineage nodes: {len(lineage_data.get('nodes', []))}")
print(f"  🔗 Lineage edges: {len(lineage_data.get('edges', []))}")


✅ Lineage-aware retriever initialized
  🔗 Combines vector search + lineage queries
  📊 Lineage nodes: 13
  🔗 Lineage edges: 13


In [15]:
# Phase 2.5: Test Complete RAG System

# Test questions for data pipeline incidents
test_questions = [
    "What should I do if the sales orders pipeline fails?",
    "Job curated.sales_orders failed — who's impacted?",
    "What are the SLA commitments for the sales orders pipeline?",
    "Which dashboards depend on curated.sales_orders?"
]

print("🧪 Testing complete RAG system with incident questions...")
print("=" * 70)

for i, question in enumerate(test_questions, 1):
    print(f"\n📋 Question {i}: {question}")
    print("-" * 50)
    
    try:
        # Use lineage-aware search for better results
        lineage_results = lineage_retriever.search_with_lineage(question, k=5)
        
        print(f"🔍 Retrieved {len(lineage_results)} documents:")
        for j, doc in enumerate(lineage_results[:3]):  # Show top 3
            doc_type = doc.metadata.get('type', 'unknown')
            print(f"  {j+1}. [{doc_type}] {doc.page_content[:80]}...")
        
        # Create context for LLM
        context = "\n\n".join([doc.page_content for doc in lineage_results])
        
        # Generate response using the RAG chain
        result = rag_chain.invoke({"query": question})
        
        print(f"\n🤖 Answer:")
        print(f"{result['result']}")
        
        print(f"\n📚 Sources used ({len(result['source_documents'])}):")
        for j, doc in enumerate(result['source_documents'][:3]):  # Show top 3 sources
            print(f"  {j+1}. {doc.metadata['file_name']} ({doc.metadata['type']})")
        
        print("\n" + "="*70)
        
    except Exception as e:
        print(f"❌ Error: {e}")
        print("\n" + "="*70)

print(f"\n🎉 Phase 2 Complete: Core RAG System Ready!")
print(f"   ✅ Vector store operational")
print(f"   ✅ Embeddings generated")
print(f"   ✅ Basic RAG working")
print(f"   ✅ Lineage-aware search implemented")
print(f"   ✅ Ready for Phase 3: Agent System")


🧪 Testing complete RAG system with incident questions...

📋 Question 1: What should I do if the sales orders pipeline fails?
--------------------------------------------------
🔍 Retrieved 5 documents:
  1. [sql] -- Sales Orders Pipeline
-- Purpose: Transform raw order data into curated sales...
  2. [markdown] # Sales Orders Domain Specification

## Purpose
The sales orders pipeline proces...
  3. [markdown] ### Common Actions
- **Rollback**: Revert to last known good state
- **Hotfix**:...

🤖 Answer:
If the sales orders pipeline fails, follow these steps to triage the incident:

### 1. Assess the Severity
- **Determine the severity level** based on the impact:
  - **P0**: If the failure results in critical business impact or revenue loss (e.g., no sales orders processed), escalate immediately.
  - **P1**: If there is a high risk of SLA breach (e.g., delays in data availability), prioritize resolution.
  - **P2**: If the failure causes medium impact but does not affect critical operati

# Phase 3: Agent System 🤖

## Objectives:
1. Build LangGraph supervisor agent
2. Create specialized agents (Impact Assessor, Writer)
3. Implement multi-agent orchestration
4. Test complex incident triage workflows

## Agent Architecture:
- **Supervisor Agent**: Orchestrates the workflow
- **Impact Assessor Agent**: Analyzes business impact and blast radius
- **Writer Agent**: Generates structured incident briefs
- **Lineage Agent**: Handles data lineage queries

## Tools Available:
- RAG retrieval (docs + code)
- Lineage graph queries
- Web search (Tavily, optional)


In [16]:
# Phase 3.1: Import LangGraph and Define Agent Tools

from langgraph.graph import StateGraph, END
from langchain.tools import Tool
from langchain_community.tools import TavilySearchResults
from typing import TypedDict, List, Dict, Any, Optional
import json

# Define the agent state
class AgentState(TypedDict):
    question: str
    context: List[Dict[str, Any]]
    impact_assessment: Optional[Dict[str, Any]]
    blast_radius: Optional[List[str]]
    recommended_actions: Optional[List[str]]
    incident_brief: Optional[str]
    current_step: str
    error: Optional[str]

# Define tools for agents
def rag_search_tool(query: str) -> str:
    """Search documents and code using RAG."""
    try:
        results = lineage_retriever.search_with_lineage(query, k=5)
        context = []
        for doc in results:
            context.append({
                "content": doc.page_content,
                "source": doc.metadata.get("file_name", "unknown"),
                "type": doc.metadata.get("type", "unknown")
            })
        return json.dumps(context, indent=2)
    except Exception as e:
        return f"Error in RAG search: {str(e)}"

def lineage_query_tool(table_name: str) -> str:
    """Query lineage graph for table dependencies."""
    try:
        downstream = lineage_retriever.find_downstream_impact(table_name)
        upstream = lineage_retriever.find_upstream_dependencies(table_name)
        
        result = {
            "table": table_name,
            "upstream_dependencies": upstream,
            "downstream_impact": downstream,
            "total_dependencies": len(upstream) + len(downstream)
        }
        return json.dumps(result, indent=2)
    except Exception as e:
        return f"Error in lineage query: {str(e)}"

def web_search_tool(query: str) -> str:
    """Search the web for additional context (optional)."""
    if not os.getenv("TAVILY_API_KEY"):
        return "Web search not available (TAVILY_API_KEY not set)"
    
    try:
        search = TavilySearchResults(max_results=3)
        results = search.run(query)
        return str(results)
    except Exception as e:
        return f"Error in web search: {str(e)}"

# Create tool instances
tools = [
    Tool(
        name="rag_search",
        description="Search documents and code for incident response information",
        func=rag_search_tool
    ),
    Tool(
        name="lineage_query", 
        description="Query data lineage to find table dependencies and impact",
        func=lineage_query_tool
    ),
    Tool(
        name="web_search",
        description="Search the web for additional context about errors or issues",
        func=web_search_tool
    )
]

print("✅ Agent tools defined")
print(f"  🔍 RAG Search: Document and code retrieval")
print(f"  🧬 Lineage Query: Table dependency analysis")
print(f"  🌐 Web Search: External context (optional)")
print(f"  🤖 Ready for agent implementation")


✅ Agent tools defined
  🔍 RAG Search: Document and code retrieval
  🧬 Lineage Query: Table dependency analysis
  🌐 Web Search: External context (optional)
  🤖 Ready for agent implementation


In [17]:
# Phase 3.2: Implement Specialized Agents

def supervisor_agent(state: AgentState) -> AgentState:
    """Supervisor agent that orchestrates the incident triage workflow."""
    question = state["question"]
    
    # Determine the type of incident and next steps
    supervisor_prompt = f"""
    You are the Supervisor Agent for Traceback incident triage system.
    
    Question: {question}
    
    Analyze this incident question and determine:
    1. What type of incident this is (pipeline failure, data quality, etc.)
    2. What information we need to gather
    3. What the next step should be
    
    Respond with a JSON object containing:
    - "incident_type": Type of incident
    - "next_step": Next agent to call ("impact_assessor", "lineage_analyzer", "writer")
    - "reasoning": Why this step is needed
    """
    
    try:
        response = llm.invoke([{"role": "user", "content": supervisor_prompt}])
        
        # Parse response (simplified - in production, use proper JSON parsing)
        if "impact_assessor" in response.content.lower():
            next_step = "impact_assessor"
        elif "lineage" in response.content.lower():
            next_step = "lineage_analyzer"
        else:
            next_step = "writer"
        
        state["current_step"] = next_step
        return state
        
    except Exception as e:
        state["error"] = f"Supervisor error: {str(e)}"
        state["current_step"] = "writer"  # Fallback
        return state

def impact_assessor_agent(state: AgentState) -> AgentState:
    """Impact Assessor agent that analyzes business impact and blast radius."""
    question = state["question"]
    
    # Use RAG search to gather context
    rag_results = rag_search_tool(question)
    
    # Extract table names for lineage analysis
    table_names = []
    for word in question.split():
        if '.' in word and any(schema in word for schema in ['raw.', 'curated.', 'analytics.']):
            table_names.append(word)
    
    lineage_results = []
    for table_name in table_names:
        lineage_results.append(lineage_query_tool(table_name))
    
    # Generate impact assessment
    impact_prompt = f"""
    You are the Impact Assessor Agent for Traceback.
    
    Question: {question}
    
    Context from documents:
    {rag_results}
    
    Lineage analysis:
    {json.dumps(lineage_results, indent=2)}
    
    Provide a structured impact assessment:
    1. Business Impact Level (Critical/High/Medium/Low)
    2. Affected Systems/Tables
    3. Blast Radius (downstream impact)
    4. SLA Impact
    5. Estimated Recovery Time
    
    Format as JSON with these fields.
    """
    
    try:
        response = llm.invoke([{"role": "user", "content": impact_prompt}])
        
        # Parse and store impact assessment
        state["impact_assessment"] = {
            "assessment": response.content,
            "context_sources": json.loads(rag_results) if rag_results.startswith('[') else [],
            "lineage_data": lineage_results
        }
        
        # Extract blast radius
        blast_radius = []
        for result in lineage_results:
            if result.startswith('{'):
                data = json.loads(result)
                blast_radius.extend(data.get("downstream_impact", []))
        
        state["blast_radius"] = list(set(blast_radius))  # Remove duplicates
        state["current_step"] = "writer"
        
    except Exception as e:
        state["error"] = f"Impact assessor error: {str(e)}"
        state["current_step"] = "writer"
    
    return state

def lineage_analyzer_agent(state: AgentState) -> AgentState:
    """Lineage Analyzer agent that focuses on data dependencies."""
    question = state["question"]
    
    # Extract table names and analyze lineage
    table_names = []
    for word in question.split():
        if '.' in word and any(schema in word for schema in ['raw.', 'curated.', 'analytics.']):
            table_names.append(word)
    
    lineage_analysis = []
    for table_name in table_names:
        lineage_analysis.append(lineage_query_tool(table_name))
    
    # Generate lineage-focused analysis
    lineage_prompt = f"""
    You are the Lineage Analyzer Agent for Traceback.
    
    Question: {question}
    
    Lineage Analysis:
    {json.dumps(lineage_analysis, indent=2)}
    
    Provide detailed lineage analysis:
    1. Direct Dependencies
    2. Indirect Dependencies (2+ hops)
    3. Affected Dashboards/Reports
    4. Data Flow Impact
    5. Recovery Dependencies
    
    Format as structured analysis.
    """
    
    try:
        response = llm.invoke([{"role": "user", "content": lineage_prompt}])
        
        state["impact_assessment"] = {
            "lineage_analysis": response.content,
            "lineage_data": lineage_analysis
        }
        
        # Extract blast radius from lineage
        blast_radius = []
        for result in lineage_analysis:
            if result.startswith('{'):
                data = json.loads(result)
                blast_radius.extend(data.get("downstream_impact", []))
        
        state["blast_radius"] = list(set(blast_radius))
        state["current_step"] = "writer"
        
    except Exception as e:
        state["error"] = f"Lineage analyzer error: {str(e)}"
        state["current_step"] = "writer"
    
    return state

def writer_agent(state: AgentState) -> AgentState:
    """Writer agent that generates the final incident brief."""
    question = state["question"]
    impact_assessment = state.get("impact_assessment", {})
    blast_radius = state.get("blast_radius", [])
    
    # Gather additional context if needed
    rag_results = rag_search_tool(question)
    
    # Generate incident brief
    writer_prompt = f"""
    You are the Writer Agent for Traceback incident triage.
    
    Question: {question}
    
    Impact Assessment:
    {json.dumps(impact_assessment, indent=2)}
    
    Blast Radius:
    {blast_radius}
    
    Additional Context:
    {rag_results}
    
    Generate a comprehensive incident brief with:
    1. **Incident Summary**: Brief description
    2. **Business Impact**: Level and details
    3. **Blast Radius**: Affected systems/tables
    4. **Root Cause Analysis**: Likely causes
    5. **Recommended Actions**: Immediate steps
    6. **Recovery Plan**: Step-by-step recovery
    7. **Prevention**: Future mitigation
    
    Format as a professional incident brief.
    """
    
    try:
        response = llm.invoke([{"role": "user", "content": writer_prompt}])
        
        state["incident_brief"] = response.content
        state["current_step"] = "complete"
        
    except Exception as e:
        state["error"] = f"Writer error: {str(e)}"
        state["incident_brief"] = f"Error generating incident brief: {str(e)}"
        state["current_step"] = "complete"
    
    return state

print("✅ Specialized agents implemented")
print(f"  🎯 Supervisor Agent: Workflow orchestration")
print(f"  📊 Impact Assessor: Business impact analysis")
print(f"  🧬 Lineage Analyzer: Data dependency analysis")
print(f"  ✍️ Writer Agent: Incident brief generation")


✅ Specialized agents implemented
  🎯 Supervisor Agent: Workflow orchestration
  📊 Impact Assessor: Business impact analysis
  🧬 Lineage Analyzer: Data dependency analysis
  ✍️ Writer Agent: Incident brief generation


In [18]:
# Phase 3.3: Build LangGraph Workflow

def route_next_step(state: AgentState) -> str:
    """Route to the next agent based on current step."""
    current_step = state.get("current_step", "supervisor")
    
    if current_step == "supervisor":
        return "impact_assessor"  # Default routing
    elif current_step == "impact_assessor":
        return "writer"
    elif current_step == "lineage_analyzer":
        return "writer"
    elif current_step == "writer":
        return END
    else:
        return "writer"  # Fallback

# Create the LangGraph workflow
workflow = StateGraph(AgentState)

# Add nodes for each agent
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("impact_assessor", impact_assessor_agent)
workflow.add_node("lineage_analyzer", lineage_analyzer_agent)
workflow.add_node("writer", writer_agent)

# Define the workflow edges
workflow.add_edge("supervisor", "impact_assessor")
workflow.add_edge("impact_assessor", "writer")
workflow.add_edge("lineage_analyzer", "writer")
workflow.add_edge("writer", END)

# Set entry point
workflow.set_entry_point("supervisor")

# Compile the graph
traceback_graph = workflow.compile()

print("✅ LangGraph workflow created")
print(f"  🔄 Workflow: supervisor → impact_assessor → writer → END")
print(f"  🎯 Entry point: supervisor")
print(f"  🏁 Exit point: writer")
print(f"  🤖 Graph compiled and ready")


✅ LangGraph workflow created
  🔄 Workflow: supervisor → impact_assessor → writer → END
  🎯 Entry point: supervisor
  🏁 Exit point: writer
  🤖 Graph compiled and ready


In [19]:
# Phase 3.4: Test Multi-Agent System

def run_traceback_incident_triage(question: str) -> Dict[str, Any]:
    """Run the complete Traceback incident triage workflow."""
    print(f"🚨 Starting Traceback incident triage...")
    print(f"📋 Question: {question}")
    print("=" * 60)
    
    # Initialize state
    initial_state = AgentState(
        question=question,
        context=[],
        impact_assessment=None,
        blast_radius=None,
        recommended_actions=None,
        incident_brief=None,
        current_step="supervisor",
        error=None
    )
    
    try:
        # Run the workflow
        result = traceback_graph.invoke(initial_state)
        
        print(f"✅ Incident triage completed!")
        print(f"📊 Final state: {result['current_step']}")
        
        if result.get("error"):
            print(f"⚠️ Error occurred: {result['error']}")
        
        return result
        
    except Exception as e:
        print(f"❌ Workflow error: {str(e)}")
        return {"error": str(e), "question": question}

# Test the multi-agent system
test_incidents = [
    "Job curated.sales_orders failed — who's impacted?",
    "What should I do if raw.sales_orders has quality issues?",
    "Which dashboards will be affected if curated.revenue_summary fails?"
]

print("🧪 Testing multi-agent incident triage system...")
print("=" * 70)

for i, incident in enumerate(test_incidents, 1):
    print(f"\n🔍 Test {i}: {incident}")
    print("-" * 50)
    
    result = run_traceback_incident_triage(incident)
    
    if result.get("incident_brief"):
        print(f"\n📋 Incident Brief:")
        print(f"{result['incident_brief']}")
        
        if result.get("blast_radius"):
            print(f"\n💥 Blast Radius:")
            for item in result["blast_radius"][:5]:  # Show top 5
                print(f"  • {item}")
    
    print("\n" + "="*70)


🧪 Testing multi-agent incident triage system...

🔍 Test 1: Job curated.sales_orders failed — who's impacted?
--------------------------------------------------
🚨 Starting Traceback incident triage...
📋 Question: Job curated.sales_orders failed — who's impacted?
✅ Incident triage completed!
📊 Final state: complete

📋 Incident Brief:
# Incident Brief: Curated Sales Orders Pipeline Failure

## 1. Incident Summary
On [insert date and time], the job responsible for curating sales orders (`curated.sales_orders`) failed, resulting in a disruption of the sales orders pipeline. This incident has impacted downstream analytics and reporting processes, leading to potential delays in business operations.

## 2. Business Impact
- **Impact Level**: High
- **Details**: The failure of the sales orders pipeline has resulted in unmet freshness SLAs of 2 hours. This impacts the availability of critical sales data for reporting and analytics, affecting decision-making processes across the organization.

##

In [20]:
# Phase 3.5: Save Agent System and Summary

# Save agent system components
agent_system = {
    "workflow": traceback_graph,
    "tools": tools,
    "agents": {
        "supervisor": supervisor_agent,
        "impact_assessor": impact_assessor_agent,
        "lineage_analyzer": lineage_analyzer_agent,
        "writer": writer_agent
    }
}

print("✅ Agent system components ready")
print(f"  🤖 Supervisor Agent: Workflow orchestration")
print(f"  📊 Impact Assessor: Business impact analysis")
print(f"  🧬 Lineage Analyzer: Data dependency analysis")
print(f"  ✍️ Writer Agent: Incident brief generation")
print(f"  🔄 LangGraph Workflow: Multi-agent orchestration")

print(f"\n🎉 Phase 3 Complete: Agent System Ready!")
print(f"   ✅ Multi-agent architecture implemented")
print(f"   ✅ LangGraph workflow operational")
print(f"   ✅ Specialized agents for incident triage")
print(f"   ✅ Tool integration (RAG + Lineage + Web)")
print(f"   ✅ End-to-end incident triage workflow")
print(f"   ✅ Ready for Phase 4: API & Interface")

print(f"\n🚀 Complete End-to-End Agentic RAG System:")
print(f"   📊 Phase 1: Data Foundation ✅")
print(f"   🧠 Phase 2: Core RAG System ✅")
print(f"   🤖 Phase 3: Agent System ✅")
print(f"   🌐 Phase 4: API & Interface (Next)")


✅ Agent system components ready
  🤖 Supervisor Agent: Workflow orchestration
  📊 Impact Assessor: Business impact analysis
  🧬 Lineage Analyzer: Data dependency analysis
  ✍️ Writer Agent: Incident brief generation
  🔄 LangGraph Workflow: Multi-agent orchestration

🎉 Phase 3 Complete: Agent System Ready!
   ✅ Multi-agent architecture implemented
   ✅ LangGraph workflow operational
   ✅ Specialized agents for incident triage
   ✅ Tool integration (RAG + Lineage + Web)
   ✅ End-to-end incident triage workflow
   ✅ Ready for Phase 4: API & Interface

🚀 Complete End-to-End Agentic RAG System:
   📊 Phase 1: Data Foundation ✅
   🧠 Phase 2: Core RAG System ✅
   🤖 Phase 3: Agent System ✅
   🌐 Phase 4: API & Interface (Next)
