# Lab 06 — Data Quality & Governance (DashDash) — **All-Python + PostgreSQL** (Codespaces Ready)

*Adapted from the dbt-based lab to run end-to-end in a single Jupyter Notebook using Python (SQLAlchemy, pandas) against PostgreSQL.*  
**Course:** MASY1-GC 3260 · Advanced Data Warehousing & Applications

**How to use (students):**
1. Fill in your PostgreSQL credentials in **Block02-Config** (or set `DATABASE_URL` in the Codespace).
2. Run the notebook top-to-bottom.
3. The notebook will: create/load CSVs → seed into Postgres → build staging **views** → run DQ tests → build marts → compute KPIs → write a stakeholder reply.
4. All artifacts (CSVs, RUN_LOG.txt, sample exports) are saved under `outputs/`.

**Instructor/Admin:** set `DB_MODE = "admin"` and run the safety cell; you can demo in `public` if you have CREATE on `public`.

> **AI Disclosure:** This notebook template was generated with GPT-5 Pro on 2025-10-08 22:04 UTC. You are expected to understand and defend the code you submit.

In [None]:
# ------------------------------
# Block01-Setup | Cell01: Dependency installation & timers
# ------------------------------

import datetime, importlib, sys, subprocess, json, traceback
from pathlib import Path

# Initialize project-level tracking variables
project_start_time = datetime.datetime.now()
total_project_cells_executed = 0
print(f"🚀 Project tracking initialized at {project_start_time:%Y-%m-%d %H:%M:%S}")
total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-" * 50)

# --- Helper function to format time durations consistently ---
def format_duration(delta):
    """Return duration as HH:MM:SS.mmm string from a timedelta or seconds."""
    if isinstance(delta, (int, float)):
        total_ms = int(delta * 1000)
    else:  # assume timedelta
        total_ms = int(delta.total_seconds() * 1000)
    hh, rem = divmod(total_ms, 3_600_000)
    mm, rem = divmod(rem, 60_000)
    ss, ms = divmod(rem, 1_000)
    return f"{hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}"

# Optional rich printer
try:
    from rich import print as rprint
    USE_RICH = True
except ImportError:
    USE_RICH = False
    def rprint(msg, *args, **kwargs):  # fallback printer
        print(msg)

# Required packages
PKGS = {
    "sqlalchemy": ["sqlalchemy"],
    "psycopg2-binary": ["psycopg2"],
    "pandas": ["pandas"],
    "rich": ["rich"],
    "python-dotenv": ["dotenv"],
    "ipywidgets": ["ipywidgets"],
}

def modules_ready(mods: list) -> bool:
    for m in mods:
        try:
            importlib.import_module(m)
        except ImportError:
            return False
    return True

def pip_install(pkg: str) -> bool:
    rprint(f"🔧 Installing [bold]{pkg}[/bold]…")
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", pkg])
        rprint(f"✅ {pkg} installed.\n")
        return True
    except subprocess.CalledProcessError as err:
        rprint(f"❌ Couldn’t install {pkg}: {err}\n")
        return False
    except Exception:
        rprint("🚨 Unexpected error:\n" + traceback.format_exc())
        return False

def run_dependency_check() -> bool:
    t0 = datetime.datetime.now()
    rprint(f"🏁 Dependency check started — {t0:%H:%M:%S}\n")
    missing = [pkg for pkg, mods in PKGS.items() if not modules_ready(mods)]
    status_ok = True
    if not missing:
        rprint("🎉 All required packages are already present. No action needed!")
    else:
        rprint(f"📦 Missing packages detected: {json.dumps(missing)}\n")
        failed = [p for p in missing if not pip_install(p)]
        if "rich" in missing and "rich" not in failed and not USE_RICH:
            importlib.invalidate_caches()
            rich = importlib.import_module("rich")
            rprint = rich.print
            globals()["rprint"] = rprint
        if failed:
            rprint("⚠️ The following packages could NOT be installed automatically:")
            for p in failed: rprint(f" • [bold red]{p}[/bold red]")
            rprint(" ➜ Please install them manually.\n")
            status_ok = False
        else:
            rprint("✨ All dependencies successfully installed!")
    duration_delta = datetime.datetime.now() - t0
    rprint(f"\n⏱️ Check finished in [bold]{format_duration(duration_delta)}[/bold] (HH:MM:SS.mmm)\n")
    return status_ok

dependencies_ready = run_dependency_check()

# Imports
if dependencies_ready:
    print("✅ Importing required libraries...")
    import time, os, re
    import pandas as pd
    from sqlalchemy import create_engine, text
    from sqlalchemy import types as satypes
    from dotenv import load_dotenv
    import psycopg2
    from IPython.display import display, Markdown, HTML
    import ipywidgets as widgets
    print("✅ All libraries imported successfully.")
