# Data Warehouse Delta Loader

This notebook is responsible for loading data into the **Delta Lake** tables in the data warehouse layer.

**Important**: Ensure that the Delta Lake tables are properly configured for downstream analytics and reporting.

In [None]:
# pip install -r requirements-delta.txt
# python -m ipykernel install --user --name=python3

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, dayofmonth, month, quarter, year, dayofweek, date_format, sum as spark_sum
from delta import *

jar_dir = "/path/to/your/jars"

# PYSPARK configuration for Azure Blob Storage integration with Delta Lake
os.environ['PYSPARK_SUBMIT_ARGS'] = (
    f"--jars {jar_dir}/hadoop-azure-3.3.6.jar,"
    f"{jar_dir}/azure-storage-8.6.6.jar,"
    f"{jar_dir}/jetty-client-9.4.43.v20210629.jar,"
    f"{jar_dir}/jetty-http-9.4.43.v20210629.jar,"
    f"{jar_dir}/jetty-io-9.4.43.v20210629.jar,"
    f"{jar_dir}/mysql-connector-j-9.3.0.jar,"
    f"{jar_dir}/jetty-util-9.4.43.v20210629.jar,"
    f"{jar_dir}/jetty-util-ajax-9.4.43.v20210629.jar "
    "--packages io.delta:delta-spark_2.12:3.0.0 "
    "pyspark-shell"
)

In [None]:
# Init spark session with Delta Lake support
spark = (SparkSession.builder
    .appName("DW Data Load - Delta Lake")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true")  # Allow schema evolution
    .config("spark.sql.parquet.enableVectorizedReader", "false")  # Disable vectorized reader to avoid type conflicts
    .getOrCreate())

print("Spark session with Delta Lake support initialized")

In [None]:
# MySQL connection for data warehouse
mysql_url = "jdbc:mysql://YOUR_MYSQL_HOST:3306/store_dw"
mysql_props = {
    "user": "YOUR_MYSQL_USERNAME",
    "password": "YOUR_MYSQL_PASSWORD",
    "driver": "com.mysql.cj.jdbc.Driver"
}

print("MySQL connection properties configured")

In [None]:
# Configure Azure storage access keys

# Silver access key (source: silver-delta)
spark.conf.set(
    "fs.azure.account.key.YOUR_SILVER_STORAGE_ACCOUNT.dfs.core.windows.net",
    "YOUR_AZURE_SILVER_STORAGE_KEY"
)

# Gold access key (target: gold-delta)
spark.conf.set(
    "fs.azure.account.key.YOUR_GOLD_STORAGE_ACCOUNT.dfs.core.windows.net",
    "YOUR_AZURE_GOLD_STORAGE_KEY"
)

print("Azure storage credentials configured")

In [None]:
# Define storage paths for Delta Lake
silver_delta_base = "abfss://silver-delta@YOUR_SILVER_STORAGE_ACCOUNT.dfs.core.windows.net"
gold_delta_base = "abfss://gold-delta@YOUR_GOLD_STORAGE_ACCOUNT.dfs.core.windows.net"

print(f"Source (Silver Delta): {silver_delta_base}")
print(f"Target (Gold Delta): {gold_delta_base}")
print("Building Data Warehouse with Delta Lake format...")

In [None]:
# 1. DimProduct - Delta Lake Version
print("Processing DimProduct...")

products = spark.read.format("delta").load(f"{silver_delta_base}/Products")
dim_product = products.select(
    col("ProductID"),
    col("Name").alias("ProductName"),
    col("CategoryID"),
    col("SellerID")
)

# Write to MySQL
dim_product.write.jdbc(mysql_url, "DimProduct", mode="overwrite", properties=mysql_props)

# Write to Delta Lake (gold-delta)
dim_product.write.format("delta").mode("overwrite").save(f"{gold_delta_base}/DimProduct")

record_count = dim_product.count()
print(f"DimProduct: {record_count} records written to MySQL and Delta Lake")

In [None]:
# 2. DimCategory - Delta Lake Version
print("Processing DimCategory...")

categories = spark.read.format("delta").load(f"{silver_delta_base}/ProductCategories")
dim_category = categories.select(
    col("CategoryID"),
    col("CategoryName")
)

# Write to MySQL
dim_category.write.jdbc(mysql_url, "DimCategory", mode="overwrite", properties=mysql_props)

# Write to Delta Lake (gold-delta)
dim_category.write.format("delta").mode("overwrite").save(f"{gold_delta_base}/DimCategory")

record_count = dim_category.count()
print(f"DimCategory: {record_count} records written to MySQL and Delta Lake")

In [None]:
# 3. DimSeller - Delta Lake Version
print("📋 Processing DimSeller...")

sellers = spark.read.format("delta").load(f"{silver_delta_base}/Sellers")
dim_seller = sellers.select(
    col("SellerID"),
    col("Name").alias("SellerName")
)

# Write to MySQL
dim_seller.write.jdbc(mysql_url, "DimSeller", mode="overwrite", properties=mysql_props)

