# 🔄 Delta Live Tables: Production ETL Pipeline
*Build reliable, maintainable data pipelines with declarative ETL*

---

## 🎯 Learning Objectives

By the end of this demo, you'll understand:
- ✅ **Delta Live Tables (DLT)** concepts and benefits
- ✅ **Declarative pipeline** development approach
- ✅ **Data quality expectations** and monitoring
- ✅ **Production deployment** best practices

---

## 🏗️ What We'll Build

**Production F1 ETL Pipeline:**
```
🔄 DLT Pipeline:
├── 📊 Live Tables (batch processing)
├── 🌊 Streaming Tables (real-time ingestion)
├── ✅ Data Quality Expectations
├── 📈 Automatic Lineage Tracking
└── 🔍 Pipeline Monitoring Dashboard
```

### 💡 DLT Pipeline Creation:
1. **Navigate** to Workflows → Delta Live Tables
2. **Create** new pipeline with F1 transformation logic
3. **Define** data quality expectations
4. **Configure** compute and scheduling
5. **Monitor** pipeline execution and lineage

### 🎯 Key Benefits:
- **Automatic dependency resolution** (no manual ordering)
- **Built-in data quality** monitoring and quarantine
- **Schema evolution** handling
- **Live monitoring** and alerting
- **Cost optimization** with auto-scaling

**Continue to the next notebook:** `06_AI_Agent_Bricks.ipynb`

**🏁 Ready to explore AI-powered applications? Let's dive into intelligent F1 analytics! 🤖**

# 🔄 Delta Live Tables: Managed ETL Pipeline
*Build declarative, production-ready data pipelines in 5 minutes*

---

## 🎯 Learning Objectives

By the end of this demo, you'll understand:
- ✅ **Delta Live Tables (DLT) fundamentals** and decorators
- ✅ **Data quality expectations** with built-in monitoring
- ✅ **Managed pipeline execution** and dependency management
- ✅ **DLT vs Jobs comparison** and when to use each

---

## 🏗️ What We'll Build

**Managed F1 Data Pipeline with DLT:**
```
📁 Volume Files           🥉 DLT Bronze              🥈 DLT Silver              🥇 DLT Gold
┌─────────────────┐      ┌─────────────────┐       ┌─────────────────┐       ┌─────────────────┐
│ drivers.csv     │  →   │ dlt_bronze_     │   →   │ dlt_silver_     │   →   │ dlt_gold_driver_│
│ results.csv     │      │ drivers         │       │ drivers_clean   │       │ stats           │
└─────────────────┘      │ dlt_bronze_     │       │ dlt_silver_     │       │ dlt_gold_top_   │
                         │ results         │       │ results_clean   │       │ performers      │
                         └─────────────────┘       └─────────────────┘       └─────────────────┘
```

**🔥 Key Features:**
- ⚡ **Declarative syntax** - focus on what, not how
- 🎯 **Data quality expectations** - automatic monitoring
- 🔄 **Automatic dependency management** - smart execution order
- 📊 **Built-in monitoring** - pipeline health and lineage

## 📋 Delta Live Tables Overview

**Delta Live Tables (DLT)** is Databricks' framework for building reliable, maintainable, and testable data processing pipelines.

### 🌟 Key Benefits:

#### 🎯 **Declarative Development**
- Write **what** you want, not **how** to compute it
- Automatic dependency resolution and execution order
- Focus on business logic, not infrastructure

#### 🔍 **Built-in Data Quality**
- **Expectations** define data quality rules
- **Quarantine** bad data instead of failing pipelines  
- **Monitoring** with automatic quality metrics

#### ⚡ **Managed Operations**
- **Auto-scaling** compute based on workload
- **Error recovery** and automatic retries
- **Incremental processing** for efficiency

#### 📊 **Observability**
- **Lineage tracking** shows data flow
- **Performance metrics** and optimization suggestions
- **Data freshness** monitoring

In [None]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

## 🥉 Bronze Layer: Raw Data Ingestion

Bronze tables in DLT ingest raw data with minimal transformation.

