# Week 3 Notebook: SQL Parameterization and Input Safety

This notebook is built for defensive learning. It contrasts insecure patterns with secure alternatives and explains *why* each change matters.

## How to read this notebook

- Code comments explain both low-level behavior and big-picture security concepts.
- Each insecure example is followed by a secure implementation.
- All demos run locally with a temporary in-memory database and local temp files.

Use this for class/lab environments only, never against real systems.

In [None]:
# Standard library imports only, so students can run this notebook without extra installs.
# sqlite3: lightweight database engine included with Python.
# subprocess: used to show command-construction risks beyond SQL.
# tempfile + pathlib: used for safe, self-contained file-path demos.
import sqlite3
import subprocess
import tempfile
from pathlib import Path

In [None]:
def build_demo_db():
    # We create an in-memory DB so every notebook run starts from a known clean state.
    # This removes setup friction and prevents accidental persistence of test data.
    conn = sqlite3.connect(":memory:")

    # Row factory returns dict-like rows (sqlite3.Row), which are easier to inspect in teaching demos.
    conn.row_factory = sqlite3.Row

    # executescript lets us define schema + seed data in one deterministic block.
    # The table models a simple authentication/account context.
    conn.executescript(
        """
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            username TEXT NOT NULL UNIQUE,
            password TEXT NOT NULL,
            role TEXT NOT NULL,
            failed_logins INTEGER NOT NULL DEFAULT 0
        );

        INSERT INTO users (username, password, role, failed_logins) VALUES
            ('admin', 'admin_pw', 'admin', 0),
            ('alice', 'alice_pw', 'student', 1),
            ('bob', 'bob_pw', 'student', 3),
            ('carol', 'carol_pw', 'ta', 0);
        """
    )

    return conn


def print_rows(rows):
    # Shared display helper so each demo focuses on query logic, not print formatting noise.
    for row in rows:
        print(dict(row))
    print(f"row_count={len(rows)}")


# Initialize one connection used across demos below.
conn = build_demo_db()

## Demo 1: Login query built with string interpolation (insecure)

Big-picture issue: if user-controlled input is merged into SQL text, the input can become SQL syntax and alter query meaning.

In [None]:
def insecure_login(conn, username, password):
    # BAD PATTERN:
    # This f-string injects raw input directly into SQL text.
    # If username/password includes quote/comment/operator payloads,
    # the SQL parser may interpret that payload as query logic.
    query = f"""
    SELECT username, role
    FROM users
    WHERE username = '{username}' AND password = '{password}'
    """

    # By the time we execute, the query string is already potentially compromised.
    rows = conn.execute(query).fetchall()
    return rows, query


# Normal behavior with expected credentials.
rows, query = insecure_login(conn, "alice", "alice_pw")
print("Normal login query:")
print(query.strip())
print_rows(rows)

# Injection payload designed to short-circuit auth logic in a vulnerable query.
attack_password = "' OR '1'='1' --"
rows, query = insecure_login(conn, "not_a_user", attack_password)
print("\nInjected login query:")
print(query.strip())
print_rows(rows)

### Secure replacement: positional parameters (`?`)

Big-picture fix: separate *query structure* from *user data*. The SQL engine handles escaping/binding so user text stays data, not code.

In [None]:
def secure_login_positional(conn, username, password):
    # GOOD PATTERN:
    # Query text uses placeholders for values.
    # Input is passed separately in the second argument to execute().
    query = """
    SELECT username, role
    FROM users
    WHERE username = ? AND password = ?
    """

    # Database binds values safely; payload characters are treated as literal text.
    rows = conn.execute(query, (username, password)).fetchall()
    return rows, query


rows, query = secure_login_positional(conn, "not_a_user", attack_password)
print("Parameterized query text:")
print(query.strip())
print("Bound values:", ("not_a_user", attack_password))
print_rows(rows)

## Demo 2: `LIKE` search and named parameters

Named parameters improve readability when many values are involved or when query templates are reused.

