# 🏪 LakehouseMart Data Pipeline

## Overview
This Spark Declarative Pipeline implements a complete **Medallion Architecture** (Bronze → Silver → Gold) for an e-commerce analytics platform. It ingests raw customer, product, and order data from cloud storage, applies data quality checks and Change Data Capture (CDC) transformations, and produces analytics-ready datasets for business intelligence.

---

## 📊 Pipeline Architecture

### **Bronze Layer** (Raw Data Ingestion)
Streaming tables that ingest raw JSON files from cloud storage using Auto Loader:
* **bronze_customers** - Raw customer data with schema inference and evolution
* **bronze_products** - Raw product catalog data
* **bronze_orders** - Raw order transaction data

### **Silver Layer** (Cleaned & CDC-Enabled)
Streaming tables with data quality constraints and CDC processing:
* **silver_customers** - Validated customer records (non-null constraints on key fields)
* **silver_customers_cdc** - Current customer state (SCD Type 1)
* **silver_products_cdc** - Historical product changes (SCD Type 2)
* **silver_orders_cdc** - Historical order lifecycle tracking (SCD Type 2)

### **Gold Layer** (Analytics-Ready Views)
Materialized views optimized for business reporting and analytics:
* **gold_mv_daily_order_kpis** - Daily sales metrics (revenue, AOV, cancellation rate)
* **gold_mv_customer_360** - Unified customer profiles with lifetime value
* **gold_mv_revenue_by_product_country** - Geographic product performance
* **gold_mv_cancellation_funnel** - Cancellation analysis by payment method, country, and time
* **gold_mv_customer_360_enhanced** - Customer 360 with product mix and first/last touch

---

## 🔄 Data Flow

```
┌─────────────────────────────────┐
│   Cloud Storage (JSON Files)    │
│  /Volumes/.../raw_files/        │
└────────────┬────────────────────┘
             │
             ▼
┌─────────────────────────────────┐
│      BRONZE LAYER               │
│  • bronze_customers             │
│  • bronze_products              │
│  • bronze_orders                │
│  (Auto Loader + Schema Inference)│
└────────────┬────────────────────┘
             │
             ▼
┌─────────────────────────────────┐
│      SILVER LAYER               │
│  • silver_customers (validated) │
│  • silver_customers_cdc (Type 1)│
│  • silver_products_cdc (Type 2) │
│  • silver_orders_cdc (Type 2)   │
│  (Data Quality + CDC)           │
└────────────┬────────────────────┘
             │
             ▼
┌─────────────────────────────────┐
│      GOLD LAYER                 │
│  • gold_mv_daily_order_kpis     │
│  • gold_mv_customer_360         │
│  • gold_mv_revenue_by_product   │
│  • gold_mv_cancellation_funnel  │
│  • gold_mv_customer_360_enhanced│
│  (Business Metrics & KPIs)      │
└─────────────────────────────────┘
```

---

## 🎯 Key Features

✅ **Auto Loader** - Incremental file ingestion with schema inference  
✅ **CDC Processing** - Track changes with SCD Type 1 (current state) and Type 2 (historical tracking)  
✅ **Data Quality** - Constraint-based validation with row-level enforcement  
✅ **Business Metrics** - Pre-aggregated KPIs for fast analytics  
✅ **Geo Analytics** - Country-level product and cancellation insights  
✅ **Customer 360 Enhanced** - Product mix, first/last order, and lifecycle metrics  

---

## 📈 Use Cases

* **Sales Operations** - Monitor daily revenue, order volume, and cancellation trends
* **Customer Analytics** - Build customer 360 profiles with lifetime value metrics and product mix
* **Merchandising** - Analyze product performance across geographies
* **Operations/CX** - Identify friction points in payment and fulfillment processes
* **Finance/FP&A** - Support financial planning with historical revenue tracking

---

## 📋 Pipeline Tables Summary