In [None]:
@dlt.table(
    name="dlt_bronze_drivers",
    comment="DLT Bronze: Raw driver data from Volume CSV files",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_drivers():
    """
    Ingest raw driver data from Volume CSV files.
    DLT automatically handles schema inference and evolution.
    """
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", "/Volumes/main/default/f1_raw_data/dlt_schema/drivers/")
        .option("header", "true")
        .load("/Volumes/main/default/f1_raw_data/drivers.csv")
        .select(
            col("driverId").cast("int").alias("driver_id"),
            col("driverRef").alias("driver_ref"),
            col("number").alias("car_number"),
            col("code").alias("driver_code"),
            col("forename").alias("first_name"),
            col("surname").alias("last_name"),
            col("dob").alias("date_of_birth"),
            col("nationality"),
            col("url").alias("wiki_url"),
            current_timestamp().alias("ingested_at"),
            lit("dlt_bronze_pipeline").alias("ingestion_method")
        )
    )

In [None]:
@dlt.table(
    name="dlt_bronze_results",
    comment="DLT Bronze: Raw race results data from Volume CSV files",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def bronze_results():
    """
    Ingest raw race results data from Volume CSV files.
    Includes all original columns for complete data preservation.
    """
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", "/Volumes/main/default/f1_raw_data/dlt_schema/results/")
        .option("header", "true")
        .load("/Volumes/main/default/f1_raw_data/results.csv")
        .select(
            col("resultId").cast("int").alias("result_id"),
            col("raceId").cast("int").alias("race_id"),
            col("driverId").cast("int").alias("driver_id"),
            col("constructorId").cast("int").alias("constructor_id"),
            col("number").alias("car_number"),
            col("grid").cast("int").alias("grid_position"),
            col("position").alias("finish_position_text"),
            col("positionOrder").cast("int").alias("position_order"),
            col("points").cast("double").alias("points_scored"),
            col("laps").cast("int").alias("laps_completed"),
            col("time").alias("race_time"),
            col("milliseconds").alias("race_time_ms"),
            col("statusId").cast("int").alias("status_id"),
            current_timestamp().alias("ingested_at"),
            lit("dlt_bronze_pipeline").alias("ingestion_method")
        )
    )

## 🥈 Silver Layer: Clean and Validated Data

Silver tables implement data quality expectations and transformations.

In [None]:
@dlt.table(
    name="dlt_silver_drivers_clean",
    comment="DLT Silver: Cleaned and validated driver data with quality expectations"
)
@dlt.expect_or_drop("valid_driver_id", "driver_id IS NOT NULL")
@dlt.expect_or_drop("valid_names", "first_name IS NOT NULL AND last_name IS NOT NULL")
@dlt.expect("reasonable_birth_date", "date_of_birth >= '1900-01-01' AND date_of_birth <= current_date()")
def silver_drivers_clean():
    """
    Clean and validate driver data with data quality expectations.
    - Drop records with missing critical fields
    - Warn on suspicious birth dates
    - Enrich with calculated fields
    """
    return (
        dlt.read("dlt_bronze_drivers")
        .filter(col("driver_id").isNotNull())
        .select(
            col("driver_id"),
            col("driver_ref"),
            col("car_number"),
            col("driver_code"),
            col("first_name"),
            col("last_name"),
            concat(col("first_name"), lit(" "), col("last_name")).alias("full_name"),
            # Clean and convert date of birth
            when(col("date_of_birth") != "\\N", 
                 to_date(col("date_of_birth"), "yyyy-MM-dd")
            ).alias("birth_date"),
            col("nationality"),
            # Calculate current age
            when(col("date_of_birth") != "\\N",
                 floor(datediff(current_date(), to_date(col("date_of_birth"), "yyyy-MM-dd")) / 365)
            ).alias("current_age"),
            col("wiki_url"),
            col("ingested_at"),
            current_timestamp().alias("processed_at"),
            lit("silver_quality_processed").alias("processing_stage")
        )
    )

