# **AI TECH INSTITUTE** · *Intermediate AI & Data Science*
### Week 03 · Notebook 07 — Performance Optimization & Production Best Practices
**Instructor:** Amir Charkhi  |  **Goal:** Performance Optimization & Production Best Practices

> Format: theory → implementation → best practices → real-world application.
>
**Learning Objectives:**
- Understand and analyze query execution plans
- Master indexing strategies for optimal performance
- Implement query optimization techniques
- Apply production best practices for maintainable SQL
- Monitor and tune database performance


## 🎯 The Production Challenge

Your startup just hit 100M rows in the database. Queries that took milliseconds now take minutes.

**Real scenarios we'll solve:**
1. Dashboard queries timing out
2. ETL jobs running for hours
3. Database CPU at 100%
4. Storage costs exploding

Time to optimize for **production scale**!

In [None]:
# Setup and imports
import pandas as pd
import numpy as np
import sqlite3
import time
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import Markdown, display, HTML
import warnings
warnings.filterwarnings('ignore')

# Configure displays
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.float_format', '{:.2f}'.format)
sns.set_style('whitegrid')

def show_sql(query, title="SQL Query:"):
    """Pretty print SQL queries"""
    print(f"\n📝 {title}")
    display(Markdown(f"```sql\n{query}\n```"))

def time_query(query, conn, title="Query", show_results=True):
    """Execute query and measure time"""
    start = time.time()
    result = pd.read_sql(query, conn)
    elapsed = time.time() - start
    
    print(f"⏱️ {title}: {elapsed:.4f} seconds")
    if show_results and len(result) > 0:
        print(f"📊 Returned {len(result):,} rows")
        if len(result) <= 5:
            display(result)
    return result, elapsed

def explain_query(query, conn):
    """Show query execution plan"""
    explain_query = f"EXPLAIN QUERY PLAN {query}"
    plan = pd.read_sql(explain_query, conn)
    print("\n🔍 Query Execution Plan:")
    for _, row in plan.iterrows():
        print(f"  {row['detail']}")
    return plan

print("✅ Environment ready for production SQL optimization!")

## 📊 Setting Up Production-Scale Database

We'll create a database with millions of rows to simulate production challenges.

In [None]:
# Create production database
conn = sqlite3.connect('production.db')
cursor = conn.cursor()

# Enable query statistics
cursor.execute("PRAGMA query_only = OFF")
cursor.execute("PRAGMA journal_mode = WAL")  # Write-Ahead Logging for better concurrency

# Drop existing tables
tables = ['orders', 'order_items', 'customers', 'products', 'events']
for table in tables:
    cursor.execute(f"DROP TABLE IF EXISTS {table}")
    cursor.execute(f"DROP INDEX IF EXISTS idx_{table}_all")

# Create tables WITHOUT indexes first (we'll add them strategically)
cursor.execute("""
CREATE TABLE customers (
    customer_id INTEGER PRIMARY KEY,
    email TEXT,
    first_name TEXT,
    last_name TEXT,
    country TEXT,
    city TEXT,
    created_at TIMESTAMP,
    lifetime_value DECIMAL(10,2),
    segment TEXT
)
""")

cursor.execute("""
CREATE TABLE products (
    product_id INTEGER PRIMARY KEY,
    sku TEXT,
    name TEXT,
    category TEXT,
    subcategory TEXT,
    price DECIMAL(10,2),
    cost DECIMAL(10,2),
    created_at TIMESTAMP
)
""")

cursor.execute("""
CREATE TABLE orders (
    order_id INTEGER PRIMARY KEY,
    customer_id INTEGER,
    order_date TIMESTAMP,
    status TEXT,
    total_amount DECIMAL(10,2),
    shipping_country TEXT,
    payment_method TEXT
)
""")

cursor.execute("""
CREATE TABLE order_items (
    order_item_id INTEGER PRIMARY KEY,
    order_id INTEGER,
    product_id INTEGER,
    quantity INTEGER,
    unit_price DECIMAL(10,2),
    discount DECIMAL(5,2)
)
""")

cursor.execute("""
CREATE TABLE events (
    event_id INTEGER PRIMARY KEY,
    customer_id INTEGER,
    event_type TEXT,
    event_timestamp TIMESTAMP,
    page_url TEXT,
    session_id TEXT,
    device_type TEXT
)
""")

