In [1]:
import sys, os
from pathlib import Path

def add_project_root() -> Path:
    """
    Add the repo root (the folder that contains 'etl/' and 'sql/') to sys.path
    so `from etl... import ...` works no matter where the notebook lives.
    """
    cwd = Path.cwd()
    for p in (cwd, *cwd.parents):
        if (p / "etl").is_dir() and (p / "sql").exists():
            if str(p) not in sys.path:
                sys.path.insert(0, str(p))
            print("✓ project root:", p)
            return p
    raise RuntimeError("Could not find project root with 'etl' and 'sql'.")

add_project_root()

✓ project root: C:\Users\keith\Documents\ipeds_etl\ipeds_etl


WindowsPath('C:/Users/keith/Documents/ipeds_etl/ipeds_etl')

In [2]:
from sqlalchemy.engine import make_url
from etl.config import settings
from etl.db import ping, list_ipeds_schemas

print("DB URL:", make_url(settings.DATABASE_URL).render_as_string(hide_password=True))
print("Ping:", ping())
print("Schemas:", list_ipeds_schemas())

DB URL: postgresql+psycopg2://ipeds_loader:***@localhost:5432/ipeds_db
Ping: ('ipeds_db', 'ipeds_loader')
Schemas: ['ipeds_core', 'ipeds_dim', 'ipeds_raw', 'ipeds_vw', 'public']


In [3]:
ENDPOINT = "directory"
YEARS = [2022]   # you can add more later, e.g. [2020, 2021, 2022, 2023]

In [4]:
# -----------------------------
# Core table creation (DDL)
# -----------------------------
from sqlalchemy import text
from etl.db import get_sqlalchemy_engine
from etl.registry import get_endpoint_config

def ensure_core_table(endpoint: str) -> None:
    """
    Ensure a typed, de-duplicated *core* table exists for an endpoint.

    What this creates:
    - A table named: ipeds_core.{endpoint}
    - Columns and SQL types come from etl/registry.py (single source of truth)
    - PRIMARY KEY (unitid, year) to prevent duplicates for a given year
    - A couple of helpful indexes used constantly by analysts/joins

    Why this is in code (and not a .sql file):
    - Keeps schema definition close to the ETL mapping logic
    - Lets us evolve columns per endpoint in one place (the registry)
    - Safe to run repeatedly: all DDL uses IF NOT EXISTS (idempotent)
    """
    # Pull endpoint definition (columns + types, mapper, etc.)
    cfg = get_endpoint_config(endpoint)
    table = f"ipeds_core.{endpoint}"

    # Turn {"unitid": "INTEGER NOT NULL", ...} into "unitid INTEGER NOT NULL, ..."
    cols_sql = [f"{col} {sqltype}" for col, sqltype in cfg["schema"].items()]
    cols_block = ",\n        ".join(cols_sql)

    # DDL notes:
    # - PRIMARY KEY matches our upsert conflict target later.
    # - year index: common filter (dashboards by year).
    # - state_abbr index: common filter/aggregation (slicing by state).
    ddl = f"""
    CREATE TABLE IF NOT EXISTS {table} (
        {cols_block},
        PRIMARY KEY (unitid, year)
    );
    CREATE INDEX IF NOT EXISTS {endpoint}_year_idx  ON {table} (year);
    CREATE INDEX IF NOT EXISTS {endpoint}_state_idx ON {table} (state_abbr);
    """

    eng = get_sqlalchemy_engine()

    # Execute each DDL statement in a single transaction.
    # We split on ';' because we're issuing multiple statements (DDL + indexes).
    with eng.begin() as cx:
        for stmt in ddl.strip().split(";"):
            s = stmt.strip()
            if s:
                cx.execute(text(s))

    print(f"✓ ensured core table {table}")


# Example: make sure the table exists for the current endpoint constant
ensure_core_table(ENDPOINT)

✓ ensured core table ipeds_core.directory