| Layer | Table Name | Type | Purpose |
|-------|-----------|------|------|
| Bronze | bronze_customers | Streaming Table | Raw customer ingestion |
| Bronze | bronze_products | Streaming Table | Raw product ingestion |
| Bronze | bronze_orders | Streaming Table | Raw order ingestion |
| Silver | silver_customers | Streaming Table | Validated customers |
| Silver | silver_customers_cdc | Streaming Table | Current customer state (SCD Type 1) |
| Silver | silver_products_cdc | Streaming Table | Product history (SCD Type 2) |
| Silver | silver_orders_cdc | Streaming Table | Order history (SCD Type 2) |
| Gold | gold_mv_daily_order_kpis | Materialized View | Daily sales KPIs |
| Gold | gold_mv_customer_360 | Materialized View | Customer profiles + LTV |
| Gold | gold_mv_revenue_by_product_country | Materialized View | Geo product performance |
| Gold | gold_mv_cancellation_funnel | Materialized View | Cancellation analytics |
| Gold | gold_mv_customer_360_enhanced | Materialized View | Customer 360 with product mix & lifecycle |

---

## 🚀 Getting Started

1. **Configure Pipeline Settings**: Ensure the pipeline points to right location
2. **Verify Source Data**: Check that JSON files exist in right location
3. **Run Pipeline**: Start with a full refresh to initialize all tables
4. **Monitor**: Check data quality metrics and CDC processing status
5. **Query Gold Layer**: Use materialized views for analytics and reporting

In [0]:
/*
👤 **Bronze Customers Streaming Table**

This streaming table ingests raw customer data from cloud storage using Auto Loader:
- Reads JSON files from the customers volume with multiLine support for proper parsing.
- Automatically infers schema and handles schema evolution as new fields arrive.
- Adds ingestion_timestamp to track when each record was loaded.
- Provides the foundation for downstream silver-layer transformations and CDC processing.

Use this table as the raw landing zone for all customer data before applying quality checks and transformations.
*/

CREATE OR REFRESH STREAMING TABLE bronze_customers
AS SELECT *, current_timestamp AS ingestion_timestamp FROM STREAM(read_files("/Volumes/db_ws_east_us/raw/raw_files/customers/customers_large.json", format => "json", multiLine => true))

In [0]:
/*
📦 **Bronze Products Streaming Table**

This streaming table ingests raw product catalog data from cloud storage using Auto Loader:
- Reads JSON files from the products volume with multiLine support for proper parsing.
- Automatically infers schema and handles schema evolution as new products or attributes are added.
- Adds ingestion_timestamp to track when each record was loaded.
- Provides the foundation for downstream CDC processing to track product changes over time.

Use this table as the raw landing zone for all product catalog data before applying CDC and historical tracking.
*/

CREATE OR REFRESH STREAMING TABLE bronze_products
AS SELECT *, current_timestamp AS ingestion_timestamp FROM STREAM(read_files("/Volumes/db_ws_east_us/raw/raw_files/products/products_large.json", format => "json", multiLine => true))

In [0]:
/*
🛒 **Bronze Orders Streaming Table**

This streaming table ingests raw order transaction data from cloud storage using Auto Loader:
- Reads JSON files from the orders volume with multiLine support for proper parsing.
- Automatically infers schema and handles schema evolution as new order fields arrive.
- Adds ingestion_timestamp to track when each record was loaded.
- Provides the foundation for downstream CDC processing to track order lifecycle changes.

Use this table as the raw landing zone for all order transactions before applying CDC and building analytics-ready datasets.
*/

CREATE OR REFRESH STREAMING TABLE bronze_orders
AS SELECT *, current_timestamp AS ingestion_timestamp FROM STREAM(read_files("/Volumes/db_ws_east_us/raw/raw_files/orders/orders_large.json", format => "json", multiLine => true))

