# Databricks Lake Flow - CTAS Examples

This notebook contains all the CREATE TABLE AS SELECT (CTAS) examples from the Lake Flow project. You can run these examples directly on your Databricks cluster to explore Lake Flow capabilities.

## Prerequisites
- Databricks cluster with Delta Lake support
- Databricks Runtime 13.3+ (for Liquid Clustering examples)
- Sample data tables (created in this notebook)

## What You'll Learn
1. Basic CTAS with Delta Tables
2. Partitioned Tables with Advanced Features
3. Streaming Tables with Change Data Capture
4. Materialized Views with Complex Aggregations
5. Liquid Clustering (Databricks Runtime 13.3+)
6. Multi-Hop Architecture (Bronze/Silver/Gold)
7. Advanced Features (Generated Columns, Time Travel, etc.)

## Setup: Create Database and Sample Data

First, let's create a database for our examples and populate it with sample data.

## 🔧 Configure Your Catalog and Schema

**Before running the examples, you need to configure your target catalog and schema:**

### Unity Catalog Setup (Recommended)

1. **Choose your catalog**: This could be:
   - `main` - Default catalog in most workspaces
   - `dev`, `staging`, `prod` - Environment-specific catalogs
   - Your custom catalog name

2. **Choose your schema**: This could be:
   - `lake_flow_demo` - For this demo
   - `analytics`, `data_engineering` - Department-specific
   - Your project-specific schema name

3. **Update the next cell**: Replace `your_catalog` and `your_schema` with your actual values

### Example Configurations:

```sql
-- For main catalog with demo schema
USE CATALOG main;
CREATE SCHEMA IF NOT EXISTS lake_flow_demo;
USE SCHEMA lake_flow_demo;

-- For dev environment
USE CATALOG dev;
CREATE SCHEMA IF NOT EXISTS analytics;
USE SCHEMA analytics;

-- For production with department schema
USE CATALOG prod;
CREATE SCHEMA IF NOT EXISTS data_engineering;
USE SCHEMA data_engineering;
```

