# Database Processing with LangChain

## Overview
This notebook demonstrates comprehensive approaches to extracting and processing data from SQL databases for RAG applications. Databases are one of the most common sources of structured business data, and understanding how to effectively convert database content into documents is crucial for building enterprise-ready AI systems.

## What You'll Learn
1. **Database Setup** - Creating sample SQLite databases with realistic business data
2. **SQLDatabaseLoader** - Using LangChain's built-in database utilities
3. **Custom SQL Processing** - Building intelligent database-to-document converters
4. **Relationship Extraction** - Preserving database relationships in document format
5. **Query-Based Processing** - Converting SQL query results into contextual documents

## Prerequisites
```bash
uv add langchain-community sqlite3
```

## Common Database Use Cases in RAG
- Customer relationship management (CRM) data
- Product catalogs and inventory systems
- Financial records and transaction histories
- Employee and organizational data
- Project management and task tracking
- Order processing and e-commerce data
- Configuration and settings management

### SQL Databases

In [1]:
"""
Database Processing Setup for RAG Applications

This module sets up the environment for processing SQL database data.
We'll create a realistic SQLite database that represents common business scenarios
with related tables and relationships.

SQLite is perfect for demonstrations because:
- No server setup required
- File-based storage
- Supports full SQL functionality
- Easy to distribute and reproduce

Author: Data Science Team
Date: 2024
"""

# Import necessary libraries for database operations and file handling
import sqlite3  # SQLite database interface for Python
import os       # Operating system interface for directory operations

# Create directory structure for storing our database files
# exist_ok=True prevents error if directory already exists
os.makedirs("data/databases", exist_ok=True)
print("✅ Directory structure created successfully")
print("🗄️ Ready to create and process SQL databases")

✅ Directory structure created successfully
🗄️ Ready to create and process SQL databases


In [2]:
# Creating Sample SQLite Database
# ===============================
"""
This section creates a comprehensive sample database that represents
a typical business scenario with multiple related tables.

The database structure includes:
- Employees table: Staff information with roles and departments
- Projects table: Project data with relationships to employees
- Realistic data relationships that demonstrate foreign key concepts
"""

# Establish connection to SQLite database
# If the file doesn't exist, SQLite will create it automatically
database_path = 'data/databases/company.db'
conn = sqlite3.connect(database_path)
cursor = conn.cursor()

print(f"✅ Database connection established: {database_path}")
print("🔧 Ready to create tables and populate with sample data")

✅ Database connection established: data/databases/company.db
🔧 Ready to create tables and populate with sample data


In [3]:
# Create Employees Table
# ======================
"""
The employees table stores staff information and represents a common
business entity found in most organizational databases.

Table Structure:
- id: Primary key (INTEGER) - Unique identifier for each employee
- name: Employee full name (TEXT)
- role: Job title/position (TEXT)
- department: Organizational department (TEXT)
- salary: Annual salary (REAL) - Numeric data for calculations

Using IF NOT EXISTS prevents errors if the table already exists.
"""

cursor.execute('''CREATE TABLE IF NOT EXISTS employees
                 (id INTEGER PRIMARY KEY, 
                  name TEXT NOT NULL,
                  role TEXT NOT NULL,
                  department TEXT NOT NULL,
                  salary REAL)''')

print("✅ Employees table created successfully")
print("📊 Table structure: id, name, role, department, salary")

✅ Employees table created successfully
📊 Table structure: id, name, role, department, salary


In [4]:
# Create Projects Table
# =====================
"""
The projects table stores project information and demonstrates foreign key relationships.

Table Structure:
- id: Primary key (INTEGER) - Unique identifier for each project
- name: Project name (TEXT)
- status: Current project status (TEXT) - e.g., 'Active', 'Completed', 'Planning'
- budget: Project budget allocation (REAL) - Numeric data for financial calculations
- lead_id: Foreign key (INTEGER) - References employees.id for project leader

This table demonstrates a one-to-many relationship: one employee can lead multiple projects.
"""

cursor.execute('''CREATE TABLE IF NOT EXISTS projects
                 (id INTEGER PRIMARY KEY, 
                  name TEXT NOT NULL,
                  status TEXT NOT NULL,
                  budget REAL,
                  lead_id INTEGER,
                  FOREIGN KEY (lead_id) REFERENCES employees (id))''')

print("✅ Projects table created successfully")
print("📊 Table structure: id, name, status, budget, lead_id")
print("🔗 Foreign key relationship: projects.lead_id → employees.id")

✅ Projects table created successfully
📊 Table structure: id, name, status, budget, lead_id
🔗 Foreign key relationship: projects.lead_id → employees.id


In [5]:
# Define Realistic Sample Data
# ============================
"""
This section defines sample data that represents a realistic business scenario.
The data demonstrates various aspects of organizational structure and project management.

Employee Data Features:
- Diverse roles across different departments
- Realistic salary ranges for different positions
- Mix of technical and non-technical roles

Project Data Features:
- Various project statuses (Active, Completed, Planning)
- Different budget allocations
- Clear leadership relationships through foreign keys
"""

# Sample employee data with realistic business information
# Format: (id, name, role, department, salary)
employees = [
    (1, 'John Doe', 'Senior Developer', 'Engineering', 95000),      # Senior technical role
    (2, 'Jane Smith', 'Data Scientist', 'Analytics', 105000),       # Specialized analytics role
    (3, 'Mike Johnson', 'Product Manager', 'Product', 110000),      # Management role
    (4, 'Sarah Williams', 'DevOps Engineer', 'Engineering', 98000)  # Infrastructure role
]

# Sample project data with relationships to employees
# Format: (id, name, status, budget, lead_id)
projects = [
    (1, 'RAG Implementation', 'Active', 150000, 1),      # John leads RAG project
    (2, 'Data Pipeline', 'Completed', 80000, 2),         # Jane completed data pipeline
    (3, 'Customer Portal', 'Planning', 200000, 3),       # Mike planning customer portal
    (4, 'ML Platform', 'Active', 250000, 2)              # Jane leads ML platform (shows one-to-many)
]

