<div style="display: flex; justify-content: space-between; align-items: center; padding: 8px 16px; background: #F8F9FA; border-bottom: 2px solid #E0E0E0; margin: 0; line-height: 1;">
    <div style="font-size: 14px; color: #666;">
        <span style="font-weight: bold; color: #333;">{SOURCE_PLATFORM} → Databricks Migration</span>
        <span style="margin-left: 8px; color: #999;">|</span>
        <span style="margin-left: 8px;">03 - Execute</span>
    </div>
    <div style="display: flex; align-items: center; gap: 8px;">
        <img src="https://cdn.simpleicons.org/snowflake/29B5E8" width="24" height="24"/>
        <span style="color: #999; font-size: 16px;">→</span>
        <img src="https://cdn.simpleicons.org/databricks/FF3621" width="24" height="24"/>
    </div>
</div>


<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img
    src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png"
    alt="Databricks Learning"
  >
</div>

# Pipeline and Orchestration

## Overview

After migrating data and converting SQL, you must rebuild your data pipelines and scheduling in Databricks. This lesson covers Lakeflow Jobs, Spark Declarative Pipelines, and integration with external orchestrators for end-to-end workflow automation.

## Learning Objectives

By the end of this lesson, you will be able to:
- Map {SOURCE_PLATFORM} scheduling concepts to Databricks orchestration
- Build multi-task workflows with Lakeflow Jobs
- Create declarative data pipelines with Spark Declarative Pipelines (DLT)
- Integrate with external orchestrators (Airflow, Azure Data Factory)
- Implement monitoring, alerting, and error handling

## Databricks Orchestration Options

Databricks provides multiple orchestration tools depending on your needs.

<br />
<div class="mermaid">
flowchart TB
    subgraph LAKEFLOW["Lakeflow"]
        JOBS["<b>Lakeflow Jobs</b><br/><i>Workflow orchestration</i>"]
        CONNECT["<b>Lakeflow Connect</b><br/><i>Managed ingestion</i>"]
    end
    subgraph PIPELINES["Data Pipelines"]
        DLT["<b>Spark Declarative Pipelines</b><br/><i>Declarative ETL</i>"]
        SS["<b>Structured Streaming</b><br/><i>Real-time processing</i>"]
    end
    subgraph EXTERNAL["External Orchestrators"]
        AIRFLOW["<b>Apache Airflow</b><br/><i>DAG-based orchestration</i>"]
        ADF["<b>Azure Data Factory</b><br/><i>Azure-native</i>"]
        OTHER["<b>Other</b><br/><i>Prefect, Dagster, etc.</i>"]
    end
    JOBS --> DLT
    JOBS --> SS
    JOBS --> CONNECT
    EXTERNAL --> JOBS
    style LAKEFLOW fill:#e8f5e9,stroke:#4caf50
    style PIPELINES fill:#e3f2fd,stroke:#1976d2
    style EXTERNAL fill:#fff3e0,stroke:#ff9800
</div>
<script type="module"> import mermaid from "https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.esm.min.mjs"; mermaid.initialize({ startOnLoad: true, theme: "default" }); </script>

### Tool Comparison

| Tool | Best For | Complexity |
|------|----------|------------|
| **Lakeflow Jobs** | General workflow orchestration | Low |
| **Spark Declarative Pipelines** | Declarative ETL with quality controls | Medium |
| **Structured Streaming** | Real-time event processing | Medium-High |
| **Apache Airflow** | Complex DAGs, cross-platform orchestration | High |
| **Azure Data Factory** | Azure-native, hybrid workflows | Medium |

## Lakeflow Jobs

Lakeflow Jobs is the primary orchestration tool in Databricks, supporting multi-task workflows, dependencies, retries, and scheduling.

<br />
<div class="mermaid">
flowchart LR
    subgraph JOB["Lakeflow Job"]
        T1["<b>Task 1</b><br/><i>Extract</i>"]
        T2["<b>Task 2</b><br/><i>Transform</i>"]
        T3["<b>Task 3</b><br/><i>Load</i>"]
        T4["<b>Task 4</b><br/><i>Validate</i>"]
    end
    T1 --> T2 --> T3 --> T4
    style JOB fill:#fff,stroke:#FF3621,stroke-width:2px
    style T1 fill:#e3f2fd,stroke:#1976d2
    style T2 fill:#fff3e0,stroke:#ff9800
    style T3 fill:#e8f5e9,stroke:#4caf50
    style T4 fill:#e8f5e9,stroke:#4caf50