### Permissions Required:
- **CREATE** permission on the catalog
- **USE CATALOG** permission
- **CREATE SCHEMA** permission (if schema doesn't exist)
- **USE SCHEMA** permission

💡 **Tip**: Contact your Databricks admin if you need permissions or help choosing the right catalog/schema.

In [None]:
-- ==================================================
-- CONFIGURE YOUR CATALOG AND SCHEMA HERE
-- ==================================================
-- Replace 'your_catalog' and 'your_schema' with actual values
-- Example: 'main' catalog and 'lake_flow_demo' schema
-- Or: 'dev' catalog and 'analytics' schema

-- Option 1: Use Unity Catalog with specific catalog and schema
USE CATALOG your_catalog;
CREATE SCHEMA IF NOT EXISTS your_schema;
USE SCHEMA your_schema;

-- Option 2: Alternative - Use three-level namespace directly
-- CREATE SCHEMA IF NOT EXISTS your_catalog.your_schema;
-- USE your_catalog.your_schema;

-- Option 3: For workspaces without Unity Catalog (legacy)
-- CREATE DATABASE IF NOT EXISTS lake_flow_demo;
-- USE lake_flow_demo;

-- Verify current namespace
SELECT current_catalog() as current_catalog, current_database() as current_schema;

### Create Sample Orders Table

In [None]:
-- Create sample orders table with realistic data
DROP TABLE IF EXISTS orders;

CREATE TABLE orders (
  order_id INT,
  customer_id INT,
  product_id INT,
  region STRING,
  product_category STRING,
  order_date DATE,
  order_amount DECIMAL(10,2),
  customer_segment STRING,
  payment_method STRING,
  discount_amount DECIMAL(10,2)
) USING DELTA;

-- Insert sample data
INSERT INTO orders VALUES
  (1, 101, 1001, 'North America', 'Electronics', '2024-01-15', 1250.00, 'Premium', 'Credit Card', 125.00),
  (2, 102, 1002, 'Europe', 'Clothing', '2024-01-16', 890.50, 'Standard', 'PayPal', 0.00),
  (3, 103, 1003, 'Asia Pacific', 'Electronics', '2024-01-17', 2100.75, 'Premium', 'Bank Transfer', 210.00),
  (4, 104, 1001, 'North America', 'Home & Garden', '2024-01-18', 450.25, 'Standard', 'Credit Card', 45.00),
  (5, 105, 1004, 'Europe', 'Electronics', '2024-01-19', 1780.00, 'Premium', 'Credit Card', 178.00),
  (6, 101, 1005, 'North America', 'Clothing', '2024-01-20', 320.75, 'Premium', 'PayPal', 32.00),
  (7, 106, 1002, 'Asia Pacific', 'Electronics', '2024-01-21', 950.00, 'Standard', 'Credit Card', 0.00),
  (8, 107, 1003, 'Europe', 'Home & Garden', '2024-01-22', 675.50, 'Standard', 'Bank Transfer', 67.50),
  (9, 108, 1006, 'North America', 'Electronics', '2024-01-23', 1450.25, 'Premium', 'Credit Card', 145.00),
  (10, 102, 1007, 'Europe', 'Clothing', '2024-01-24', 580.00, 'Standard', 'PayPal', 58.00);

-- Verify data
SELECT COUNT(*) as record_count FROM orders;
SELECT * FROM orders LIMIT 5;

### Create Additional Sample Tables

In [None]:
-- Create customers table
DROP TABLE IF EXISTS customers;

CREATE TABLE customers (
  customer_id INT,
  customer_name STRING,
  email STRING,
  registration_date DATE,
  customer_segment STRING,
  preferences MAP<STRING, STRING>
) USING DELTA;

INSERT INTO customers VALUES
  (101, 'John Smith', 'john.smith@email.com', '2023-06-15', 'Premium', map('category', 'Electronics', 'newsletter', 'true')),
  (102, 'Maria Garcia', 'maria.garcia@email.com', '2023-08-22', 'Standard', map('category', 'Clothing', 'newsletter', 'false')),
  (103, 'David Chen', 'david.chen@email.com', '2023-05-10', 'Premium', map('category', 'Electronics', 'newsletter', 'true')),
  (104, 'Sarah Johnson', 'sarah.johnson@email.com', '2023-09-03', 'Standard', map('category', 'Home & Garden', 'newsletter', 'true')),
  (105, 'Ahmed Hassan', 'ahmed.hassan@email.com', '2023-07-18', 'Premium', map('category', 'Electronics', 'newsletter', 'true')),
  (106, 'Lisa Wong', 'lisa.wong@email.com', '2023-10-12', 'Standard', map('category', 'Electronics', 'newsletter', 'false')),
  (107, 'Pierre Dubois', 'pierre.dubois@email.com', '2023-04-25', 'Standard', map('category', 'Home & Garden', 'newsletter', 'true')),
  (108, 'Jennifer Brown', 'jennifer.brown@email.com', '2023-11-08', 'Premium', map('category', 'Electronics', 'newsletter', 'true'));

-- Create product inventory for streaming examples
DROP TABLE IF EXISTS product_inventory_raw;

CREATE TABLE product_inventory_raw (
  product_id INT,
  product_name STRING,
  category STRING,
  current_stock INT,
  price DECIMAL(10,2),
  last_updated TIMESTAMP
) USING DELTA;

INSERT INTO product_inventory_raw VALUES
  (1001, 'Laptop Pro X1', 'Electronics', 45, 1299.99, '2024-01-20 10:30:00'),
  (1002, 'Designer Jeans', 'Clothing', 120, 89.50, '2024-01-20 11:15:00'),
  (1003, 'Smart TV 55"', 'Electronics', 23, 899.99, '2024-01-20 12:00:00'),
  (1004, 'Wireless Headphones', 'Electronics', 78, 199.99, '2024-01-20 13:45:00'),
  (1005, 'Cotton T-Shirt', 'Clothing', 200, 24.99, '2024-01-20 14:30:00'),
  (1006, 'Gaming Monitor', 'Electronics', 15, 449.99, '2024-01-20 15:15:00'),
  (1007, 'Running Shoes', 'Clothing', 65, 129.99, '2024-01-20 16:00:00');

SELECT 'Sample data created successfully' as status;

## Example 1: Basic CTAS with Delta Table

This example demonstrates basic CTAS functionality with Delta Lake properties, auto-optimization, and basic aggregations.

## 📍 Table Storage Options

When creating tables with Unity Catalog, you have several storage options:

### 1. Managed Tables (Recommended)
- **No LOCATION specified** - Unity Catalog manages storage automatically
- **Benefits**: Automatic lifecycle management, integrated governance, optimized performance
- **Best for**: Most use cases, especially in governed environments

### 2. External Tables with Volumes
- **LOCATION**: `/Volumes/catalog/schema/volume_name/table_name`
- **Benefits**: Control over storage location while maintaining Unity Catalog features
- **Best for**: When you need specific storage locations or integration with external systems

### 3. External Tables with Cloud Storage
- **LOCATION**: `s3://bucket/path/` or `abfss://container@account.dfs.core.windows.net/path/`
- **Benefits**: Direct cloud storage integration
- **Best for**: Migration scenarios or specific compliance requirements

### 4. Legacy DBFS (Compatibility)
- **LOCATION**: `/tmp/` or `/mnt/` paths
- **Note**: For workspaces without Unity Catalog

💡 **For this demo**: We'll use managed tables (no LOCATION) for simplicity. Uncomment LOCATION lines if you need external storage.

In [None]:
-- Basic CTAS with Delta Table and Auto-Optimization
DROP TABLE IF EXISTS order_summary_basic;

CREATE OR REPLACE TABLE order_summary_basic
USING DELTA
-- Option 1: Use Unity Catalog managed tables (recommended - no LOCATION needed)
-- Option 2: Use external location with catalog/schema structure
-- LOCATION '/Volumes/your_catalog/your_schema/lake_flow_demo/order_summary_basic'
-- Option 3: Use DBFS for legacy workspaces
-- LOCATION '/tmp/lake_flow_demo/order_summary_basic'
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true',
  'delta.enableChangeDataFeed' = 'true'
)
AS
SELECT 
  region,
  product_category,
  COUNT(*) as total_orders,
  SUM(order_amount) as total_revenue,
  AVG(order_amount) as avg_order_value,
  MIN(order_date) as first_order_date,
  MAX(order_date) as last_order_date
