# Data Transformation: Lakeflow Spark Declarative Pipelines

**Lakeflow Spark Declarative Pipelines** (formerly Delta Live Tables) is a declarative framework for building reliable, maintainable, and testable data pipelines. With DLT, you focus on **what** transformations you want, not **how** to execute them.

## What You'll Learn

✅ Create declarative pipelines with streaming tables and materialized views  
✅ Use the new multi-file editor for organized pipeline development  
✅ Implement incremental data processing with flows  
✅ Apply data quality expectations automatically  
✅ Build Bronze-Silver-Gold architecture declaratively  

---

## Why Declarative Pipelines?

**Traditional Approach:**
- Manual checkpointing and state management
- Custom incremental processing logic
- Manual data quality checks
- Complex error handling

**Declarative Approach:**
- Automatic checkpointing and recovery
- Built-in incremental processing
- Declarative data quality expectations
- Simplified error handling and monitoring

---

## Use Case: Production-Ready IoT Pipeline

We'll build a complete IoT data pipeline that:
- Ingests raw sensor data incrementally
- Cleans and validates data with expectations
- Enriches with dimensional data
- Creates business-level aggregations
- Handles late-arriving data automatically

---

## Table of Contents

1. [Understanding Declarative Pipelines](#understanding)
2. [Multi-File Editor Setup](#multi-file)
3. [Streaming Tables vs Materialized Views](#tables-vs-views)
4. [Building the Bronze Layer](#bronze)
5. [Building the Silver Layer](#silver)
6. [Building the Gold Layer](#gold)
7. [Data Quality Expectations](#expectations)
8. [Complete Pipeline Example](#complete-example)

---

**References:**
- [Multi-File Editor](https://docs.databricks.com/aws/en/ldp/multi-file-editor)
- [Streaming Tables](https://docs.databricks.com/aws/en/ldp/streaming-tables)
- [Flows](https://docs.databricks.com/aws/en/ldp/flows)
- [Materialized Views](https://docs.databricks.com/aws/en/ldp/materialized-views)
- [Load Data](https://docs.databricks.com/aws/en/ldp/load)


In [None]:
## 1. Understanding Declarative Pipelines <a id="understanding"></a>

### What are Lakeflow Spark Declarative Pipelines?

Declarative pipelines let you define **what** data transformations you want, and the framework automatically handles:
- **Incremental processing** - Only process new/changed data
- **Dependency management** - Automatically determine execution order
- **Data quality** - Built-in validation with expectations
- **Monitoring** - Automatic lineage and observability
- **Recovery** - Checkpoint management and error handling

### Key Concepts

**Streaming Tables:**
- For append-only, incremental data ingestion
- Each input row processed only once
- Low-latency streaming transformations
- Ideal for bronze and silver layers

**Materialized Views:**
- For stateful aggregations and joins
- Automatically recompute when source data changes
- Always return correct, consistent results
- Ideal for gold layer aggregations

**Flows:**
- Define how data moves between tables
- Support multiple sources writing to one target
- Automatic incremental processing
- Named for checkpoint management

### Benefits

✅ **Less code** - Framework handles boilerplate  
✅ **Automatic scaling** - Adapts to data volume  
✅ **Built-in quality** - Expectations track data quality  
✅ **Production-ready** - Monitoring, alerts, lineage included  
✅ **Incremental by default** - Efficient processing  

### Architecture

```
Source Data (Volumes/Kafka/Tables)
         ↓
Streaming Tables (Bronze) - Ingest raw data
         ↓
Streaming Tables (Silver) - Clean & enrich
         ↓
Materialized Views (Gold) - Aggregate metrics
```


## 2. Multi-File Editor Setup <a id="multi-file"></a>

### What is the Multi-File Editor?

The **Lakeflow Pipelines Editor** is an integrated development environment for building pipelines. It provides:
- **Multiple file support** - Organize code into modules
- **Visual pipeline graph** - See data flow and dependencies
- **Data preview** - Inspect data at any pipeline stage
- **Selective execution** - Test individual transformations
- **Debugging tools** - Identify and fix issues quickly

### Creating a Pipeline

**Step 1: Create New ETL Pipeline**

1. Click **New** at the top of the sidebar
2. Select **ETL pipeline**
3. Provide a unique name: `iot_sensor_pipeline`
4. Specify default catalog: `default`
5. Specify default schema: `db_crash_course`
6. Choose to start with sample code in **Python** or **SQL**
7. Click **Create**

**Step 2: Explore the Editor**

You'll see:
- **File browser** (left) - Organize pipeline code into multiple files
- **Code editor** (center) - Write pipeline definitions
- **Pipeline graph** (bottom) - Visual DAG showing data flow
- **Data preview** - Preview results at each stage

### File Organization Best Practices

```
iot_sensor_pipeline/
├── bronze/
│   ├── sensor_ingest.py
│   └── inspection_ingest.py
├── silver/
│   ├── sensor_clean.py
│   └── inspection_enriched.py
├── gold/
│   ├── factory_kpis.py
│   └── device_health.py
└── common/
    └── expectations.py
```

### Benefits of Multi-File Organization:

✅ **Modularity** - Separate concerns into logical files  
✅ **Reusability** - Share common functions across files  
✅ **Collaboration** - Multiple developers work on different files  
✅ **Testing** - Test individual components  
✅ **Maintenance** - Easier to understand and update  

**Reference:** [Multi-File Editor Documentation](https://docs.databricks.com/aws/en/ldp/multi-file-editor)


## 3. Streaming Tables vs Materialized Views <a id="tables-vs-views"></a>

### Streaming Tables

**Use for:** Incremental, append-only data processing

**Characteristics:**
- Each row processed exactly once
- Low-latency streaming
- Supports stateful operations with watermarks
- Best for bronze and silver layers

**When to use:**
- Data ingestion from files, Kafka, etc.
- Append-only transformations
- Stateful streaming (with bounded state)
- ETL jobs that process new data only

**Example use cases:**
- Ingest CSV files from cloud storage
- Read from Kafka topics
- Clean and filter streaming data

---

### Materialized Views

**Use for:** Stateful aggregations that need to recompute

**Characteristics:**
- Always return correct, up-to-date results
- Automatically recompute when sources change
- Support complex joins and aggregations
- Best for gold layer aggregations

**When to use:**
- Aggregations that need to update (not just append)
- Joins with slowly changing dimensions
- Views that need to be always correct
- Gold layer business metrics

**Example use cases:**
- Daily/hourly aggregations
- Complex joins across multiple tables
- Business KPIs and reports

---

### Comparison Table

| Feature | Streaming Table | Materialized View |
|---------|----------------|-------------------|
| **Processing** | Incremental (each row once) | Recomputes as needed |
| **Joins** | Stream-snapshot (point-in-time) | Always correct |
| **Updates** | Append-only | Updates/deletes |
| **Latency** | Low (continuous) | Higher (batch) |
| **Use case** | Ingestion, ETL | Aggregations, reporting |

**Reference:** 
- [Streaming Tables](https://docs.databricks.com/aws/en/ldp/streaming-tables)
- [Materialized Views](https://docs.databricks.com/aws/en/ldp/materialized-views)


In [None]:
## 4. Building the Bronze Layer <a id="bronze"></a>

### Bronze Layer: Streaming Tables for Data Ingestion

The bronze layer ingests raw data using **streaming tables** that process new files incrementally.

### Python Example: Ingest Sensor Data


"""
Python pipeline code - save as bronze/sensor_ingest.py
"""

from pyspark import pipelines as dp

# Configuration (would typically come from pipeline settings)
CATALOG = "default"
SCHEMA = "db_crash_course"

@dp.table(
    name="sensor_bronze",
    comment="Raw sensor readings ingested from CSV files in volumes"
)
def sensor_bronze():
    """
    Ingest sensor data from volumes using Auto Loader.
    Auto Loader automatically handles:
    - Schema inference and evolution
    - Checkpoint management
    - Exactly-once processing
    """
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("header", "true")
        .load(f"/Volumes/{CATALOG}/{SCHEMA}/sensor_data/")
    )

# SQL Equivalent:
sql_example = """
CREATE OR REFRESH STREAMING TABLE sensor_bronze
  COMMENT 'Raw sensor readings ingested from CSV files'
AS SELECT * FROM STREAM read_files(
  '/Volumes/default/db_crash_course/sensor_data/',
  format => 'csv',
  header => 'true'
);
"""

print("Streaming table for bronze layer sensor ingestion:")
print(sql_example)


In [None]:
### Python Example: Ingest Inspection Data

"""
Save as bronze/inspection_ingest.py
"""

@dp.table(
    name="inspection_bronze",
    comment="Raw inspection records from volumes"
)
def inspection_bronze():
    """Ingest inspection data with Auto Loader."""
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("header", "true")
        .load(f"/Volumes/{CATALOG}/{SCHEMA}/inspection_data/")
    )

print("""
✅ Bronze Layer Benefits with Streaming Tables:

1. **Auto Loader** handles file discovery automatically
2. **Schema inference** detects column types
3. **Schema evolution** handles new columns gracefully
4. **Exactly-once** processing guaranteed
5. **Checkpointing** automatic - no manual state management

The framework monitors the source location and processes new files as they arrive!
""")


## 5. Building the Silver Layer <a id="silver"></a>

### Silver Layer: Data Quality and Enrichment

The silver layer cleans, validates, and enriches data using streaming tables.

### Data Quality with Expectations

**Expectations** are declarative data quality rules that:
- Validate data automatically
- Track quality metrics over time
- Handle violations with specified actions (fail, drop, warn)

### Python Example: Clean and Validate Sensor Data


In [None]:
"""
Save as silver/sensor_clean.py
"""

from pyspark.sql.functions import col, when, abs as abs_func

@dp.table(
    name="sensor_silver",
    comment="Cleaned and validated sensor readings",
    # Data quality expectations
    expect={
        "valid_device_id": "device_id IS NOT NULL",
        "valid_timestamp": "timestamp IS NOT NULL",
        "valid_temperature_range": "temperature BETWEEN -50 AND 150"
    },
    # Drop rows that fail expectations
    expect_or_drop={
        "positive_air_pressure_after_fix": "air_pressure > 0"
    }
)
def sensor_silver():
    """
    Clean sensor bronze data:
    - Fix negative air pressure values
    - Validate data quality with expectations
    - Add derived fields
    """
    from pyspark.sql.functions import current_timestamp
    
    return (
        dp.read_stream("sensor_bronze")  # Read from bronze streaming table
        # Fix negative air pressure (data quality issue)
        .withColumn("air_pressure",
                   when(col("air_pressure") < 0, abs_func(col("air_pressure")))
                   .otherwise(col("air_pressure")))
        # Add processing timestamp
        .withColumn("processed_at", current_timestamp())
    )

print("""
✅ Data Quality Expectations Applied:

- valid_device_id: Ensures device_id is not null
- valid_timestamp: Ensures timestamp is present  
- valid_temperature_range: Validates temperature is realistic (-50 to 150°F)
- positive_air_pressure_after_fix: Drops rows with invalid pressure after fix

Failed expectations are logged and tracked automatically!
""")


### SQL Example: Same Transformation in SQL


In [None]:
-- Save as silver/sensor_clean.sql

CREATE OR REFRESH STREAMING TABLE sensor_silver (
  CONSTRAINT valid_device_id EXPECT (device_id IS NOT NULL),
  CONSTRAINT valid_timestamp EXPECT (timestamp IS NOT NULL),
  CONSTRAINT valid_temperature_range EXPECT (temperature BETWEEN -50 AND 150),
  CONSTRAINT positive_air_pressure EXPECT OR DROP (air_pressure > 0)
)
COMMENT 'Cleaned and validated sensor readings'
AS SELECT
  device_id,
  trip_id,
  factory_id,
  model_id,
  timestamp,
  CASE 
    WHEN air_pressure < 0 THEN ABS(air_pressure)
    ELSE air_pressure
  END as air_pressure,
  temperature,
  rotation_speed,
  airflow_rate,
  delay,
  density,
  current_timestamp() as processed_at
FROM STREAM(sensor_bronze);

print("SQL version provides same functionality with declarative syntax!")


### Enrich with Dimension Tables (Stream-Snapshot Join)


In [None]:
"""
Save as silver/sensor_enriched.py
"""

@dp.table(
    name="sensor_enriched",
    comment="Sensor data enriched with dimension tables"
)
def sensor_enriched():
    """
    Enrich sensor data with factory, model, and device dimensions.
    
    Note: This is a stream-snapshot join. The dimension tables are
    read as snapshots when the stream starts. Changes to dimensions
    won't reflect unless the stream is restarted or refreshed.
    """
    from pyspark.sql.functions import round as spark_round
    
    # Read streaming data
    sensors = dp.read_stream("sensor_silver")
    
    # Read dimension tables as static snapshots
    dim_factories = spark.read.table(f"{CATALOG}.{SCHEMA}.dim_factories")
    dim_models = spark.read.table(f"{CATALOG}.{SCHEMA}.dim_models")
    dim_devices = spark.read.table(f"{CATALOG}.{SCHEMA}.dim_devices")
    
    # Join with dimensions
    enriched = (
        sensors
        .join(dim_devices.select("device_id", "installation_date", "status"), 
              "device_id", "left")
        .join(dim_factories.select("factory_id", "factory_name", "region", "city"),
              "factory_id", "left")
        .join(dim_models.select("model_id", "model_name", "model_family", "model_category"),
              "model_id", "left")
    )
    
    # Add business calculations
    return (
        enriched
        .withColumn("temperature_celsius",
                   spark_round((col("temperature") - 32) * 5 / 9, 2))
        .withColumn("temperature_zone",
                   when(col("temperature") > 85, "Critical")
                   .when(col("temperature") > 75, "Warning")
                   .otherwise("Normal"))
        .withColumn("risk_score",
                   spark_round(
                       (col("temperature") / 100 * 0.4) +
                       (col("rotation_speed") / 1000 * 0.3) +
                       (col("density") / 5 * 0.3),
                       2
                   ))
    )

print("Stream-snapshot join: Dimensions are snapshotted at stream start time.")


### Using Multiple Flows to Write to a Single Target


In [None]:
"""
Example: Multiple sources writing to one table
"""

# Create the target streaming table first
dp.create_streaming_table(
    name="sensor_all_regions",
    comment="Combined sensor data from all regions"
)

# Flow 1: US West data
@dp.append_flow(target="sensor_all_regions")
def append_us_west():
    return (
        dp.read_stream("sensor_silver")
        .filter(col("region") == "West")
    )

# Flow 2: US East data
@dp.append_flow(target="sensor_all_regions")
def append_us_east():
    return (
        dp.read_stream("sensor_silver")
        .filter(col("region") == "East")
    )

# Flow 3: International data
@dp.append_flow(target="sensor_all_regions")
def append_international():
    return (
        dp.read_stream("sensor_silver")
        .filter(~col("region").isin(["West", "East"]))
    )

print("""
✅ Multiple Flows Pattern:

Benefits:
- Add new sources without full refresh
- Process regions independently
- Better parallelization
- Easier to maintain

Each flow has its own checkpoint - isolated failure handling!
""")

# Reference: https://docs.databricks.com/aws/en/ldp/flows


## 6. Building the Gold Layer <a id="gold"></a>

### Gold Layer: Materialized Views for Aggregations

Use **materialized views** for aggregations that need to update (not just append).

### Materialized View: Factory KPIs


In [None]:
"""
Save as gold/factory_kpis.py
"""

from pyspark.sql.functions import avg, max, count, countDistinct, round as spark_round

@dp.materialized_view(
    name="factory_kpis_gold",
    comment="Factory-level KPIs and performance metrics"
)
def factory_kpis_gold():
    """
    Aggregate factory-level metrics.
    Uses materialized view so aggregations update when new data arrives.
    """
    return (
        spark.read.table("sensor_enriched")  # Read from silver
        .groupBy("factory_id", "factory_name", "region", "city")
        .agg(
            countDistinct("device_id").alias("total_devices"),
            count("*").alias("total_readings"),
            spark_round(avg("temperature"), 2).alias("avg_temperature"),
            spark_round(max("temperature"), 2).alias("max_temperature"),
            spark_round(avg("risk_score"), 2).alias("avg_risk_score"),
            count(when(col("temperature_zone") == "Critical", 1)).alias("critical_readings")
        )
    )

# SQL Equivalent:
sql_mv = """
CREATE OR REFRESH MATERIALIZED VIEW factory_kpis_gold
COMMENT 'Factory-level KPIs and performance metrics'
AS SELECT
  factory_id,
  factory_name,
  region,
  city,
  COUNT(DISTINCT device_id) as total_devices,
  COUNT(*) as total_readings,
  ROUND(AVG(temperature), 2) as avg_temperature,
  MAX(temperature) as max_temperature,
  ROUND(AVG(risk_score), 2) as avg_risk_score,
  COUNT(CASE WHEN temperature_zone = 'Critical' THEN 1 END) as critical_readings
FROM sensor_enriched
GROUP BY factory_id, factory_name, region, city;
"""

print("Materialized views automatically recompute when source data changes!")


### Materialized View: Device Health Metrics


In [None]:
"""
Save as gold/device_health.py
"""

from pyspark.sql.functions import avg, count, stddev, round as spark_round

@dp.materialized_view(
    name="device_health_gold",
    comment="Device-level health metrics and status"
)
def device_health_gold():
    """
    Calculate per-device health metrics.
    Updates automatically as new sensor data arrives.
    """
    return (
        spark.read.table("sensor_enriched")
        .groupBy("device_id", "factory_name", "model_name", "model_category")
        .agg(
            count("*").alias("reading_count"),
            spark_round(avg("temperature"), 2).alias("avg_temperature"),
            spark_round(stddev("temperature"), 2).alias("temp_stddev"),
            spark_round(avg("risk_score"), 2).alias("avg_risk_score"),
            count(when(col("temperature_zone") == "Critical", 1)).alias("critical_count")
        )
        .withColumn("health_status",
                   when(col("avg_risk_score") > 0.7, "Poor")
                   .when(col("avg_risk_score") > 0.5, "Fair")
                   .otherwise("Good"))
    )

print("Materialized view handles incremental aggregation updates automatically!")


## 7. Data Quality Expectations <a id="expectations"></a>

### Three Types of Expectations

**1. `expect` (Track Violations):**
- Logs failed records but doesn't drop them
- Good for monitoring non-critical issues


In [None]:
@dp.table(
    name="sensor_with_tracking",
    expect={
        "valid_temperature": "temperature IS NOT NULL"  # Track but don't drop
    }
)
def sensor_with_tracking():
    return dp.read_stream("sensor_bronze")

print("expect: Violations are logged in metrics but rows pass through")

# ---

print("\n2. expect_or_drop (Drop Invalid Rows):")
print("- Drops rows that fail validation")
print("- Good for critical data quality rules")

sensor_strict = """
@dp.table(
    expect_or_drop={
        "required_fields": "device_id IS NOT NULL AND timestamp IS NOT NULL",
        "valid_values": "air_pressure > 0 AND temperature > -50"
    }
)
def sensor_strict():
    return dp.read_stream("sensor_bronze")
"""

print("\nexpect_or_drop: Invalid rows are dropped, violations tracked")

# ---

print("\n3. expect_or_fail (Stop Pipeline on Violations):")
print("- Pipeline fails if expectations not met")
print("- Good for critical data pipelines")

sensor_critical = """
@dp.table(
    expect_or_fail={
        "no_nulls": "device_id IS NOT NULL AND timestamp IS NOT NULL"
    }
)
def sensor_critical():
    return dp.read_stream("sensor_bronze")
"""

print("\nexpect_or_fail: Pipeline stops if violations occur")


### Complete Example with Expectations


In [None]:
"""
Comprehensive example with multiple expectation types
"""

@dp.table(
    name="sensor_silver_validated",
    comment="Fully validated sensor data with comprehensive quality checks",
    # Track these issues (don't drop)
    expect={
        "valid_device": "device_id IS NOT NULL",
        "has_timestamp": "timestamp IS NOT NULL",
        "reasonable_temp": "temperature BETWEEN -50 AND 150"
    },
    # Drop rows with critical failures
    expect_or_drop={
        "positive_pressure": "air_pressure > 0",
        "positive_rotation": "rotation_speed >= 0"
    },
    # Fail pipeline if too many critical issues
    expect_or_fail={
        "sufficient_data_quality": "air_pressure > 0"
    }
)
def sensor_silver_validated():
    """Silver table with comprehensive data quality checks."""
    return (
        dp.read_stream("sensor_bronze")
        .withColumn("air_pressure",
                   when(col("air_pressure") < 0, abs_func(col("air_pressure")))
                   .otherwise(col("air_pressure")))
    )

print("""
✅ Multi-level Data Quality:

expect: Track but don't block (monitoring)
expect_or_drop: Drop bad rows (cleaning)
expect_or_fail: Stop pipeline (critical issues)

All expectations are tracked in pipeline metrics!
""")


## 8. Complete Pipeline Example <a id="complete-example"></a>

### Full IoT Pipeline with All Layers

Here's a complete pipeline definition you can use as a template. Save this as a pipeline in the multi-file editor:


In [None]:
"""
Complete pipeline: iot_pipeline.py
Run this in the Lakeflow Pipelines Editor
"""

from pyspark import pipelines as dp
from pyspark.sql.functions import col, when, abs as abs_func, round as spark_round, current_timestamp

# ========================================
# BRONZE LAYER - Data Ingestion
# ========================================

@dp.table(
    name="sensor_bronze",
    comment="Raw sensor data ingested from volumes"
)
def sensor_bronze():
    """Ingest sensor CSV files incrementally with Auto Loader."""
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("header", "true")
        .load("/Volumes/default/db_crash_course/sensor_data/")
    )

@dp.table(
    name="inspection_bronze",
    comment="Raw inspection data from volumes"
)
def inspection_bronze():
    """Ingest inspection CSV files incrementally."""
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("header", "true")
        .load("/Volumes/default/db_crash_course/inspection_data/")
    )

# ========================================
# SILVER LAYER - Cleaning & Validation
# ========================================

@dp.table(
    name="sensor_silver",
    comment="Cleaned and validated sensor data",
    expect={
        "valid_device_id": "device_id IS NOT NULL",
        "valid_timestamp": "timestamp IS NOT NULL"
    },
    expect_or_drop={
        "valid_temperature": "temperature BETWEEN -50 AND 150",
        "positive_pressure": "air_pressure > 0"
    }
)
def sensor_silver():
    """Clean sensor data and apply quality checks."""
    return (
        dp.read_stream("sensor_bronze")
        .withColumn("air_pressure",
                   when(col("air_pressure") < 0, abs_func(col("air_pressure")))
                   .otherwise(col("air_pressure")))
        .withColumn("processed_at", current_timestamp())
    )

print("Complete pipeline structure defined declaratively!")


### Continue: Silver Layer Enrichment


In [None]:
# ========================================
# SILVER LAYER - Enrichment
# ========================================

@dp.table(
    name="sensor_enriched",
    comment="Sensor data enriched with dimensions and business metrics"
)
def sensor_enriched():
    """
    Enrich with dimensions and add calculated fields.
    Uses stream-snapshot joins.
    """
    # Stream from silver
    sensors = dp.read_stream("sensor_silver")
    
    # Snapshot dimensions
    factories = spark.read.table("default.db_crash_course.dim_factories")
    models = spark.read.table("default.db_crash_course.dim_models")
    devices = spark.read.table("default.db_crash_course.dim_devices")
    
    # Join and calculate
    enriched = (
        sensors
        .join(devices, "device_id", "left")
        .join(factories, "factory_id", "left")
        .join(models, "model_id", "left")
    )
    
    return (
        enriched
        .withColumn("temperature_celsius",
                   spark_round((col("temperature") - 32) * 5 / 9, 2))
        .withColumn("temperature_zone",
                   when(col("temperature") > 85, "Critical")
                   .when(col("temperature") > 75, "Warning")
                   .otherwise("Normal"))
        .withColumn("risk_score",
                   spark_round(
                       (col("temperature") / 100 * 0.4) +
                       (col("rotation_speed") / 1000 * 0.3) +
                       (col("density") / 5 * 0.3),
                       2
                   ))
    )

print("Silver layer: Cleaned, validated, and enriched!")


### Continue: Gold Layer Materialized Views


In [None]:
# ========================================
# GOLD LAYER - Business Aggregations
# ========================================

@dp.materialized_view(
    name="factory_kpis_gold",
    comment="Factory-level KPIs for dashboards"
)
def factory_kpis_gold():
    """Aggregate metrics by factory."""
    from pyspark.sql.functions import avg, count, countDistinct
    
    return (
        spark.read.table("sensor_enriched")
        .groupBy("factory_id", "factory_name", "region")
        .agg(
            countDistinct("device_id").alias("total_devices"),
            count("*").alias("total_readings"),
            spark_round(avg("temperature"), 2).alias("avg_temperature"),
            spark_round(avg("risk_score"), 2).alias("avg_risk_score"),
            count(when(col("temperature_zone") == "Critical", 1)).alias("critical_readings")
        )
    )

@dp.materialized_view(
    name="device_health_gold",
    comment="Device health metrics and status"
)
def device_health_gold():
    """Per-device health aggregations."""
    from pyspark.sql.functions import avg, count, stddev
    
    return (
        spark.read.table("sensor_enriched")
        .groupBy("device_id", "factory_name", "model_name")
        .agg(
            count("*").alias("reading_count"),
            spark_round(avg("temperature"), 2).alias("avg_temperature"),
            spark_round(avg("risk_score"), 2).alias("avg_risk_score")
        )
        .withColumn("health_status",
                   when(col("avg_risk_score") > 0.7, "Poor")
                   .when(col("avg_risk_score") > 0.5, "Fair")
                   .otherwise("Good"))
    )

print("Gold layer: Materialized views for business metrics!")


### How to Deploy This Pipeline

**Step 1: Create Pipeline in UI**

1. Click **New** → **ETL Pipeline**
2. Name: `iot_sensor_pipeline`
3. Set catalog: `default`
4. Set schema: `db_crash_course`

**Step 2: Add the Code**

Option A: Single File
- Paste the complete code above into the main pipeline file

Option B: Multi-File (Recommended)
- Create `bronze/sensor_ingest.py` - Bronze tables
- Create `silver/sensor_clean.py` - Silver tables
- Create `gold/aggregations.py` - Gold views
- Import functions as needed

**Step 3: Configure Pipeline Settings**

- **Target**: Catalog and schema where tables are created
- **Cluster**: Compute resources (auto-scaling recommended)
- **Continuous vs Triggered**: 
  - Continuous: Always running, low latency
  - Triggered: Run on schedule or manual trigger

**Step 4: Start the Pipeline**

1. Click **Start** in the pipeline editor
2. Monitor progress in the pipeline graph
3. View data quality metrics
4. Inspect data at each stage

**Step 5: Monitor and Maintain**

- View expectations dashboard for data quality
- Check pipeline event log for errors
- Review lineage graph
- Set up alerts for failures

**Reference:** [Multi-File Editor](https://docs.databricks.com/aws/en/ldp/multi-file-editor)


In [None]:
# ========================================
# SILVER LAYER - Enrichment (continued)
# ========================================

@dp.table(
    name="sensor_enriched",
    comment="Sensor data enriched with dimensions"
)
def sensor_enriched():
    """Enrich with dimensional data."""
    sensors = dp.read_stream("sensor_silver")
    
    # Read dimensions as snapshots
    factories = spark.read.table("default.db_crash_course.dim_factories")
    models = spark.read.table("default.db_crash_course.dim_models")
    
    enriched = (
        sensors
        .join(factories.select("factory_id", "factory_name", "region"), "factory_id", "left")
        .join(models.select("model_id", "model_name", "model_category"), "model_id", "left")
    )
    
    return enriched.withColumn(
        "risk_score",
        spark_round((col("temperature") / 100 * 0.4) + (col("rotation_speed") / 1000 * 0.6), 2)
    )

# ========================================
# GOLD LAYER - Materialized Views
# ========================================

@dp.materialized_view(name="factory_kpis_gold")
def factory_kpis_gold():
    """Factory-level aggregations."""
    from pyspark.sql.functions import avg, count, countDistinct
    
    return (
        spark.read.table("sensor_enriched")
        .groupBy("factory_name", "region")
        .agg(
            countDistinct("device_id").alias("device_count"),
            spark_round(avg("temperature"), 2).alias("avg_temp"),
            spark_round(avg("risk_score"), 2).alias("avg_risk")
        )
    )

print("✅ Complete pipeline definition - ready to deploy!")


### SQL Version: Complete Pipeline

You can also define the entire pipeline in SQL:


In [None]:
-- Complete SQL pipeline
-- Save as pipeline.sql

-- BRONZE: Ingest sensor data
CREATE OR REFRESH STREAMING TABLE sensor_bronze
AS SELECT * FROM STREAM read_files(
  '/Volumes/default/db_crash_course/sensor_data/',
  format => 'csv',
  header => 'true'
);

-- SILVER: Clean and validate
CREATE OR REFRESH STREAMING TABLE sensor_silver (
  CONSTRAINT valid_device EXPECT (device_id IS NOT NULL),
  CONSTRAINT valid_temp EXPECT (temperature BETWEEN -50 AND 150),
  CONSTRAINT positive_pressure EXPECT OR DROP (air_pressure > 0)
)
AS SELECT
  device_id,
  factory_id,
  model_id,
  timestamp,
  CASE WHEN air_pressure < 0 THEN ABS(air_pressure) ELSE air_pressure END as air_pressure,
  temperature,
  rotation_speed,
  density,
  current_timestamp() as processed_at
FROM STREAM(sensor_bronze);

-- SILVER: Enrich
CREATE OR REFRESH STREAMING TABLE sensor_enriched
AS SELECT
  s.*,
  f.factory_name,
  f.region,
  m.model_name,
  m.model_category,
  ROUND((s.temperature - 32) * 5 / 9, 2) as temperature_celsius,
  CASE
    WHEN s.temperature > 85 THEN 'Critical'
    WHEN s.temperature > 75 THEN 'Warning'
    ELSE 'Normal'
  END as temperature_zone
FROM STREAM(sensor_silver) s
LEFT JOIN default.db_crash_course.dim_factories f ON s.factory_id = f.factory_id
LEFT JOIN default.db_crash_course.dim_models m ON s.model_id = m.model_id;

-- GOLD: Factory aggregations
CREATE OR REFRESH MATERIALIZED VIEW factory_kpis_gold
AS SELECT
  factory_name,
  region,
  COUNT(DISTINCT device_id) as device_count,
  ROUND(AVG(temperature), 2) as avg_temperature,
  COUNT(CASE WHEN temperature_zone = 'Critical' THEN 1 END) as critical_readings
FROM sensor_enriched
GROUP BY factory_name, region;

sql_pipeline = """SQL pipeline: Concise, declarative, production-ready!"""
print(sql_pipeline)


## Advanced Patterns

### Pattern 1: Backfilling Historical Data


In [None]:
# Use ONCE to insert historical data one time
@dp.append_flow(
    target="sensor_silver",
    once=True  # Runs only once unless full refresh
)
def backfill_historical():
    """Backfill historical sensor data from archive."""
    return (
        spark.read
        .format("delta")
        .load("/Volumes/default/db_crash_course/archive/historical_sensors/")
    )

print("""
✅ Backfill Pattern:

- ONCE flag: Runs one time only
- Won't reprocess on incremental updates
- Perfect for loading historical data
- Combines with streaming for complete dataset
""")


### Pattern 2: Change Data Capture (CDC)


In [None]:
# Handle CDC data automatically
@dp.table(
    name="devices_current"
)
def devices_current():
    """
    Apply CDC changes to maintain current state.
    Auto CDC handles INSERT, UPDATE, DELETE operations.
    """
    return (
        dp.read_stream("devices_cdc")
        .apply_changes(
            keys=["device_id"],
            sequence_by="update_timestamp",
            stored_as_scd_type="1"  # Type 1: Keep current state only
        )
    )

# Or with streaming CDC source:
devices_scd2 = """
@dp.table
def devices_historical():
    return (
        dp.read_stream("devices_cdc")
        .apply_changes(
            keys=["device_id"],
            sequence_by="update_timestamp",
            stored_as_scd_type="2"  # Type 2: Keep full history
        )
    )
"""

print("""
✅ CDC Pattern:

SCD Type 1: Keep current state (overwrites)
SCD Type 2: Keep full history (versioning)

Framework handles:
- Deduplication
- Ordering by sequence
- Merge operations
- History tracking (Type 2)
""")


### Pattern 3: Watermarks for Stateful Streaming


In [None]:
# Use watermarks for time-windowed aggregations
@dp.table(name="sensor_hourly_aggregates")
def sensor_hourly_aggregates():
    """
    Hourly aggregations with watermark for handling late data.
    Watermark: Wait up to 2 hours for late-arriving data.
    """
    from pyspark.sql.functions import window, avg, count
    
    return (
        dp.read_stream("sensor_silver")
        .withWatermark("timestamp", "2 hours")  # Handle late data up to 2 hours
        .groupBy(
            window(col("timestamp"), "1 hour"),  # 1-hour windows
            "factory_id"
        )
        .agg(
            count("*").alias("reading_count"),
            avg("temperature").alias("avg_temperature"),
            avg("rotation_speed").alias("avg_rotation_speed")
        )
    )

print("""
✅ Watermark Pattern:

- Handles late-arriving data gracefully
- Prevents unbounded state growth
- Required for windowed aggregations
- Balances latency vs completeness

Example: withWatermark("timestamp", "2 hours")
Waits up to 2 hours for late data before finalizing windows.
""")

# Reference: https://docs.databricks.com/aws/en/ldp/streaming-tables


## Summary

In this notebook, you learned:

✅ **Declarative Pipelines** - Define what, not how  
✅ **Multi-file editor** - Organize code into modules  
✅ **Streaming tables** - Incremental, append-only processing  
✅ **Materialized views** - Auto-updating aggregations  
✅ **Flows** - Multiple sources to one target  
✅ **Expectations** - Declarative data quality  
✅ **Advanced patterns** - Backfilling, CDC, watermarks  

### Key Takeaways:

1. **Declarative > Imperative**: Focus on transformations, not orchestration
2. **Streaming tables** for bronze/silver (append-only)
3. **Materialized views** for gold (aggregations that update)
4. **Expectations** provide automatic data quality tracking
5. **Multi-file editor** enables better code organization
6. **Flows** allow multiple sources to write to one target
7. **Framework handles**: Checkpointing, recovery, lineage, monitoring

### Streaming Tables vs Materialized Views

| Use Case | Use This |
|----------|----------|
| Ingest files | Streaming table |
| Clean/filter | Streaming table |
| Append-only transforms | Streaming table |
| Aggregations | Materialized view |
| Joins with updates | Materialized view |
| Always-correct views | Materialized view |

### Production Benefits:

**Automatic:**
- Incremental processing
- Checkpoint management  
- Schema evolution
- Data quality tracking
- Lineage visualization
- Error recovery

**Built-in:**
- Monitoring dashboard
- Expectations metrics
- Event logs
- Performance insights

### Creating Your First Pipeline:

1. **UI: New → ETL Pipeline**
2. **Choose Python or SQL**
3. **Define tables/views** with decorators
4. **Add expectations** for quality
5. **Start pipeline** and monitor
6. **Iterate** based on metrics

### Best Practices:

**Organization:**
- Use multi-file editor for large pipelines
- Separate bronze/silver/gold into different files
- Create reusable helper functions
- Document expectations and business logic

**Performance:**
- Use streaming tables for append-only data
- Use materialized views for aggregations
- Add watermarks for windowed operations
- Partition appropriately (framework optimizes automatically)

**Quality:**
- Start with `expect` to understand data
- Add `expect_or_drop` for known issues
- Use `expect_or_fail` for critical rules
- Monitor expectations dashboard

**Operations:**
- Use continuous mode for real-time
- Use triggered mode for batch
- Set up alerts for failures
- Review event logs regularly

### Next Steps:

- Create your first declarative pipeline for the IoT dataset
- Explore the pipeline graph visualization
- Review expectations dashboard after running
- Try different expectation strategies
- Add CDC for slowly changing dimensions
- Implement windowed aggregations with watermarks

---

**Additional Resources:**
- [Lakeflow Pipelines Documentation](https://docs.databricks.com/aws/en/ldp/)
- [Multi-File Editor](https://docs.databricks.com/aws/en/ldp/multi-file-editor)
- [Streaming Tables](https://docs.databricks.com/aws/en/ldp/streaming-tables)
- [Materialized Views](https://docs.databricks.com/aws/en/ldp/materialized-views)
- [Flows](https://docs.databricks.com/aws/en/ldp/flows)