In [5]:
# -----------------------------
# Raw → Core load (UPSERT)
# -----------------------------
from typing import List, Dict, Any
from sqlalchemy import text
from etl.db import get_sqlalchemy_engine
from etl.mappers.directory import map_directory_row
from etl.registry import get_endpoint_config

def load_core_for_year(endpoint: str, year: int) -> int:
    """
    Move one year's worth of data from ipeds_raw → ipeds_core for a given endpoint.

    Pipeline steps (per year):
    1) Read raw JSONB pages, flatten them to individual JSON objects (1 per institution)
       using jsonb_array_elements + LATERAL.
    2) Map/clean each raw JSON dict with the endpoint-specific mapper (handles types,
       trims strings, converts magic -1/-2/-3 to NULL, etc.).
    3) UPSERT into ipeds_core.{endpoint} on (unitid, year), updating non-PK columns.

    Returns
    -------
    int : number of rows written (length of the mapped list).
    """
    cfg = get_endpoint_config(endpoint)
    table = f"ipeds_core.{endpoint}"

    # Preserve a deterministic column order for the INSERT.
    # This must match what the mapper outputs.
    schema_cols: List[str] = list(cfg["schema"].keys())

    # --- 1) Flatten raw JSON payloads into row-wise JSON (one dict per record) ---
    # We read from ipeds_raw.{endpoint}_raw where each row is a *page* with a JSONB
    # array called payload. jsonb_array_elements() expands that array into "elem".
    # CROSS JOIN LATERAL is the idiomatic way to iterate JSON arrays in Postgres.
    eng = get_sqlalchemy_engine()
    flatten_sql = text(f"""
        SELECT elem::jsonb AS row
        FROM ipeds_raw.{endpoint}_raw r
        CROSS JOIN LATERAL jsonb_array_elements(r.payload) AS elem
        WHERE r.year = :y
        ORDER BY r.page_number
    """)
    with eng.connect() as cx:
        raw_rows: List[Dict[str, Any]] = [r[0] for r in cx.execute(flatten_sql, {"y": year}).fetchall()]

    # --- 2) Map/clean raw dicts to our core schema ---
    # The mapper returns a dict with exactly the keys in schema_cols (plus NULLs as needed).
    # If you add fields to the registry, update the mapper to provide those keys.
    mapped: List[Dict[str, Any]] = [map_directory_row(d) for d in raw_rows]

    # --- 3) Build an UPSERT that updates all non-PK columns when the row already exists ---
    # We exclude PK columns from the SET list (unitid, year) since those define the row.
    set_cols = [c for c in schema_cols if c not in ("unitid", "year")]

    # VALUES uses named parameters like :inst_name; we feed a list[dict] to cx.execute().
    # ON CONFLICT target must match the table's PRIMARY KEY.
    # We do a full-field overwrite on conflict — this is fine because the mapper
    # always produces authoritative values for that (unitid, year).
    insert_sql = text(f"""
        INSERT INTO {table} ({", ".join(schema_cols)})
        VALUES ({", ".join(":"+c for c in schema_cols)})
        ON CONFLICT (unitid, year) DO UPDATE
        SET {", ".join(f"{c}=EXCLUDED.{c}" for c in set_cols)}
    """)

    # --- 4) Execute in a single transaction for atomicity & speed ---
    with eng.begin() as cx:
        # Passing the whole list allows SQLAlchemy to do an executemany() style insert.
        cx.execute(insert_sql, mapped)

    print(f"✓ {endpoint}: upserted {len(mapped)} rows for year {year} into {table}")
    return len(mapped)


# Example driver: load several years for the selected endpoint
total = 0
for y in YEARS:
    total += load_core_for_year(ENDPOINT, y)
print("TOTAL rows processed:", total)

✓ directory: upserted 6256 rows for year 2022 into ipeds_core.directory
TOTAL rows processed: 6256