FROM orders
GROUP BY region, product_category
ORDER BY total_revenue DESC;

In [None]:
-- Verify the basic table
SELECT * FROM order_summary_basic;

-- Show table properties
SHOW TBLPROPERTIES order_summary_basic;

## Example 2: Partitioned Tables with Advanced Features

This example shows partitioning strategies, window functions, JSON data extraction, and customer segmentation.

In [None]:
-- Partitioned Table with Advanced Analytics
DROP TABLE IF EXISTS customer_analytics_partitioned;

CREATE OR REPLACE TABLE customer_analytics_partitioned
USING DELTA
LOCATION '/tmp/lake_flow_demo/customer_analytics_partitioned'
PARTITIONED BY (customer_segment, region)
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true'
)
AS
SELECT 
  c.customer_id,
  c.customer_name,
  c.email,
  o.region,
  c.customer_segment,
  
  -- JSON extraction from customer preferences
  c.preferences['category'] as preferred_category,
  CASE WHEN c.preferences['newsletter'] = 'true' THEN 'Subscribed' ELSE 'Not Subscribed' END as newsletter_status,
  
  -- Customer analytics with window functions
  COUNT(o.order_id) as total_orders,
  SUM(o.order_amount) as total_spent,
  AVG(o.order_amount) as avg_order_value,
  
  -- Window function: Rank customers by spending within their segment
  RANK() OVER (
    PARTITION BY c.customer_segment, o.region
    ORDER BY SUM(o.order_amount) DESC
  ) as spending_rank_in_segment,
  
  -- Customer value classification
  CASE 
    WHEN SUM(o.order_amount) > 2000 THEN 'High Value'
    WHEN SUM(o.order_amount) > 1000 THEN 'Medium Value'
    ELSE 'Low Value'
  END as customer_value_tier,
  
  MIN(o.order_date) as first_purchase_date,
  MAX(o.order_date) as last_purchase_date,
  DATEDIFF(MAX(o.order_date), MIN(o.order_date)) as customer_lifetime_days
  
