# Chapter 18: Advanced Database Patterns

This notebook brings together everything from the previous two notebooks and
introduces patterns used in real applications: schema migrations, the repository
pattern, aggregate queries, indexes, foreign keys, and a practical task tracker.

## What You Will Learn
- Schema migrations with versioned `ALTER TABLE` patterns
- In-memory databases for fast, isolated testing
- The repository pattern for separating data access from business logic
- Aggregate queries: `COUNT`, `SUM`, `AVG`, `GROUP BY`
- Indexes and query performance
- Foreign keys and relationships
- Practical example: building a simple task tracker

## Schema Migrations: Versioned ALTER TABLE

As your application evolves, the database schema needs to change. A **migration**
applies incremental, versioned changes. Here is a lightweight pattern that
tracks the current schema version in the database itself.

In [None]:
import sqlite3


def get_schema_version(conn: sqlite3.Connection) -> int:
    """Read the current schema version from the database."""
    # user_version is a built-in SQLite pragma
    return conn.execute("PRAGMA user_version").fetchone()[0]


def set_schema_version(conn: sqlite3.Connection, version: int) -> None:
    """Set the schema version (cannot use parameter binding with PRAGMA)."""
    conn.execute(f"PRAGMA user_version = {version}")


# Define migrations as a list of (version, description, sql) tuples
MIGRATIONS: list[tuple[int, str, str]] = [
    (1, "Create users table", """
        CREATE TABLE users (
            id    INTEGER PRIMARY KEY,
            name  TEXT NOT NULL,
            email TEXT NOT NULL UNIQUE
        )
    """),
    (2, "Add created_at column to users", """
        ALTER TABLE users ADD COLUMN created_at TEXT DEFAULT CURRENT_TIMESTAMP
    """),
    (3, "Add is_active column to users", """
        ALTER TABLE users ADD COLUMN is_active INTEGER DEFAULT 1
    """),
]


def migrate(conn: sqlite3.Connection) -> None:
    """Apply all pending migrations."""
    current_version: int = get_schema_version(conn)
    print(f"Current schema version: {current_version}")

    for version, description, sql in MIGRATIONS:
        if version > current_version:
            print(f"  Applying migration {version}: {description}")
            with conn:
                conn.executescript(sql)
                set_schema_version(conn, version)

    final: int = get_schema_version(conn)
    print(f"Schema version after migration: {final}")


# Run migrations on a fresh database
conn: sqlite3.Connection = sqlite3.connect(":memory:")
migrate(conn)

# Running again is a no-op (idempotent)
print("\nRunning migrations again:")
migrate(conn)

## In-Memory Databases for Testing

In-memory databases are perfect for unit tests: they are fast, isolated, and
automatically cleaned up. Each `connect(":memory:")` call creates a completely
independent database.

In [None]:
def create_test_db() -> sqlite3.Connection:
    """Create a fresh test database with schema and sample data."""
    conn = sqlite3.connect(":memory:")
    conn.row_factory = sqlite3.Row
    conn.execute("""
        CREATE TABLE products (
            id    INTEGER PRIMARY KEY,
            name  TEXT NOT NULL,
            price REAL NOT NULL,
            stock INTEGER NOT NULL DEFAULT 0
        )
    """)
    conn.executemany(
        "INSERT INTO products (name, price, stock) VALUES (?, ?, ?)",
        [
            ("Widget", 9.99, 100),
            ("Gadget", 24.99, 50),
            ("Doohickey", 4.99, 200),
        ],
    )
    conn.commit()
    return conn


# Simulate test functions
def test_product_count() -> None:
    """Test that the database has the expected number of products."""
    db = create_test_db()
    count: int = db.execute("SELECT COUNT(*) FROM products").fetchone()[0]
    assert count == 3, f"Expected 3 products, got {count}"
    db.close()
    print("  test_product_count: PASSED")


def test_update_stock() -> None:
    """Test that stock updates work correctly."""
    db = create_test_db()
    db.execute("UPDATE products SET stock = stock - 10 WHERE name = ?", ("Widget",))
    db.commit()
    row = db.execute("SELECT stock FROM products WHERE name = ?", ("Widget",)).fetchone()
    assert row["stock"] == 90, f"Expected 90, got {row['stock']}"
    db.close()
    print("  test_update_stock: PASSED")