print("✅ Sample data defined successfully")
print("📊 Employee records: 4 employees across 3 departments")
print("📊 Project records: 4 projects with various statuses and budgets")
print("🔗 Relationships: Each project has an assigned lead from the employees table")

✅ Sample data defined successfully
📊 Employee records: 4 employees across 3 departments
📊 Project records: 4 projects with various statuses and budgets
🔗 Relationships: Each project has an assigned lead from the employees table


In [6]:
# Insert Sample Data into Tables
# ===============================
"""
This section populates the tables with our sample data.

Using executemany() for efficient batch insertion.
INSERT OR REPLACE ensures idempotent operations - safe to run multiple times.
"""

# Insert employee data - batch operation for efficiency
cursor.executemany('INSERT OR REPLACE INTO employees VALUES (?,?,?,?,?)', employees)
print(f"✅ Inserted {len(employees)} employee records")

# Insert project data - demonstrates foreign key relationships
cursor.executemany('INSERT OR REPLACE INTO projects VALUES (?,?,?,?,?)', projects)
print(f"✅ Inserted {len(projects)} project records")
print("🔗 Foreign key relationships established between employees and projects")

✅ Inserted 4 employee records
✅ Inserted 4 project records
🔗 Foreign key relationships established between employees and projects


In [7]:
# Verify database content by querying employees table
# This allows us to see the data that was inserted
cursor.execute("SELECT * FROM employees")
print("📊 Employee table contents:")
for row in cursor.fetchall():
    print(f"  {row}")

📊 Employee table contents:
  (1, 'John Doe', 'Senior Developer', 'Engineering', 95000.0)
  (2, 'Jane Smith', 'Data Scientist', 'Analytics', 105000.0)
  (3, 'Mike Johnson', 'Product Manager', 'Product', 110000.0)
  (4, 'Sarah Williams', 'DevOps Engineer', 'Engineering', 98000.0)


In [8]:
# Commit changes and close connection
# ====================================
# commit() saves all changes made during this session
conn.commit()

# close() properly closes the database connection to free up resources
conn.close()

print("✅ Database setup completed successfully")
print("💾 All changes committed and connection closed")
print("🗄️ Database ready for processing with LangChain loaders")

✅ Database setup completed successfully
💾 All changes committed and connection closed
🗄️ Database ready for processing with LangChain loaders


## Database Content Extraction

In [9]:
# Import LangChain Database Utilities
# ====================================
"""
This section imports the necessary LangChain utilities for database processing.

SQLDatabase: Provides a high-level interface to SQL databases
- Connection management and query execution
- Schema inspection and table information
- Safety features for production use

SQLDatabaseLoader: Converts database content to Document objects
- Table-based document creation
- Query-based document extraction
- Metadata preservation for filtering and search
"""

from langchain_community.utilities import SQLDatabase  # Database utility for connection and queries
from langchain_community.document_loaders import SQLDatabaseLoader  # Document loader for SQL databases

In [10]:
# Method 1: Using SQLDatabase Utility for Database Inspection
# ===========================================================
"""
SQLDatabase provides a high-level interface for database operations.
It's useful for:
- Schema inspection and understanding database structure
- Safe query execution with built-in protections
- Integration with LangChain's SQL tools and agents

Pros: Built-in safety features, schema inspection, LangChain integration
Cons: Less control over document creation, standardized output format
"""

print("1️⃣ SQLDatabase Utility - Database Schema Analysis")
print("-" * 50)

try:
    # Connect to the SQLite database using LangChain's SQLDatabase wrapper
    # The from_uri method creates a connection from a database URI
    db = SQLDatabase.from_uri("sqlite:///data/databases/company.db")
    
    # Get list of available tables in the database
    # This is useful for understanding the database structure
    tables = db.get_usable_table_names()
    print(f"📊 Available tables: {tables}")
    
    # Get detailed table information including schema and relationships
    # This provides DDL (Data Definition Language) statements for tables
    print(f"\n🔍 Database Schema Information:")
    print(f"{db.get_table_info()}")
    
    # Test a sample query to verify database connectivity and data
    print(f"\n📋 Sample Query Results:")
    sample_query = "SELECT name, role, department FROM employees LIMIT 3"
    result = db.run(sample_query)
    print(f"Query: {sample_query}")
    print(f"Results: {result}")
    
    print(f"\n✅ Database connection and inspection successful")
    print(f"💡 Ready for document creation and processing")

except Exception as e:
    print(f"❌ Error connecting to database: {e}")
    print("💡 Ensure the database file exists and was created successfully")

1️⃣ SQLDatabase Utility - Database Schema Analysis
--------------------------------------------------
📊 Available tables: ['employees', 'projects']

🔍 Database Schema Information:

CREATE TABLE employees (
	id INTEGER, 
	name TEXT, 
	role TEXT, 
	department TEXT, 
	salary REAL, 
	PRIMARY KEY (id)
)

/*
3 rows from employees table:
id	name	role	department	salary
1	John Doe	Senior Developer	Engineering	95000.0
2	Jane Smith	Data Scientist	Analytics	105000.0
3	Mike Johnson	Product Manager	Product	110000.0
*/


CREATE TABLE projects (
	id INTEGER, 
	name TEXT, 
	status TEXT, 
	budget REAL, 
	lead_id INTEGER, 
	PRIMARY KEY (id)
)

/*
3 rows from projects table:
id	name	status	budget	lead_id
1	RAG Implementation	Active	150000.0	1
2	Data Pipeline	Completed	80000.0	2
3	Customer Portal	Planning	200000.0	3
*/