else:
    print("❌ Critical libraries are missing. Cannot proceed with imports.")

# Create folders and simple archive helper
PROJECT_ROOT = Path.cwd()
DATA_DIR = PROJECT_ROOT / "data"
OUTPUTS_DIR = PROJECT_ROOT / "outputs"
DATA_DIR.mkdir(exist_ok=True)
OUTPUTS_DIR.mkdir(exist_ok=True)

def archive_then_write(path: Path, data: bytes):
    path = path.resolve()
    if path.exists():
        arch = path.parent / "Archived"
        arch.mkdir(exist_ok=True)
        ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        archived = arch / f"{path.stem}__{ts}{path.suffix}"
        path.replace(archived)
        print(f"🔶 Archived existing file to: {archived}")
    path.write_bytes(data)
    print(f"📄 Wrote file: {path}")

# Cell end timing
print("-" * 50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
formatted_cell_duration = format_duration(cell_execution_duration)
formatted_project_duration = format_duration(project_execution_duration)
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
if dependencies_ready:
    print("✅ All Tasks/Processes for this cell were completed successfully.")
else:
    print("❌ Some critical tasks (dependency installation) failed. See logs above.")
print(f"⏱️ [Block01-Setup] Cell Execution Time: {formatted_cell_duration}")
print(f"⏳ Total Project Execution Duration: {formatted_project_duration} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block02 — Config
Configure PostgreSQL connection, set `search_path` safety, and create helpers.

In [None]:
# ------------------------------
# Block02-Config | Cell01: DB config, engine, helpers
# ------------------------------

import datetime, os, re, traceback
from contextlib import contextmanager

# Continue timers
try:
    project_start_time
except NameError:
    project_start_time = datetime.datetime.now()
try:
    total_project_cells_executed
except NameError:
    total_project_cells_executed = 0
total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

# Rich printer
try:
    from rich import print as rprint
except Exception:
    def rprint(*a, **k): print(*a, **k)

def format_duration(delta):
    if isinstance(delta, (int, float)):
        total_ms = int(delta * 1000)
    else:
        total_ms = int(delta.total_seconds() * 1000)
    hh, rem = divmod(total_ms, 3_600_000)
    mm, rem = divmod(rem, 60_000)
    ss, ms = divmod(rem, 1_000)
    return f"{hh:02d}:{mm:02d}:{ss:02d}.{ms:03d}"

# ------------------------------
# 🔐 Credentials (edit here OR set env DATABASE_URL)
# ------------------------------
DB_MODE = os.getenv("DB_MODE", "student")  # "student" or "admin"
PGHOST = os.getenv("PGHOST", "YOUR_HOST")          # e.g., ep-xyz.us-east-2.aws.neon.tech
PGPORT = int(os.getenv("PGPORT", "5432"))
PGDATABASE = os.getenv("PGDATABASE", "YOUR_DBNAME")
PGUSER = os.getenv("PGUSER", "YOUR_USERNAME")      # Student: usually your NetID-based username
PGPASSWORD = os.getenv("PGPASSWORD", "YOUR_PASSWORD")
PGSSL = os.getenv("PGSSL", "require")              # require / prefer / disable

# If DATABASE_URL is set, it takes precedence.
DATABASE_URL = os.getenv("DATABASE_URL")
if not DATABASE_URL:
    DATABASE_URL = f"postgresql://{PGUSER}:{PGPASSWORD}@{PGHOST}:{PGPORT}/{PGDATABASE}?sslmode={PGSSL}"

def _mask_url(url: str) -> str:
    return re.sub(r"(postgresql:\/\/[^:]+:)([^@]+)(@)", r"\1******\3", url)

rprint(f"🔗 Using DB URL: [dim]{_mask_url(DATABASE_URL)}[/dim]")

# ------------------------------
# Engine + connection helpers
# ------------------------------
from sqlalchemy import create_engine, text
_engine = None

def get_engine():
    global _engine
    if _engine is None:
        _engine = create_engine(DATABASE_URL, echo=False, future=True, pool_pre_ping=True)
    return _engine

@contextmanager
def db_connect():
    eng = get_engine()
    conn = eng.connect()
    try:
        yield conn
    finally:
        conn.close()

# Safety: force schema to $user, allow admin in public if they have privilege
def set_search_path_safely():
    safety_block = """
    SET search_path TO "$user", public;
    DO $$
    DECLARE
      current_schema_name text := current_schema();
      current_user_name   text := current_user;
      is_instructor       boolean := has_schema_privilege(current_user, 'public', 'CREATE');
    BEGIN
      IF current_schema_name <> current_user_name
         AND NOT (is_instructor AND current_schema_name = 'public') THEN
        RAISE EXCEPTION
          'You are in schema "%", not your personal schema "%". Run: SET search_path TO "$user", public;',
          current_schema_name, current_user_name;
      END IF;

      IF is_instructor AND current_schema_name = 'public' THEN
        RAISE NOTICE 'Instructor mode: running as "%" in schema "public".', current_user_name;
      END IF;
    END $$;
    """
    with db_connect() as conn:
        conn.execute(text(safety_block))
        conn.commit()

def ensure_schema_exists(schema_name: str):
    with db_connect() as conn:
        conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{schema_name}";'))
        conn.execute(text(f'GRANT USAGE, CREATE ON SCHEMA "{schema_name}" TO "{schema_name}";'))
        conn.commit()

# Smoke test
ok = True
try:
    with db_connect() as conn:
        val = conn.execute(text("SELECT current_date")).scalar_one()
        user, schema = conn.execute(text("SELECT current_user, current_schema")).first()
    rprint(f"✅ Connected! current_date=[bold]{val}[/bold], user=[bold]{user}[/bold], schema=[bold]{schema}[/bold]")
except Exception as e:
    ok = False
    rprint(f"[red]❌ Connection failed[/red]\n{traceback.format_exc()}")

try:
    set_search_path_safely()
    rprint("🛡️ search_path safety check applied.")
except Exception as e:
    ok = False
    rprint(f"[red]❌ search_path safety failed[/red] — {e}")

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
rprint(
    f"🏁 Config Finished | {cell_core_logic_end_time:%H:%M:%S.%f} | "
    f"Cell: [bold]{format_duration(cell_execution_duration)}[/bold] | "
    f"Project: [bold]{format_duration(project_execution_duration)}[/bold] | "
    f"Cells Run: [bold]{total_project_cells_executed}[/bold]"
)
if ok:
    rprint("[green]✅ DB config ready.[/green]")
else:
    rprint("[red]⚠️ Some DB tasks failed. Fix credentials or privileges and retry.[/red]")
# --- End Cell End Timing & Project Duration ---


### Block03 — Data
Copy the sample CSVs (with small defects) into `./data` or replace them with instructor-provided files.

In [None]:
# ------------------------------
# Block03-Data | Cell01: Create/load CSVs (with defects)
# ------------------------------

import datetime
from pathlib import Path
import pandas as pd

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

DATA_DIR = Path("data"); DATA_DIR.mkdir(exist_ok=True)
# If the CSVs already exist (e.g., instructor replaced with official files), we keep them.
# Otherwise we drop in the sample ones bundled with this repo template.
bundled = {
    "restaurants.csv": '/mnt/data/nyu-lab06-python-postgres/data/restaurants.csv',
    "couriers.csv": '/mnt/data/nyu-lab06-python-postgres/data/couriers.csv',
    "customers.csv": '/mnt/data/nyu-lab06-python-postgres/data/customers.csv',
    "orders.csv": '/mnt/data/nyu-lab06-python-postgres/data/orders.csv',
}

for fname, src in bundled.items():
    dest = DATA_DIR / fname
    if dest.exists():
        print(f"✅ Using existing CSV (kept): {dest.resolve()}")
    else:
        Path(src).replace(dest)
        print(f"📄 Copied sample CSV → {dest.resolve()}")

# Quick peek
for fname in ["restaurants.csv", "couriers.csv", "customers.csv", "orders.csv"]:
    p = DATA_DIR / fname
    try:
        df = pd.read_csv(p)
        print(f"📊 {fname} shape={df.shape} | path={p.resolve()}")
        display(df.head(3))
    except Exception as e:
        print(f"⚠️ Could not preview {fname} — {e}")

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"✅ All Tasks/Processes for this cell were completed successfully.")
print(f"⏱️ [Block03-Data] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block04 — Seed
Load CSVs into PostgreSQL tables in your personal schema.

In [None]:
# ------------------------------
# Block04-Seed | Cell01: Load CSVs into Postgres tables
# ------------------------------

import datetime
import pandas as pd
from sqlalchemy import text
from sqlalchemy import types as satypes
from pathlib import Path

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

DATA_DIR = Path("data")
tables = {
    "restaurants": {"path": DATA_DIR/"restaurants.csv"},
    "couriers": {"path": DATA_DIR/"couriers.csv"},
    "customers": {"path": DATA_DIR/"customers.csv"},
    "orders": {"path": DATA_DIR/"orders.csv"},
}

# Optional: Drop and recreate as clean tables
ddl = [
    "DROP VIEW IF EXISTS stg_orders, stg_restaurants, stg_couriers, stg_customers CASCADE;",
    "DROP VIEW IF EXISTS fct_deliveries, dim_restaurant, dim_courier, dim_customer, kpi_delivery_overview, monitoring_dq_exceptions CASCADE;"
]

with db_connect() as conn:
    for stmt in ddl:
        try:
            conn.execute(text(stmt))
        except Exception:
            pass
    conn.commit()

# Use pandas.to_sql for simplicity; types inferred reasonably
# Note: We intentionally do not add PK/constraints here; tests operate at staging layer.
for tname, info in tables.items():
    df = pd.read_csv(info["path"])
    df.to_sql(tname, con=get_engine(), if_exists="replace", index=False)
    print(f"📄 Loaded table: {tname} (rows={len(df)})")

# Verify
with db_connect() as conn:
    for tname in tables:
        cnt = conn.execute(text(f"SELECT COUNT(*) FROM {tname}")).scalar_one()
        print(f"✅ {tname}: {cnt} row(s)")

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"✅ All Tasks/Processes for this cell were completed successfully.")
print(f"⏱️ [Block04-Seed] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block05 — Staging
Create staging **views** that normalize status, dedupe orders, and derive `delivery_minutes` + `on_time_flag`.

In [None]:
# ------------------------------
# Block05-Staging | Cell01: Create staging views
# ------------------------------

import datetime
from sqlalchemy import text

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

sql_stg_restaurants = """
CREATE OR REPLACE VIEW stg_restaurants AS
SELECT * FROM restaurants;
"""

sql_stg_couriers = """
CREATE OR REPLACE VIEW stg_couriers AS
SELECT
  courier_id,
  courier_name,
  lower(vehicle_type) AS vehicle_type,
  active_from, active_to, region
FROM couriers;
"""

sql_stg_customers = """
CREATE OR REPLACE VIEW stg_customers AS
SELECT * FROM customers;
"""

sql_stg_orders = """
CREATE OR REPLACE VIEW stg_orders AS
WITH base AS (
  SELECT
    order_id,
    customer_id,
    restaurant_id,
    courier_id,
    NULLIF(order_timestamp,'')::timestamp    AS order_ts,
    NULLIF(pickup_timestamp,'')::timestamp   AS pickup_ts,
    NULLIF(dropoff_timestamp,'')::timestamp  AS dropoff_ts,
    lower(trim(NULLIF(status,'')))           AS status_norm,
    payment_method,
    NULLIF(subtotal,'')::numeric(10,2)       AS subtotal,
    NULLIF(delivery_fee,'')::numeric(10,2)   AS delivery_fee,
    NULLIF(tip_amount,'')::numeric(10,2)     AS tip_amount,
    NULLIF(distance_km,'')::numeric(6,2)     AS distance_km,
    row_number() OVER (PARTITION BY order_id ORDER BY order_timestamp) AS rn
  FROM orders
),
dedup AS (
  SELECT * FROM base WHERE rn = 1
),
clean AS (
  SELECT
    *,
    CASE
      WHEN status_norm IN ('delivered','canceled','returned') THEN status_norm
      WHEN status_norm IS NULL THEN 'unknown'
      ELSE 'unknown'
    END AS status_final,
    CASE
      WHEN pickup_ts IS NOT NULL AND dropoff_ts IS NOT NULL
        THEN EXTRACT(epoch FROM (dropoff_ts - pickup_ts))/60.0
      ELSE NULL
    END AS delivery_minutes
  FROM dedup
)
SELECT
  order_id, customer_id, restaurant_id, courier_id,
  order_ts, pickup_ts, dropoff_ts,
  status_final AS status,
  payment_method, subtotal, delivery_fee, tip_amount, distance_km,
  delivery_minutes,
  CASE WHEN status_final = 'delivered' AND delivery_minutes <= 45 THEN TRUE ELSE FALSE END AS on_time_flag
FROM clean;
"""

with db_connect() as conn:
  for sql in [sql_stg_restaurants, sql_stg_couriers, sql_stg_customers, sql_stg_orders]:
    conn.execute(text(sql))
  conn.commit()

print("✅ Staging views created: stg_restaurants, stg_couriers, stg_customers, stg_orders")

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"✅ All Tasks/Processes for this cell were completed successfully.")
print(f"⏱️ [Block05-Staging] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block06 — Tests (staging)
Run core tests: not_null, unique, accepted_values, relationships, expression_is_true. Failures are stored in `dq_failures__*` tables.

In [None]:
# ------------------------------
# Block06-Tests | Cell01: Run DQ tests on staging
# ------------------------------

import datetime, json
from sqlalchemy import text
import pandas as pd

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

OUTPUTS_DIR = Path("outputs"); OUTPUTS_DIR.mkdir(exist_ok=True)
RUN_LOG_PATH = OUTPUTS_DIR / "RUN_LOG.txt"

# Define tests analogous to dbt's not_null, unique, accepted_values, relationships, expression_is_true
TESTS = [
  {
    "name": "stg_orders_order_id_not_null",
    "severity": "error",
    "sql": "SELECT * FROM stg_orders WHERE order_id IS NULL"
  },
  {
    "name": "stg_orders_order_id_unique",
    "severity": "error",
    "sql": "SELECT order_id, COUNT(*) AS cnt FROM stg_orders GROUP BY order_id HAVING COUNT(*) <> 1"
  },
  {
    "name": "stg_orders_status_accepted_values",
    "severity": "warn",
    "sql": "SELECT * FROM stg_orders WHERE status NOT IN ('delivered','canceled','returned','unknown') OR status IS NULL"
  },
  {
    "name": "stg_orders_restaurant_fk_relationships",
    "severity": "error",
    "sql": "SELECT o.* FROM stg_orders o LEFT JOIN stg_restaurants r ON o.restaurant_id=r.restaurant_id WHERE r.restaurant_id IS NULL"
  },
  {
    "name": "stg_orders_courier_fk_relationships",
    "severity": "error",
    "sql": "SELECT o.* FROM stg_orders o LEFT JOIN stg_couriers c ON o.courier_id=c.courier_id WHERE o.courier_id IS NOT NULL AND c.courier_id IS NULL"
  },
  {
    "name": "stg_orders_delivery_minutes_nonnegative",
    "severity": "error",
    "sql": "SELECT * FROM stg_orders WHERE NOT (delivery_minutes IS NULL OR delivery_minutes >= 0)"
  },
]

def run_test_store_failures(test: dict) -> dict:
    with db_connect() as conn:
        df = pd.read_sql(text(test["sql"]), conn)
    fail_table = f"dq_failures__{test['name']}"
    with db_connect() as conn:
        conn.execute(text(f'DROP TABLE IF EXISTS "{fail_table}"'))
        conn.commit()
        if not df.empty:
            df.to_sql(fail_table, con=get_engine(), if_exists="replace", index=False)
    return {"name": test["name"], "severity": test["severity"], "failures": len(df), "failure_table": fail_table if not df.empty else None}

results = []
for t in TESTS:
    r = run_test_store_failures(t)
    results.append(r)
    level = "❌" if (r["severity"]=="error" and r["failures"]>0) else ("🔶" if r["failures"]>0 else "✅")
    extra = f" → stored in {r['failure_table']}" if r["failure_table"] else ""
    print(f"{level} {r['name']}: {r['failures']} failing rows{extra}")

# Write/append a RUN_LOG.txt
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(RUN_LOG_PATH, "a", encoding="utf-8") as f:
    f.write(f"\n[{ts}] STAGING TESTS\n")
    for r in results:
        f.write(json.dumps(r) + "\n")

# Display summary
df_summary = pd.DataFrame(results)
display(df_summary)

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"⏱️ [Block06-Tests] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block07 — Marts
Create `fct_deliveries` (valid delivered rows) and thin dims.

In [None]:
# ------------------------------
# Block07-Marts | Cell01: Create fact and dims
# ------------------------------

import datetime
from sqlalchemy import text

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

sql_fct = """
CREATE OR REPLACE VIEW fct_deliveries AS
SELECT *
FROM stg_orders o
WHERE o.status = 'delivered'
  AND o.restaurant_id IN (SELECT restaurant_id FROM stg_restaurants)
  AND (o.courier_id IS NULL OR o.courier_id IN (SELECT courier_id FROM stg_couriers));
"""

sql_dim_rest = """CREATE OR REPLACE VIEW dim_restaurant AS SELECT * FROM stg_restaurants;"""
sql_dim_cour = """CREATE OR REPLACE VIEW dim_courier AS SELECT * FROM stg_couriers;"""
sql_dim_cust = """CREATE OR REPLACE VIEW dim_customer AS SELECT * FROM stg_customers;"""

with db_connect() as conn:
    for sql in [sql_fct, sql_dim_rest, sql_dim_cour, sql_dim_cust]:
        conn.execute(text(sql))
    conn.commit()

print("✅ Marts created: fct_deliveries, dim_restaurant, dim_courier, dim_customer")

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"✅ All Tasks/Processes for this cell were completed successfully.")
print(f"⏱️ [Block07-Marts] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block08 — Tests (marts)
Validate fact/dim constraints and accepted values on courier vehicle types.

In [None]:
# ------------------------------
# Block08-Tests | Cell02: Run DQ tests on marts
# ------------------------------

import datetime, json
from sqlalchemy import text
import pandas as pd

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

OUTPUTS_DIR = Path("outputs"); OUTPUTS_DIR.mkdir(exist_ok=True)
RUN_LOG_PATH = OUTPUTS_DIR / "RUN_LOG.txt"

TESTS_MARTS = [
  {
    "name": "fct_deliveries_order_id_not_null",
    "severity": "error",
    "sql": "SELECT * FROM fct_deliveries WHERE order_id IS NULL"
  },
  {
    "name": "fct_deliveries_order_id_unique",
    "severity": "error",
    "sql": "SELECT order_id, COUNT(*) AS cnt FROM fct_deliveries GROUP BY order_id HAVING COUNT(*) <> 1"
  },
  {
    "name": "fct_deliveries_restaurant_fk_dim",
    "severity": "error",
    "sql": "SELECT f.* FROM fct_deliveries f LEFT JOIN dim_restaurant d ON f.restaurant_id=d.restaurant_id WHERE d.restaurant_id IS NULL"
  },
  {
    "name": "fct_deliveries_courier_fk_dim",
    "severity": "error",
    "sql": "SELECT f.* FROM fct_deliveries f LEFT JOIN dim_courier d ON f.courier_id=d.courier_id WHERE f.courier_id IS NOT NULL AND d.courier_id IS NULL"
  },
  {
    "name": "dim_courier_vehicle_type_accepted_values",
    "severity": "error",
    "sql": "SELECT * FROM dim_courier WHERE vehicle_type NOT IN ('bike','scooter','car') OR vehicle_type IS NULL"
  },
]

def run_test_store_failures(test: dict) -> dict:
    with db_connect() as conn:
        df = pd.read_sql(text(test["sql"]), conn)
    fail_table = f"dq_failures__{test['name']}"
    with db_connect() as conn:
        conn.execute(text(f'DROP TABLE IF EXISTS "{fail_table}"'))
        conn.commit()
        if not df.empty:
            df.to_sql(fail_table, con=get_engine(), if_exists="replace", index=False)
    return {"name": test["name"], "severity": test["severity"], "failures": len(df), "failure_table": fail_table if not df.empty else None}

results = []
for t in TESTS_MARTS:
    r = run_test_store_failures(t)
    results.append(r)
    level = "❌" if (r["severity"]=="error" and r["failures"]>0) else ("🔶" if r["failures"]>0 else "✅")
    extra = f" → stored in {r['failure_table']}" if r["failure_table"] else ""
    print(f"{level} {r['name']}: {r['failures']} failing rows{extra}")

ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(RUN_LOG_PATH, "a", encoding="utf-8") as f:
    f.write(f"\n[{ts}] MART TESTS\n")
    for r in results:
        f.write(json.dumps(r) + "\n")

df_summary = pd.DataFrame(results)
display(df_summary)

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"⏱️ [Block08-Tests] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block09 — KPI
Compute KPI view and preview values.

In [None]:
# ------------------------------
# Block09-KPI | Cell01: Compute KPI view and preview
# ------------------------------

import datetime, pandas as pd
from sqlalchemy import text
from pathlib import Path

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

sql_kpi = """
CREATE OR REPLACE VIEW kpi_delivery_overview AS
WITH d AS (SELECT * FROM fct_deliveries)
SELECT
  AVG(CASE WHEN on_time_flag THEN 1 ELSE 0 END)::numeric(5,4) AS on_time_rate,
  AVG(delivery_minutes)::numeric(6,2)                         AS avg_delivery_minutes,
  (
    SELECT
      (COUNT(*) FILTER (WHERE status IN ('canceled','returned'))::numeric
       / NULLIF(COUNT(*),0))
    FROM stg_orders
  )::numeric(5,4) AS cancel_return_rate
;
"""

with db_connect() as conn:
    conn.execute(text(sql_kpi))
    conn.commit()

df = pd.read_sql(text("SELECT * FROM kpi_delivery_overview"), get_engine())
display(df)

# Also print a 2-decimal summary
if not df.empty:
    on_time = float(df.loc[0,"on_time_rate"])*100 if df.loc[0,"on_time_rate"] is not None else 0.0
    adm = float(df.loc[0,"avg_delivery_minutes"]) if df.loc[0,"avg_delivery_minutes"] is not None else 0.0
    crr = float(df.loc[0,"cancel_return_rate"])*100 if df.loc[0,"cancel_return_rate"] is not None else 0.0
    print(f"🎯 On-Time Delivery %: {on_time:.2f}% | Avg Minutes: {adm:.2f} | Cancel/Return Rate: {crr:.2f}%")

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"✅ All Tasks/Processes for this cell were completed successfully.")
print(f"⏱️ [Block09-KPI] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block10 — Monitoring
Create a `monitoring_dq_exceptions` view with excluded rows and reasons.

In [None]:
# ------------------------------
# Block10-Monitoring | Cell01: Create DQ Exceptions view
# ------------------------------

import datetime, pandas as pd
from sqlalchemy import text

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

sql_ex = """
CREATE OR REPLACE VIEW monitoring_dq_exceptions AS
WITH base AS (
  SELECT
    order_id,
    customer_id,
    restaurant_id,
    courier_id,
    NULLIF(order_timestamp,'')::timestamp    AS order_ts,
    NULLIF(pickup_timestamp,'')::timestamp   AS pickup_ts,
    NULLIF(dropoff_timestamp,'')::timestamp  AS dropoff_ts,
    lower(trim(NULLIF(status,'')))           AS status_norm,
    row_number() OVER (PARTITION BY order_id ORDER BY order_timestamp) AS rn
  FROM orders
),
dupes AS (
  SELECT order_id, 'duplicate_order' AS reason FROM base WHERE rn > 1
),
bad_rest_fk AS (
  SELECT o.order_id, 'bad_fk_restaurant' AS reason
  FROM orders o
  LEFT JOIN stg_restaurants r ON o.restaurant_id = r.restaurant_id
  WHERE r.restaurant_id IS NULL
),
bad_cour_fk AS (
  SELECT o.order_id, 'bad_fk_courier' AS reason
  FROM orders o
  LEFT JOIN stg_couriers c ON o.courier_id = c.courier_id
  WHERE o.courier_id IS NOT NULL AND c.courier_id IS NULL
),
unknown_status AS (
  SELECT order_id, 'status_unknown' AS reason
  FROM base
  WHERE status_norm IS NULL OR status_norm NOT IN ('delivered','canceled','returned')
)
SELECT DISTINCT order_id, reason
FROM (
  SELECT * FROM dupes
  UNION ALL SELECT * FROM bad_rest_fk
  UNION ALL SELECT * FROM bad_cour_fk
  UNION ALL SELECT * FROM unknown_status
) u
ORDER BY order_id, reason;
"""

with db_connect() as conn:
    conn.execute(text(sql_ex))
    conn.commit()

df = pd.read_sql(text("SELECT * FROM monitoring_dq_exceptions ORDER BY order_id, reason"), get_engine())
display(df)

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"✅ All Tasks/Processes for this cell were completed successfully.")
print(f"⏱️ [Block10-Monitoring] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block11 — Self‑Solve Test
Singular test for dropoff/status logic; failing rows are written to `dq_failures__test_dropoff_status_logic`.

In [None]:
# ------------------------------
# Block11-Tests | Cell03: Self-solve singular logic test
# ------------------------------

import datetime, json, pandas as pd
from sqlalchemy import text

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

OUTPUTS_DIR = Path("outputs"); OUTPUTS_DIR.mkdir(exist_ok=True)
RUN_LOG_PATH = OUTPUTS_DIR / "RUN_LOG.txt"

sql_self = """
SELECT *
FROM stg_orders
WHERE (status IN ('canceled','returned') AND dropoff_ts IS NOT NULL)
   OR (status = 'delivered' AND dropoff_ts IS NULL);
"""

df = pd.read_sql(text(sql_self), get_engine())
fail_table = "dq_failures__test_dropoff_status_logic"
with db_connect() as conn:
    conn.execute(text(f'DROP TABLE IF EXISTS "{fail_table}"'))
    conn.commit()
if not df.empty:
    df.to_sql(fail_table, con=get_engine(), if_exists="replace", index=False)

print(f"{'❌' if not df.empty else '✅'} test_dropoff_status_logic: {len(df)} failing row(s){' → stored in '+fail_table if not df.empty else ''}")

ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(RUN_LOG_PATH, "a", encoding="utf-8") as f:
    f.write(f"\n[{ts}] SELF-SOLVE TEST\n")
    f.write(json.dumps({{"name":"test_dropoff_status_logic","severity":"error","failures":len(df),"failure_table":(fail_table if not df.empty else None)}}) + "\n")

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"⏱️ [Block11-Tests] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block12 — Deliverable
Write your stakeholder Markdown reply and export a few views to CSV.

In [None]:
# ------------------------------
# Block12-Deliverable | Cell01: Write stakeholder reply .md
# ------------------------------

import datetime, pandas as pd
from sqlalchemy import text
from pathlib import Path

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

# Fetch KPI values
df = pd.read_sql(text("SELECT * FROM kpi_delivery_overview"), get_engine())
on_time = adm = crr = 0.0
if not df.empty:
    on_time = float(df.loc[0,"on_time_rate"] or 0)*100.0
    adm = float(df.loc[0,"avg_delivery_minutes"] or 0)
    crr = float(df.loc[0,"cancel_return_rate"] or 0)*100.0

# Student placeholders
FIRST = os.getenv("STUDENT_FIRST", "First")
LAST = os.getenv("STUDENT_LAST", "Last")
NETID = os.getenv("STUDENT_NETID", "netid1234")

md = f"""Lab06_{FIRST}_{LAST}_{NETID}_Reply.md
---
# Stakeholder Reply — DashDash Data Quality & KPIs

**KPI summary:** On-Time Delivery: {on_time:.2f}% · Average Delivery Minutes: {adm:.2f} · Cancel/Return Rate: {crr:.2f}%

**What changed after remediation**
- Deduplicated orders by `order_id` (kept earliest record); removed duplicates from KPI universe.
- Normalized mixed/invalid `status` values to {{delivered|canceled|returned|unknown}} to prevent leakage into dashboards.
- Enforced referential integrity to restaurants/couriers; excluded rows with bad foreign keys from facts.
- Added a DQ Exceptions view to track excluded rows and their reasons.

**Recommendation**
- Proceed with the “10‑Minute Free Delivery Insurance” promo **if** daily on‑time % remains ≥ 85% during soft‑launch; otherwise, pause the offer in regions where scooter/car availability is thin.

_Generated with GPT-5 Pro on {datetime.datetime.utcnow():%Y-%m-%d %H:%M UTC}. Student verified the numbers and process._
"""

OUTPUTS_DIR = Path("outputs"); OUTPUTS_DIR.mkdir(exist_ok=True)
outpath = OUTPUTS_DIR / f"Lab06_{FIRST}_{LAST}_{NETID}_Reply.md"
outpath.write_text("\n".join(md.splitlines()[1:]), encoding="utf-8")
print(f"📄 Wrote stakeholder reply: {outpath.resolve()}")

# Export a few useful views as CSVs for instructors
exports = {
    "stg_orders": OUTPUTS_DIR/"stg_orders_export.csv",
    "fct_deliveries": OUTPUTS_DIR/"fct_deliveries_export.csv",
    "monitoring_dq_exceptions": OUTPUTS_DIR/"monitoring_dq_exceptions_export.csv",
    "kpi_delivery_overview": OUTPUTS_DIR/"kpi_delivery_overview_export.csv",
}
for view, path in exports.items():
    try:
        dfv = pd.read_sql(text(f"SELECT * FROM {view}"), get_engine())
        dfv.to_csv(path, index=False)
        print(f"✅ Exported {view} → {path.resolve()} (rows={len(dfv)})")
    except Exception as e:
        print(f"⚠️ Could not export {view}: {e}")

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"✅ All Tasks/Processes for this cell were completed successfully.")
print(f"⏱️ [Block12-Deliverable] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---


### Block13 — Finish
Point to `outputs/RUN_LOG.txt`. You can download and submit this log as needed.

In [None]:
# ------------------------------
# Block13-Finish | Cell01: Finish + show RUN_LOG path
# ------------------------------

import datetime
from pathlib import Path

total_project_cells_executed += 1
cell_core_logic_start_time = datetime.datetime.now()
print(f"🚀 Starting Cell Logic... | Timestamp: {cell_core_logic_start_time:%H:%M:%S.%f}")
print("-"*50)

OUTPUTS_DIR = Path("outputs"); OUTPUTS_DIR.mkdir(exist_ok=True)
runlog = OUTPUTS_DIR / "RUN_LOG.txt"
if not runlog.exists():
    runlog.write_text("No tests run yet.")

print(f"📄 Run log: {runlog.resolve()}")

print("-"*50)
cell_core_logic_end_time = datetime.datetime.now()
cell_execution_duration = cell_core_logic_end_time - cell_core_logic_start_time
project_execution_duration = cell_core_logic_end_time - project_start_time
print(f"🏁 Finished Cell Logic... | Timestamp: {cell_core_logic_end_time:%H:%M:%S.%f}")
print(f"✅ All Tasks/Processes for this cell were completed successfully.")
print(f"⏱️ [Block13-Finish] Cell Execution Time: {format_duration(cell_execution_duration)}")
print(f"⏳ Total Project Execution Duration: {format_duration(project_execution_duration)} | Cell: {total_project_cells_executed}")
# --- End Cell End Timing & Project Duration ---