conn.commit()
print("✅ Production schema created!")

In [None]:
# Generate large-scale data
print("🔄 Generating production-scale data...")
np.random.seed(42)

# Scale parameters
n_customers = 50000
n_products = 5000
n_orders = 200000
n_events = 500000

# Generate customers
print(f"  Creating {n_customers:,} customers...")
countries = ['USA', 'UK', 'Germany', 'France', 'Japan', 'Canada', 'Australia']
segments = ['Premium', 'Standard', 'Basic', 'Inactive']

customers = pd.DataFrame({
    'customer_id': range(1, n_customers + 1),
    'email': [f'user{i}@company.com' for i in range(1, n_customers + 1)],
    'first_name': [f'First{i}' for i in range(1, n_customers + 1)],
    'last_name': [f'Last{i}' for i in range(1, n_customers + 1)],
    'country': np.random.choice(countries, n_customers),
    'city': [f'City{i%100}' for i in range(n_customers)],
    'created_at': pd.date_range('2020-01-01', periods=n_customers, freq='30s'),
    'lifetime_value': np.random.exponential(500, n_customers),
    'segment': np.random.choice(segments, n_customers, p=[0.1, 0.4, 0.4, 0.1])
})

# Generate products
print(f"  Creating {n_products:,} products...")
categories = ['Electronics', 'Clothing', 'Home', 'Books', 'Sports', 'Toys']
subcategories = ['Sub1', 'Sub2', 'Sub3', 'Sub4', 'Sub5']

products = pd.DataFrame({
    'product_id': range(1, n_products + 1),
    'sku': [f'SKU{i:05d}' for i in range(1, n_products + 1)],
    'name': [f'Product {i}' for i in range(1, n_products + 1)],
    'category': np.random.choice(categories, n_products),
    'subcategory': np.random.choice(subcategories, n_products),
    'price': np.random.uniform(10, 1000, n_products),
    'cost': np.random.uniform(5, 500, n_products),
    'created_at': pd.date_range('2019-01-01', periods=n_products, freq='2H')
})

# Generate orders
print(f"  Creating {n_orders:,} orders...")
order_dates = pd.date_range('2022-01-01', '2024-12-31', freq='10s')[:n_orders]
statuses = ['completed', 'pending', 'cancelled', 'refunded']
payment_methods = ['credit_card', 'paypal', 'stripe', 'bank_transfer']

orders = pd.DataFrame({
    'order_id': range(1, n_orders + 1),
    'customer_id': np.random.randint(1, n_customers + 1, n_orders),
    'order_date': order_dates,
    'status': np.random.choice(statuses, n_orders, p=[0.7, 0.15, 0.1, 0.05]),
    'total_amount': np.random.uniform(50, 2000, n_orders),
    'shipping_country': np.random.choice(countries, n_orders),
    'payment_method': np.random.choice(payment_methods, n_orders)
})

# Generate order items (2-5 items per order)
print(f"  Creating order items...")
order_items = []
order_item_id = 1
for order_id in range(1, min(n_orders + 1, 50000)):  # Limit for performance
    n_items = np.random.randint(1, 6)
    for _ in range(n_items):
        order_items.append({
            'order_item_id': order_item_id,
            'order_id': order_id,
            'product_id': np.random.randint(1, n_products + 1),
            'quantity': np.random.randint(1, 5),
            'unit_price': np.random.uniform(10, 500),
            'discount': np.random.choice([0, 0.05, 0.10, 0.15, 0.20], p=[0.5, 0.2, 0.15, 0.1, 0.05])
        })
        order_item_id += 1

order_items_df = pd.DataFrame(order_items)

# Generate events
print(f"  Creating {n_events:,} events...")
event_types = ['page_view', 'add_to_cart', 'checkout', 'purchase', 'search']
devices = ['mobile', 'desktop', 'tablet']

events = pd.DataFrame({
    'event_id': range(1, n_events + 1),
    'customer_id': np.random.randint(1, n_customers + 1, n_events),
    'event_type': np.random.choice(event_types, n_events, p=[0.5, 0.2, 0.1, 0.1, 0.1]),
    'event_timestamp': pd.date_range('2024-01-01', periods=n_events, freq='5s'),
    'page_url': [f'/page/{i%1000}' for i in range(n_events)],
    'session_id': [f'session_{i//10}' for i in range(n_events)],
    'device_type': np.random.choice(devices, n_events, p=[0.5, 0.4, 0.1])
})

