In [0]:
CREATE OR REFRESH STREAMING TABLE bronze_taxi
COMMENT 'Raw customers data ingested from the source system operational data'
TBLPROPERTIES ('quality' = 'bronze')
AS 
SELECT *,
       _metadata.file_path AS input_file_path,
       CURRENT_TIMESTAMP AS ingestion_timestamp
FROM cloud_files(
  '/Volumes/ishwaritaxi/taxilanding/volume2',
  'csv',
  map('cloudFiles.inferColumnTypes', 'true')
);

In [0]:
CREATE OR REFRESH STREAMING TABLE silver_taxi_clean (
  CONSTRAINT valid_vendor EXPECT (VendorID IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_datetime EXPECT (tpep_pickup_datetime IS NOT NULL AND tpep_dropoff_datetime IS NOT NULL AND tpep_pickup_datetime < tpep_dropoff_datetime) ON VIOLATION DROP ROW,
  CONSTRAINT valid_passenger EXPECT (passenger_count >= 1),
  CONSTRAINT valid_distance EXPECT (trip_distance > 0),
  CONSTRAINT valid_fare EXPECT (fare_amount >= 0),
  CONSTRAINT valid_total EXPECT (total_amount >= 0)
)
COMMENT 'Cleaned Yellow Taxi Trip Data'
TBLPROPERTIES ('quality' = 'silver')
AS
SELECT VendorID,
       tpep_pickup_datetime,
       tpep_dropoff_datetime,
       passenger_count,
       trip_distance,
       RatecodeID,
       payment_type,
       fare_amount,
       tip_amount,
       tolls_amount,
       total_amount
FROM STREAM(bronze_taxi);

In [0]:
CREATE OR REFRESH STREAMING TABLE silver_taxi
COMMENT 'SCD Type 1 taxi data'
TBLPROPERTIES ('quality' = 'silver');

-- COMMAND ----------

APPLY CHANGES INTO LIVE.silver_taxi
FROM STREAM(LIVE.silver_taxi_clean)
KEYS (VendorID)
SEQUENCE BY tpep_pickup_datetime
STORED AS SCD TYPE 1; 

In [0]:
-- To resolve the DIFFERENT_DELTA_TABLE_READ_BY_STREAMING_SOURCE error,
-- you must delete the streaming checkpoint for this table to restart the query from scratch.
-- No code changes are needed here, but you must remove the checkpoint directory
-- associated with gold_taxi_data in your pipeline's storage location.
-- After deleting the checkpoint, re-run the pipeline.

CREATE OR REFRESH STREAMING TABLE gold_taxi_data
COMMENT 'Aggregated Gold Layer for Taxi Trip Data'
TBLPROPERTIES ('quality' = 'gold')
AS
SELECT 
    VendorID,
    COUNT(*) AS total_trips,
    SUM(trip_distance) AS total_distance,
    SUM(fare_amount) AS total_fares,
    SUM(tip_amount) AS total_tips,
    SUM(total_amount) AS total_revenue,
    MIN(tpep_pickup_datetime) AS first_trip,
    MAX(tpep_dropoff_datetime) AS last_trip
FROM 
    STREAM(silver_taxi)
GROUP BY 
    VendorID;