FROM customers c
INNER JOIN orders o ON c.customer_id = o.customer_id
GROUP BY 
  c.customer_id, c.customer_name, c.email, o.region, c.customer_segment,
  c.preferences['category'], c.preferences['newsletter'];

In [None]:
-- Verify partitioned table
SELECT * FROM customer_analytics_partitioned ORDER BY total_spent DESC;

-- Show partition information
SHOW PARTITIONS customer_analytics_partitioned;

## Example 3: Streaming Tables with Change Data Capture

This example demonstrates streaming table creation with change data feed, surrogate key generation, and real-time processing patterns.

In [None]:
-- Streaming Table with Change Data Capture
DROP TABLE IF EXISTS product_inventory_streaming;

CREATE OR REPLACE TABLE product_inventory_streaming
USING DELTA
LOCATION '/tmp/lake_flow_demo/product_inventory_streaming'
TBLPROPERTIES (
  'delta.enableChangeDataFeed' = 'true',
  'delta.autoOptimize.optimizeWrite' = 'true'
)
AS
SELECT 
  -- Surrogate key generation using UUID
  uuid() as inventory_key,
  
  product_id,
  product_name,
  category,
  current_stock,
  price,
  
  -- Inventory status classification
  CASE 
    WHEN current_stock = 0 THEN 'Out of Stock'
    WHEN current_stock < 20 THEN 'Low Stock'
    WHEN current_stock < 50 THEN 'Medium Stock'
    ELSE 'High Stock'
  END as stock_status,
  
  -- Price tier classification
  CASE 
    WHEN price < 50 THEN 'Budget'
    WHEN price < 200 THEN 'Mid-Range'
    WHEN price < 500 THEN 'Premium'
    ELSE 'Luxury'
  END as price_tier,
  
  -- Reorder point calculation (20% of max typical stock)
  current_stock * 0.2 as reorder_point,
  
  -- Inventory value
  current_stock * price as inventory_value,
  
  last_updated,
  current_timestamp() as processed_timestamp
  
FROM product_inventory_raw;

In [None]:
-- Verify streaming table
SELECT * FROM product_inventory_streaming ORDER BY inventory_value DESC;

-- Demonstrate change data feed (make a change first)
UPDATE product_inventory_raw 
SET current_stock = current_stock - 5, last_updated = current_timestamp() 
WHERE product_id IN (1001, 1003);

-- Re-create streaming table to capture changes
INSERT INTO product_inventory_streaming
SELECT 
  uuid() as inventory_key,
  product_id, product_name, category, current_stock, price,
  CASE 
    WHEN current_stock = 0 THEN 'Out of Stock'
    WHEN current_stock < 20 THEN 'Low Stock'
    WHEN current_stock < 50 THEN 'Medium Stock'
    ELSE 'High Stock'
  END as stock_status,
  CASE 
    WHEN price < 50 THEN 'Budget'
    WHEN price < 200 THEN 'Mid-Range'
    WHEN price < 500 THEN 'Premium'
    ELSE 'Luxury'
  END as price_tier,
  current_stock * 0.2 as reorder_point,
  current_stock * price as inventory_value,
  last_updated,
  current_timestamp() as processed_timestamp
