# Delta Lake Pipeline Example
This notebook demonstrates a complete data pipeline using Delta Lake with Bronze, Silver, and Gold layers.

## 1. Setup and Configuration

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from delta.tables import DeltaTable
import os

In [None]:
# Define MinIO settings from environment variables
MINIO_HOST = os.environ.get("MINIO_HOST")
AWS_ACCESS_KEY_ID = "minio"
AWS_SECRET_ACCESS_KEY = "password"

print(f"MinIO Host: {MINIO_HOST}")
print(f"AWS Access Key: {AWS_ACCESS_KEY_ID}")

## 2. Create Spark Session

In [None]:
# Get Spark Master URL from environment or use default
SPARK_MASTER_URL = os.environ.get("SPARK_MASTER_URL", "spark://spark-master-service:7077")

# Get pod IP for Spark driver host
import socket
DRIVER_HOST = socket.gethostbyname(socket.gethostname())

print(f"Spark Master URL: {SPARK_MASTER_URL}")
print(f"Driver Host: {DRIVER_HOST}")

In [None]:
# Create Spark Session with Delta Lake and MinIO configurations
spark = SparkSession.builder \
    .appName("Delta Lake Pipeline") \
    .master(SPARK_MASTER_URL) \
    .config("spark.driver.host", DRIVER_HOST) \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.driver.port", "7077") \
    .config("spark.blockManager.port", "7078") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_HOST) \
    .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
    .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.executorEnv.AWS_ACCESS_KEY_ID", AWS_ACCESS_KEY_ID) \
    .config("spark.executorEnv.AWS_SECRET_ACCESS_KEY", AWS_SECRET_ACCESS_KEY) \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:3.2.0") \
    .getOrCreate()

print(f"✓ Connected to Spark Master: {SPARK_MASTER_URL}")
print(f"✓ Spark Version: {spark.version}")

In [None]:
# Base path for the Delta tables in the MinIO bucket
base_path = "s3a://datalakehouse/deltalake/"
print(f"Base path: {base_path}")

## 3. Bronze Layer - Ingest Raw Data

In [None]:
# Simulate new raw data coming in as a Spark DataFrame
print("--- Ingesting raw data to Bronze layer ---")
raw_data = spark.createDataFrame(
    [
        (1, "Alice", "New York", "2025-01-01"),
        (2, "Bob", "Los Angeles", "2025-01-02"),
        (3, "Charlie", "Chicago", "2025-01-03")
    ],
    ["id", "name", "location", "timestamp"]
)

raw_data.show()

In [None]:
# Write to Bronze layer
bronze_path = f"{base_path}bronze/users/"
print(f"Writing to: {bronze_path}")

raw_data.write.format("delta").mode("overwrite").save(bronze_path)
print("✓ Bronze table created/updated.")

## 4. Silver Layer - Clean and Enrich Data

In [None]:
# Read the Bronze table
print("--- Processing data for Silver layer ---")
bronze_df = spark.read.format("delta").load(bronze_path)

print("Bronze data:")
bronze_df.show()

In [None]:
# Perform a simple cleaning/enrichment step (remove location column)
silver_df = bronze_df.withColumn("name", col("name").alias("user_name")).drop("location")

print("Silver data (cleaned):")
silver_df.show()

In [None]:
# Write to Silver layer using MERGE for upserts
silver_path = f"{base_path}silver/users/"
print(f"Writing to: {silver_path}")

if DeltaTable.isDeltaTable(spark, silver_path):
    delta_table = DeltaTable.forPath(spark, silver_path)
    delta_table.alias("target") \
        .merge(silver_df.alias("source"), "target.id = source.id") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
    print("✓ Silver table updated with a MERGE operation.")
else:
    silver_df.write.format("delta").mode("overwrite").save(silver_path)
    print("✓ Silver table created.")

## 5. Gold Layer - Aggregate for Analytics

In [None]:
# Read the Silver table
print("--- Aggregating data for Gold layer ---")
silver_df = spark.read.format("delta").load(silver_path)

# Perform a simple aggregation
gold_df = silver_df.groupBy("name").count().alias("user_count")

print("Gold data (aggregated):")
gold_df.show()

In [None]:
# Write to Gold layer
gold_path = f"{base_path}gold/user_summary/"
print(f"Writing to: {gold_path}")

gold_df.write.format("delta").mode("overwrite").save(gold_path)
print("✓ Gold table created/updated.")

## 6. Time Travel - Query Historical Data

In [None]:
# View table history
print("--- Reading Gold table with Time Travel ---")
gold_table = DeltaTable.forPath(spark, gold_path)
gold_history = gold_table.history()

print("Table history:")
gold_history.show(truncate=False)

In [None]:
# Read the latest version
print("Reading latest gold table:")
spark.read.format("delta").load(gold_path).show()

In [None]:
# Read a previous version (if history exists)
if gold_history.count() > 1:
    old_version = gold_history.collect()[1]["version"]
    print(f"Reading an older version ({old_version}) of the gold table:")
    spark.read.format("delta").option("versionAsOf", old_version).load(gold_path).show()
else:
    print("No previous versions available yet. Run this notebook again to see time travel in action!")

## 7. Cleanup

In [None]:
# Stop Spark session
spark.stop()
print("✓ Spark session stopped.")