In [None]:
def insecure_username_search(conn, keyword):
    # BAD PATTERN: keyword is merged into SQL text inside the LIKE expression.
    # If keyword closes the quote and appends SQL, query semantics change.
    query = f"SELECT username, role FROM users WHERE username LIKE '%{keyword}%'"
    rows = conn.execute(query).fetchall()
    return rows, query


rows, query = insecure_username_search(conn, "ali")
print("Insecure search query (normal):")
print(query)
print_rows(rows)

# This payload attempts to escape out of LIKE and force a true predicate.
attack_keyword = "%' OR 1=1 --"
rows, query = insecure_username_search(conn, attack_keyword)
print("\nInsecure search query (injected):")
print(query)
print_rows(rows)

In [None]:
def secure_username_search_named(conn, keyword):
    # GOOD PATTERN: SQL contains a named placeholder.
    query = "SELECT username, role FROM users WHERE username LIKE :pattern"

    # Wildcards belong in the value, not by concatenating raw SQL fragments.
    params = {"pattern": f"%{keyword}%"}

    # execute(query, params) keeps grammar fixed and substitutes safely.
    rows = conn.execute(query, params).fetchall()
    return rows, query, params


rows, query, params = secure_username_search_named(conn, attack_keyword)
print("Named-parameter query:")
print(query)
print("Params:", params)
print_rows(rows)

## Demo 3: Dynamic `IN (...)` list

Common mistake: manually quoting and joining user values into an `IN` clause.

Secure pattern: generate placeholders for list length, then bind values separately.

In [None]:
def insecure_roles_query(conn, roles):
    # BAD PATTERN: wraps each role in single quotes manually, then string-joins into SQL.
    # If any role contains quote/control text, it can terminate the literal and inject logic.
    role_list = ",".join(f"'{r}'" for r in roles)
    query = f"SELECT username, role FROM users WHERE role IN ({role_list})"
    rows = conn.execute(query).fetchall()
    return rows, query


rows, query = insecure_roles_query(conn, ["student", "ta"])
print("Insecure IN query (normal):")
print(query)
print_rows(rows)

attack_role = "student') OR 1=1 --"
rows, query = insecure_roles_query(conn, [attack_role])
print("\nInsecure IN query (injected):")
print(query)
print_rows(rows)

In [None]:
def secure_roles_query(conn, roles):
    # Edge-case handling: empty input list should return no rows safely.
    # Returning a query with WHERE 1=0 keeps semantics explicit for students.
    if not roles:
        return [], "SELECT username, role FROM users WHERE 1=0", ()

    # Build exactly one placeholder per role value.
    # Example: roles length 3 -> '?,?,?'.
    placeholders = ",".join("?" for _ in roles)

    # Query grammar is still deterministic, only placeholder count changes.
    query = f"SELECT username, role FROM users WHERE role IN ({placeholders})"

    # Values are bound as data, preventing clause manipulation.
    rows = conn.execute(query, roles).fetchall()
    return rows, query, tuple(roles)


rows, query, params = secure_roles_query(conn, [attack_role])
print("Safe IN query:")
print(query)
print("Params:", params)
print_rows(rows)

## Demo 4: Dynamic sort (`ORDER BY`)

Important nuance: placeholders parameterize values, not SQL identifiers (column names, sort keywords).

For identifiers, use an allowlist of pre-approved SQL fragments.

In [None]:
def insecure_sort(conn, sort_expression):
    # BAD PATTERN: sort_expression is pasted directly into ORDER BY.
    # This can allow injection of additional SQL syntax.
    query = f"SELECT username, role, failed_logins FROM users ORDER BY {sort_expression}"
    rows = conn.execute(query).fetchall()
    return rows, query


rows, query = insecure_sort(conn, "failed_logins DESC, username ASC")
print("Insecure ORDER BY query:")
print(query)
print_rows(rows)

# Teaching note: ORDER BY ? does NOT mean "bind a column expression".
# It binds a value literal, so the sort intent is not achieved.
rows = conn.execute(
    "SELECT username, role, failed_logins FROM users ORDER BY ?",
    ("failed_logins DESC",),
).fetchall()
print("\nORDER BY ? does not safely switch columns; it treats the value as data.")
print_rows(rows)