FROM product_inventory_raw
WHERE product_id IN (1001, 1003);

## Example 4: Materialized Views with Complex Aggregations

This example shows statistical functions (percentiles, stddev), array aggregations, and conditional aggregations in materialized views.

In [None]:
-- Create Materialized View with Complex Aggregations
DROP MATERIALIZED VIEW IF EXISTS order_analytics_materialized;

CREATE MATERIALIZED VIEW order_analytics_materialized
AS
SELECT 
  region,
  product_category,
  
  -- Basic aggregations
  COUNT(*) as total_orders,
  SUM(order_amount) as total_revenue,
  AVG(order_amount) as avg_order_value,
  
  -- Statistical functions
  STDDEV(order_amount) as revenue_stddev,
  PERCENTILE_APPROX(order_amount, 0.25) as revenue_25th_percentile,
  PERCENTILE_APPROX(order_amount, 0.50) as revenue_median,
  PERCENTILE_APPROX(order_amount, 0.75) as revenue_75th_percentile,
  PERCENTILE_APPROX(order_amount, 0.95) as revenue_95th_percentile,
  
  -- Array aggregations
  COLLECT_LIST(order_amount) as all_order_amounts,
  COLLECT_SET(customer_id) as unique_customers,
  ARRAY_DISTINCT(COLLECT_LIST(payment_method)) as payment_methods_used,
  
  -- Conditional aggregations
  COUNT(CASE WHEN customer_segment = 'Premium' THEN 1 END) as premium_orders,
  COUNT(CASE WHEN customer_segment = 'Standard' THEN 1 END) as standard_orders,
  SUM(CASE WHEN payment_method = 'Credit Card' THEN order_amount ELSE 0 END) as credit_card_revenue,
  SUM(CASE WHEN payment_method = 'PayPal' THEN order_amount ELSE 0 END) as paypal_revenue,
  
  -- Discount analysis
  AVG(discount_amount) as avg_discount,
  SUM(discount_amount) as total_discounts_given,
  COUNT(CASE WHEN discount_amount > 0 THEN 1 END) as orders_with_discounts,
  
  -- Date range
  MIN(order_date) as first_order_date,
  MAX(order_date) as last_order_date,
  DATEDIFF(MAX(order_date), MIN(order_date)) as date_range_days
  
FROM orders
GROUP BY region, product_category;

In [None]:
-- Query the materialized view
SELECT 
  region,
  product_category,
  total_orders,
  ROUND(total_revenue, 2) as total_revenue,
  ROUND(avg_order_value, 2) as avg_order_value,
  ROUND(revenue_median, 2) as median_order_value,
  ROUND(revenue_stddev, 2) as order_value_stddev,
  SIZE(unique_customers) as unique_customer_count,
  payment_methods_used
FROM order_analytics_materialized
ORDER BY total_revenue DESC;

## Example 5: Liquid Clustering (Databricks Runtime 13.3+)

This example demonstrates Liquid Clustering for advanced performance optimization with multi-dimensional clustering and analytics window functions.

In [None]:
-- Liquid Clustering Example (requires Databricks Runtime 13.3+)
DROP TABLE IF EXISTS sales_clustered;

CREATE OR REPLACE TABLE sales_clustered
USING DELTA
LOCATION '/tmp/lake_flow_demo/sales_clustered'
CLUSTER BY (region, product_category, order_date)
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.tuneFileSizesForRewrites' = 'true'
)
AS
SELECT 
  order_id,
  customer_id,
  product_id,
  region,
  product_category,
  order_date,
  order_amount,
  
  -- Advanced window functions for customer analytics
  ROW_NUMBER() OVER (
    PARTITION BY customer_id
    ORDER BY order_date
  ) as order_sequence,
  
  LAG(order_amount) OVER (
    PARTITION BY customer_id
    ORDER BY order_date
  ) as previous_order_amount,
  
  SUM(order_amount) OVER (
    PARTITION BY customer_id
    ORDER BY order_date
    ROWS UNBOUNDED PRECEDING
  ) as running_total,
  
  -- Date/time functions
  DAYOFWEEK(order_date) as day_of_week,
  QUARTER(order_date) as quarter,
  DATE_ADD(order_date, 30) as expected_next_order
  
