# SQL Auditor Prototype

This notebook demonstrates the SQL auditing pipeline with example queries.


In [None]:
import sys
from pathlib import Path

# Add backend to path
sys.path.insert(0, str(Path('..').resolve()))

from backend.services.pipeline import audit_queries
from backend.core.models import AuditRequest
import json


## Example Schema


In [None]:
schema_ddl = """
-- @rows=50000
CREATE TABLE users (
    id INTEGER PRIMARY KEY,
    email TEXT NOT NULL UNIQUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    name TEXT,
    status TEXT DEFAULT 'active'
);

-- @rows=100000
CREATE TABLE orders (
    id INTEGER PRIMARY KEY,
    user_id INTEGER NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    total_cents INTEGER NOT NULL,
    status TEXT DEFAULT 'pending',
    FOREIGN KEY (user_id) REFERENCES users(id)
);

-- @rows=10000
CREATE TABLE products (
    id INTEGER PRIMARY KEY,
    sku TEXT NOT NULL UNIQUE,
    name TEXT NOT NULL,
    category TEXT,
    price_cents INTEGER NOT NULL
);
"""

print(schema_ddl)


## Example Queries to Audit


In [None]:
queries = [
    # Query 1: SELECT * with join
    """SELECT * FROM orders o
    JOIN users u ON u.id = o.user_id
    ORDER BY o.created_at DESC;""",
    
    # Query 2: Non-SARGable predicate
    """SELECT * FROM users WHERE LOWER(email) = 'test@example.com';""",
    
    # Query 3: LIKE with prefix wildcard
    """SELECT * FROM products WHERE name LIKE '%widget%';""",
    
    # Query 4: Missing WHERE clause
    """SELECT COUNT(*) FROM orders;""",
]

for i, q in enumerate(queries, 1):
    print(f"\nQuery {i}:")
    print(q)


## Run Audit Pipeline


In [None]:
import asyncio

async def run_audit():
    result = await audit_queries(
        schema_ddl=schema_ddl,
        queries=queries,
        dialect="sqlite",
        use_llm=False  # Set to True if OPENAI_API_KEY is configured
    )
    return result

report = asyncio.run(run_audit())


## Display Results


In [None]:
print("=" * 60)
print("AUDIT SUMMARY")
print("=" * 60)
print(f"Total Issues: {report.summary.total_issues}")
print(f"High Severity: {report.summary.high_severity}")
print(f"Estimated Improvement: {report.summary.est_improvement}")
print()


In [None]:
print("=" * 60)
print("ISSUES BY SEVERITY")
print("=" * 60)

for severity in ['error', 'warn', 'info']:
    severity_issues = [i for i in report.issues if i.severity == severity]
    if severity_issues:
        print(f"\n{severity.upper()} ({len(severity_issues)}):")
        for issue in severity_issues:
            print(f"  [{issue.code}] {issue.message}")
            if issue.snippet:
                print(f"    Snippet: {issue.snippet[:100]}...")


In [None]:
print("=" * 60)
print("INDEX SUGGESTIONS")
print("=" * 60)

for idx, index in enumerate(report.indexes, 1):
    print(f"\n{idx}. Table: {index.table}")
    print(f"   Columns: {', '.join(index.columns)}")
    print(f"   Type: {index.type}")
    print(f"   Rationale: {index.rationale}")
    if index.expected_improvement:
        print(f"   Expected Improvement: {index.expected_improvement}")


In [None]:
print("=" * 60)
print("QUERY REWRITES")
print("=" * 60)

for idx, rewrite in enumerate(report.rewrites, 1):
    print(f"\nRewrite {idx}:")
    print(f"\nOriginal:")
    print(rewrite.original)
    print(f"\nOptimized:")
    print(rewrite.optimized)
    print(f"\nRationale:")
    print(rewrite.rationale)
    print("-" * 60)


In [None]:
if report.llm_explain:
    print("=" * 60)
    print("LLM EXPLANATION")
    print("=" * 60)
    print(report.llm_explain)


## Export Results


In [None]:
# Export to JSON
report_dict = report.model_dump()
with open('audit_report.json', 'w') as f:
    json.dump(report_dict, f, indent=2, default=str)

print("Report exported to audit_report.json")