def test_isolation() -> None:
    """Each test gets its own database -- changes don't leak."""
    db = create_test_db()
    # Widget should still have 100 stock (previous test's changes don't affect us)
    row = db.execute("SELECT stock FROM products WHERE name = ?", ("Widget",)).fetchone()
    assert row["stock"] == 100, f"Expected 100, got {row['stock']}"
    db.close()
    print("  test_isolation: PASSED")


print("Running tests with in-memory databases:")
test_product_count()
test_update_stock()
test_isolation()

## The Repository Pattern

The **repository pattern** separates data access logic from business logic.
Instead of writing SQL throughout your application, you encapsulate all
database operations behind a clean interface. This makes your code easier
to test and maintain.

In [None]:
from dataclasses import dataclass
from typing import Protocol


@dataclass
class User:
    """Domain model for a user."""
    id: int | None
    name: str
    email: str


class UserRepository(Protocol):
    """Interface for user data access."""
    def add(self, user: User) -> User: ...
    def get_by_id(self, user_id: int) -> User | None: ...
    def get_all(self) -> list[User]: ...
    def update(self, user: User) -> None: ...
    def delete(self, user_id: int) -> bool: ...


class SQLiteUserRepository:
    """SQLite implementation of the UserRepository."""

    def __init__(self, conn: sqlite3.Connection) -> None:
        self.conn = conn
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id    INTEGER PRIMARY KEY,
                name  TEXT NOT NULL,
                email TEXT NOT NULL UNIQUE
            )
        """)
        self.conn.commit()

    def _row_to_user(self, row: tuple) -> User:
        return User(id=row[0], name=row[1], email=row[2])

    def add(self, user: User) -> User:
        with self.conn:
            cur = self.conn.execute(
                "INSERT INTO users (name, email) VALUES (?, ?)",
                (user.name, user.email),
            )
        return User(id=cur.lastrowid, name=user.name, email=user.email)

    def get_by_id(self, user_id: int) -> User | None:
        row = self.conn.execute(
            "SELECT id, name, email FROM users WHERE id = ?", (user_id,)
        ).fetchone()
        return self._row_to_user(row) if row else None

    def get_all(self) -> list[User]:
        rows = self.conn.execute("SELECT id, name, email FROM users ORDER BY name").fetchall()
        return [self._row_to_user(row) for row in rows]

    def update(self, user: User) -> None:
        with self.conn:
            self.conn.execute(
                "UPDATE users SET name = ?, email = ? WHERE id = ?",
                (user.name, user.email, user.id),
            )

    def delete(self, user_id: int) -> bool:
        with self.conn:
            cur = self.conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
        return cur.rowcount > 0


# Use the repository -- business logic never sees SQL
db: sqlite3.Connection = sqlite3.connect(":memory:")
repo: SQLiteUserRepository = SQLiteUserRepository(db)

alice: User = repo.add(User(id=None, name="Alice", email="alice@example.com"))
bob: User = repo.add(User(id=None, name="Bob", email="bob@example.com"))
print(f"Added: {alice}")
print(f"Added: {bob}")

# Read
found: User | None = repo.get_by_id(alice.id)
print(f"\nFound by id: {found}")

# Update
alice.name = "Alice Smith"
repo.update(alice)
print(f"After update: {repo.get_by_id(alice.id)}")

# List all
print(f"\nAll users: {repo.get_all()}")

# Delete
deleted: bool = repo.delete(bob.id)
print(f"\nDeleted Bob: {deleted}")
print(f"All users after delete: {repo.get_all()}")

db.close()

## Aggregate Queries: COUNT, SUM, AVG, GROUP BY

SQL aggregate functions summarize data across rows. Combined with `GROUP BY`,
they produce powerful analytical results directly from the database.

In [None]:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row

conn.execute("""
    CREATE TABLE sales (
        id       INTEGER PRIMARY KEY,
        product  TEXT NOT NULL,
        category TEXT NOT NULL,
        quantity INTEGER NOT NULL,
        price    REAL NOT NULL
    )