FROM orders
WHERE order_date >= '2024-01-01';

In [None]:
-- Query the clustered table
SELECT 
  customer_id,
  order_sequence,
  order_date,
  order_amount,
  previous_order_amount,
  running_total,
  region,
  product_category
FROM sales_clustered
ORDER BY customer_id, order_sequence;

-- Show clustering information
DESCRIBE DETAIL sales_clustered;

## Example 6: Multi-Hop Architecture (Bronze → Silver → Gold)

This example demonstrates the medallion architecture with progressive data refinement through Bronze, Silver, and Gold layers.

### Bronze Layer (Raw Data Ingestion)

In [None]:
-- BRONZE LAYER: Raw data ingestion with minimal processing
DROP TABLE IF EXISTS bronze_transactions;

CREATE OR REPLACE TABLE bronze_transactions
USING DELTA
LOCATION '/tmp/lake_flow_demo/bronze_transactions'
TBLPROPERTIES (
  'delta.enableChangeDataFeed' = 'true'
)
AS
SELECT 
  order_id,
  customer_id,
  product_id,
  region,
  product_category,
  order_date,
  order_amount,
  customer_segment,
  payment_method,
  discount_amount,
  
  -- Metadata for lineage and auditing
  current_timestamp() as ingestion_timestamp,
  'orders_table' as source_system,
  uuid() as bronze_record_id
  
FROM orders;

### Silver Layer (Cleaned and Enriched Data)

In [None]:
-- SILVER LAYER: Data quality, cleansing, and enrichment
DROP TABLE IF EXISTS silver_transactions;

CREATE OR REPLACE TABLE silver_transactions
USING DELTA
LOCATION '/tmp/lake_flow_demo/silver_transactions'
TBLPROPERTIES (
  'delta.enableChangeDataFeed' = 'true',
  'delta.autoOptimize.optimizeWrite' = 'true'
)
AS
SELECT 
  order_id,
  customer_id,
  product_id,
  
  -- Data standardization and cleansing
  UPPER(TRIM(region)) as region_clean,
  UPPER(TRIM(product_category)) as product_category_clean,
  order_date,
  
  -- Data validation and quality checks
  CASE 
    WHEN order_amount > 0 THEN order_amount 
    ELSE NULL 
  END as order_amount_validated,
  
  customer_segment,
  payment_method,
  
  CASE 
    WHEN discount_amount >= 0 THEN discount_amount 
    ELSE 0 
  END as discount_amount_validated,
  
  -- Business rule application and enrichment
  order_amount - discount_amount as net_amount,
  
  CASE 
    WHEN order_amount - discount_amount > 1000 THEN 'High Value'
    WHEN order_amount - discount_amount > 500 THEN 'Medium Value'
    ELSE 'Low Value'
  END as transaction_tier,
  
  -- Geospatial enrichment (simplified)
  CASE 
    WHEN UPPER(region) = 'NORTH AMERICA' THEN 'Americas'
    WHEN UPPER(region) = 'EUROPE' THEN 'EMEA'
    WHEN UPPER(region) = 'ASIA PACIFIC' THEN 'APAC'
    ELSE 'Other'
  END as geo_region,
  
  -- Data quality indicators
  CASE 
    WHEN order_amount IS NULL OR order_amount <= 0 THEN 'INVALID_AMOUNT'
    WHEN customer_id IS NULL THEN 'MISSING_CUSTOMER'
    WHEN product_id IS NULL THEN 'MISSING_PRODUCT'
    ELSE 'VALID'
  END as data_quality_flag,
  
  -- Processing metadata
  current_timestamp() as silver_processed_timestamp,
  bronze_record_id
  