# Load data into database
print("\n📥 Loading data into database...")
customers.to_sql('customers', conn, if_exists='append', index=False)
products.to_sql('products', conn, if_exists='append', index=False)
orders.to_sql('orders', conn, if_exists='append', index=False)
order_items_df.to_sql('order_items', conn, if_exists='append', index=False)
events.to_sql('events', conn, if_exists='append', index=False)

conn.commit()

print(f"\n✅ Production database populated:")
print(f"  - {n_customers:,} customers")
print(f"  - {n_products:,} products")
print(f"  - {n_orders:,} orders")
print(f"  - {len(order_items_df):,} order items")
print(f"  - {n_events:,} events")

---

## 🔍 Part 1: Understanding Query Execution Plans

The execution plan is your roadmap to understanding how the database processes queries.

### 1.1 Analyzing a Slow Query

In [None]:
# A typical dashboard query - SLOW VERSION
slow_query = """
SELECT 
    c.country,
    COUNT(DISTINCT o.customer_id) as unique_customers,
    COUNT(o.order_id) as total_orders,
    SUM(o.total_amount) as revenue
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
WHERE o.order_date >= '2024-01-01'
  AND o.status = 'completed'
  AND c.segment = 'Premium'
GROUP BY c.country
ORDER BY revenue DESC
"""

print("🐌 SLOW QUERY - No Indexes")
show_sql(slow_query)

# Explain the query
explain_query(slow_query, conn)

# Time the query
result, slow_time = time_query(slow_query, conn, "Slow Query")

print("\n⚠️ Notice: SCAN TABLE means full table scan - very slow!")

### 1.2 Creating Strategic Indexes

In [None]:
# Add strategic indexes based on query patterns
print("🔨 Creating strategic indexes...\n")

index_queries = [
    # Foreign key indexes
    ("CREATE INDEX idx_orders_customer_id ON orders(customer_id)",
     "Foreign key index for JOIN"),
    
    # Filter condition indexes
    ("CREATE INDEX idx_orders_date_status ON orders(order_date, status)",
     "Composite index for WHERE clause"),
    
    ("CREATE INDEX idx_customers_segment ON customers(segment)",
     "Index for segment filtering"),
    
    # Covering index (includes all needed columns)
    ("CREATE INDEX idx_orders_covering ON orders(customer_id, order_date, status, total_amount)",
     "Covering index - all data in index")
]

for query, description in index_queries:
    print(f"📌 {description}")
    cursor.execute(f"DROP INDEX IF EXISTS {query.split()[2]}")
    cursor.execute(query)
    
conn.commit()
print("\n✅ Indexes created!")

In [None]:
# Run the same query with indexes
print("🚀 FAST QUERY - With Indexes")

# Explain the optimized query
explain_query(slow_query, conn)

# Time the query
result, fast_time = time_query(slow_query, conn, "Fast Query")

# Performance improvement
if slow_time > 0:
    improvement = (slow_time - fast_time) / slow_time * 100
    speedup = slow_time / fast_time
    print(f"\n🎉 Performance Improvement:")
    print(f"  - {improvement:.1f}% faster")
    print(f"  - {speedup:.1f}x speedup")
    print(f"  - Saved {slow_time - fast_time:.4f} seconds")

---

## 📈 Part 2: Index Strategies

Indexes are like a book's table of contents - they help find data quickly but take space to maintain.

### 2.1 Types of Indexes and When to Use Them

In [None]:
# Demonstrate different index types
print("📚 INDEX TYPES AND USE CASES\n")

index_examples = {
    "Single Column Index": {
        "sql": "CREATE INDEX idx_customer_email ON customers(email)",
        "use_case": "Lookups by email (WHERE email = 'user@example.com')",
        "pros": "Fast for single column filters",
        "cons": "Not helpful for multi-column filters"
    },
    "Composite Index": {
        "sql": "CREATE INDEX idx_orders_composite ON orders(customer_id, order_date, status)",
        "use_case": "Queries filtering on multiple columns",
        "pros": "Covers multiple filter conditions",
        "cons": "Column order matters! Only helps if leftmost columns are used"
    },
    "Covering Index": {
        "sql": "CREATE INDEX idx_covering ON orders(customer_id, order_date) INCLUDE (total_amount)",
        "use_case": "Query can be satisfied entirely from index",
        "pros": "No table lookup needed - very fast",
        "cons": "Larger index size"
    },
    "Partial Index": {
        "sql": "CREATE INDEX idx_active_orders ON orders(order_date) WHERE status = 'completed'",
        "use_case": "Frequently filter on specific value",
        "pros": "Smaller index, faster for specific queries",
        "cons": "Only helps for the specific condition"
    }
}

