# 03 - Delta Live Tables Pipeline with SFTP

This notebook implements a complete DLT pipeline that:
1. Reads data from source SFTP using AutoLoader
2. Processes and transforms data through bronze → silver → gold layers
3. Writes processed data back to target SFTP using custom SFTP data source

**Note:** This notebook should be run as a DLT pipeline in Databricks.

In [None]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import *
from ingest import SFTPDataSource

## Configuration

In [None]:
# Load configuration from config table
config_df = spark.table("sftp_demo.config.connection_params")
config_dict = {row.key: row.value for row in config_df.collect()}

# Source SFTP configuration
SOURCE_HOST = config_dict["source_host"]
SOURCE_USERNAME = config_dict["source_username"]
SOURCE_CONNECTION = "sftp_demo.source_sftp_connection"

# Target SFTP configuration
TARGET_HOST = config_dict["target_host"]
TARGET_USERNAME = config_dict["target_username"]
SSH_KEY_PATH = config_dict["ssh_key_path"]

target_sftp_config = {
    "host": TARGET_HOST,
    "username": TARGET_USERNAME,
    "private_key_path": SSH_KEY_PATH,
    "port": 22
}

## Bronze Layer: Raw Data Ingestion

Use AutoLoader to read CSV files from source SFTP.

In [None]:
@dlt.table(
    name="bronze_customers",
    comment="Raw customer data from source SFTP",
    table_properties={"quality": "bronze"}
)
def bronze_customers():
    """Ingest raw customer data from SFTP using AutoLoader"""
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.connectionName", SOURCE_CONNECTION)
        .option("header", "true")
        .option("inferSchema", "true")
        .load(f"sftp://{SOURCE_HOST}/customers.csv")
        .withColumn("ingestion_timestamp", F.current_timestamp())
        .withColumn("source_file", F.input_file_name())
    )

In [None]:
@dlt.table(
    name="bronze_orders",
    comment="Raw order data from source SFTP",
    table_properties={"quality": "bronze"}
)
def bronze_orders():
    """Ingest raw order data from SFTP using AutoLoader"""
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.connectionName", SOURCE_CONNECTION)
        .option("header", "true")
        .option("inferSchema", "true")
        .load(f"sftp://{SOURCE_HOST}/orders.csv")
        .withColumn("ingestion_timestamp", F.current_timestamp())
        .withColumn("source_file", F.input_file_name())
    )

## Silver Layer: Cleaned and Validated Data

Apply data quality rules and transformations.