In [None]:
@dlt.table(
    name="dlt_silver_results_clean",
    comment="DLT Silver: Cleaned race results with data quality validations"
)
@dlt.expect_or_drop("valid_result_id", "result_id IS NOT NULL")
@dlt.expect_or_drop("valid_race_and_driver", "race_id IS NOT NULL AND driver_id IS NOT NULL")
@dlt.expect("valid_points", "points_scored >= 0")
@dlt.expect("reasonable_laps", "laps_completed >= 0 AND laps_completed <= 200")
def silver_results_clean():
    """
    Clean and validate race results with comprehensive quality checks.
    - Ensure critical IDs are present
    - Validate points and lap counts
    - Convert position data to proper types
    """
    return (
        dlt.read("dlt_bronze_results")
        .filter(col("result_id").isNotNull())
        .select(
            col("result_id"),
            col("race_id"),
            col("driver_id"),
            col("constructor_id"),
            col("car_number"),
            col("grid_position"),
            # Clean finish position - handle DNF, DNS, etc.
            when(col("finish_position_text").rlike("^[0-9]+$"), 
                 col("finish_position_text").cast("int")
            ).alias("finish_position"),
            col("finish_position_text"),
            col("position_order"),
            col("points_scored"),
            col("laps_completed"),
            col("race_time"),
            # Convert milliseconds if numeric
            when(col("race_time_ms").rlike("^[0-9]+$"),
                 col("race_time_ms").cast("bigint")
            ).alias("race_duration_ms"),
            col("status_id"),
            # Add derived fields
            when(col("points_scored") > 0, true).otherwise(false).alias("scored_points"),
            when(col("finish_position_text") == "1", true).otherwise(false).alias("race_winner"),
            when(col("finish_position_text").isin("1", "2", "3"), true).otherwise(false).alias("podium_finish"),
            col("ingested_at"),
            current_timestamp().alias("processed_at"),
            lit("silver_quality_processed").alias("processing_stage")
        )
    )

## 🥇 Gold Layer: Analytics-Ready Aggregations

Gold tables provide business-ready analytics with complex aggregations.

In [None]:
@dlt.table(
    name="dlt_gold_driver_stats",
    comment="DLT Gold: Comprehensive driver career statistics and performance metrics"
)
@dlt.expect("drivers_with_races", "total_races > 0")
def gold_driver_stats():
    """
    Calculate comprehensive driver career statistics.
    Aggregates from clean silver tables to create analytics-ready data.
    """
    drivers = dlt.read("dlt_silver_drivers_clean")
    results = dlt.read("dlt_silver_results_clean")
    
    return (
        drivers.alias("d")
        .join(results.alias("r"), col("d.driver_id") == col("r.driver_id"), "inner")
        .groupBy(
            col("d.driver_id"),
            col("d.full_name"),
            col("d.nationality"),
            col("d.current_age"),
            col("d.birth_date")
        )
        .agg(
            count("r.result_id").alias("total_races"),
            sum("r.points_scored").alias("career_points"),
            sum(when(col("r.race_winner"), 1).otherwise(0)).alias("wins"),
            sum(when(col("r.podium_finish"), 1).otherwise(0)).alias("podiums"),
            sum(when(col("r.scored_points"), 1).otherwise(0)).alias("points_finishes"),
            avg("r.finish_position").alias("avg_finish_position"),
            min("r.finish_position").alias("best_finish"),
            max("r.finish_position").alias("worst_finish"),
            sum("r.laps_completed").alias("total_laps"),
            # Performance ratios
            round(sum("r.points_scored") / count("r.result_id"), 2).alias("points_per_race"),
            round(sum(when(col("r.race_winner"), 1).otherwise(0)) * 100.0 / count("r.result_id"), 2).alias("win_percentage"),
            round(sum(when(col("r.podium_finish"), 1).otherwise(0)) * 100.0 / count("r.result_id"), 2).alias("podium_percentage"),
            # Data lineage
            current_timestamp().alias("calculated_at"),
            lit("dlt_gold_aggregation").alias("calculation_method")
        )
        .filter(col("total_races") >= 1)  # Only drivers with actual race participation
    )

In [None]:
@dlt.table(
    name="dlt_gold_top_performers",
    comment="DLT Gold: Top performing drivers across different performance categories"
)
@dlt.expect("performance_categories", "performance_tier IS NOT NULL")
def gold_top_performers():
    """
    Create performance tiers and identify top performers.
    Builds on driver stats to create business-friendly categorizations.
    """
    return (
        dlt.read("dlt_gold_driver_stats")
        .select(
            col("driver_id"),
            col("full_name"),
            col("nationality"),
            col("total_races"),
            col("career_points"),
            col("wins"),
            col("podiums"),
            col("points_per_race"),
            col("win_percentage"),
            col("podium_percentage"),
            # Create performance tiers
            when(col("wins") >= 20, "F1 Legend")
            .when(col("wins") >= 5, "Race Winner")
            .when(col("podiums") >= 10, "Podium Regular")
            .when(col("career_points") >= 100, "Points Scorer")
            .when(col("total_races") >= 20, "Veteran")
            .otherwise("Rookie").alias("performance_tier"),
            # Excellence indicators
            when(col("win_percentage") >= 25, "Elite Winner")
            .when(col("podium_percentage") >= 50, "Consistent Podium")
            .when(col("points_per_race") >= 5, "Strong Performer")
            .otherwise("Developing").alias("consistency_rating"),
            # Experience categorization
            when(col("total_races") >= 200, "Ultra Veteran")
            .when(col("total_races") >= 100, "Veteran")
            .when(col("total_races") >= 50, "Experienced")
            .otherwise("Newcomer").alias("experience_level"),
            col("calculated_at"),
            current_timestamp().alias("categorized_at")
        )
        .filter(col("total_races") >= 5)  # Focus on drivers with meaningful careers
    )