</div>
<script type="module"> import mermaid from "https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.esm.min.mjs"; mermaid.initialize({ startOnLoad: true, theme: "default" }); </script>

### Job Features

| Feature | Description |
|---------|-------------|
| **Task types** | Notebooks, Python scripts, SQL, JARs, DLT pipelines |
| **Dependencies** | Sequential, parallel, conditional execution |
| **Triggers** | Cron schedule, file arrival, continuous |
| **Parameters** | Pass values between tasks, use widgets |
| **Clusters** | Job clusters (ephemeral) or all-purpose (shared) |
| **Retries** | Automatic retry on failure with backoff |

### Creating a Lakeflow Job

<div class="code-block" data-language="python">
# Using Databricks SDK to create a job programmatically
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, JobCluster

w = WorkspaceClient()

# Create job with multiple tasks
job = w.jobs.create(
    name="ETL_Pipeline_Migration",
    tasks=[
        Task(
            task_key="extract",
            notebook_task=NotebookTask(
                notebook_path="/Repos/team/project/01_extract",
                base_parameters={"source_table": "customers"}
            ),
            job_cluster_key="etl_cluster"
        ),
        Task(
            task_key="transform",
            depends_on=[{"task_key": "extract"}],
            notebook_task=NotebookTask(
                notebook_path="/Repos/team/project/02_transform"
            ),
            job_cluster_key="etl_cluster"
        ),
        Task(
            task_key="load",
            depends_on=[{"task_key": "transform"}],
            notebook_task=NotebookTask(
                notebook_path="/Repos/team/project/03_load"
            ),
            job_cluster_key="etl_cluster"
        )
    ],
    job_clusters=[
        JobCluster(
            job_cluster_key="etl_cluster",
            new_cluster={
                "spark_version": "14.3.x-scala2.12",
                "node_type_id": "i3.xlarge",
                "num_workers": 2
            }
        )
    ],
    schedule={
        "quartz_cron_expression": "0 0 6 * * ?",  # Daily at 6 AM
        "timezone_id": "America/New_York"
    }
)

print(f"Created job: {job.job_id}")
</div>

<link href="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/themes/prism.min.css" rel="stylesheet" />
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-python.min.js"></script>

<script>
(function() {
    document.querySelectorAll('.code-block').forEach(function(block) {
        var lang = block.getAttribute('data-language') || 'python';
        var code = block.textContent.trim();
        var id = 'code-' + Math.random().toString(36).substr(2, 9);
        
        block.innerHTML = 
            '<div style="position:relative;margin:16px 0;">' +
                '<button class="copy-btn" style="position:absolute;top:8px;right:8px;padding:4px 12px;font-size:12px;background:#ddd;color:#333;border:1px solid #ccc;border-radius:4px;cursor:pointer;z-index:10;">Copy</button>' +
                '<pre style="background:#f8f8f8;border-radius:8px;padding:16px;padding-top:40px;overflow-x:auto;margin:0;border:1px solid #e0e0e0;"><code id="' + id + '" class="language-' + lang + '" style="font-family:Consolas,Monaco,monospace;font-size:14px;"></code></pre>' +
            '</div>';
        
        var codeEl = document.getElementById(id);
        codeEl.textContent = code;
        Prism.highlightElement(codeEl);
        
        block.querySelector('.copy-btn').onclick = function() {
            var t = document.createElement('textarea');
            t.value = code;
            document.body.appendChild(t);
            t.select();
            document.execCommand('copy');
            document.body.removeChild(t);
            this.textContent = '✓ Copied!';
            setTimeout(() => this.textContent = 'Copy', 2000);
        };
    });
})();
</script>

## Spark Declarative Pipelines (DLT)

Spark Declarative Pipelines provides declarative ETL with built-in data quality controls, automatic dependency management, and Delta Lake optimization.