for index_type, details in index_examples.items():
    print(f"📌 {index_type}")
    print(f"   SQL: {details['sql']}")
    print(f"   Use: {details['use_case']}")
    print(f"   ✅ {details['pros']}")
    print(f"   ⚠️ {details['cons']}")
    print()

### 2.2 Index Selectivity Analysis

In [None]:
# Analyze index selectivity
selectivity_query = """
SELECT 
    'status' as column_name,
    COUNT(DISTINCT status) as distinct_values,
    COUNT(*) as total_rows,
    ROUND(COUNT(DISTINCT status) * 100.0 / COUNT(*), 4) as selectivity_pct
FROM orders
UNION ALL
SELECT 
    'customer_id',
    COUNT(DISTINCT customer_id),
    COUNT(*),
    ROUND(COUNT(DISTINCT customer_id) * 100.0 / COUNT(*), 4)
FROM orders
UNION ALL
SELECT 
    'payment_method',
    COUNT(DISTINCT payment_method),
    COUNT(*),
    ROUND(COUNT(DISTINCT payment_method) * 100.0 / COUNT(*), 4)
FROM orders
ORDER BY selectivity_pct DESC
"""

print("📊 INDEX SELECTIVITY ANALYSIS")
print("Higher selectivity = Better index candidate\n")

selectivity_result = pd.read_sql(selectivity_query, conn)
display(selectivity_result)

print("\n💡 Insights:")
print("- High selectivity (>10%): Excellent for indexing")
print("- Medium selectivity (1-10%): Good for composite indexes")
print("- Low selectivity (<1%): Poor for single column index")

### 2.3 Index Impact on Write Performance

In [None]:
# Measure write performance with and without indexes
print("⚖️ INDEX IMPACT ON WRITE OPERATIONS\n")

# Test data for inserts
test_orders = pd.DataFrame({
    'order_id': range(1000000, 1001000),
    'customer_id': np.random.randint(1, 50000, 1000),
    'order_date': pd.date_range('2024-01-01', periods=1000, freq='1H'),
    'status': np.random.choice(['completed', 'pending'], 1000),
    'total_amount': np.random.uniform(50, 500, 1000),
    'shipping_country': np.random.choice(['USA', 'UK'], 1000),
    'payment_method': np.random.choice(['credit_card', 'paypal'], 1000)
})

# Test with indexes
print("With indexes:")
start = time.time()
test_orders.head(100).to_sql('orders', conn, if_exists='append', index=False)
with_index_time = time.time() - start
print(f"  Insert time: {with_index_time:.4f} seconds")

# Drop indexes
cursor.execute("DROP INDEX IF EXISTS idx_orders_customer_id")
cursor.execute("DROP INDEX IF EXISTS idx_orders_date_status")

# Test without indexes
print("\nWithout indexes:")
start = time.time()
test_orders.iloc[100:200].to_sql('orders', conn, if_exists='append', index=False)
without_index_time = time.time() - start
print(f"  Insert time: {without_index_time:.4f} seconds")

# Restore indexes
cursor.execute("CREATE INDEX idx_orders_customer_id ON orders(customer_id)")
cursor.execute("CREATE INDEX idx_orders_date_status ON orders(order_date, status)")

print(f"\n📊 Write Performance Impact:")
print(f"  Indexes add {((with_index_time - without_index_time) / without_index_time * 100):.1f}% overhead to inserts")
print("\n💡 Trade-off: Slower writes for much faster reads")

---

## 🚀 Part 3: Query Optimization Techniques

Beyond indexes, there are many ways to optimize queries.

### 3.1 Optimizing JOIN Operations

In [None]:
# Compare different JOIN strategies
print("🔗 JOIN OPTIMIZATION STRATEGIES\n")