In [0]:
/*
👤 **Silver Customers Table**

This streaming table cleans and validates customer data by enforcing key data quality constraints:
- Ensures all customer records have non-null IDs, emails, first and last names, and segment assignments.
- Drops any records failing these checks to maintain a high-quality, reliable customer dataset.

Use this table as the trusted source for downstream analytics, customer 360 profiles, and operational reporting.
*/

CREATE OR REFRESH STREAMING TABLE silver_customers
(
  CONSTRAINT valid_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_first_name EXPECT (first_name IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_last_name EXPECT (last_name IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_segment EXPECT (segment IS NOT NULL) ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM(LIVE.bronze_customers)

In [0]:
/*
👤 **Silver Customers CDC Table**

This streaming table applies Change Data Capture (CDC) to maintain an up-to-date, deduplicated, and history-aware customer dataset:
- Handles inserts, updates, and deletes from the silver_customers table using SCD Type 1 logic (latest state only).
- Uses customer_id as the unique key and signup_ts for sequencing changes.
- Drops technical columns and CDC operation metadata for a clean, analytics-ready output.

Use this table as the trusted source for current customer attributes in downstream analytics, reporting, and customer 360 views.
*/

CREATE OR REFRESH STREAMING TABLE silver_customers_cdc;

APPLY CHANGES INTO
  silver_customers_cdc
FROM
  stream(LIVE.silver_customers)
KEYS
  (customer_id)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  signup_ts
COLUMNS * EXCEPT
  (operation, _rescued_data, ingestion_timestamp)
STORED AS
  SCD TYPE 1;

In [0]:
/*
📦 **Silver Products CDC Table**

This streaming table applies Change Data Capture (CDC) to maintain a complete history of product changes over time:
- Handles inserts, updates, and deletes from the bronze_products table using SCD Type 2 logic (historical tracking).
- Uses product_id as the unique key and event_ts for sequencing changes.
- Preserves historical versions of each product with validity timestamps for time-travel queries.
- Drops technical columns and CDC operation metadata for a clean, analytics-ready output.

Use this table to track product attribute changes over time, analyze pricing history, and support historical reporting for merchandising and inventory analytics.
*/

CREATE OR REFRESH STREAMING TABLE silver_products_cdc;

APPLY CHANGES INTO
  silver_products_cdc
FROM
  stream(LIVE.bronze_products)
KEYS
  (product_id)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  event_ts
COLUMNS * EXCEPT
  (operation, _rescued_data, ingestion_timestamp)
STORED AS
  SCD TYPE 2;

In [0]:
/*
🛒 **Silver Orders CDC Table**

This streaming table applies Change Data Capture (CDC) to maintain a complete history of order changes over time:
- Handles inserts, updates, and deletes from the bronze_orders table using SCD Type 2 logic (historical tracking).
- Uses order_id as the unique key and order_ts for sequencing changes.
- Preserves historical versions of each order with validity timestamps for time-travel queries.
- Drops technical columns and CDC operation metadata for a clean, analytics-ready output.

Use this table to track order lifecycle changes, analyze order modifications, cancellations, and support historical reporting for sales and operations analytics.
*/

CREATE OR REFRESH STREAMING TABLE silver_orders_cdc;

APPLY CHANGES INTO
  silver_orders_cdc
FROM
  stream(LIVE.bronze_orders)
KEYS
  (order_id)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  order_ts
COLUMNS * EXCEPT
  (operation, _rescued_data, ingestion_timestamp)
STORED AS
  SCD TYPE 2;

In [0]:
/*
📊 **Daily Order KPIs Materialized View**

This materialized view provides daily sales and operational metrics for business analysis, including:
- 💰 **Gross revenue per day**
- 🛒 **Total number of orders**
- ❌ **Number of cancelled orders**
- 📈 **Average order value (AOV)**
- 🔄 **Order cancellation rate**

Use this view to monitor sales performance, identify trends, and support decision-making for Sales Operations and Finance teams.
*/

CREATE OR REPLACE MATERIALIZED VIEW gold_mv_daily_order_kpis
COMMENT 'Daily revenue, order count, AOV, cancellation rate'
AS
WITH base AS (
  SELECT
    CAST(order_ts AS DATE) AS order_date,
    status,
    quantity,
    unit_price,
    (quantity * unit_price) AS line_revenue
  FROM silver_orders_cdc
  WHERE order_id IS NOT NULL
)
SELECT
  order_date,
  COUNT(*) AS orders,
  SUM(CASE WHEN status = 'Cancelled' THEN 1 ELSE 0 END) AS cancelled_orders,
  ROUND(SUM(line_revenue), 2) AS gross_revenue,
  CASE WHEN COUNT(*) > 0 THEN ROUND(SUM(line_revenue)/COUNT(*), 2) END AS avg_order_value,
  CASE WHEN COUNT(*) > 0 THEN
    ROUND(SUM(CASE WHEN status='Cancelled' THEN 1 ELSE 0 END) * 1.0 / COUNT(*), 2)
  END AS cancellation_rate
FROM base
GROUP BY order_date
ORDER BY order_date;

In [0]:
/*
👤 **Customer 360 Profile & Value Materialized View**

This materialized view provides a unified, up-to-date profile for each customer, including:
- 🏷️ **Current attributes**: name, email, country, segment
- 🛒 **Lifetime order count**
- 💰 **Lifetime revenue**

Use this view to power marketing, customer experience, and analytics with a complete customer snapshot and value metrics.
*/

CREATE OR REPLACE MATERIALIZED VIEW gold_mv_customer_360
COMMENT 'Current customer profile with lifetime value metrics'
AS
WITH orders AS (
  SELECT
    customer_id,
    COUNT(*) AS order_cnt,
    SUM(quantity * unit_price) AS revenue
  FROM silver_orders_cdc
  WHERE customer_id IS NOT NULL
  GROUP BY customer_id
)
SELECT
  c.customer_id,
  c.first_name,
  c.last_name,
  c.email,
  c.country,
  c.segment,
  COALESCE(o.order_cnt, 0) AS order_count_lifetime,
  COALESCE(o.revenue,   0) AS revenue_lifetime
FROM silver_customers_cdc AS c
LEFT JOIN orders AS o
  ON c.customer_id = o.customer_id
WHERE c.customer_id IS NOT NULL;

In [0]:
/*
🌍 **Revenue by Product & Country Materialized View**

This materialized view provides a comprehensive breakdown of sales performance by product and country, including:
- 🏷️ **Product attributes**: name, category, sub-category, brand
- 🌐 **Country**: geographic segmentation
- 🛒 **Order count**: total orders per product-country pair
- 💰 **Revenue**: total sales revenue per product-country pair

Use this view to support Geo Merchandising, Financial Planning & Analysis (FP&A), and to identify top-performing products in each region.
*/

CREATE OR REPLACE MATERIALIZED VIEW gold_mv_revenue_by_product_country
COMMENT 'Revenue and order counts by product, by country'
AS
WITH agg AS (
  SELECT
    o.product_id,
    o.country,
    COUNT(*) AS orders,
    ROUND(SUM(o.quantity*o.unit_price), 2) AS revenue
  FROM silver_orders_cdc AS o
  WHERE o.product_id IS NOT NULL
  GROUP BY o.product_id, o.country
)
SELECT
  a.product_id,
  p.product_name,
  p.category,
  p.sub_category,
  p.brand,
  a.country,
  a.orders,
  a.revenue
FROM agg a
LEFT JOIN silver_products_cdc AS p
  ON a.product_id = p.product_id;

In [0]:
/*
🔎 **Cancellation Funnel Materialized View (Operations / CX)**

This materialized view helps you:
- 📉 **Track cancellation rates** by payment method, country, and month
- 🌍 **Identify friction points** across geographies and payment channels
- 🛑 **Spot trends** in order cancellations to improve customer experience

Use this view to monitor operational health, optimize payment flows, and reduce cancellations.
*/

CREATE OR REPLACE MATERIALIZED VIEW gold_mv_cancellation_funnel
COMMENT 'Cancellation rates across payment_method, country, and time'
AS
SELECT
  DATE_FORMAT(DATE_TRUNC('MONTH', CAST(o.order_ts AS DATE)), 'yyyy-MM') AS order_month,
  o.payment_method,
  o.country,
  COUNT(*) AS orders,
  SUM(CASE WHEN o.status='Cancelled' THEN 1 ELSE 0 END) AS cancelled_orders,
  CASE WHEN COUNT(*)>0 THEN
    ROUND(SUM(CASE WHEN o.status='Cancelled' THEN 1 ELSE 0 END) * 1.0 / COUNT(*), 2)
  END AS cancellation_rate
FROM silver_orders_cdc AS o
WHERE o.order_id IS NOT NULL
GROUP BY DATE_FORMAT(DATE_TRUNC('MONTH', CAST(o.order_ts AS DATE)), 'yyyy-MM'), o.payment_method, o.country
ORDER BY order_month;

In [0]:
/*
👤 **Customer 360 with Product Mix & First/Last Touch Materialized View**

This materialized view provides a comprehensive, analytics-ready profile for each customer, including:
- 🏷️ **Current customer attributes**: name, email, signup date, country, segment
- 💰 **Lifetime value (LTV)**: total revenue and order count across all time
- 🛒 **Product mix**: detailed breakdown of categories and brands purchased, with revenue and quantity per group
- 🕒 **First and last order timestamps**: customer lifecycle insights

Use this view to power marketing, personalization, customer analytics, and segmentation based on purchase behavior and value.
*/

CREATE OR REPLACE MATERIALIZED VIEW gold_mv_customer_360_enhanced
COMMENT 'Customer profile + LTV + product mix + first/last touch'
AS
WITH o AS (
  SELECT
    o.order_id,
    o.customer_id,
    o.product_id,
    o.order_ts,
    o.quantity,
    o.unit_price,
    (o.quantity * o.unit_price) AS revenue
  FROM silver_orders_cdc AS o
  WHERE o.customer_id IS NOT NULL AND o.product_id IS NOT NULL
),
op AS (
  SELECT
    o.customer_id,
    p.category,
    p.brand,
    ROUND(SUM(o.revenue), 2) AS cat_brand_revenue,
    SUM(o.quantity) AS cat_brand_qty
  FROM o
  JOIN silver_products_cdc AS p ON o.product_id = p.product_id
  GROUP BY o.customer_id, p.category, p.brand
),
mix AS (
  -- Represent product mix as arrays of structs (category, brand, revenue, qty)
  SELECT
    customer_id,
    SORT_ARRAY(COLLECT_LIST(named_struct('category', category, 'brand', brand, 'revenue', cat_brand_revenue, 'qty', cat_brand_qty))) AS product_mix
  FROM op
  GROUP BY customer_id
),
ltv AS (
  SELECT
    customer_id,
    ROUND(SUM(revenue), 2) AS revenue_lifetime,
    COUNT(*) AS order_count,
    MIN(order_ts) AS first_order_ts,
    MAX(order_ts) AS last_order_ts
  FROM o
  GROUP BY customer_id
)
SELECT
  c.customer_id,
  c.first_name, c.last_name, c.email, c.signup_ts, c.country, c.segment,
  l.revenue_lifetime, l.order_count, l.first_order_ts, l.last_order_ts,
  m.product_mix
FROM silver_customers_cdc AS c
LEFT JOIN ltv l  ON c.customer_id = l.customer_id
LEFT JOIN mix m  ON c.customer_id = m.customer_id
WHERE c.customer_id IS NOT NULL AND m.product_mix IS NOT NULL
ORDER BY last_name;