<br />
<div class="mermaid">
flowchart LR
    subgraph DLT["Spark Declarative Pipeline"]
        RAW["<b>Bronze</b><br/><i>@dlt.table</i>"]
        CLEAN["<b>Silver</b><br/><i>@dlt.table</i><br/><i>expectations</i>"]
        AGG["<b>Gold</b><br/><i>@dlt.table</i>"]
    end
    SOURCE["<b>Source</b><br/><i>Files, Streams</i>"] --> RAW
    RAW --> CLEAN --> AGG
    AGG --> CONSUME["<b>Consumers</b><br/><i>BI, ML, Apps</i>"]
    style DLT fill:#fff,stroke:#FF3621,stroke-width:2px
    style RAW fill:#cd7f32,stroke:#8b4513,color:#fff
    style CLEAN fill:#c0c0c0,stroke:#808080,color:#000
    style AGG fill:#ffd700,stroke:#b8860b,color:#000
</div>
<script type="module"> import mermaid from "https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.esm.min.mjs"; mermaid.initialize({ startOnLoad: true, theme: "default" }); </script>

### DLT Features

| Feature | Description |
|---------|-------------|
| **Expectations** | Data quality rules with actions (warn, drop, fail) |
| **Auto-dependency** | Automatic lineage and execution order |
| **Incremental** | Efficient processing of new/changed data |
| **CDC support** | `APPLY CHANGES INTO` for SCD Type 1 and 2 |
| **Monitoring** | Built-in data quality metrics and lineage |

### DLT Pipeline Example

<div class="code-block" data-language="python">
import dlt
from pyspark.sql.functions import *

# Bronze: Raw ingestion
@dlt.table(
    name="bronze_orders",
    comment="Raw orders from source system"
)
def bronze_orders():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .load("/landing/orders/")
    )

# Silver: Cleansed with expectations
@dlt.table(
    name="silver_orders",
    comment="Cleansed orders with quality rules"
)
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_drop("valid_date", "order_date IS NOT NULL")
def silver_orders():
    return (
        dlt.read_stream("bronze_orders")
        .select(
            col("order_id"),
            col("customer_id"),
            col("order_date").cast("date"),
            col("amount").cast("decimal(18,2)"),
            current_timestamp().alias("processed_at")
        )
    )

# Gold: Business aggregation
@dlt.table(
    name="gold_daily_sales",
    comment="Daily sales summary"
)
def gold_daily_sales():
    return (
        dlt.read("silver_orders")
        .groupBy("order_date")
        .agg(
            count("*").alias("order_count"),
            sum("amount").alias("total_amount")
        )
    )
</div>

<link href="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/themes/prism.min.css" rel="stylesheet" />
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-python.min.js"></script>

<script>
(function() {
    document.querySelectorAll('.code-block').forEach(function(block) {
        var lang = block.getAttribute('data-language') || 'python';
        var code = block.textContent.trim();
        var id = 'code-' + Math.random().toString(36).substr(2, 9);
        
        block.innerHTML = 
            '<div style="position:relative;margin:16px 0;">' +
                '<button class="copy-btn" style="position:absolute;top:8px;right:8px;padding:4px 12px;font-size:12px;background:#ddd;color:#333;border:1px solid #ccc;border-radius:4px;cursor:pointer;z-index:10;">Copy</button>' +
                '<pre style="background:#f8f8f8;border-radius:8px;padding:16px;padding-top:40px;overflow-x:auto;margin:0;border:1px solid #e0e0e0;"><code id="' + id + '" class="language-' + lang + '" style="font-family:Consolas,Monaco,monospace;font-size:14px;"></code></pre>' +
            '</div>';
        
        var codeEl = document.getElementById(id);
        codeEl.textContent = code;
        Prism.highlightElement(codeEl);
        
        block.querySelector('.copy-btn').onclick = function() {
            var t = document.createElement('textarea');
            t.value = code;
            document.body.appendChild(t);
            t.select();
            document.execCommand('copy');
            document.body.removeChild(t);
            this.textContent = '✓ Copied!';
            setTimeout(() => this.textContent = 'Copy', 2000);
        };
    });
})();
</script>

## External Orchestrator Integration

If you use external orchestrators like Apache Airflow or Azure Data Factory, integrate them with Databricks using APIs or native connectors.

<br />
<div class="mermaid">
flowchart TB
    subgraph EXTERNAL["External Orchestrator"]
        SCHEDULE["<b>Schedule</b><br/><i>Cron, Event</i>"]
        DAG["<b>DAG / Pipeline</b><br/><i>Tasks, Dependencies</i>"]
    end
    subgraph DATABRICKS["Databricks"]
        API["<b>Jobs API</b><br/><i>REST / SDK</i>"]
        JOBS["<b>Lakeflow Jobs</b>"]
        DLT["<b>DLT Pipelines</b>"]
    end
    SCHEDULE --> DAG
    DAG -->|"Trigger"| API
    API --> JOBS
    API --> DLT
    style EXTERNAL fill:#fff3e0,stroke:#ff9800
    style DATABRICKS fill:#e8f5e9,stroke:#4caf50