## 📊 Data Quality Expectations Explained

DLT provides powerful data quality features through **expectations**:

### 🎯 Expectation Types:

#### 1. `@dlt.expect()`
```python
@dlt.expect("reasonable_birth_date", "date_of_birth >= '1900-01-01'")
```
- **Behavior:** Records violation but continues processing
- **Use case:** Data quality monitoring and alerts
- **Result:** Violating records included in output with quality metrics tracked

#### 2. `@dlt.expect_or_drop()`
```python
@dlt.expect_or_drop("valid_driver_id", "driver_id IS NOT NULL")
```
- **Behavior:** Drops records that fail the expectation
- **Use case:** Critical data quality requirements
- **Result:** Only valid records in output table

#### 3. `@dlt.expect_or_fail()`
```python
@dlt.expect_or_fail("critical_data", "COUNT(*) > 0")
```
- **Behavior:** Stops pipeline execution if expectation fails
- **Use case:** Critical business rules that cannot be violated
- **Result:** Pipeline failure with clear error message

### 📈 Quality Monitoring:
- **Automatic dashboards** show data quality trends
- **Alerting** when quality degrades
- **Historical tracking** of data quality over time

## 🚀 DLT Pipeline Creation Guide

### 📋 How to Create a DLT Pipeline:

#### 1. Navigate to Delta Live Tables 🔄
- Click **"Workflows"** in the left sidebar
- Click **"Delta Live Tables"** tab
- Click **"Create Pipeline"** button

#### 2. Configure Pipeline Settings ⚙️
```
Pipeline Name: "F1 Data Pipeline with DLT"
Description: "Managed ETL pipeline for Formula 1 analytics with data quality"
```

#### 3. Source Configuration 📝
- **Source Type:** `Notebook`
- **Notebook Path:** Select this notebook (`05_Delta_Live_Pipeline.ipynb`)
- **Source:** Your workspace location

#### 4. Target Configuration 🎯
```
Target Catalog: main
Target Schema: default
Storage Location: Managed (Unity Catalog)
```

#### 5. Compute Configuration ⚡
```
Cluster Mode: Serverless (recommended)
Min Workers: 1
Max Workers: 5 (auto-scaling)
```

#### 6. Advanced Settings 🎛️
- **Pipeline Mode:** `Triggered` (manual) or `Continuous` (streaming)
- **Channel:** `Current` (latest features)
- **Edition:** `Advanced` (for expectations and monitoring)

## 🔄 DLT vs Jobs: When to Use Each

### 🏗️ **Use Delta Live Tables When:**

✅ **Complex ETL with dependencies**
- Multiple transformation layers (Bronze → Silver → Gold)
- Automatic dependency resolution needed
- Schema evolution and data quality critical

✅ **Data quality is paramount**
- Need built-in expectations and monitoring
- Automatic quarantine of bad data
- Quality metrics and alerting required

✅ **Streaming and incremental processing**
- Near real-time data processing
- Change data capture (CDC) patterns
- Efficient incremental updates

✅ **Team collaboration on pipelines**
- Declarative code is easier to understand
- Built-in lineage and documentation
- Standardized patterns across teams

### ⚙️ **Use Jobs When:**

✅ **Simple, scheduled tasks**
- Single notebook execution
- Basic data refresh operations
- Notification and reporting workflows

✅ **Custom orchestration logic**
- Complex conditional workflows
- Integration with external systems
- Custom retry and error handling

✅ **Ad-hoc or exploratory processing**
- One-time data migration
- Experimental data processing
- Quick fixes and patches

### 📊 **Feature Comparison:**

| **Feature** | **Delta Live Tables** | **Jobs** |
|-------------|----------------------|----------|
| **Dependency Management** | ✅ Automatic | ⚙️ Manual |
| **Data Quality** | ✅ Built-in expectations | ⚙️ Custom code |
| **Streaming** | ✅ Native support | ⚙️ Structured streaming |
| **Monitoring** | ✅ Automatic dashboards | ⚙️ Custom monitoring |
| **Cost** | 💰 DLT premium | 💰 Standard compute |
| **Flexibility** | 🎯 Declarative patterns | 🔧 Full control |

