In [0]:
-- DLT pipeline: Bronze → Silver → Gold with expectations.
-- This file is run by the pipeline (not by the SQL warehouse directly).
-- Within Delta Live Tables you attach this notebook as the pipeline's single task;
-- DLT reads these statements, materializes the dependency graph, and schedules
-- streaming/triggered runs on managed clusters without you invoking it from a SQL warehouse.
-- Running it in a warehouse would only execute once, but the pipeline keeps it
-- on a recurring schedule with checkpointing, lineage tracking, and SLA monitoring.

-- BRONZE (choose ONE source):
-- The pipeline treats Bronze as the raw landing zone. Pick exactly one option below
-- based on how you ingested the taxi data.

-- (A) If you created raw.taxi_raw via Auto Loader (01_auto_loader_bronze.sql):
CREATE OR REFRESH STREAMING TABLE raw.taxi_bronze
AS SELECT * FROM main_nyctaxi.raw.taxi_raw;

-- (B) If you want the lowest-friction samples path:
-- CREATE OR REFRESH STREAMING TABLE raw.taxi_bronze
-- (CONSTRAINT trip_distance_positive EXPECT (trip_distance > 0) ON VIOLATION DROP ROW)
-- AS SELECT * FROM STREAM(samples.nyctaxi.trips);

-- SILVER: typing + pruning.
-- Typing = casting raw strings into strongly typed TIMESTAMP/INT/DOUBLE columns so
-- downstream logic can rely on schema enforcement.
-- Pruning = filtering out obviously bad or irrelevant records before serving
-- them to refined consumers (for example removing null trip distances/fare amounts).
-- The result is a curated stream in the Unity Catalog schema `ref`.
CREATE OR REFRESH STREAMING TABLE ref.trips_clean
AS
SELECT
  CAST(tpep_pickup_datetime  AS TIMESTAMP) AS pickup_ts,
  CAST(tpep_dropoff_datetime AS TIMESTAMP) AS dropoff_ts,
  DATE(CAST(tpep_pickup_datetime AS TIMESTAMP)) AS pickup_date,
  CAST(passenger_count AS INT)  AS passenger_count,
  CAST(trip_distance  AS DOUBLE) AS trip_distance,
  CAST(fare_amount    AS DOUBLE) AS fare_amount,
  vendor_id, pickup_zip, dropoff_zip
FROM LIVE.raw.taxi_bronze
WHERE trip_distance IS NOT NULL AND fare_amount IS NOT NULL;
-- `LIVE.` tells DLT to pull the latest managed version of the upstream table.
-- The SELECT casts each column to an analytics-friendly type, derives a date key,
-- and drops rows with missing distance or fare so later quality rules have a clean baseline.

-- Expectations on the cleaned stream.
-- This second layer applies declarative data-quality constraints on the curated stream.
-- If a record fails these expectations it is automatically dropped (or could be quarantined
-- with different severities), giving you an auditable filter before publishing to Gold.
-- The resulting table lives alongside `trips_clean` in the `ref` schema.
CREATE OR REFRESH STREAMING TABLE ref.trips_valid
(
  CONSTRAINT vendor_not_null EXPECT (vendor_id IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT nonnegative_fare EXPECT (fare_amount >= 0)     ON VIOLATION DROP ROW
)
AS SELECT * FROM LIVE.ref.trips_clean;

-- GOLD: BI aggregates as MVs (eligible on Pro/Serverless SQL or inside the pipeline).
-- Gold tables summarize Silver data for business intelligence. Here we build
-- a materialized view so DLT (or Databricks SQL) maintains the aggregation incrementally
-- each time new trips arrive. Materialized views persist their results in Delta storage
-- refresh automatically, and expose history/time travel like any Delta table—
-- they are more than a one-time snapshot.
-- `LIVE.ref.trips_valid` keeps dependency tracking intact so the pipeline refresh order
-- and lineage graphs show Bronze → Silver → Gold relationships.
CREATE OR REPLACE MATERIALIZED VIEW mart.daily_kpis AS
SELECT
  pickup_date,
  COUNT(*)                           AS trip_count,
  ROUND(AVG(fare_amount), 2)         AS avg_fare,
  ROUND(AVG(trip_distance), 3)       AS avg_trip_miles,
  ROUND(SUM(fare_amount), 2)         AS total_revenue,
  ROUND(SUM(fare_amount) / NULLIF(SUM(trip_distance), 0), 3) AS revenue_per_mile
FROM LIVE.ref.trips_valid
GROUP BY pickup_date;