</div>
<script type="module"> import mermaid from "https://cdn.jsdelivr.net/npm/mermaid@10/dist/mermaid.esm.min.mjs"; mermaid.initialize({ startOnLoad: true, theme: "default" }); </script>

### Integration Options

| Orchestrator | Integration Method |
|--------------|--------------------|
| **Apache Airflow** | `DatabricksRunNowOperator`, `DatabricksSubmitRunOperator` |
| **Azure Data Factory** | Databricks Linked Service, Notebook Activity |
| **AWS Step Functions** | Lambda + Databricks SDK |
| **Prefect / Dagster** | Databricks SDK blocks |

### Airflow Integration Example

<div class="code-block" data-language="python">
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from datetime import datetime

with DAG(
    dag_id="etl_migration_pipeline",
    schedule_interval="0 6 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    
    # Trigger existing Databricks job
    run_etl_job = DatabricksRunNowOperator(
        task_id="run_etl_job",
        databricks_conn_id="databricks_default",
        job_id=12345,  # Lakeflow Job ID
        notebook_params={
            "run_date": "{{ ds }}",
            "source_schema": "bronze"
        }
    )
    
    # Chain with other tasks
    run_etl_job
</div>

<link href="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/themes/prism.min.css" rel="stylesheet" />
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-python.min.js"></script>

<script>
(function() {
    document.querySelectorAll('.code-block').forEach(function(block) {
        var lang = block.getAttribute('data-language') || 'python';
        var code = block.textContent.trim();
        var id = 'code-' + Math.random().toString(36).substr(2, 9);
        
        block.innerHTML = 
            '<div style="position:relative;margin:16px 0;">' +
                '<button class="copy-btn" style="position:absolute;top:8px;right:8px;padding:4px 12px;font-size:12px;background:#ddd;color:#333;border:1px solid #ccc;border-radius:4px;cursor:pointer;z-index:10;">Copy</button>' +
                '<pre style="background:#f8f8f8;border-radius:8px;padding:16px;padding-top:40px;overflow-x:auto;margin:0;border:1px solid #e0e0e0;"><code id="' + id + '" class="language-' + lang + '" style="font-family:Consolas,Monaco,monospace;font-size:14px;"></code></pre>' +
            '</div>';
        
        var codeEl = document.getElementById(id);
        codeEl.textContent = code;
        Prism.highlightElement(codeEl);
        
        block.querySelector('.copy-btn').onclick = function() {
            var t = document.createElement('textarea');
            t.value = code;
            document.body.appendChild(t);
            t.select();
            document.execCommand('copy');
            document.body.removeChild(t);
            this.textContent = '✓ Copied!';
            setTimeout(() => this.textContent = 'Copy', 2000);
        };
    });
})();
</script>

## Monitoring and Alerting

Implement comprehensive monitoring for migrated pipelines to ensure reliability.

### Monitoring Options

| Component | Tool | Metrics |
|-----------|------|--------|
| **Job runs** | Lakeflow Jobs UI | Duration, status, failures |
| **DLT quality** | DLT Event Log | Expectation pass/fail rates |
| **Cluster metrics** | Ganglia, Spark UI | CPU, memory, shuffle |
| **Custom alerts** | Databricks Alerts | SQL-based conditions |
| **External** | Datadog, Splunk | Unified observability |

### Alert Configuration

<div class="code-block" data-language="sql">
-- Create alert for job failures
-- (Configure via Databricks SQL Alerts UI)

-- Query to detect failed jobs in last hour
SELECT 
    job_id,
    run_id,
    state.result_state,
    state.state_message,
    end_time
FROM system.lakeflow.job_runs
WHERE state.result_state = 'FAILED'
  AND end_time > current_timestamp() - INTERVAL 1 HOUR
ORDER BY end_time DESC;
</div>

<link href="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/themes/prism.min.css" rel="stylesheet" />
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-sql.min.js"></script>

<script>
(function() {
    document.querySelectorAll('.code-block').forEach(function(block) {
        var lang = block.getAttribute('data-language') || 'sql';
        var code = block.textContent.trim();
        var id = 'code-' + Math.random().toString(36).substr(2, 9);
        
        block.innerHTML = 
            '<div style="position:relative;margin:16px 0;">' +
                '<button class="copy-btn" style="position:absolute;top:8px;right:8px;padding:4px 12px;font-size:12px;background:#ddd;color:#333;border:1px solid #ccc;border-radius:4px;cursor:pointer;z-index:10;">Copy</button>' +
                '<pre style="background:#f8f8f8;border-radius:8px;padding:16px;padding-top:40px;overflow-x:auto;margin:0;border:1px solid #e0e0e0;"><code id="' + id + '" class="language-' + lang + '" style="font-family:Consolas,Monaco,monospace;font-size:14px;"></code></pre>' +
            '</div>';
        
        var codeEl = document.getElementById(id);
        codeEl.textContent = code;
        Prism.highlightElement(codeEl);
        
        block.querySelector('.copy-btn').onclick = function() {
            var t = document.createElement('textarea');
            t.value = code;
            document.body.appendChild(t);
            t.select();
            document.execCommand('copy');
            document.body.removeChild(t);
            this.textContent = '✓ Copied!';
            setTimeout(() => this.textContent = 'Copy', 2000);
        };
    });
})();
</script>

## Migration Pattern: {SOURCE_PLATFORM} to Databricks

Map your existing {SOURCE_PLATFORM} scheduling concepts to Databricks equivalents.

### Concept Mapping

| {SOURCE_PLATFORM} Concept | Databricks Equivalent |
|---------------------------|----------------------|
| Scheduled Task | Lakeflow Job with cron trigger |
| Stored Procedure | Notebook task or SQL Procedure |
| Task Dependencies | Job task dependencies |
| Streams/Change Tracking | Auto Loader, CDC |
| Alerts | Databricks Alerts |

### Migration Steps

1. **Inventory** - Catalog all scheduled jobs and dependencies
2. **Convert logic** - Port SQL/procedures (see [3.4 - SQL and Code Conversion]($./3.4 - SQL and Code Conversion))
3. **Create jobs** - Build Lakeflow Jobs or DLT pipelines
4. **Configure triggers** - Set schedules matching source
5. **Enable monitoring** - Set up alerts and dashboards
6. **Test end-to-end** - Validate outputs match

## Summary

### Orchestration Checklist

- [ ] Inventory all source scheduled jobs and dependencies
- [ ] Choose orchestration tool (Lakeflow Jobs, DLT, external)
- [ ] Convert job logic to notebooks/pipelines
- [ ] Configure triggers and schedules
- [ ] Set up monitoring and alerting
- [ ] Test in parallel with source before cutover
- [ ] Document runbooks for operations

### Key Principles

| Principle | Why It Matters |
|-----------|----------------|
| **Use Lakeflow Jobs for orchestration** | Native integration, monitoring, retries |
| **Adopt DLT for ETL** | Declarative, quality controls, auto-optimization |
| **Monitor proactively** | Catch failures before business impact |
| **Test thoroughly** | Run parallel before decommissioning source |

### Next Steps

With pipelines migrated, proceed to cutover planning:

- [**Module 04 - Cutover**]($../04 - Cutover/4.1 - Cutover Planning Overview) - Transition planning and execution

<div style="color: #FF3621; font-weight: bold; font-size: 2em; margin-bottom: 12px;">COURSE DEVELOPER (remove before publishing)</div>

### Template Customization

**Placeholders to replace:**
- `{SOURCE_PLATFORM}` - Source platform name (Snowflake, BigQuery, Redshift, Teradata)

**Platform-specific additions required:**
- Add platform-specific scheduling concept mapping (e.g., Snowflake Tasks, dbt Cloud)
- Include platform-specific CDC/streaming patterns
- Add platform-specific monitoring equivalents
- Document any native connectors available

&copy; 2026 Databricks, Inc. All rights reserved. Apache, Apache Spark, Spark, the Spark Logo, Apache Iceberg, Iceberg, and the Apache Iceberg logo are trademarks of the <a href="https://www.apache.org/" target="_blank">Apache Software Foundation</a>.<br/><br/><a href="https://databricks.com/privacy-policy" target="_blank">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use" target="_blank">Terms of Use</a> | <a href="https://help.databricks.com/" target="_blank">Support</a>