""")

sales_data: list[tuple[str, str, int, float]] = [
    ("Laptop",      "Electronics", 5,  999.99),
    ("Mouse",       "Electronics", 50, 29.99),
    ("Keyboard",    "Electronics", 30, 79.99),
    ("Desk",        "Furniture",   10, 299.99),
    ("Chair",       "Furniture",   15, 199.99),
    ("Monitor",     "Electronics", 20, 349.99),
    ("Bookshelf",   "Furniture",   8,  149.99),
    ("Headphones",  "Electronics", 40, 59.99),
]

conn.executemany(
    "INSERT INTO sales (product, category, quantity, price) VALUES (?, ?, ?, ?)",
    sales_data,
)
conn.commit()

# Basic aggregates
row = conn.execute("""
    SELECT
        COUNT(*)            AS total_products,
        SUM(quantity)       AS total_units,
        ROUND(AVG(price), 2) AS avg_price,
        MIN(price)          AS cheapest,
        MAX(price)          AS most_expensive
    FROM sales
""").fetchone()

print("Overall statistics:")
print(f"  Products:       {row['total_products']}")
print(f"  Total units:    {row['total_units']}")
print(f"  Average price:  ${row['avg_price']}")
print(f"  Cheapest:       ${row['cheapest']}")
print(f"  Most expensive: ${row['most_expensive']}")

In [None]:
# GROUP BY -- aggregate per category
print("Sales by category:")
print(f"  {'Category':<15} {'Products':>8} {'Units':>8} {'Avg Price':>10} {'Revenue':>12}")
print(f"  {'-'*55}")

for row in conn.execute("""
    SELECT
        category,
        COUNT(*)                      AS num_products,
        SUM(quantity)                 AS total_units,
        ROUND(AVG(price), 2)          AS avg_price,
        ROUND(SUM(quantity * price), 2) AS revenue
    FROM sales
    GROUP BY category
    ORDER BY revenue DESC
"""):
    print(f"  {row['category']:<15} {row['num_products']:>8} "
          f"{row['total_units']:>8} {row['avg_price']:>10.2f} "
          f"{row['revenue']:>12.2f}")

# HAVING -- filter groups
print("\nCategories with revenue over $5000:")
for row in conn.execute("""
    SELECT category, ROUND(SUM(quantity * price), 2) AS revenue
    FROM sales
    GROUP BY category
    HAVING revenue > 5000
"""):
    print(f"  {row['category']}: ${row['revenue']:,.2f}")

## Indexes and Query Performance

An **index** is a data structure that speeds up lookups on specific columns,
similar to a book's index. Without an index, SQLite performs a full table scan.
Use `EXPLAIN QUERY PLAN` to see how SQLite executes a query.

In [None]:
# Check query plan BEFORE adding an index
print("Query plan WITHOUT index:")
plan = conn.execute(
    "EXPLAIN QUERY PLAN SELECT * FROM sales WHERE category = ?",
    ("Electronics",)
).fetchall()
for row in plan:
    print(f"  {row['detail']}")

# Create an index on the category column
conn.execute("CREATE INDEX IF NOT EXISTS idx_sales_category ON sales (category)")

# Check query plan AFTER adding an index
print("\nQuery plan WITH index:")
plan = conn.execute(
    "EXPLAIN QUERY PLAN SELECT * FROM sales WHERE category = ?",
    ("Electronics",)
).fetchall()
for row in plan:
    print(f"  {row['detail']}")

# List all indexes in the database
print("\nAll indexes:")
for row in conn.execute("SELECT name, tbl_name FROM sqlite_master WHERE type='index'"):
    print(f"  {row['name']} on {row['tbl_name']}")

conn.close()

## Foreign Keys and Relationships

Foreign keys enforce referential integrity between tables. SQLite supports
foreign keys but **they are disabled by default**. You must enable them with
`PRAGMA foreign_keys = ON` for each connection.

In [None]:
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row

# Enable foreign key enforcement
conn.execute("PRAGMA foreign_keys = ON")
fk_status = conn.execute("PRAGMA foreign_keys").fetchone()[0]
print(f"Foreign keys enabled: {bool(fk_status)}")

# Create related tables
conn.executescript("""
    CREATE TABLE authors (
        id   INTEGER PRIMARY KEY,
        name TEXT NOT NULL
    );

    CREATE TABLE books (
        id        INTEGER PRIMARY KEY,
        title     TEXT NOT NULL,
        author_id INTEGER NOT NULL,
        FOREIGN KEY (author_id) REFERENCES authors(id)
            ON DELETE CASCADE
    );