FROM bronze_transactions
WHERE order_amount IS NOT NULL AND order_amount > 0;

### Gold Layer (Business-Ready Aggregated Data)

In [None]:
-- GOLD LAYER: Business-ready aggregated data for analytics
DROP TABLE IF EXISTS gold_regional_summary;

CREATE OR REPLACE TABLE gold_regional_summary
USING DELTA
LOCATION '/tmp/lake_flow_demo/gold_regional_summary'
PARTITIONED BY (geo_region)
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true'
)
AS
SELECT 
  geo_region,
  region_clean as region,
  product_category_clean as product_category,
  
  -- Business metrics and KPIs
  COUNT(*) as total_transactions,
  COUNT(DISTINCT customer_id) as unique_customers,
  COUNT(DISTINCT product_id) as unique_products,
  
  -- Revenue metrics
  SUM(order_amount_validated) as gross_revenue,
  SUM(discount_amount_validated) as total_discounts,
  SUM(net_amount) as net_revenue,
  AVG(net_amount) as avg_transaction_value,
  
  -- Customer segmentation metrics
  COUNT(CASE WHEN customer_segment = 'Premium' THEN 1 END) as premium_transactions,
  COUNT(CASE WHEN customer_segment = 'Standard' THEN 1 END) as standard_transactions,
  
  -- Transaction tier distribution
  COUNT(CASE WHEN transaction_tier = 'High Value' THEN 1 END) as high_value_transactions,
  COUNT(CASE WHEN transaction_tier = 'Medium Value' THEN 1 END) as medium_value_transactions,
  COUNT(CASE WHEN transaction_tier = 'Low Value' THEN 1 END) as low_value_transactions,
  
  -- Payment method analysis
  COUNT(CASE WHEN payment_method = 'Credit Card' THEN 1 END) as credit_card_transactions,
  COUNT(CASE WHEN payment_method = 'PayPal' THEN 1 END) as paypal_transactions,
  COUNT(CASE WHEN payment_method = 'Bank Transfer' THEN 1 END) as bank_transfer_transactions,
  
  -- Data quality metrics
  COUNT(CASE WHEN data_quality_flag = 'VALID' THEN 1 END) as valid_records,
  COUNT(CASE WHEN data_quality_flag != 'VALID' THEN 1 END) as invalid_records,
  
  -- Time range
  MIN(order_date) as period_start,
  MAX(order_date) as period_end,
  
  -- Processing metadata
  current_timestamp() as gold_processed_timestamp
  
FROM silver_transactions
WHERE data_quality_flag = 'VALID'
GROUP BY geo_region, region_clean, product_category_clean;

In [None]:
-- Query the Gold layer for business insights
SELECT 
  geo_region,
  region,
  product_category,
  total_transactions,
  unique_customers,
  ROUND(net_revenue, 2) as net_revenue,
  ROUND(avg_transaction_value, 2) as avg_transaction_value,
  ROUND(100.0 * premium_transactions / total_transactions, 1) as premium_pct,
  ROUND(100.0 * high_value_transactions / total_transactions, 1) as high_value_pct
FROM gold_regional_summary
ORDER BY net_revenue DESC;

-- Show data lineage through the layers
SELECT 
  'Bronze Layer' as layer, COUNT(*) as record_count FROM bronze_transactions
UNION ALL
SELECT 
  'Silver Layer' as layer, COUNT(*) as record_count FROM silver_transactions
UNION ALL
SELECT 
  'Gold Layer' as layer, COUNT(*) as record_count FROM gold_regional_summary;

## Example 7: Advanced Features

This example demonstrates generated columns, column mapping, deletion vectors, and time travel capabilities.

In [None]:
-- Advanced Features: Generated Columns and Column Mapping
DROP TABLE IF EXISTS orders_advanced;