In [None]:
@dlt.table(
    name="silver_customers",
    comment="Cleaned and validated customer data",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_email", "email IS NOT NULL AND email LIKE '%@%'")
@dlt.expect("valid_signup_date", "signup_date IS NOT NULL")
def silver_customers():
    """Clean and validate customer data"""
    return (
        dlt.read_stream("bronze_customers")
        .select(
            "customer_id",
            F.trim(F.col("name")).alias("name"),
            F.lower(F.trim(F.col("email"))).alias("email"),
            F.upper(F.trim(F.col("country"))).alias("country"),
            F.to_date(F.col("signup_date")).alias("signup_date"),
            F.current_timestamp().alias("processed_timestamp")
        )
        .dropDuplicates(["customer_id"])
    )

In [None]:
@dlt.table(
    name="silver_orders",
    comment="Cleaned and validated order data",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_drop("valid_quantity", "quantity > 0")
def silver_orders():
    """Clean and validate order data"""
    return (
        dlt.read_stream("bronze_orders")
        .select(
            "order_id",
            "customer_id",
            F.trim(F.col("product")).alias("product"),
            F.col("quantity").cast("int").alias("quantity"),
            F.col("amount").cast("decimal(10,2)").alias("amount"),
            F.to_date(F.col("order_date")).alias("order_date"),
            F.current_timestamp().alias("processed_timestamp")
        )
        .dropDuplicates(["order_id"])
    )

## Gold Layer: Business-Level Aggregations

Create enriched datasets for analytics.

In [None]:
@dlt.table(
    name="gold_customer_orders",
    comment="Enriched customer order data with aggregations",
    table_properties={"quality": "gold"}
)
def gold_customer_orders():
    """Create enriched customer order dataset"""
    customers = dlt.read_stream("silver_customers")
    orders = dlt.read_stream("silver_orders")
    
    return (
        orders
        .join(customers, "customer_id", "left")
        .select(
            "order_id",
            "customer_id",
            "name",
            "email",
            "country",
            "product",
            "quantity",
            "amount",
            "order_date",
            F.current_timestamp().alias("processed_timestamp")
        )
    )

In [None]:
@dlt.table(
    name="gold_customer_summary",
    comment="Customer summary metrics",
    table_properties={"quality": "gold"}
)
def gold_customer_summary():
    """Calculate customer summary metrics"""
    return (
        dlt.read_stream("gold_customer_orders")
        .groupBy("customer_id", "name", "email", "country")
        .agg(
            F.count("order_id").alias("total_orders"),
            F.sum("amount").alias("total_amount"),
            F.avg("amount").alias("avg_order_amount"),
            F.min("order_date").alias("first_order_date"),
            F.max("order_date").alias("last_order_date")
        )
        .withColumn("processed_timestamp", F.current_timestamp())
    )

## Write to Target SFTP

Use custom SFTP data source to write processed data back to target SFTP.

In [None]:
def write_to_sftp(df, remote_path):
    """
    Write DataFrame to target SFTP using custom data source
    
    Args:
        df: DataFrame to write
        remote_path: Remote file path on SFTP
    """
    # Convert to Pandas for writing (for batch processing)
    pdf = df.toPandas()
    
    # Write to SFTP
    writer = SFTPDataSource.create_writer(target_sftp_config)
    with writer.session():
        writer.write_dataframe(
            pdf,
            remote_path,
            format="csv",
            header=True
        )
    
    print(f"Data written to SFTP: {remote_path}")

In [None]:
@dlt.table(
    name="target_customer_summary",
    comment="Customer summary data for SFTP export",
    table_properties={"quality": "gold"}
)
def target_customer_summary():
    """Prepare customer summary for SFTP export"""
    return dlt.read("gold_customer_summary")

In [None]:
# Export gold layer data to target SFTP
# This would typically be triggered after the DLT pipeline completes

def export_to_sftp():
    """
    Export processed data to target SFTP
    Run this after DLT pipeline completes
    """
    # Read gold layer data
    customer_summary_df = spark.table("sftp_demo.gold.gold_customer_summary")
    customer_orders_df = spark.table("sftp_demo.gold.gold_customer_orders")
    
    # Write to target SFTP
    write_to_sftp(customer_summary_df, "/customer_summary.csv")
    write_to_sftp(customer_orders_df, "/customer_orders.csv")
    
    print("Export to target SFTP completed successfully")

# Uncomment to run export after pipeline
# export_to_sftp()

## Verification: Read from Target SFTP

Verify that data was successfully written to target SFTP.

In [None]:
def verify_target_sftp():
    """
    Verify data in target SFTP
    """
    writer = SFTPDataSource.create_writer(target_sftp_config)
    
    with writer.session():
        files = writer.list_files(".")
        print("Files in target SFTP:")
        for f in files:
            print(f"  - {f}")
    
    # Read back using AutoLoader to verify
    target_connection = "sftp_demo.target_sftp_connection"
    
    verification_df = (
        spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(f"sftp://{TARGET_HOST}/customer_summary.csv")
    )
    
    print("\nSample data from target SFTP:")
    verification_df.show(10)
    
    print(f"\nTotal records in target: {verification_df.count()}")

# Uncomment to run verification
# verify_target_sftp()

## Pipeline Execution Summary

To run this complete pipeline:

1. **Create DLT Pipeline in Databricks:**
   - Go to Delta Live Tables → Create Pipeline
   - Set notebook path to this notebook
   - Configure target catalog: `sftp_demo`
   - Set storage location for checkpoints
   - Enable AutoLoader for continuous ingestion

2. **Pipeline Configuration:**
   ```json
   {
     "name": "SFTP Data Pipeline",
     "storage": "/dbfs/sftp_demo/dlt",
     "target": "sftp_demo",
     "continuous": false,
     "development": true
   }
   ```

3. **Start Pipeline:**
   - Click "Start" to run the pipeline
   - Monitor data flow: Bronze → Silver → Gold
   - Check data quality metrics

4. **Export to Target SFTP:**
   - After pipeline completes, run `export_to_sftp()`
   - Verify with `verify_target_sftp()`

### Data Flow:
```
Source SFTP (customers.csv, orders.csv)
    ↓ AutoLoader
Bronze Layer (raw data)
    ↓ Cleaning & Validation
Silver Layer (validated data)
    ↓ Enrichment & Aggregation
Gold Layer (business metrics)
    ↓ Custom SFTP Data Source
Target SFTP (customer_summary.csv, customer_orders.csv)
```

### Success Criteria:
- ✓ Bronze tables contain all source records
- ✓ Silver tables have clean, validated data
- ✓ Gold tables contain enriched business metrics
- ✓ Data quality expectations passed
- ✓ Target SFTP contains exported files
- ✓ Data lineage visible in DLT UI