""")

# Insert authors
conn.execute("INSERT INTO authors (name) VALUES (?)", ("Alice Walker",))
conn.execute("INSERT INTO authors (name) VALUES (?)", ("Bob Martin",))

# Insert books linked to authors
conn.execute("INSERT INTO books (title, author_id) VALUES (?, ?)",
             ("The Color Purple", 1))
conn.execute("INSERT INTO books (title, author_id) VALUES (?, ?)",
             ("Clean Code", 2))
conn.execute("INSERT INTO books (title, author_id) VALUES (?, ?)",
             ("Clean Architecture", 2))
conn.commit()

# JOIN query: get books with author names
print("\nBooks with authors (JOIN):")
for row in conn.execute("""
    SELECT b.title, a.name AS author
    FROM books b
    JOIN authors a ON b.author_id = a.id
    ORDER BY a.name, b.title
"""):
    print(f"  '{row['title']}' by {row['author']}")

# Foreign key constraint prevents invalid references
try:
    conn.execute("INSERT INTO books (title, author_id) VALUES (?, ?)",
                 ("Ghost Book", 999))  # author_id 999 doesn't exist
except sqlite3.IntegrityError as e:
    print(f"\nFK violation caught: {e}")

# CASCADE delete: removing an author removes their books
conn.execute("DELETE FROM authors WHERE id = ?", (2,))
conn.commit()

remaining = conn.execute("SELECT title FROM books").fetchall()
print(f"\nBooks after deleting Bob Martin (CASCADE): {[r['title'] for r in remaining]}")

conn.close()

## Practical Example: Building a Simple Task Tracker

Let's combine everything into a practical application: a task tracker with
projects, tasks, tags, migrations, and aggregate reporting.

In [None]:
import sqlite3
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum


class TaskStatus(Enum):
    TODO = "todo"
    IN_PROGRESS = "in_progress"
    DONE = "done"


@dataclass
class Task:
    """Domain model for a task."""
    id: int | None
    project: str
    title: str
    status: TaskStatus = TaskStatus.TODO
    priority: int = 1  # 1=low, 2=medium, 3=high
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())


class TaskTracker:
    """A simple task tracking application backed by SQLite."""

    def __init__(self, db_path: str = ":memory:") -> None:
        self.conn = sqlite3.connect(db_path)
        self.conn.row_factory = sqlite3.Row
        self.conn.execute("PRAGMA foreign_keys = ON")
        self._create_schema()

    def _create_schema(self) -> None:
        self.conn.executescript("""
            CREATE TABLE IF NOT EXISTS tasks (
                id         INTEGER PRIMARY KEY,
                project    TEXT NOT NULL,
                title      TEXT NOT NULL,
                status     TEXT NOT NULL DEFAULT 'todo',
                priority   INTEGER NOT NULL DEFAULT 1,
                created_at TEXT NOT NULL
            );

            CREATE INDEX IF NOT EXISTS idx_tasks_project ON tasks (project);
            CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks (status);
        """)

    def _row_to_task(self, row: sqlite3.Row) -> Task:
        return Task(
            id=row["id"],
            project=row["project"],
            title=row["title"],
            status=TaskStatus(row["status"]),
            priority=row["priority"],
            created_at=row["created_at"],
        )

    def add_task(self, task: Task) -> Task:
        """Add a new task and return it with the assigned ID."""
        with self.conn:
            cur = self.conn.execute(
                "INSERT INTO tasks (project, title, status, priority, created_at) "
                "VALUES (?, ?, ?, ?, ?)",
                (task.project, task.title, task.status.value,
                 task.priority, task.created_at),
            )
        task.id = cur.lastrowid
        return task

    def update_status(self, task_id: int, status: TaskStatus) -> None:
        """Change a task's status."""
        with self.conn:
            self.conn.execute(
                "UPDATE tasks SET status = ? WHERE id = ?",
                (status.value, task_id),
            )

    def get_tasks(self, project: str | None = None,
                  status: TaskStatus | None = None) -> list[Task]:
        """Retrieve tasks with optional filters."""
        query: str = "SELECT * FROM tasks WHERE 1=1"
        params: list[str] = []
        if project:
            query += " AND project = ?"
            params.append(project)
        if status:
            query += " AND status = ?"
            params.append(status.value)
        query += " ORDER BY priority DESC, created_at"
        rows = self.conn.execute(query, params).fetchall()
        return [self._row_to_task(r) for r in rows]

    def get_summary(self) -> list[dict[str, object]]:
        """Get task counts grouped by project and status."""
        rows = self.conn.execute("""
            SELECT
                project,
                SUM(CASE WHEN status = 'todo' THEN 1 ELSE 0 END) AS todo,
                SUM(CASE WHEN status = 'in_progress' THEN 1 ELSE 0 END) AS in_progress,
                SUM(CASE WHEN status = 'done' THEN 1 ELSE 0 END) AS done,
                COUNT(*) AS total
            FROM tasks
            GROUP BY project
            ORDER BY project
        """).fetchall()
        return [dict(row) for row in rows]

    def close(self) -> None:
        self.conn.close()