📋 Sample Query Results:
Query: SELECT name, role, department FROM employees LIMIT 3
Results: [('John Doe', 'Senior Developer', 'Engineering'), ('Jane Smith', 'Data Scientist', 'Analytics'),

In [11]:
# Method 2: Custom SQL Processing for Enhanced Document Creation
# =============================================================
"""
This section demonstrates custom SQL processing that provides more control over
how database content is converted into documents for RAG applications.

Benefits of custom processing:
- Table-aware document creation with schema information
- Relationship extraction and preservation
- Rich metadata creation for filtering and search
- Flexible document structures based on data types
"""

from typing import List
from langchain_core.documents import Document

print("\n2️⃣ Custom SQL Processing - Intelligent Database-to-Document Conversion")
print("-" * 70)

def sql_to_documents(db_path: str) -> List[Document]:
    """
    Convert SQL Database to documents with preserved context and relationships.
    
    This function creates structured documents from database content with
    enhanced formatting and metadata for better RAG performance.
    
    Args:
        db_path (str): Path to the SQLite database file
        
    Returns:
        List[Document]: List of Document objects with structured content
        
    Features:
        - Table-aware document creation
        - Schema information preservation
        - Relationship extraction between tables
        - Rich metadata for filtering and search
        - Sample data inclusion for context
    
    Example:
        docs = sql_to_documents("company.db")
        print(f"Created {len(docs)} documents from database")
    """
    # Establish connection to the database
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    documents = []
    
    # Strategy 1: Create comprehensive table overview documents
    # Get all table names from the database
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    
    for table in tables:
        table_name = table[0]
        
        # Get detailed table schema information
        cursor.execute(f"PRAGMA table_info({table_name});")
        columns = cursor.fetchall()
        column_info = []
        for col in columns:
            col_name, col_type, not_null, default_val, primary_key = col[1], col[2], col[3], col[4], col[5]
            pk_indicator = " (PRIMARY KEY)" if primary_key else ""
            nn_indicator = " NOT NULL" if not_null else ""
            column_info.append(f"{col_name} {col_type}{pk_indicator}{nn_indicator}")
        
        # Get all table data for content and analysis
        cursor.execute(f"SELECT * FROM {table_name}")
        rows = cursor.fetchall()
        column_names = [col[1] for col in columns]
        
        # Create comprehensive table documentation
        table_content = f"""Database Table: {table_name}
        
Schema Information:
{chr(10).join([f"• {info}" for info in column_info])}

Total Records: {len(rows)}
Columns: {len(column_names)}

Sample Data (first 5 records):"""
        
        # Add formatted sample records for better readability
        for i, row in enumerate(rows[:5], 1):
            table_content += f"\n\nRecord {i}:"
            record = dict(zip(column_names, row))
            for key, value in record.items():
                table_content += f"\n  • {key}: {value}"
        
        # Create document with rich metadata
        doc = Document(
            page_content=table_content,
            metadata={
                'source': db_path,
                'table_name': table_name,
                'num_records': len(rows),
                'num_columns': len(column_names),
                'column_names': column_names,
                'data_type': 'sql_table_overview',
                'content_type': 'structured_database'
            }
        )
        documents.append(doc)
    
    # Strategy 2: Create relationship documents using SQL joins
    # This demonstrates how to extract and preserve database relationships
    print(f"🔗 Extracting database relationships...")
    
    try:
        # Join employees and projects to show relationships
        cursor.execute("""
            SELECT 
                e.name as employee_name, 
                e.role as employee_role, 
                e.department as employee_dept,
                e.salary as employee_salary,
                p.name as project_name, 
                p.status as project_status,
                p.budget as project_budget
            FROM employees e
            JOIN projects p ON e.id = p.lead_id
        """)
        
        relationships = cursor.fetchall()
        
        if relationships:
            # Create a comprehensive relationship document
            rel_content = """Employee-Project Relationships

This document contains information about employees and the projects they lead,
demonstrating organizational structure and project assignments.

Project Leadership Details:
"""
            
            for rel in relationships:
                employee_name, employee_role, employee_dept, employee_salary, project_name, project_status, project_budget = rel
                rel_content += f"""
• {employee_name} ({employee_role})
  Department: {employee_dept}
  Salary: ${employee_salary:,}
  Leads Project: {project_name}
  Project Status: {project_status}
  Project Budget: ${project_budget:,}
"""
            
            # Add summary statistics
            total_budget = sum([rel[6] for rel in relationships])  # project_budget is index 6
            active_projects = len([rel for rel in relationships if rel[5] == 'Active'])  # project_status is index 5
            
            rel_content += f"""

Summary Statistics:
• Total projects with assigned leads: {len(relationships)}
• Active projects: {active_projects}
• Total combined project budget: ${total_budget:,}
• Departments involved: {len(set([rel[2] for rel in relationships]))}
"""
            
            rel_doc = Document(
                page_content=rel_content,
                metadata={
                    'source': db_path,
                    'data_type': 'sql_relationships',
                    'query_type': 'employee_project_join',
                    'relationship_count': len(relationships),
                    'total_budget': total_budget,
                    'active_projects': active_projects,
                    'content_type': 'organizational_relationships'
                }
            )
            documents.append(rel_doc)
    
    except Exception as e:
        print(f"⚠️  Warning: Could not extract relationships - {e}")
    
    # Clean up database connection
    conn.close()
    
    print(f"✅ Custom SQL processing completed")
    print(f"📊 Created {len(documents)} documents from database")
    
    return documents

# Test the custom SQL processing function
try:
    custom_sql_docs = sql_to_documents("data/databases/company.db")
    
    # Analyze the created documents
    print(f"\n📋 Document Analysis:")
    doc_types = {}
    for doc in custom_sql_docs:
        doc_type = doc.metadata.get('data_type', 'unknown')
        if doc_type not in doc_types:
            doc_types[doc_type] = 0
        doc_types[doc_type] += 1
    
    for doc_type, count in doc_types.items():
        print(f"  • {doc_type}: {count} documents")
    
    # Show example document
    if custom_sql_docs:
        print(f"\n📄 Example document preview:")
        print(f"Content (first 300 chars):\n{custom_sql_docs[0].page_content[:300]}...")
        print(f"Metadata keys: {list(custom_sql_docs[0].metadata.keys())}")

except Exception as e:
    print(f"❌ Error in custom SQL processing: {e}")
    print("💡 Ensure the database was created successfully")


2️⃣ Custom SQL Processing - Intelligent Database-to-Document Conversion
----------------------------------------------------------------------
🔗 Extracting database relationships...
✅ Custom SQL processing completed
📊 Created 3 documents from database

📋 Document Analysis:
  • sql_table_overview: 2 documents
  • sql_relationships: 1 documents

📄 Example document preview:
Content (first 300 chars):
Database Table: employees

Schema Information:
• id INTEGER (PRIMARY KEY)
• name TEXT
• role TEXT
• department TEXT
• salary REAL

Total Records: 4
Columns: 5

Sample Data (first 5 records):

Record 1:
  • id: 1
  • name: John Doe
  • role: Senior Developer
  • department: Engineering
  • salary: 95...
Metadata keys: ['source', 'table_name', 'num_records', 'num_columns', 'column_names', 'data_type', 'content_type']


In [12]:
# Test the custom SQL processing function directly
# This will return the list of documents for inspection
sql_to_documents("data/databases/company.db")

🔗 Extracting database relationships...
✅ Custom SQL processing completed
📊 Created 3 documents from database


[Document(metadata={'source': 'data/databases/company.db', 'table_name': 'employees', 'num_records': 4, 'num_columns': 5, 'column_names': ['id', 'name', 'role', 'department', 'salary'], 'data_type': 'sql_table_overview', 'content_type': 'structured_database'}, page_content='Database Table: employees\n\nSchema Information:\n• id INTEGER (PRIMARY KEY)\n• name TEXT\n• role TEXT\n• department TEXT\n• salary REAL\n\nTotal Records: 4\nColumns: 5\n\nSample Data (first 5 records):\n\nRecord 1:\n  • id: 1\n  • name: John Doe\n  • role: Senior Developer\n  • department: Engineering\n  • salary: 95000.0\n\nRecord 2:\n  • id: 2\n  • name: Jane Smith\n  • role: Data Scientist\n  • department: Analytics\n  • salary: 105000.0\n\nRecord 3:\n  • id: 3\n  • name: Mike Johnson\n  • role: Product Manager\n  • department: Product\n  • salary: 110000.0\n\nRecord 4:\n  • id: 4\n  • name: Sarah Williams\n  • role: DevOps Engineer\n  • department: Engineering\n  • salary: 98000.0'),
 Document(metadata={'source

In [13]:
# Method 3: Using SQLDatabaseLoader for Direct Document Creation
# =============================================================
"""
SQLDatabaseLoader provides a direct way to convert database tables into documents.
It's designed to work seamlessly with LangChain's document processing pipeline.

Benefits:
- Automatic table-to-document conversion
- Built-in metadata handling
- Integration with LangChain document chains
- Simple API for quick implementation
"""

print("3️⃣ SQLDatabaseLoader - Direct Database-to-Document Conversion")
print("-" * 60)

try:
    # Method 3a: Create documents from specific SQL query
    loader = SQLDatabaseLoader(
        query="SELECT name, role, department, salary FROM employees",
        db=db  # Use the SQLDatabase instance created earlier
    )
    
    query_docs = loader.load()
    
    print(f"✅ SQLDatabaseLoader query processing completed")
    print(f"📊 Created {len(query_docs)} documents from query")
    
    if query_docs:
        print(f"\n📄 Query-based document example:")
        print(f"Content (first 200 chars):\n{query_docs[0].page_content[:200]}...")
        print(f"Metadata keys: {list(query_docs[0].metadata.keys())}")
    
    # Method 3b: Process join query for relationship extraction
    relationship_loader = SQLDatabaseLoader(
        query="""
        SELECT 
            e.name || ' (' || e.role || ')' as employee_info,
            p.name as project_name,
            p.status as project_status,
            p.budget as project_budget
        FROM employees e
        JOIN projects p ON e.id = p.lead_id
        ORDER BY e.name
        """,
        db=db
    )
    
    relationship_docs = relationship_loader.load()
    
    print(f"\n🔗 Relationship query processing completed")
    print(f"📊 Created {len(relationship_docs)} relationship documents")
    
    if relationship_docs:
        print(f"\n📄 Relationship document example:")
        print(f"Content:\n{relationship_docs[0].page_content}")

except Exception as e:
    print(f"❌ Error with SQLDatabaseLoader: {e}")
    print("💡 Ensure the database connection is established successfully")

3️⃣ SQLDatabaseLoader - Direct Database-to-Document Conversion
------------------------------------------------------------
✅ SQLDatabaseLoader query processing completed
📊 Created 4 documents from query

📄 Query-based document example:
Content (first 200 chars):
name: John Doe
role: Senior Developer
department: Engineering
salary: 95000.0...
Metadata keys: []

🔗 Relationship query processing completed
📊 Created 4 relationship documents

📄 Relationship document example:
Content:
employee_info: Jane Smith (Data Scientist)
project_name: Data Pipeline
project_status: Completed
project_budget: 80000.0


In [14]:
# Database Processing Methods Comparison and Analysis
# ====================================================
"""
This section compares different database processing approaches to help you choose
the best strategy for your specific database structure and use case.
"""

print("📊 Database Processing Methods Comparison")
print("=" * 45)

# Compare results if all methods have been executed
try:
    if 'custom_sql_docs' in locals() and 'query_docs' in locals() and 'relationship_docs' in locals():
        print("\n1️⃣ Custom SQL Processing:")
        print(f"  • Documents created: {len(custom_sql_docs)}")
        print(f"  • Content approach: Table-aware with schema information")
        print(f"  • Best for: Complex databases with multiple relationships")
        print(f"  • Metadata richness: {len(custom_sql_docs[0].metadata) if custom_sql_docs else 0} fields")
        
        print("\n2️⃣ SQLDatabaseLoader Query-based:")
        print(f"  • Documents created: {len(query_docs)}")
        print(f"  • Content approach: Direct query result conversion")
        print(f"  • Best for: Specific data extraction with known requirements")
        print(f"  • Metadata richness: {len(query_docs[0].metadata) if query_docs else 0} fields")
        
        print("\n3️⃣ SQLDatabaseLoader Relationship Extraction:")
        print(f"  • Documents created: {len(relationship_docs)}")
        print(f"  • Content approach: JOIN-based relationship preservation")
        print(f"  • Best for: Maintaining data relationships and context")
        print(f"  • Metadata richness: {len(relationship_docs[0].metadata) if relationship_docs else 0} fields")
        
        # Performance analysis
        print(f"\n📈 Performance Analysis:")
        custom_memory = sum(len(doc.page_content) for doc in custom_sql_docs) if custom_sql_docs else 0
        query_memory = sum(len(doc.page_content) for doc in query_docs) if query_docs else 0
        rel_memory = sum(len(doc.page_content) for doc in relationship_docs) if relationship_docs else 0
        
        print(f"  • Custom processing content size: {custom_memory} characters")
        print(f"  • Query-based content size: {query_memory} characters")
        print(f"  • Relationship extraction size: {rel_memory} characters")
        
    else:
        print("⚠️  Run all processing methods first to see comparison")
        
except Exception as e:
    print(f"❌ Error in comparison analysis: {e}")

print(f"\n💡 Method Selection Guidelines:")
print(f"  🗄️  Custom SQL Processing:")
print(f"     - Complex databases with multiple tables and relationships")
print(f"     - Need for comprehensive schema information")
print(f"     - Production RAG systems requiring rich context")

print(f"  🔍 SQLDatabaseLoader Query-based:")
print(f"     - Specific data extraction requirements")
print(f"     - Known database schema and query patterns")
print(f"     - Simple integration with LangChain pipelines")

print(f"  🔗 Relationship-aware Processing:")
print(f"     - Preserving foreign key relationships")
print(f"     - Cross-table context requirements")
print(f"     - Business intelligence and reporting scenarios")

📊 Database Processing Methods Comparison

1️⃣ Custom SQL Processing:
  • Documents created: 3
  • Content approach: Table-aware with schema information
  • Best for: Complex databases with multiple relationships
  • Metadata richness: 7 fields

2️⃣ SQLDatabaseLoader Query-based:
  • Documents created: 4
  • Content approach: Direct query result conversion
  • Best for: Specific data extraction with known requirements
  • Metadata richness: 0 fields

3️⃣ SQLDatabaseLoader Relationship Extraction:
  • Documents created: 4
  • Content approach: JOIN-based relationship preservation
  • Best for: Maintaining data relationships and context
  • Metadata richness: 0 fields

📈 Performance Analysis:
  • Custom processing content size: 2266 characters
  • Query-based content size: 312 characters
  • Relationship extraction size: 484 characters

💡 Method Selection Guidelines:
  🗄️  Custom SQL Processing:
     - Complex databases with multiple tables and relationships
     - Need for comprehensive 

In [15]:
# Advanced Database Processing Techniques
# ========================================
"""
This section demonstrates advanced techniques for handling complex database scenarios
commonly encountered in production RAG systems.
"""

import sys
import time
from datetime import datetime

def create_contextual_sql_processor(db_path: str, table_context_rules: dict = None):
    """
    Factory function to create production-ready SQL processors with configurable rules.
    
    Args:
        db_path (str): Path to database file
        table_context_rules (dict): Rules for table-specific processing
        
    Returns:
        function: Configured SQL processor function
    """
    def process_database_with_context(strategy: str = 'comprehensive') -> tuple[List[Document], dict]:
        """
        Process database with advanced context preservation strategies.
        
        Args:
            strategy (str): Processing strategy - 'comprehensive', 'focused', 'relationships'
            
        Returns:
            tuple: (documents, processing_metrics)
        """
        start_time = time.time()
        processing_metrics = {
            'start_time': datetime.now().isoformat(),
            'database_path': db_path,
            'strategy': strategy,
            'success': False,
            'error_message': None,
            'documents_created': 0,
            'tables_processed': 0,
            'processing_time_seconds': 0
        }
        
        try:
            conn = sqlite3.connect(db_path)
            cursor = conn.cursor()
            documents = []
            
            # Get table information for processing
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
            tables = [row[0] for row in cursor.fetchall()]
            processing_metrics['tables_processed'] = len(tables)
            
            if strategy == 'comprehensive':
                # Create comprehensive documents with full context
                for table_name in tables:
                    # Get column information
                    cursor.execute(f"PRAGMA table_info({table_name});")
                    columns = cursor.fetchall()
                    column_names = [col[1] for col in columns]
                    
                    # Get all data
                    cursor.execute(f"SELECT * FROM {table_name}")
                    rows = cursor.fetchall()
                    
                    # Create rich content with business context
                    content = f"""Database Table Analysis: {table_name.upper()}

Table Overview:
• Table Name: {table_name}
• Total Records: {len(rows)}
• Columns: {', '.join(column_names)}

Business Context:
This table contains {len(rows)} records representing """
                    
                    # Add table-specific business context
                    if table_name == 'employees':
                        content += "employee information including personal details, roles, and compensation data."
                        content += "\n\nKey Insights:"
                        if rows:
                            departments = set([row[3] for row in rows])  # Assuming department is 4th column
                            avg_salary = sum([row[4] for row in rows]) / len(rows)  # Assuming salary is 5th column
                            content += f"\n• Departments: {', '.join(departments)}"
                            content += f"\n• Average Salary: ${avg_salary:,.2f}"
                    
                    elif table_name == 'projects':
                        content += "project information including status, budgets, and leadership assignments."
                        content += "\n\nKey Insights:"
                        if rows:
                            statuses = set([row[2] for row in rows])  # Assuming status is 3rd column
                            total_budget = sum([row[3] for row in rows])  # Assuming budget is 4th column
                            content += f"\n• Project Statuses: {', '.join(statuses)}"
                            content += f"\n• Total Budget: ${total_budget:,}"
                    
                    # Add sample records for context
                    content += f"\n\nSample Records:"
                    for i, row in enumerate(rows[:3], 1):
                        record_dict = dict(zip(column_names, row))
                        content += f"\n\nRecord {i}:"
                        for key, value in record_dict.items():
                            content += f"\n  {key}: {value}"
                    
                    doc = Document(
                        page_content=content,
                        metadata={
                            'source': db_path,
                            'table_name': table_name,
                            'strategy': 'comprehensive',
                            'record_count': len(rows),
                            'column_count': len(column_names),
                            'data_type': 'sql_comprehensive'
                        }
                    )
                    documents.append(doc)
            
            elif strategy == 'relationships':
                # Focus on cross-table relationships and business logic
                try:
                    # Create relationship-focused documents
                    cursor.execute("""
                        SELECT 
                            e.name as employee_name,
                            e.role as employee_role,
                            e.department,
                            e.salary,
                            COUNT(p.id) as project_count,
                            GROUP_CONCAT(p.name || ' (' || p.status || ')') as projects,
                            SUM(p.budget) as total_budget_managed
                        FROM employees e
                        LEFT JOIN projects p ON e.id = p.lead_id
                        GROUP BY e.id, e.name, e.role, e.department, e.salary
                    """)
                    
                    employee_relationships = cursor.fetchall()
                    
                    for emp_rel in employee_relationships:
                        name, role, dept, salary, proj_count, projects, total_budget = emp_rel
                        projects_managed = projects if projects else "No projects assigned"
                        
                        content = f"""Employee Leadership Profile: {name}

Professional Details:
• Name: {name}
• Role: {role}
• Department: {dept}
• Annual Salary: ${salary:,}

Project Management:
• Projects Led: {proj_count}
• Total Budget Managed: ${total_budget or 0:,}
• Current Projects: {projects_managed}

Leadership Assessment:
This employee {"demonstrates significant project leadership" if proj_count > 0 else "is not currently leading any projects"}.
{"Budget management responsibility indicates senior role." if total_budget and total_budget > 100000 else ""}
"""
                        
                        doc = Document(
                            page_content=content,
                            metadata={
                                'source': db_path,
                                'employee_name': name,
                                'employee_role': role,
                                'project_count': proj_count,
                                'total_budget': total_budget or 0,
                                'strategy': 'relationships',
                                'data_type': 'sql_employee_profile'
                            }
                        )
                        documents.append(doc)
                        
                except Exception as e:
                    print(f"⚠️  Warning: Relationship processing failed - {e}")
            
            # Update metrics
            processing_metrics['success'] = True
            processing_metrics['documents_created'] = len(documents)
            processing_metrics['processing_time_seconds'] = time.time() - start_time
            
            conn.close()
            return documents, processing_metrics
            
        except Exception as e:
            processing_metrics['error_message'] = str(e)
            processing_metrics['processing_time_seconds'] = time.time() - start_time
            return [], processing_metrics
    
    return process_database_with_context

print("🔬 Advanced Database Processing Techniques")
print("=" * 42)

# Test advanced processing strategies
strategies = ['comprehensive', 'relationships']
advanced_results = {}

try:
    # Create advanced processor
    advanced_processor = create_contextual_sql_processor("data/databases/company.db")
    
    for strategy in strategies:
        docs, metrics = advanced_processor(strategy)
        advanced_results[strategy] = {'docs': docs, 'metrics': metrics}
        
        print(f"\n📊 Strategy: {strategy.upper()}")
        print(f"  • Success: {metrics['success']}")
        print(f"  • Documents: {metrics['documents_created']}")
        print(f"  • Processing time: {metrics['processing_time_seconds']:.3f}s")
        print(f"  • Tables processed: {metrics['tables_processed']}")
        
        if docs:
            avg_length = sum(len(doc.page_content) for doc in docs) / len(docs)
            print(f"  • Avg document length: {avg_length:.0f} chars")
    
    print(f"\n💡 Advanced Strategy Benefits:")
    print(f"  🔍 Comprehensive: Rich context with business insights")
    print(f"  🔗 Relationships: Cross-table connections and leadership analysis")
    
except Exception as e:
    print(f"❌ Error in advanced processing: {e}")

# Database validation utilities
def validate_database_structure(db_path: str) -> dict:
    """
    Validate database structure and provide recommendations.
    
    Args:
        db_path (str): Path to database file
        
    Returns:
        dict: Validation results and recommendations
    """
    validation_results = {
        'valid': False,
        'tables': [],
        'relationships': [],
        'recommendations': [],
        'warnings': []
    }
    
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # Check table existence
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        tables = [row[0] for row in cursor.fetchall()]
        validation_results['tables'] = tables
        
        if len(tables) == 0:
            validation_results['warnings'].append("No tables found in database")
            return validation_results
        
        # Check for relationships
        for table in tables:
            cursor.execute(f"PRAGMA foreign_key_list({table});")
            fkeys = cursor.fetchall()
            if fkeys:
                for fkey in fkeys:
                    validation_results['relationships'].append({
                        'from_table': table,
                        'from_column': fkey[3],
                        'to_table': fkey[2],
                        'to_column': fkey[4]
                    })
        
        # Generate recommendations
        if len(validation_results['relationships']) > 0:
            validation_results['recommendations'].append("Use relationship-aware processing for better context")
        else:
            validation_results['recommendations'].append("Consider adding foreign keys for better data relationships")
        
        if len(tables) > 5:
            validation_results['recommendations'].append("Use chunking strategies for large databases")
        
        validation_results['valid'] = True
        conn.close()
        
    except Exception as e:
        validation_results['warnings'].append(f"Database validation error: {e}")
    
    return validation_results

print(f"\n🔍 Database Structure Validation:")
try:
    validation = validate_database_structure("data/databases/company.db")
    
    print(f"  ✅ Valid database: {validation['valid']}")
    print(f"  📊 Tables found: {len(validation['tables'])}")
    print(f"  🔗 Relationships found: {len(validation['relationships'])}")
    
    if validation['recommendations']:
        print(f"  💡 Recommendations:")
        for rec in validation['recommendations']:
            print(f"    • {rec}")
    
    if validation['warnings']:
        print(f"  ⚠️  Warnings:")
        for warn in validation['warnings']:
            print(f"    • {warn}")

except Exception as e:
    print(f"❌ Error in database validation: {e}")

🔬 Advanced Database Processing Techniques

📊 Strategy: COMPREHENSIVE
  • Success: True
  • Documents: 2
  • Processing time: 0.001s
  • Tables processed: 2
  • Avg document length: 683 chars

📊 Strategy: RELATIONSHIPS
  • Success: True
  • Documents: 4
  • Processing time: 0.000s
  • Tables processed: 2
  • Avg document length: 406 chars

💡 Advanced Strategy Benefits:
  🔍 Comprehensive: Rich context with business insights
  🔗 Relationships: Cross-table connections and leadership analysis

🔍 Database Structure Validation:
  ✅ Valid database: True
  📊 Tables found: 2
  🔗 Relationships found: 0
  💡 Recommendations:
    • Consider adding foreign keys for better data relationships


## Best Practices for Database Processing

### When to Use Each Approach

1. **SQLDatabase Utility**
   - ✅ Quick database inspection and schema analysis
   - ✅ Integration with LangChain SQL agents and tools
   - ✅ Built-in safety features for query execution
   - ❌ Limited control over document structure

2. **Custom SQL Processing**
   - ✅ Production RAG systems requiring rich context
   - ✅ Complex databases with multiple relationships
   - ✅ Need for business-specific document formatting
   - ✅ Advanced metadata requirements for filtering

3. **SQLDatabaseLoader**
   - ✅ Simple query-to-document conversion
   - ✅ Integration with LangChain document pipelines
   - ✅ Specific data extraction requirements
   - ❌ Less flexibility in document structure

### Common Challenges and Solutions

1. **Large Database Performance**
   - **Problem**: Memory issues with large tables and result sets
   - **Solution**: Implement pagination and batch processing
   - **Code Pattern**: Use LIMIT and OFFSET in queries for chunking

2. **Complex Relationships**
   - **Problem**: Maintaining foreign key relationships in documents
   - **Solution**: Use JOIN queries and relationship-aware processing
   - **Code Pattern**: Create documents that preserve business context

3. **Schema Changes**
   - **Problem**: Database schema evolution breaks processing
   - **Solution**: Implement dynamic schema inspection and validation
   - **Code Pattern**: Use PRAGMA queries to inspect table structure

4. **SQL Injection Security**
   - **Problem**: Dynamic SQL construction poses security risks
   - **Solution**: Use parameterized queries and input validation
   - **Code Pattern**: Always use cursor.execute() with parameters

In [16]:
# Production Database Processing Utilities
# ========================================
"""
This section provides production-ready utilities and performance analysis
for database processing in RAG systems.
"""

def analyze_database_processing_performance() -> dict:
    """
    Analyze memory usage and processing characteristics of different database approaches.
    
    Returns:
        dict: Performance metrics for different processing methods
    """
    performance_metrics = {}
    
    # Analyze custom processing documents if loaded
    if 'custom_sql_docs' in locals() and custom_sql_docs:
        performance_metrics['custom_sql'] = {
            'method': 'Custom SQL Processing',
            'document_count': len(custom_sql_docs),
            'avg_doc_length': sum(len(doc.page_content) for doc in custom_sql_docs) / len(custom_sql_docs),
            'total_memory': sum(sys.getsizeof(doc.page_content) for doc in custom_sql_docs),
            'metadata_richness': len(custom_sql_docs[0].metadata)
        }
    
    # Analyze query-based documents if loaded
    if 'query_docs' in locals() and query_docs:
        performance_metrics['query_based'] = {
            'method': 'SQLDatabaseLoader Query',
            'document_count': len(query_docs),
            'avg_doc_length': sum(len(doc.page_content) for doc in query_docs) / len(query_docs),
            'total_memory': sum(sys.getsizeof(doc.page_content) for doc in query_docs),
            'metadata_richness': len(query_docs[0].metadata)
        }
    
    # Analyze relationship documents if loaded
    if 'relationship_docs' in locals() and relationship_docs:
        performance_metrics['relationship'] = {
            'method': 'Relationship Extraction',
            'document_count': len(relationship_docs),
            'avg_doc_length': sum(len(doc.page_content) for doc in relationship_docs) / len(relationship_docs),
            'total_memory': sum(sys.getsizeof(doc.page_content) for doc in relationship_docs),
            'metadata_richness': len(relationship_docs[0].metadata)
        }
    
    return performance_metrics

def create_batch_sql_processor(batch_size: int = 1000):
    """
    Create a batch processor for large databases to avoid memory issues.
    
    Args:
        batch_size (int): Number of records to process at once
        
    Returns:
        function: Batch processing function
    """
    def process_table_in_batches(db_path: str, table_name: str) -> List[Document]:
        """
        Process a database table in batches for memory efficiency.
        
        Args:
            db_path (str): Path to database file
            table_name (str): Name of table to process
            
        Returns:
            List[Document]: Documents created from batched processing
        """
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        documents = []
        
        # Get total record count
        cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
        total_records = cursor.fetchone()[0]
        
        # Get column information
        cursor.execute(f"PRAGMA table_info({table_name});")
        columns = cursor.fetchall()
        column_names = [col[1] for col in columns]
        
        # Process in batches
        for offset in range(0, total_records, batch_size):
            cursor.execute(f"SELECT * FROM {table_name} LIMIT {batch_size} OFFSET {offset}")
            batch_rows = cursor.fetchall()
            
            # Create batch document
            batch_content = f"""Database Table Batch: {table_name} (Records {offset+1}-{min(offset+batch_size, total_records)})

Batch Information:
• Table: {table_name}
• Records in batch: {len(batch_rows)}
• Batch offset: {offset}
• Total records: {total_records}

Records:"""
            
            for i, row in enumerate(batch_rows):
                record_dict = dict(zip(column_names, row))
                batch_content += f"\n\nRecord {offset + i + 1}:"
                for key, value in record_dict.items():
                    batch_content += f"\n  {key}: {value}"
            
            doc = Document(
                page_content=batch_content,
                metadata={
                    'source': db_path,
                    'table_name': table_name,
                    'batch_number': offset // batch_size + 1,
                    'batch_size': len(batch_rows),
                    'batch_offset': offset,
                    'total_records': total_records,
                    'data_type': 'sql_batch'
                }
            )
            documents.append(doc)
        
        conn.close()
        return documents
    
    return process_table_in_batches

def create_secure_sql_processor():
    """
    Create a secure SQL processor with parameterized queries and validation.
    
    Returns:
        function: Secure processing function
    """
    def process_with_security(db_path: str, query_template: str, parameters: tuple = ()) -> List[Document]:
        """
        Execute parameterized queries safely with validation.
        
        Args:
            db_path (str): Path to database file
            query_template (str): SQL query with placeholders (?)
            parameters (tuple): Parameters for query placeholders
            
        Returns:
            List[Document]: Documents from secure query execution
        """
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        documents = []
        
        try:
            # Execute parameterized query to prevent SQL injection
            cursor.execute(query_template, parameters)
            results = cursor.fetchall()
            
            # Get column names
            column_names = [description[0] for description in cursor.description]
            
            # Create documents from results
            for i, row in enumerate(results):
                record_dict = dict(zip(column_names, row))
                
                content = f"Secure Query Result {i+1}:\n"
                for key, value in record_dict.items():
                    content += f"{key}: {value}\n"
                
                doc = Document(
                    page_content=content,
                    metadata={
                        'source': db_path,
                        'query_hash': hash(query_template),
                        'result_index': i,
                        'data_type': 'secure_query_result',
                        'column_names': column_names
                    }
                )
                documents.append(doc)
        
        except Exception as e:
            print(f"❌ Secure query execution failed: {e}")
        
        finally:
            conn.close()
        
        return documents
    
    return process_with_security

print("🏭 Production Database Processing Utilities")
print("=" * 43)

# Run performance analysis
try:
    metrics = analyze_database_processing_performance()
    
    if metrics:
        print("\n📊 Database Processing Performance Analysis:")
        for method_key, data in metrics.items():
            print(f"\n{data['method']}:")
            print(f"  • Documents: {data['document_count']}")
            print(f"  • Avg length: {data['avg_doc_length']:.1f} chars")
            print(f"  • Memory: {data['total_memory']} bytes")
            print(f"  • Metadata fields: {data['metadata_richness']}")
        
        print(f"\n💡 Performance Insights:")
        print(f"  • Custom processing creates richer, more contextual documents")
        print(f"  • Relationship extraction preserves business logic")
        print(f"  • Query-based approach offers precise data extraction")
        
    else:
        print("⚠️  Run database processing methods first to see performance metrics")
        
except Exception as e:
    print(f"❌ Error in performance analysis: {e}")

# Test batch processing
print(f"\n📦 Batch Processing Example:")
try:
    batch_processor = create_batch_sql_processor(batch_size=2)  # Small batch for demo
    batch_docs = batch_processor("data/databases/company.db", "employees")
    
    print(f"✅ Batch processing completed")
    print(f"  • Batches created: {len(batch_docs)}")
    print(f"  • Records per batch: 2 (demo size)")
    
    if batch_docs:
        print(f"  • Example batch document length: {len(batch_docs[0].page_content)} chars")

except Exception as e:
    print(f"❌ Error in batch processing: {e}")

# Test secure processing
print(f"\n🔒 Secure Query Processing Example:")
try:
    secure_processor = create_secure_sql_processor()
    secure_docs = secure_processor(
        "data/databases/company.db",
        "SELECT name, role FROM employees WHERE department = ?",
        ("Engineering",)
    )
    
    print(f"✅ Secure query processing completed")
    print(f"  • Documents created: {len(secure_docs)}")
    print(f"  • Query used parameterized placeholders")
    print(f"  • SQL injection protection enabled")

except Exception as e:
    print(f"❌ Error in secure processing: {e}")

print(f"\n🎯 Production Best Practices:")
print(f"  • Use batch processing for large tables (>10,000 records)")
print(f"  • Always use parameterized queries for security")
print(f"  • Implement connection pooling for high-volume applications")
print(f"  • Monitor memory usage and processing time")
print(f"  • Cache frequently accessed query results")
print(f"  • Validate database schema before processing")

🏭 Production Database Processing Utilities
⚠️  Run database processing methods first to see performance metrics

📦 Batch Processing Example:
✅ Batch processing completed
  • Batches created: 2
  • Records per batch: 2 (demo size)
  • Example batch document length: 362 chars

🔒 Secure Query Processing Example:
✅ Secure query processing completed
  • Documents created: 2
  • Query used parameterized placeholders
  • SQL injection protection enabled

🎯 Production Best Practices:
  • Use batch processing for large tables (>10,000 records)
  • Always use parameterized queries for security
  • Implement connection pooling for high-volume applications
  • Monitor memory usage and processing time
  • Cache frequently accessed query results
  • Validate database schema before processing


## Summary and Next Steps

### What We Learned

In this notebook, we explored comprehensive strategies for processing database data:

1. **Database Setup**: Created realistic SQLite databases with business relationships
2. **SQLDatabase Utility**: Schema inspection and safe query execution
3. **Custom SQL Processing**: Intelligent database-to-document conversion with rich context
4. **SQLDatabaseLoader**: Direct query-to-document conversion with LangChain integration
5. **Advanced Techniques**: Relationship extraction, batch processing, and security measures

### Key Takeaways

- **Choose the right approach**: Simple loaders for basic extraction, custom processing for production
- **Relationships matter**: Preserve foreign key relationships and business context
- **Security is crucial**: Always use parameterized queries to prevent SQL injection
- **Performance considerations**: Implement batch processing for large databases
- **Metadata is essential**: Rich metadata enables advanced filtering and retrieval

### Production Checklist

- [ ] Implement database schema validation and inspection
- [ ] Use parameterized queries for all dynamic SQL
- [ ] Create relationship-aware documents that preserve business context
- [ ] Include comprehensive metadata for filtering and search
- [ ] Implement batch processing for large tables (>10,000 records)
- [ ] Add connection pooling for high-volume applications
- [ ] Monitor memory usage and query performance
- [ ] Cache frequently accessed query results

### Next Steps

1. **Try with your own databases**: Apply these techniques to your real database schemas
2. **Build a database processing pipeline**: Create automated workflows for different table types
3. **Optimize for your queries**: Test different document structures with your specific questions
4. **Scale for production**: Implement connection pooling, monitoring, and error recovery
5. **Add incremental updates**: Process only changed data for efficiency

### Resources for Further Learning

- [SQLAlchemy Documentation](https://docs.sqlalchemy.org/) - Advanced Python SQL toolkit
- [LangChain SQL Utilities](https://python.langchain.com/docs/modules/chains/popular/sqlite) - Complete SQL integration guide
- [Database Design Best Practices](https://www.sqlstyle.guide/) - SQL style and design guidelines
- [SQL Injection Prevention](https://owasp.org/www-community/attacks/SQL_Injection) - Security best practices