# Write to Delta Lake (gold-delta)
dim_seller.write.format("delta").mode("overwrite").save(f"{gold_delta_base}/DimSeller")

record_count = dim_seller.count()
print(f"DimSeller: {record_count} records written to MySQL and Delta Lake")

In [None]:
# 4. DimCustomer - Delta Lake Version
print("Processing DimCustomer...")

customers = spark.read.format("delta").load(f"{silver_delta_base}/Customers")
dim_customer = customers.select(
    col("CustomerID"),
    col("Name").alias("CustomerName")
)

# Write to MySQL
dim_customer.write.jdbc(mysql_url, "DimCustomer", mode="overwrite", properties=mysql_props)

# Write to Delta Lake (gold-delta)
dim_customer.write.format("delta").mode("overwrite").save(f"{gold_delta_base}/DimCustomer")

record_count = dim_customer.count()
print(f"DimCustomer: {record_count} records written to MySQL and Delta Lake")

In [None]:
# 5. DimOrderStatus - Delta Lake Version
print("Processing DimOrderStatus...")

order_status = spark.read.format("delta").load(f"{silver_delta_base}/OrderStatus")
dim_order_status = order_status.select(
    col("StatusID"),
    col("StatusName")
)

# Write to MySQL
dim_order_status.write.jdbc(mysql_url, "DimOrderStatus", mode="overwrite", properties=mysql_props)

# Write to Delta Lake (gold-delta)
dim_order_status.write.format("delta").mode("overwrite").save(f"{gold_delta_base}/DimOrderStatus")

record_count = dim_order_status.count()
print(f"DimOrderStatus: {record_count} records written to MySQL and Delta Lake")

In [None]:
# 6. DimDate - Delta Lake Version (from Orders)
print("Processing DimDate...")

orders = spark.read.format("delta").load(f"{silver_delta_base}/Orders")
dim_date = orders.select(
    date_format(col("CreatedAt"), "yyyyMMdd").cast("int").alias("DateKey"),
    col("CreatedAt").cast("date").alias("Date"),
    dayofmonth(col("CreatedAt")).alias("Day"),
    month(col("CreatedAt")).alias("Month"),
    quarter(col("CreatedAt")).alias("Quarter"),
    year(col("CreatedAt")).alias("Year"),
    dayofweek(col("CreatedAt")).alias("DayOfWeek")
).distinct()

# Write to MySQL
dim_date.write.jdbc(mysql_url, "DimDate", mode="overwrite", properties=mysql_props)

# Write to Delta Lake (gold-delta)
dim_date.write.format("delta").mode("overwrite").save(f"{gold_delta_base}/DimDate")

record_count = dim_date.count()
print(f"DimDate: {record_count} records written to MySQL and Delta Lake")

In [None]:
# 7. DimReason - Delta Lake Version
print("Processing DimReason...")

reasons = spark.read.format("delta").load(f"{silver_delta_base}/Reasons")
dim_reason = reasons.select(
    col("ReasonID"),
    col("ReasonType"),
    col("ReasonDescription")
)

# Write to MySQL
dim_reason.write.jdbc(mysql_url, "DimReason", mode="overwrite", properties=mysql_props)

# Write to Delta Lake (gold-delta)
dim_reason.write.format("delta").mode("overwrite").save(f"{gold_delta_base}/DimReason")

record_count = dim_reason.count()
print(f"  ✅ DimReason: {record_count} records written to MySQL and Delta Lake")

In [None]:
# 8. FactSales - Delta Lake Version
print("📋 Processing FactSales...")

# Read all required tables from Delta Lake
order_items = spark.read.format("delta").load(f"{silver_delta_base}/OrderItems").alias("oi")
orders = spark.read.format("delta").load(f"{silver_delta_base}/Orders").alias("o")
payments = spark.read.format("delta").load(f"{silver_delta_base}/Payments").alias("p")
products = spark.read.format("delta").load(f"{silver_delta_base}/Products").alias("pr")

# Only consider orders that have a payment
paid_orders = payments.select("OrderID").distinct().alias("po")

# Join OrderItems with Orders and filter for paid orders
fact_sales = (
    order_items
    .join(orders, col("oi.OrderID") == col("o.OrderID"))
    .join(paid_orders, col("oi.OrderID") == col("po.OrderID"), "inner")
    .join(products, col("oi.ProductID") == col("pr.ProductID"))
    .join(payments, col("oi.OrderID") == col("p.OrderID"), "inner")
    .select(
        col("oi.OrderItemID"),
        col("oi.OrderID"),
        col("oi.ProductID"),
        col("pr.SellerID"),
        col("o.CustomerID"),
        col("pr.CategoryID"),
        date_format(col("o.CreatedAt"), "yyyyMMdd").cast("int").alias("OrderDateKey"),
        col("o.StatusID"),
        col("oi.Quantity").cast("int").alias("Quantity"),
        col("pr.Price").cast("double").alias("CurrentPrice"),  # Use Price from Product
        col("pr.Cost").cast("double").alias("Cost"),           # Use Cost from Product
        (col("oi.Quantity").cast("int") * col("pr.Price").cast("double")).alias("Revenue"),
        (
            (col("oi.Quantity").cast("int") * col("pr.Price").cast("double")) -
            (col("oi.Quantity").cast("int") * col("pr.Cost").cast("double"))
        ).alias("Profit"),
        col("p.CreatedAt").alias("CreatedAt")
    )
)