# Inefficient: JOIN then filter
inefficient_join = """
SELECT COUNT(*)
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
  AND p.category = 'Electronics'
"""

# Efficient: Filter before JOIN
efficient_join = """
SELECT COUNT(*)
FROM (
    SELECT order_id 
    FROM orders 
    WHERE order_date >= '2024-01-01'
) o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN (
    SELECT product_id 
    FROM products 
    WHERE category = 'Electronics'
) p ON oi.product_id = p.product_id
"""

print("❌ INEFFICIENT: Join all, then filter")
show_sql(inefficient_join)
_, inefficient_time = time_query(inefficient_join, conn, "Inefficient JOIN", show_results=False)

print("\n✅ EFFICIENT: Filter first, then join")
show_sql(efficient_join)
_, efficient_time = time_query(efficient_join, conn, "Efficient JOIN", show_results=False)

if inefficient_time > 0:
    print(f"\n🎯 Improvement: {(inefficient_time / efficient_time):.1f}x faster")

### 3.2 Avoiding N+1 Query Problems

In [None]:
# Demonstrate N+1 problem and solution
print("🔄 N+1 QUERY PROBLEM\n")

# N+1 Problem: Separate query for each customer
print("❌ N+1 PROBLEM: Multiple queries")
start = time.time()

# Get customers
customers_sample = pd.read_sql(
    "SELECT customer_id FROM customers WHERE segment = 'Premium' LIMIT 10", 
    conn
)

# For each customer, get their orders (N+1 queries)
results = []
for customer_id in customers_sample['customer_id']:
    orders = pd.read_sql(
        f"SELECT COUNT(*) as order_count FROM orders WHERE customer_id = {customer_id}",
        conn
    )
    results.append(orders)

n_plus_one_time = time.time() - start
print(f"  Time: {n_plus_one_time:.4f} seconds")
print(f"  Queries executed: {len(customers_sample) + 1}")

# Solution: Single query with JOIN
print("\n✅ SOLUTION: Single query with JOIN")
single_query = """
SELECT 
    c.customer_id,
    COUNT(o.order_id) as order_count
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
WHERE c.segment = 'Premium'
GROUP BY c.customer_id
LIMIT 10
"""

_, single_time = time_query(single_query, conn, "Single Query", show_results=False)
print(f"  Queries executed: 1")

if n_plus_one_time > 0:
    print(f"\n🎯 Improvement: {(n_plus_one_time / single_time):.1f}x faster")

### 3.3 Query Result Caching Strategies

In [None]:
# Implement materialized view pattern
print("💾 MATERIALIZED VIEW PATTERN\n")

# Create a materialized view (summary table)
print("Creating materialized summary...")
cursor.execute("DROP TABLE IF EXISTS daily_sales_summary")

create_summary = """
CREATE TABLE daily_sales_summary AS
SELECT 
    DATE(order_date) as sale_date,
    COUNT(DISTINCT customer_id) as unique_customers,
    COUNT(order_id) as order_count,
    SUM(total_amount) as revenue,
    AVG(total_amount) as avg_order_value
FROM orders
WHERE status = 'completed'
GROUP BY DATE(order_date)
"""

cursor.execute(create_summary)
cursor.execute("CREATE INDEX idx_summary_date ON daily_sales_summary(sale_date)")
conn.commit()

# Compare query performance
print("\n📊 Performance Comparison:\n")

# Direct query
direct_query = """
SELECT 
    DATE(order_date) as sale_date,
    SUM(total_amount) as revenue
FROM orders
WHERE status = 'completed'
  AND order_date >= '2024-01-01'
  AND order_date <= '2024-01-31'
GROUP BY DATE(order_date)
"""

print("❌ Direct aggregation:")
_, direct_time = time_query(direct_query, conn, "Direct Query", show_results=False)

# Using materialized view
summary_query = """
SELECT 
    sale_date,
    revenue
FROM daily_sales_summary
WHERE sale_date >= '2024-01-01'
  AND sale_date <= '2024-01-31'
"""

print("\n✅ Using materialized view:")
_, summary_time = time_query(summary_query, conn, "Summary Query", show_results=False)

if direct_time > 0:
    print(f"\n🎯 Improvement: {(direct_time / summary_time):.1f}x faster")
    print("\n💡 Trade-off: Requires periodic refresh of summary table")