CREATE OR REPLACE TABLE orders_advanced (
  order_id INT,
  customer_id INT,
  product_id INT,
  region STRING,
  product_category STRING,
  order_date DATE,
  order_amount DECIMAL(10,2),
  discount_amount DECIMAL(10,2),
  
  -- Generated columns (computed automatically)
  net_amount DECIMAL(10,2) GENERATED ALWAYS AS (order_amount - discount_amount),
  order_year INT GENERATED ALWAYS AS (YEAR(order_date)),
  order_quarter STRING GENERATED ALWAYS AS (CONCAT('Q', QUARTER(order_date), '-', YEAR(order_date))),
  
  -- Metadata columns
  created_timestamp TIMESTAMP DEFAULT current_timestamp(),
  updated_timestamp TIMESTAMP
)
USING DELTA
LOCATION '/tmp/lake_flow_demo/orders_advanced'
TBLPROPERTIES (
  'delta.enableChangeDataFeed' = 'true',
  'delta.enableDeletionVectors' = 'true',
  'delta.columnMapping.mode' = 'name'
);

In [None]:
-- Insert data into advanced table
INSERT INTO orders_advanced (
  order_id, customer_id, product_id, region, product_category, 
  order_date, order_amount, discount_amount, updated_timestamp
)
SELECT 
  order_id, customer_id, product_id, region, product_category,
  order_date, order_amount, discount_amount, current_timestamp()
FROM orders;

-- Query showing generated columns
SELECT 
  order_id,
  order_date,
  order_amount,
  discount_amount,
  net_amount,  -- Generated column
  order_year,  -- Generated column
  order_quarter, -- Generated column
  region,
  product_category
FROM orders_advanced
ORDER BY order_id;

### Demonstrate Time Travel

In [None]:
-- Show table history
DESCRIBE HISTORY orders_advanced;

-- Make some changes to demonstrate time travel
UPDATE orders_advanced 
SET order_amount = order_amount * 1.1, updated_timestamp = current_timestamp()
WHERE region = 'North America';

-- Delete some records
DELETE FROM orders_advanced WHERE order_amount < 500;

-- Show current state
SELECT COUNT(*) as current_count, AVG(order_amount) as avg_amount 
FROM orders_advanced;

In [None]:
-- Time travel: Query previous version
SELECT COUNT(*) as version_0_count, AVG(order_amount) as avg_amount
FROM orders_advanced VERSION AS OF 0;

-- Show the difference between versions
SELECT 
  'Current' as version, COUNT(*) as record_count, ROUND(AVG(order_amount), 2) as avg_amount
FROM orders_advanced
UNION ALL
SELECT 
  'Version 0' as version, COUNT(*) as record_count, ROUND(AVG(order_amount), 2) as avg_amount
FROM orders_advanced VERSION AS OF 0;

-- Show table history with details
SELECT 
  version,
  timestamp,
  operation,
  operationParameters,
  readVersion
FROM (DESCRIBE HISTORY orders_advanced)
ORDER BY version DESC;

## Summary and Next Steps

You've now seen all 7 Lake Flow CTAS examples in action:

1. ✅ **Basic CTAS with Delta Table** - Auto-optimization and basic aggregations
2. ✅ **Partitioned Tables** - Advanced features with window functions and JSON extraction
3. ✅ **Streaming Tables** - Change Data Capture and real-time processing
4. ✅ **Materialized Views** - Complex aggregations and statistical functions
5. ✅ **Liquid Clustering** - Advanced performance optimization (Runtime 13.3+)
6. ✅ **Multi-Hop Architecture** - Bronze/Silver/Gold medallion pattern
7. ✅ **Advanced Features** - Generated columns, time travel, and deletion vectors

### Performance Tips:
- Use **Liquid Clustering** instead of Z-ordering for better performance
- Enable **Auto Optimize** for automatic file compaction
- Implement **Change Data Feed** for incremental processing
- Use **Deletion Vectors** for efficient deletes

### Next Steps:
1. Experiment with your own data using these patterns
2. Monitor query performance with different clustering strategies
3. Implement incremental data processing workflows
4. Explore advanced Delta Lake features like CDF and time travel