## 📈 Advanced DLT Features

### 🔄 **Change Data Capture (CDC)**
```python
@dlt.table
def customers_cdc():
    return dlt.read_stream("customers_raw").apply_changes(
        keys=["customer_id"],
        sequence_by="update_timestamp",
        apply_as_deletes=expr("operation = 'DELETE'"),
        except_column_list=["operation", "update_timestamp"]
    )
```

### 📊 **Pipeline Dependencies**
```python
# Automatic dependency resolution
@dlt.table
def downstream_table():
    return dlt.read("upstream_table_1").join(dlt.read("upstream_table_2"))
```

### 🎯 **Custom Expectations**
```python
@dlt.expect_or_fail("freshness_check", "max(update_time) > current_timestamp() - interval 1 day")
def time_sensitive_data():
    return dlt.read("source_data")
```

## ✅ Delta Live Tables Complete!

**🎉 Outstanding! You've mastered Delta Live Tables fundamentals!**

### What You've Accomplished:
- ✅ **Built DLT pipeline** with Bronze, Silver, and Gold layers
- ✅ **Implemented data quality expectations** for automatic monitoring
- ✅ **Used declarative transformations** with automatic dependencies
- ✅ **Learned DLT vs Jobs** comparison and use cases
- ✅ **Explored advanced features** (CDC, streaming, quality monitoring)

### 🏗️ Your DLT Pipeline Architecture:
```
📁 Volume CSV Files
    ↓ (Auto Loader)
🥉 DLT Bronze Tables (Raw ingestion)
    ↓ (Quality expectations)
🥈 DLT Silver Tables (Clean & validated)  
    ↓ (Business aggregations)
🥇 DLT Gold Tables (Analytics ready)
```

### 📊 Tables Created:
- **Bronze:** `dlt_bronze_drivers`, `dlt_bronze_results`
- **Silver:** `dlt_silver_drivers_clean`, `dlt_silver_results_clean`  
- **Gold:** `dlt_gold_driver_stats`, `dlt_gold_top_performers`

In [None]:
# Let's show what DLT tables would be created (this is descriptive since DLT runs in pipeline mode)
print("🔄 Delta Live Tables Pipeline Summary")
print("=" * 45)

dlt_tables = [
    "🥉 dlt_bronze_drivers - Raw driver data ingestion",
    "🥉 dlt_bronze_results - Raw results data ingestion", 
    "🥈 dlt_silver_drivers_clean - Validated driver data",
    "🥈 dlt_silver_results_clean - Validated results data",
    "🥇 dlt_gold_driver_stats - Driver career statistics",
    "🥇 dlt_gold_top_performers - Performance categorization"
]

for table in dlt_tables:
    print(f"{table}")

print(f"\n📊 Data Quality: Built-in expectations and monitoring")
print(f"🔄 Dependencies: Automatic resolution and execution order")
print(f"⚡ Compute: Serverless managed infrastructure")

## 🚀 Next Steps

Ready to explore AI-powered features and advanced analytics?

### Immediate Actions:
1. **🔄 Create Your DLT Pipeline:**
   - Go to Workflows → Delta Live Tables → Create Pipeline
   - Use this notebook as the source
   - Configure with Serverless compute

2. **📊 Monitor Pipeline Execution:**
   - Watch automatic dependency resolution
   - Check data quality expectation results
   - Explore generated lineage graphs

3. **➡️ Next Notebook:** [06_AI_Agent_Bricks.ipynb](06_AI_Agent_Bricks.ipynb)
   - Explore AI Agents and intelligent applications
   - Build F1 Q&A chatbots with your data

### 🎯 Best Practices Checklist:
- ✅ **Start simple** with basic Bronze → Silver → Gold
- ✅ **Add expectations gradually** as you understand your data
- ✅ **Use descriptive table names** for clarity
- ✅ **Document transformations** with comments
- ✅ **Monitor data quality** trends over time
- ✅ **Test expectations** before production deployment

### 💡 Pro Tips:
- **🔧 Start with `@dlt.expect()`** to understand data patterns
- **📊 Use DLT dashboards** for quality monitoring
- **⚡ Leverage Serverless** for cost-effective execution
- **🔄 Design for incremental processing** from day one

**🔄 Your data pipelines are now production-ready! 🚀**