---

## 🏭 Part 4: Production Best Practices

Writing SQL for production requires different considerations than ad-hoc analysis.

### 4.1 Query Monitoring and Profiling

In [None]:
# Create query performance monitoring
print("📊 QUERY PERFORMANCE MONITORING\n")

# Create a query log table
cursor.execute("DROP TABLE IF EXISTS query_log")
cursor.execute("""
CREATE TABLE query_log (
    query_id INTEGER PRIMARY KEY AUTOINCREMENT,
    query_text TEXT,
    execution_time DECIMAL(10,4),
    rows_affected INTEGER,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

def log_query(query, conn):
    """Execute query with logging"""
    start = time.time()
    result = pd.read_sql(query, conn)
    execution_time = time.time() - start
    
    # Log the query
    cursor.execute(
        "INSERT INTO query_log (query_text, execution_time, rows_affected) VALUES (?, ?, ?)",
        (query[:100], execution_time, len(result))
    )
    conn.commit()
    return result, execution_time

# Run some test queries
test_queries = [
    "SELECT COUNT(*) FROM orders",
    "SELECT * FROM customers WHERE segment = 'Premium' LIMIT 10",
    "SELECT category, COUNT(*) FROM products GROUP BY category"
]

print("Running test queries...")
for query in test_queries:
    _, exec_time = log_query(query, conn)
    print(f"  ✓ Query executed in {exec_time:.4f}s")

# Analyze query performance
performance_report = """
SELECT 
    query_text,
    execution_time,
    rows_affected,
    timestamp
FROM query_log
ORDER BY execution_time DESC
"""

print("\n📈 Query Performance Report:")
report = pd.read_sql(performance_report, conn)
display(report)

### 4.2 Connection Pooling and Resource Management

In [None]:
# Demonstrate connection pooling pattern
print("🔌 CONNECTION POOLING PATTERN\n")

from contextlib import contextmanager
import queue

class ConnectionPool:
    """Simple connection pool implementation"""
    def __init__(self, database, max_connections=5):
        self.database = database
        self.max_connections = max_connections
        self.pool = queue.Queue(maxsize=max_connections)
        
        # Initialize pool with connections
        for _ in range(max_connections):
            conn = sqlite3.connect(database)
            self.pool.put(conn)
    
    @contextmanager
    def get_connection(self):
        """Get connection from pool"""
        conn = self.pool.get()
        try:
            yield conn
        finally:
            # Return connection to pool
            self.pool.put(conn)
    
    def close_all(self):
        """Close all connections"""
        while not self.pool.empty():
            conn = self.pool.get()
            conn.close()

# Example usage
pool = ConnectionPool('production.db', max_connections=3)

print("Using connection pool:")
for i in range(5):
    with pool.get_connection() as conn:
        result = pd.read_sql("SELECT COUNT(*) as cnt FROM orders", conn)
        print(f"  Query {i+1}: {result['cnt'].iloc[0]:,} orders")

pool.close_all()
print("\n✅ Connection pool prevents connection exhaustion!")

### 4.3 SQL Injection Prevention

In [None]:
# Demonstrate SQL injection prevention
print("🔒 SQL INJECTION PREVENTION\n")

# Vulnerable code - DON'T DO THIS!
def vulnerable_query(user_input):
    # Direct string concatenation - DANGEROUS!
    query = f"SELECT * FROM customers WHERE email = '{user_input}'"
    return query

# Safe code - DO THIS!
def safe_query(user_input, conn):
    # Parameterized query - SAFE!
    query = "SELECT * FROM customers WHERE email = ?"
    return pd.read_sql(query, conn, params=(user_input,))

# Example malicious input
malicious_input = "admin@example.com' OR '1'='1"

print("❌ VULNERABLE Query:")
vulnerable = vulnerable_query(malicious_input)
print(f"  {vulnerable}")
print("  ⚠️ This would return ALL customers!")

print("\n✅ SAFE Query:")
print("  Using parameterized query with ?")
print("  The malicious input is treated as a literal string")

# Best practices
print("\n📝 SQL Injection Prevention Rules:")
print("1. ALWAYS use parameterized queries (? or :param)")
print("2. NEVER concatenate user input into SQL strings")
print("3. Validate and sanitize all inputs")
print("4. Use stored procedures where appropriate")
print("5. Apply principle of least privilege to database users")

---

## 📊 Part 5: Database Statistics and Maintenance

Keep your database healthy with regular maintenance.

In [None]:
# Analyze database statistics
print("📊 DATABASE STATISTICS\n")

# Table sizes
size_query = """
SELECT 
    name as table_name,
    (SELECT COUNT(*) FROM sqlite_master WHERE type='index' AND tbl_name=m.name) as index_count
FROM sqlite_master m
WHERE type = 'table'
  AND name NOT LIKE 'sqlite_%'
ORDER BY name
"""

table_stats = pd.read_sql(size_query, conn)

# Add row counts
for idx, row in table_stats.iterrows():
    count_query = f"SELECT COUNT(*) as cnt FROM {row['table_name']}"
    count = pd.read_sql(count_query, conn)['cnt'].iloc[0]
    table_stats.at[idx, 'row_count'] = count

print("Table Statistics:")
display(table_stats)

# Database maintenance commands
print("\n🔧 MAINTENANCE OPERATIONS:\n")

# Analyze tables for query optimizer
print("1. Updating statistics...")
cursor.execute("ANALYZE")
print("   ✓ Statistics updated")

# Vacuum to reclaim space
print("\n2. Vacuuming database...")
cursor.execute("VACUUM")
print("   ✓ Space reclaimed")

# Check integrity
print("\n3. Checking integrity...")
integrity = cursor.execute("PRAGMA integrity_check").fetchone()
print(f"   ✓ Integrity: {integrity[0]}")

print("\n✅ Database maintenance complete!")

---

## 🎯 Practice Exercises

Apply what you've learned to optimize real queries!

### Exercise 1: Optimize a Slow Dashboard Query

This query powers a executive dashboard but takes too long.

In [None]:
# Slow dashboard query
dashboard_query = """
SELECT 
    c.segment,
    p.category,
    COUNT(DISTINCT o.customer_id) as customers,
    COUNT(DISTINCT o.order_id) as orders,
    SUM(oi.quantity * oi.unit_price) as revenue
FROM order_items oi
JOIN orders o ON oi.order_id = o.order_id
JOIN customers c ON o.customer_id = c.customer_id
JOIN products p ON oi.product_id = p.product_id
WHERE o.order_date >= '2024-01-01'
GROUP BY c.segment, p.category
ORDER BY revenue DESC
"""

print("TODO: Optimize this query")
print("Hints:")
print("1. Check what indexes exist")
print("2. Consider creating a summary table")
print("3. Think about filtering early")

### Exercise 2: Design Indexes for Common Queries

Given these common query patterns, design optimal indexes.

In [None]:
# Common query patterns
queries = [
    "SELECT * FROM orders WHERE customer_id = ? AND status = 'completed'",
    "SELECT * FROM products WHERE category = ? ORDER BY price DESC",
    "SELECT * FROM events WHERE customer_id = ? AND event_timestamp >= ?",
    "SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id"
]

print("TODO: Design indexes for these queries")
print("Consider:")
print("- Single vs composite indexes")
print("- Column order in composite indexes")
print("- Covering indexes where beneficial")

---

## 🎓 Key Takeaways

1. **Query Plans Are Your Friend**:
   - Always EXPLAIN before optimizing
   - Look for full table scans (SCAN TABLE)
   - Prefer index seeks over scans

2. **Index Strategy**:
   - Index foreign keys and WHERE clause columns
   - Consider composite indexes for multi-column filters
   - Remember: indexes slow down writes

3. **Query Optimization**:
   - Filter early, join late
   - Avoid N+1 queries
   - Use materialized views for complex aggregations

4. **Production Best Practices**:
   - Always use parameterized queries
   - Implement connection pooling
   - Monitor and log slow queries
   - Regular maintenance (ANALYZE, VACUUM)

5. **Performance Metrics**:
   - Measure before and after optimization
   - Consider both read and write performance
   - Profile in production-like environment

---

## 🚀 Next Steps

You're now ready to:
- Optimize slow queries in production
- Design efficient database schemas
- Implement monitoring and maintenance
- Scale to millions of rows

Remember: **Performance is a feature!** ⚡

In [None]:
# Cleanup
conn.close()
print("✅ Database connection closed. Great work on production SQL optimization!")