In [None]:
# Allowlist maps externally visible sort keys to trusted SQL snippets.
# The trusted snippets are authored by developers, not passed from users.
SAFE_SORTS = {
    "username_asc": "username ASC",
    "role_asc": "role ASC, username ASC",
    "failed_desc": "failed_logins DESC, username ASC",
}


def secure_sort(conn, sort_key):
    # Unknown keys fall back to a safe default instead of raising SQL risk.
    sort_clause = SAFE_SORTS.get(sort_key, SAFE_SORTS["username_asc"])

    # String interpolation is acceptable here because sort_clause came from our allowlist.
    query = f"SELECT username, role, failed_logins FROM users ORDER BY {sort_clause}"
    rows = conn.execute(query).fetchall()
    return rows, query, sort_clause


rows, query, chosen_clause = secure_sort(conn, "failed_desc")
print("Allowlist ORDER BY query:")
print(query)
print("Chosen clause:", chosen_clause)
print_rows(rows)

rows, query, chosen_clause = secure_sort(conn, "failed_logins DESC, username ASC")
print("\nUnknown key fallback clause:", chosen_clause)
print_rows(rows)

## Similar basic demo A: command construction

Big-picture transfer: SQL injection is one instance of a broader problem.

Whenever user input is concatenated into an interpreted language (SQL, shell, template engines), input can become control syntax.

In [None]:
def insecure_echo(user_text):
    # BAD PATTERN: shell=True + interpolated command string.
    # Shell metacharacters in user_text can create extra commands.
    cmd = f"echo USER={user_text}"
    output = subprocess.check_output(cmd, shell=True, text=True).strip()
    return cmd, output


def secure_echo(user_text):
    # GOOD PATTERN: pass argument list with shell=False.
    # The process receives exact arguments; shell syntax is not interpreted.
    cmd = ["echo", f"USER={user_text}"]
    output = subprocess.check_output(cmd, shell=False, text=True).strip()
    return cmd, output


attack_text = "alice; echo INJECTED"
cmd, output = insecure_echo(attack_text)
print("Insecure command:", cmd)
print(output)

cmd, output = secure_echo(attack_text)
print("\nSecure command args:", cmd)
print(output)

## Similar basic demo B: path traversal

Big-picture transfer: treating user path input as trusted can break directory boundaries and expose sensitive files.

In [None]:
def setup_demo_files():
    # Build a temporary mini-filesystem for demonstration.
    # root/secret.txt should NOT be readable through the public directory boundary.
    root = Path(tempfile.mkdtemp(prefix="week3_input_safety_"))
    public = root / "public"
    public.mkdir()

    (public / "welcome.txt").write_text("public info\n", encoding="utf-8")
    (root / "secret.txt").write_text("admin-only secret\n", encoding="utf-8")
    return root, public


def insecure_read(base_dir, requested_path):
    # BAD PATTERN: direct path join without canonical boundary check.
    # '../' segments can escape base_dir.
    target = base_dir / requested_path
    return target.read_text(encoding="utf-8").strip()


def secure_read(base_dir, requested_path):
    # Canonicalize both base and target paths.
    base = base_dir.resolve()
    target = (base / requested_path).resolve()

    # Enforce containment: target must be base itself or a descendant of base.
    # If not, reject as traversal attempt.
    if target != base and base not in target.parents:
        raise ValueError("Blocked: path traversal detected")

    return target.read_text(encoding="utf-8").strip()


root, public = setup_demo_files()
print("Safe file read:", insecure_read(public, "welcome.txt"))
print("Insecure traversal read:", insecure_read(public, "../secret.txt"))

try:
    print(secure_read(public, "../secret.txt"))
except ValueError as exc:
    print("Secure traversal check:", exc)

## Recap

- Core defensive principle: keep untrusted input as *data*, never executable *syntax*.
- In SQL, parameterization handles values; allowlists handle identifiers.
- Outside SQL, the same mindset applies to shell commands and filesystem paths.

### Reflection prompts
1. Which demo best illustrates complete mediation, and why?
2. Where does least privilege reduce blast radius if an injection bug still exists?
3. What logging/monitoring signal would help detect each insecure pattern early?