# Write to MySQL
fact_sales.write.jdbc(mysql_url, "FactSales", mode="overwrite", properties=mysql_props)

# Write to Delta Lake (gold-delta)
fact_sales.write.format("delta").mode("overwrite").save(f"{gold_delta_base}/FactSales")

record_count = fact_sales.count()
print(f"  ✅ FactSales: {record_count} records written to MySQL and Delta Lake")

In [None]:
# 9. FactOrderReason - Delta Lake Version
print("📋 Processing FactOrderReason...")

# Read required tables from Delta Lake
reasons = spark.read.format("delta").load(f"{silver_delta_base}/Reasons")
order_items = spark.read.format("delta").load(f"{silver_delta_base}/OrderItems")
products = spark.read.format("delta").load(f"{silver_delta_base}/Products")
orders = spark.read.format("delta").load(f"{silver_delta_base}/Orders")

# Filter orders with status 4 or 5 (cancelled/returned orders)
filtered_orders = orders.filter(orders.StatusID.isin(4, 5))

# Join Reasons with filtered Orders and OrderItems
fact_order_reason = (
    reasons
    .join(filtered_orders, reasons.OrderID == filtered_orders.OrderID, "inner")
    .join(order_items, reasons.OrderID == order_items.OrderID, "inner")
    .join(products, order_items.ProductID == products.ProductID, "inner")
    .select(
        order_items.OrderItemID,
        reasons.ReasonID,
        reasons.OrderID,
        products.SellerID,
        date_format(filtered_orders.CreatedAt, "yyyyMMdd").cast("int").alias("OrderDateKey"),
        filtered_orders.StatusID
    )
)

# Write to MySQL
fact_order_reason.write.jdbc(mysql_url, "FactOrderReason", mode="overwrite", properties=mysql_props)

# Write to Delta Lake (gold-delta)
fact_order_reason.write.format("delta").mode("overwrite").save(f"{gold_delta_base}/FactOrderReason")

record_count = fact_order_reason.count()
print(f"  ✅ FactOrderReason: {record_count} records written to MySQL and Delta Lake")

In [None]:
# Verification: Check all Delta tables in gold-delta
print("\n🔍 Verifying Gold Delta Lake tables...\n")

gold_tables = [
    "DimProduct", "DimCategory", "DimSeller", "DimCustomer", 
    "DimOrderStatus", "DimDate", "DimReason", 
    "FactSales", "FactOrderReason"
]

total_records = 0
for table in gold_tables:
    try:
        df = spark.read.format("delta").load(f"{gold_delta_base}/{table}")
        count = df.count()
        total_records += count
        print(f"  ✅ {table}: {count} records")
    except Exception as e:
        print(f"  ❌ {table}: Error - {str(e)[:50]}...")

print(f"\n📊 Total records in Gold Delta Lake: {total_records:,}")
print("\n🎉 Data Warehouse load completed!")
print("✨ All data available in both MySQL and Delta Lake (gold-delta)")
print("🔄 Delta Lake provides ACID transactions, time travel, and schema evolution")

In [None]:
# Optional: Show Delta Lake table properties and history
print("🌟 Delta Lake Features for Data Warehouse:")
print("\n📚 Time Travel Examples:")
print("   - spark.read.format('delta').option('versionAsOf', 0).load(path)")
print("   - spark.read.format('delta').option('timestampAsOf', '2025-01-01').load(path)")

print("\n🔄 ACID Transactions:")
print("   - All dimension and fact table loads are atomic")
print("   - Safe concurrent access for BI tools and analytics")

print("\n⚡ Performance Optimizations:")
print("   - OPTIMIZE command for file compaction")
print("   - Z-ORDER for multi-dimensional clustering")
print("   - Data skipping with statistics")

print("\n🔍 Management Commands:")
print("   - DESCRIBE HISTORY delta.`/path/to/table`")
print("   - OPTIMIZE delta.`/path/to/table` ZORDER BY (column)")
print("   - VACUUM delta.`/path/to/table` RETAIN 168 HOURS")

# Show sample table history
try:
    print("\n📋 Sample: DimProduct table history")
    history_df = spark.sql(f"DESCRIBE HISTORY delta.`{gold_delta_base}/DimProduct`")
    history_df.select("version", "timestamp", "operation", "operationParameters").show(5, truncate=False)
except Exception as e:
    print(f"   Note: History not available yet: {e}")

# Stop Spark session
print("\n🛑 Stopping Spark session...")
spark.stop()
print("✅ Data warehouse processing completed successfully!")