print("TaskTracker class defined successfully.")

In [None]:
# Build and use the task tracker
tracker = TaskTracker()

# Add tasks to different projects
tasks_to_add: list[Task] = [
    Task(id=None, project="Website", title="Design homepage", priority=3),
    Task(id=None, project="Website", title="Implement auth", priority=3),
    Task(id=None, project="Website", title="Write tests", priority=2),
    Task(id=None, project="Website", title="Deploy to staging", priority=1),
    Task(id=None, project="API", title="Design endpoints", priority=3),
    Task(id=None, project="API", title="Add rate limiting", priority=2),
    Task(id=None, project="API", title="Write documentation", priority=1),
    Task(id=None, project="Mobile", title="Setup project", priority=3),
    Task(id=None, project="Mobile", title="Login screen", priority=2),
]

for task in tasks_to_add:
    tracker.add_task(task)

print(f"Added {len(tasks_to_add)} tasks across projects.\n")

# Update some statuses
tracker.update_status(1, TaskStatus.DONE)         # Design homepage
tracker.update_status(2, TaskStatus.IN_PROGRESS)   # Implement auth
tracker.update_status(5, TaskStatus.DONE)          # Design endpoints
tracker.update_status(6, TaskStatus.IN_PROGRESS)   # Add rate limiting
tracker.update_status(8, TaskStatus.DONE)          # Setup project

# Query tasks by project
print("Website tasks:")
for t in tracker.get_tasks(project="Website"):
    priority_label = {1: "LOW", 2: "MED", 3: "HIGH"}[t.priority]
    print(f"  [{t.status.value:12s}] [{priority_label:>4s}] {t.title}")

# Query tasks by status
print("\nAll in-progress tasks:")
for t in tracker.get_tasks(status=TaskStatus.IN_PROGRESS):
    print(f"  {t.project}/{t.title}")

In [None]:
# Project summary report using aggregate queries
print("Project Summary")
print(f"{'Project':<12} {'Todo':>6} {'In Prog':>8} {'Done':>6} {'Total':>7}")
print(f"{'-'*41}")

for summary in tracker.get_summary():
    print(f"{summary['project']:<12} {summary['todo']:>6} "
          f"{summary['in_progress']:>8} {summary['done']:>6} "
          f"{summary['total']:>7}")

# Overall stats
all_tasks: list[Task] = tracker.get_tasks()
done_count: int = sum(1 for t in all_tasks if t.status == TaskStatus.DONE)
total_count: int = len(all_tasks)
completion: float = (done_count / total_count * 100) if total_count else 0.0

print(f"\nOverall completion: {done_count}/{total_count} ({completion:.0f}%)")

tracker.close()

## Summary

### Key Takeaways

| Topic | What You Learned |
|-------|------------------|
| **Schema migrations** | Version schema changes with `PRAGMA user_version` and ordered migration scripts |
| **In-memory testing** | `":memory:"` gives fast, isolated databases for each test |
| **Repository pattern** | Encapsulate SQL behind a clean Python interface with typed domain models |
| **Aggregates** | `COUNT`, `SUM`, `AVG`, `MIN`, `MAX` with `GROUP BY` and `HAVING` |
| **Indexes** | Speed up queries with `CREATE INDEX`; verify with `EXPLAIN QUERY PLAN` |
| **Foreign keys** | `PRAGMA foreign_keys = ON`, `REFERENCES`, `ON DELETE CASCADE` |
| **Task tracker** | Combines all patterns: schema, repository, queries, and reporting |

### Architecture Guidelines
- **Separate concerns**: Keep SQL in repository classes, business logic outside
- **Use migrations**: Never manually alter a database; script every change
- **Index strategically**: Add indexes on columns you filter or join on
- **Enable foreign keys**: They catch bugs before bad data enters the system
- **Test with in-memory databases**: Fast, isolated, no cleanup needed
- **Use domain models**: Map rows to dataclasses or named tuples for type safety