In [2]:
from pyspark.sql import SparkSession
import delta

builder = (
    SparkSession.builder
    .appName("DeltaWithPip")
    .master("local[*]")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = delta.configure_spark_with_delta_pip(builder).getOrCreate()



In [3]:
base_silver_path = r"C:\Users\User\Desktop\E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines\silver_layer\silver_data"
base_gold_path  = r"C:\Users\User\Desktop\E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines\Gold_layer\Gold_data"
# Load Silver tables
silver_customers = spark.read.format("delta").load(f"{base_silver_path}\\customers\\data")
silver_products  = spark.read.format("delta").load(f"{base_silver_path}\\products\\data")
silver_orders    = spark.read.format("delta").load(f"{base_silver_path}\\orders\\data")
silver_deliveries= spark.read.format("delta").load(f"{base_silver_path}\\deliveries\\data")

Creating initial DIM_customers 

In [4]:
from pyspark.sql import functions as f 
from pyspark.sql.window import Window

window = Window.partitionBy("customer_id").orderBy(f.col("last_updated").desc())

# adding surrogate keys and SCD2 columns
gold_dim_customers = silver_customers.withColumn("row_number",f.row_number().over(window))\
                                     .withColumn("is_current",f.when(f.col("row_number")==1,"Y").otherwise("N"))\
                                     .withColumn("start_date", f.col("last_updated")) \
                                     .withColumn("end_date", f.lit(None)) \
                                     .withColumn("customer_sk", f.monotonically_increasing_id()) \
                                     .drop("row_number")

gold_dim_customers = gold_dim_customers.select(
    "customer_sk",
    "customer_id",
    "first_name",
    "last_name",
    "full_name",
    "email",
    "phone",
    "address",
    "city",
    "country",
    "signup_date",
    "start_date",
    "end_date",
    "is_current",
    "last_updated",
    "ingestion_timestamp"
)


start_date / end_date ‚Üí responsible for SCD2 (tracking history)

is_current ‚Üí can be used for SCD1-style updates (identifying the current/active record)

Why end date for SCD-2 ?

let me justify lets keep start_date aside for now we will more specify end_date here ok so whats my idea is at initial load the end date is NUL so when the new data comes in the previous batch data end date will be updated to current time stamp and new data end date will be null this will work iteratively

In [5]:
gold_dim_customers.show()


+-----------+--------------------+----------+---------+-----------------+--------------------+----------+--------------------+-------------------+--------------------+-------------------+-------------------+--------+----------+-------------------+--------------------+
|customer_sk|         customer_id|first_name|last_name|        full_name|               email|     phone|             address|               city|             country|        signup_date|         start_date|end_date|is_current|       last_updated| ingestion_timestamp|
+-----------+--------------------+----------+---------+-----------------+--------------------+----------+--------------------+-------------------+--------------------+-------------------+-------------------+--------+----------+-------------------+--------------------+
|          0|000c883c-da36-44c...|  Courtney|   Nelson|  Courtney Nelson|courtney.nelson@g...|8111079488|  5558 Amanda Meadow|         New Lauren|              Cyprus|2025-02-24 00:00:00|2025-0

Creating DIM_products

In [6]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# Define window per product to order by last_updated (latest first)
product_window = Window.partitionBy("product_id").orderBy(f.col("last_updated").desc())


# define Surrogate Keys and SCD2 columns
gold_dim_products = silver_products.withColumn("row_number", f.row_number().over(product_window)) \
                                   .withColumn("is_current", f.when(f.col("row_number") == 1, "Y").otherwise("N")) \
                                   .withColumn("start_date", f.col("last_updated")) \
                                   .withColumn("end_date", f.lit(None)) \
                                   .withColumn("product_sk", f.monotonically_increasing_id()) \
                                   .drop("row_number")


gold_dim_products = gold_dim_products.select(
    "product_sk",
    "product_id",
    "name",
    "category",
    "price",
    "stock_quantity",
    "created_at",
    "start_date",
    "end_date",
    "is_current",
    "last_updated",
    "ingestion_timestamp"
)

row_number() ‚Üí Marks the latest record per product

is_current ‚Üí Y for latest, N for historical

start_date ‚Üí When this version became active

end_date ‚Üí NULL for now, will be updated during incremental loads

product_sk ‚Üí Unique surrogate key

In [7]:
gold_dim_products.show()

+----------+--------------------+-----------------+---------------+-------+--------------+----------+-------------------+--------+----------+-------------------+--------------------+
|product_sk|          product_id|             name|       category|  price|stock_quantity|created_at|         start_date|end_date|is_current|       last_updated| ingestion_timestamp|
+----------+--------------------+-----------------+---------------+-------+--------------+----------+-------------------+--------+----------+-------------------+--------------------+
|         0|0001ffa9-0541-4e3...|     With Edition|         Beauty|1159.12|           300|2025-06-18|2025-09-08 22:54:54|    NULL|         Y|2025-09-08 22:54:54|2025-09-26 18:36:...|
|         1|00139db0-ae53-457...|  Teacher Edition|Home Appliances|1032.01|           207|2025-05-06|2025-09-08 22:54:54|    NULL|         Y|2025-09-08 22:54:54|2025-09-26 18:36:...|
|         2|004403ca-a79a-413...| Environment Lite|          Books|1117.05|          

Creating dim_orders tables

partitionBy("order_id") ‚Üí Groups rows per order

orderBy(last_updated.desc()) ‚Üí Latest order update first

In [8]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# Window per order to get latest update first
order_window = Window.partitionBy("order_id").orderBy(f.col("last_updated").desc())

# define Surrogate Keys and SCD2 columns
gold_dim_orders = silver_orders.withColumn("row_number", f.row_number().over(order_window)) \
                               .withColumn("is_current", f.when(f.col("row_number") == 1, "Y").otherwise("N")) \
                               .withColumn("start_date", f.col("last_updated")) \
                               .withColumn("end_date", f.lit(None)) \
                               .withColumn("order_sk", f.monotonically_increasing_id()) \
                               .drop("row_number")


gold_dim_orders = gold_dim_orders.select(
    "order_sk",
    "order_id",
    "transaction_id",
    "customer_id",
    "product_id",
    "quantity",
    "total_amount",
    "order_date",
    "payment_method",
    "order_status",
    "start_date",
    "end_date",
    "is_current",
    "last_updated",
    "ingestion_timestamp"
)


In [9]:
gold_dim_orders.show()

+--------+--------------------+--------------------+-------------+------------+--------+------------+-------------------+--------------+------------+-------------------+--------+----------+-------------------+--------------------+
|order_sk|            order_id|      transaction_id|  customer_id|  product_id|quantity|total_amount|         order_date|payment_method|order_status|         start_date|end_date|is_current|       last_updated| ingestion_timestamp|
+--------+--------------------+--------------------+-------------+------------+--------+------------+-------------------+--------------+------------+-------------------+--------+----------+-------------------+--------------------+
|       0|00051a32-68e1-4e2...|a19b9d6a-94e6-49d...|CUST-B0A096BD|PRD-6A61527D|       3|     3477.36|2024-10-07 00:00:00|        PayPal|     Pending|2025-09-08 23:02:13|    NULL|         Y|2025-09-08 23:02:13|2025-09-26 18:36:...|
|       1|00207f17-5b07-42d...|a3a7d724-45d5-4ef...|CUST-B14FCDE7|PRD-CDEF1D

Creating dim_deliveries

In [10]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window

# Window per delivery to get latest update first
delivery_window = Window.partitionBy("delivery_id").orderBy(f.col("last_updated").desc())

gold_dim_deliveries = silver_deliveries.withColumn("row_number", f.row_number().over(delivery_window)) \
                                       .withColumn("is_current", f.when(f.col("row_number") == 1, "Y").otherwise("N")) \
                                       .withColumn("start_date", f.col("last_updated")) \
                                       .withColumn("end_date", f.lit(None)) \
                                       .withColumn("delivery_sk", f.monotonically_increasing_id()) \
                                       .drop("row_number")


gold_dim_deliveries = gold_dim_deliveries.select(
    "delivery_sk",
    "delivery_id",
    "order_id",
    "transaction_id",
    "customer_id",
    "customer_name",
    "product_id",
    "product_name",
    "total_amount",
    "payment_method",
    "delivery_address",
    "delivery_city",
    "delivery_country",
    "delivery_date",
    "start_date",
    "end_date",
    "is_current",
    "last_updated",
    "ingestion_timestamp"
)


In [11]:
gold_dim_deliveries.show()

+-----------+--------------------+--------------------+--------------------+-------------+------------------+------------+--------------------+------------+--------------+--------------------+---------------+--------------------+-------------------+-------------------+--------+----------+-------------------+--------------------+
|delivery_sk|         delivery_id|            order_id|      transaction_id|  customer_id|     customer_name|  product_id|        product_name|total_amount|payment_method|    delivery_address|  delivery_city|    delivery_country|      delivery_date|         start_date|end_date|is_current|       last_updated| ingestion_timestamp|
+-----------+--------------------+--------------------+--------------------+-------------+------------------+------------+--------------------+------------+--------------+--------------------+---------------+--------------------+-------------------+-------------------+--------+----------+-------------------+--------------------+
|      

SCD1 for Customers

DeltaTable.isDeltaTable: Checks if the Gold table already exists.

If yes ‚Üí we‚Äôll merge incremental changes.

If no ‚Üí this is the initial load.

silver_customers_df: This is the incoming batch from Silver.

gold_customers_df: Existing Gold data, only if the table exists.

This sets up the foundation for SCD1 (updates) and SCD2 (history).

In [12]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# Load silver customers (latest incoming batch)
silver_customers = spark.read.format("delta").load(f"{base_silver_path}\\customers\\data")

# Add SCD metadata columns (for consistency across dimensions)
incoming_customers = (
    silver_customers
    .withColumn("start_date", current_timestamp())
    .withColumn("end_date", lit(None).cast("timestamp"))
    .withColumn("is_current", lit(True))
)

# Deduplicate on customer_id ‚Üí keep the latest record by last_updated
w = Window.partitionBy("customer_id").orderBy(desc("last_updated"))
deduped_customers = (
    incoming_customers
    .withColumn("rn", row_number().over(w))
    .filter("rn = 1")
    .drop("rn")
)

# Path for Gold dimension
dim_customer_path = f"{base_gold_path}\\dim_customer\\data"

# Perform SCD1 merge (overwrite)
if DeltaTable.isDeltaTable(spark, dim_customer_path):
    dim_customer = DeltaTable.forPath(spark, dim_customer_path)

    (
        dim_customer.alias("t")
        .merge(
            deduped_customers.alias("s"),
            "t.customer_id = s.customer_id"
        )
        .whenMatchedUpdateAll()    # Overwrite existing record
        .whenNotMatchedInsertAll() # Insert new record if not exists
        .execute()
    )
else:
    # First time load ‚Üí just write the table
    deduped_customers.write.format("delta").mode("overwrite").save(dim_customer_path)


In [13]:
df = spark.read.format("delta").load(f"{base_gold_path}\\dim_customer\\data")
df.createOrReplaceTempView("dim_customers")

spark.sql("SELECT * FROM dim_customers").show()


+--------------------+----------+---------+----------------+--------------------+----------+--------------------+-----------------+--------------------+-------------------+-------------------+--------------------+-----------+--------------------+--------+----------+
|         customer_id|first_name|last_name|       full_name|               email|     phone|             address|             city|             country|        signup_date|       last_updated| ingestion_timestamp|customer_sk|          start_date|end_date|is_current|
+--------------------+----------+---------+----------------+--------------------+----------+--------------------+-----------------+--------------------+-------------------+-------------------+--------------------+-----------+--------------------+--------+----------+
|0018de11-c0a5-440...|     Grant|   Martin|    Grant Martin|grant.martin@hotm...|1034249893|312 Patrick Locks...|   West Billyport|               Egypt|2024-01-11 00:00:00|2025-09-08 22:53:33|2025-09

Upsert stands for update data is data already exists or insert if data doesnt exist

how does we know if data already exists i.e how does we know when to udate and when to insert ?

if the primary keys mateches i.e customer_id matches and last_updated in silver table is greater than last updated in gold the data already exists so needs to be updated

SCD - 1 + SCD2 for products

This PySpark snippet handles the incremental loading of the products table from the Silver layer into the Gold layer of our data lakehouse.
Unlike traditional SCD1, we are preserving historical versions of product data to allow stakeholders to query past updates (e.g., price changes) while still identifying the latest state.

In [14]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F, Window
from pyspark.sql.functions import lit, current_timestamp
import os

base_gold_path = r"C:\Users\User\Desktop\E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines\Gold_layer\Gold_data"
base_silver_path = r"C:\Users\User\Desktop\E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines\silver_layer\silver_data"

# === Paths ===
dim_product_path = os.path.join(base_gold_path, "dim_products", "data")
silver_product_path = os.path.join(base_silver_path, "products", "data")

# === Load source (incoming) products from Silver ===
silver_products = spark.read.format("delta").load(silver_product_path)

# ‚úÖ Deduplicate strictly: pick only the latest row per product_id
window = Window.partitionBy("product_id").orderBy(
    F.col("last_updated").desc_nulls_last(),
    F.col("ingestion_timestamp").desc_nulls_last()
)

deduped_products = (
    silver_products
    .withColumn("row_num", F.row_number().over(window))
    .filter(F.col("row_num") == 1)  # keep only the latest
    .drop("row_num")
)

# ‚úÖ Extra safeguard: enforce uniqueness
incoming_products = deduped_products.dropDuplicates(["product_id"]).withColumn(
    "start_date", current_timestamp()
).withColumn(
    "end_date", lit(None).cast("timestamp")
).withColumn(
    "is_current", lit(True)
)

# === Upsert into Gold dim_products (SCD2 logic) ===
if DeltaTable.isDeltaTable(spark, dim_product_path):
    dim_products = DeltaTable.forPath(spark, dim_product_path)

    (
        dim_products.alias("t")
        .merge(
            incoming_products.alias("s"),
            "t.product_id = s.product_id AND t.is_current = true"
        )
        # 1Ô∏è‚É£ Expire old record if any attribute changed
        .whenMatchedUpdate(
            condition="""
                t.name <> s.name OR
                t.category <> s.category OR
                t.price <> s.price OR
                t.stock_quantity <> s.stock_quantity
            """,
            set={
                "end_date": "current_timestamp()",
                "is_current": "false"
            }
        )
        # 2Ô∏è‚É£ Insert new record (new product or updated attributes)
        .whenNotMatchedInsertAll()
        .execute()
    )

    print("‚úÖ dim_products updated with SCD2 logic")

else:
    # First time load
    incoming_products.write.format("delta").mode("overwrite").save(dim_product_path)
    print("‚úÖ dim_products created for the first time")



‚úÖ dim_products updated with SCD2 logic


In [15]:
df = spark.read.format("delta").load(f"{base_gold_path}\\dim_products\\data")
df.createOrReplaceTempView("dim_prod" \
"" \
"ucts")

spark.sql("SELECT * FROM dim_products").show()

+--------------------+---------------+---------------+-------+--------------+----------+-------------------+--------------------+-----------+--------------------+--------+----------+
|          product_id|           name|       category|  price|stock_quantity|created_at|       last_updated| ingestion_timestamp| product_sk|          start_date|end_date|is_current|
+--------------------+---------------+---------------+-------+--------------+----------+-------------------+--------------------+-----------+--------------------+--------+----------+
|00031aa3-fad1-438...|    Senior Plus|           Toys|1828.38|           306|2025-08-31|2025-09-08 22:54:53|2025-09-26 18:36:...|34359738368|2025-09-26 18:39:...|    NULL|      true|
|0004580a-beaf-4fc...|       Idea Pro|         Sports|1178.23|           162|2024-07-15|2025-09-08 22:54:50|2025-09-26 18:36:...|42949672960|2025-09-26 18:39:...|    NULL|      true|
|00112a40-703d-4ce...|  Increase Lite|Home Appliances| 654.79|            81|2024-02-

In [16]:
df = spark.read.format("delta").load(f"{base_gold_path}\\dim_products\\data")

df.filter("is_current = false").show(truncate=False)


+------------+-------------------+--------+-------+--------------+----------+-------------------+--------------------------+-----------+-------------------------+--------------------------+----------+
|product_id  |name               |category|price  |stock_quantity|created_at|last_updated       |ingestion_timestamp       |product_sk |start_date               |end_date                  |is_current|
+------------+-------------------+--------+-------+--------------+----------+-------------------+--------------------------+-----------+-------------------------+--------------------------+----------+
|PRD-F06146B3|Clothing - Over 498|Clothing|1583.01|519           |2025-09-02|2025-09-08 22:27:07|2025-09-26 16:54:22.711362|60129569417|2025-09-26 16:55:28.44499|2025-09-26 18:39:08.093926|false     |
+------------+-------------------+--------+-------+--------------+----------+-------------------+--------------------------+-----------+-------------------------+--------------------------+-------

SCD2 for orders

only SCD2 for order the below is reason 

Orders are immutable business transactions ‚Äî fields like order_id, transaction_id, customer_id, payment_method, and order_date cannot change. Using SCD1 would overwrite history and break audit trails. SCD2 ensures we capture order lifecycle changes (e.g., status updates) while preserving full history

In [17]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.window import Window

dim_order_path = f"{base_gold_path}\\dim_orders\\data"

# Load source (incoming) data
silver_orders = spark.read.format("delta").load(f"{base_silver_path}\\orders\\data")

# Deduplicate by keeping latest record per order_id (based on last_updated or ingestion_date)
w = Window.partitionBy("order_id").orderBy(F.col("last_updated").desc())

incoming_orders = (
    silver_orders
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)    # keep only the latest
    .drop("rn")
    .withColumn("start_date", F.current_timestamp())
    .withColumn("end_date", F.lit(None).cast("timestamp"))
    .withColumn("is_current", F.lit(True))
)

# Path check and Merge
if DeltaTable.isDeltaTable(spark, dim_order_path):
    dim_orders = DeltaTable.forPath(spark, dim_order_path)

    (
        dim_orders.alias("t")
        .merge(
            incoming_orders.alias("s"),
            "t.order_id = s.order_id AND t.is_current = true"
        )
        # 1. Expire old row if order_status changed
        .whenMatchedUpdate(
            condition="t.order_status <> s.order_status",
            set={
                "end_date": "current_timestamp()",
                "is_current": "false"
            }
        )
        # 2. Insert new version (new order OR updated status)
        .whenNotMatchedInsertAll()
        .execute()
    )

else:
    incoming_orders.write.format("delta").mode("overwrite").save(dim_order_path)



In [18]:
df = spark.read.format("delta").load(f"{base_gold_path}\\dim_orders\\data")
df.createOrReplaceTempView("dim_orders")

spark.sql("SELECT * FROM dim_orders").show()

+--------------------+--------------------+-------------+------------+--------+------------+-------------------+--------------+------------+-------------------+--------------------+--------------------+--------+----------+
|            order_id|      transaction_id|  customer_id|  product_id|quantity|total_amount|         order_date|payment_method|order_status|       last_updated| ingestion_timestamp|          start_date|end_date|is_current|
+--------------------+--------------------+-------------+------------+--------+------------+-------------------+--------------+------------+-------------------+--------------------+--------------------+--------+----------+
|00078015-05aa-490...|151f1a94-8fb0-457...|CUST-59736113|PRD-1330AC1A|       1|      527.93|2024-09-15 00:00:00|          Cash|   Delivered|2025-09-08 23:02:04|2025-09-26 18:36:...|2025-09-26 18:39:...|    NULL|      true|
|0018477f-bc4f-431...|38bd5b12-b0eb-4f6...|CUST-D6EFBB86|PRD-ADC3F420|       4|     7323.52|2023-11-11 00:00

In [19]:
df = spark.read.format("delta").load(f"{base_gold_path}\\dim_products\\data")

df.filter("is_current = false").show(truncate=False)

+------------+-------------------+--------+-------+--------------+----------+-------------------+--------------------------+-----------+-------------------------+--------------------------+----------+
|product_id  |name               |category|price  |stock_quantity|created_at|last_updated       |ingestion_timestamp       |product_sk |start_date               |end_date                  |is_current|
+------------+-------------------+--------+-------+--------------+----------+-------------------+--------------------------+-----------+-------------------------+--------------------------+----------+
|PRD-F06146B3|Clothing - Over 498|Clothing|1583.01|519           |2025-09-02|2025-09-08 22:27:07|2025-09-26 16:54:22.711362|60129569417|2025-09-26 16:55:28.44499|2025-09-26 18:39:08.093926|false     |
+------------+-------------------+--------+-------+--------------+----------+-------------------+--------------------------+-----------+-------------------------+--------------------------+-------

A Hybrid approach of SCD1 and SCD2 for deliveries

In [20]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# === 1. Load Silver Deliveries ===
silver_deliveries = spark.read.format("delta").load(f"{base_silver_path}\\deliveries\\data")

# Load Customer Dimension (SCD1 lookup)
dim_customers = spark.read.format("delta").load(f"{base_gold_path}\\dim_customer\\data")

# Deduplicate deliveries (latest per delivery_id)
w = Window.partitionBy("delivery_id").orderBy(F.desc("last_updated"))
deduped_deliveries = (
    silver_deliveries
    .withColumn("rn", F.row_number().over(w))
    .filter("rn = 1")
    .drop("rn")
)

# Enrich with latest customer info
enriched_deliveries = (
    deduped_deliveries.alias("d")
    .join(
        dim_customers.filter("is_current = true").alias("c"),
        F.col("d.customer_id") == F.col("c.customer_id"),
        "left"
    )
    .select(
        "d.delivery_id",
        "d.order_id",
        "d.transaction_id",
        "d.customer_id",
        "d.product_id",
        "d.total_amount",
        "d.payment_method",
        F.col("c.address").alias("delivery_address"),
        F.col("c.city").alias("delivery_city"),
        F.col("c.country").alias("delivery_country"),
        "d.delivery_date",
        "d.delivery_status",
        "d.last_updated",
        "d.ingestion_timestamp"
    )
    .withColumn("start_date", F.current_timestamp())
    .withColumn("end_date", F.lit(None).cast("timestamp"))
    .withColumn("is_current", F.lit(True))
)

# === 2. Path for Gold Deliveries ===
dim_deliveries_path = f"{base_gold_path}\\dim_deliveries\\data"

if DeltaTable.isDeltaTable(spark, dim_deliveries_path):
    gold_table = DeltaTable.forPath(spark, dim_deliveries_path)

    (
        gold_table.alias("tgt")
        .merge(
            enriched_deliveries.alias("src"),
            "tgt.delivery_id = src.delivery_id AND tgt.is_current = true"
        )
        # --- Case 1: Status changed ‚Üí expire old record (SCD2) ---
        .whenMatchedUpdate(
            condition="tgt.delivery_status <> src.delivery_status",
            set={
                "end_date": "current_timestamp()",
                "is_current": "false"
            }
        )
        # --- Case 2: Other dynamic fields changed (SCD1 overwrite) ---
        .whenMatchedUpdate(
            condition="tgt.delivery_status = src.delivery_status",  # üëà ensures no conflict with above
            set={
                "payment_method": "src.payment_method",
                "total_amount": "src.total_amount",
                "delivery_address": "src.delivery_address",
                "delivery_city": "src.delivery_city",
                "delivery_country": "src.delivery_country",
                "last_updated": "src.last_updated",
                "ingestion_timestamp": "src.ingestion_timestamp"
            }
        )
        # --- Case 3: New delivery OR status change ‚Üí insert ---
        .whenNotMatchedInsertAll()
        .execute()
    )

else:
    # First-time load
    (
        enriched_deliveries.write
        .format("delta")
        .mode("overwrite")
        .save(dim_deliveries_path)
    )


In [21]:
df = spark.read.format("delta").load(f"{base_gold_path}\\dim_deliveries\\data")
df.createOrReplaceTempView("dim_deliveries")

spark.sql("SELECT * FROM dim_deliveries").show()

+--------------------+--------------------+--------------------+-------------+------------+------------+--------------+--------------------+-------------------+--------------------+-------------------+---------------+-------------------+--------------------+--------------------+--------+----------+
|         delivery_id|            order_id|      transaction_id|  customer_id|  product_id|total_amount|payment_method|    delivery_address|      delivery_city|    delivery_country|      delivery_date|delivery_status|       last_updated| ingestion_timestamp|          start_date|end_date|is_current|
+--------------------+--------------------+--------------------+-------------+------------+------------+--------------+--------------------+-------------------+--------------------+-------------------+---------------+-------------------+--------------------+--------------------+--------+----------+
|000b95ac-25ba-4f4...|        ORD-77F64D26|    TXN-1A6E9CB7D56E|CUST-9FC6134D|PRD-C973F156|      198

In [22]:
df = spark.read.format("delta").load(f"{base_gold_path}\\dim_products\\data")

df.filter("is_current = false").show(truncate=False)

+------------+-------------------+--------+-------+--------------+----------+-------------------+--------------------------+-----------+-------------------------+--------------------------+----------+
|product_id  |name               |category|price  |stock_quantity|created_at|last_updated       |ingestion_timestamp       |product_sk |start_date               |end_date                  |is_current|
+------------+-------------------+--------+-------+--------------+----------+-------------------+--------------------------+-----------+-------------------------+--------------------------+----------+
|PRD-F06146B3|Clothing - Over 498|Clothing|1583.01|519           |2025-09-02|2025-09-08 22:27:07|2025-09-26 16:54:22.711362|60129569417|2025-09-26 16:55:28.44499|2025-09-26 18:39:08.093926|false     |
+------------+-------------------+--------+-------+--------------+----------+-------------------+--------------------------+-----------+-------------------------+--------------------------+-------

In [23]:
import os
import shutil
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, lit

# === Define Paths Explicitly ===
dim_orders_path = r"C:/Users/User/Desktop/E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines/Gold_layer/Gold_data/dim_orders/data"
dim_customer_path = r"C:/Users/User/Desktop/E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines/Gold_layer/Gold_data/dim_customer/data"
dim_products_path = r"C:/Users/User/Desktop/E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines/Gold_layer/Gold_data/dim_products/data"
dim_deliveries_path = r"C:/Users/User/Desktop/E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines/Gold_layer/Gold_data/dim_deliveries/data"
fact_sales_path = r"C:/Users/User/Desktop/E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines/Gold_layer/Gold_data/fact_sales/data"

try:
    # === Load Dimension Tables ===
    print("Loading dimensions...")
    dim_orders = spark.read.format("delta").load(dim_orders_path)
    dim_customer = spark.read.format("delta").load(dim_customer_path)
    dim_products = spark.read.format("delta").load(dim_products_path)
    dim_deliveries = spark.read.format("delta").load(dim_deliveries_path)

    # === Build Fact Table ===
    print("Building new_fact_sales...")
    new_fact_sales = (
        dim_orders.alias("o")
        .join(dim_customer.alias("c"), col("o.customer_id") == col("c.customer_id"), "left")
        .join(dim_products.alias("p"), col("o.product_id") == col("p.product_id"), "left")
        .join(dim_deliveries.alias("d"), col("o.order_id") == col("d.order_id"), "left")
        .select(
            col("c.customer_sk").alias("customer_sk"),
            col("p.product_sk").alias("product_sk"),
            col("o.customer_id"),
            col("o.order_id"),
            col("o.transaction_id"),
            col("d.delivery_id"),
            col("o.quantity"),
            col("o.total_amount"),
            col("p.stock_quantity"),
            col("c.signup_date"),
            col("o.order_date"),
            col("d.delivery_date"),
            col("c.full_name"),
            col("p.name").alias("product_name"),
            col("p.category"),
            col("o.payment_method"),
            col("o.order_status"),
            col("d.delivery_status"),
            col("o.last_updated"),
            col("o.ingestion_timestamp"),
        )
        .withColumn("start_date", current_timestamp())
        .withColumn("end_date", lit(None).cast("timestamp"))
        .withColumn("is_current", lit(True))
        .withColumn("load_timestamp", current_timestamp())
    )

    # === Merge / Create fact_sales ===
    if not DeltaTable.isDeltaTable(spark, fact_sales_path):
        print("Creating fact_sales table...")
        new_fact_sales.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(fact_sales_path)

        # Register in Hive Metastore
        spark_sql_path = fact_sales_path.replace("\\", "/")
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS fact_sales
            USING DELTA
            LOCATION '{spark_sql_path}'
        """)
        print("fact_sales table created.")
    else:
        print("Merging new fact rows into fact_sales...")
        fact_sales = DeltaTable.forPath(spark, fact_sales_path)
        (fact_sales.alias("t")
         .merge(
             new_fact_sales.alias("s"),
             "t.order_id = s.order_id AND t.transaction_id = s.transaction_id AND t.is_current = true"
         )
         .whenMatchedUpdate(
             condition=(
                 "t.total_amount <> s.total_amount OR "
                 "t.quantity <> s.quantity OR "
                 "t.order_status <> s.order_status OR "
                 "t.delivery_status <> s.delivery_status"
             ),
             set={
                 "end_date": "current_timestamp()",
                 "is_current": "false"
             }
         )
         .whenNotMatchedInsertAll()
         .execute()
        )
        print("fact_sales merged successfully.")

except Exception as e:
    print(f"Error while building fact_sales: {e}")
    # Auto-fix if schema mismatch
    if os.path.exists(fact_sales_path):
        print("Removing old fact_sales and retrying...")
        shutil.rmtree(fact_sales_path)
        new_fact_sales.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(fact_sales_path)
        spark_sql_path = fact_sales_path.replace("\\", "/")
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS fact_sales
            USING DELTA
            LOCATION '{spark_sql_path}'
        """)
        print("fact_sales recreated with fresh schema.")
    else:
        raise


Loading dimensions...
Building new_fact_sales...
Merging new fact rows into fact_sales...
Error while building fact_sales: [DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] Cannot perform Merge as multiple source rows matched and attempted to modify the same
target row in the Delta table in possibly conflicting ways. By SQL semantics of Merge,
when multiple source rows match on the same target row, the result may be ambiguous
as it is unclear which source row should be used to update or delete the matching
target row. You can preprocess the source table to eliminate the possibility of
multiple matches. Please refer to
https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge
Removing old fact_sales and retrying...
fact_sales recreated with fresh schema.


In [24]:
df = spark.read.format("delta").load(f"C:\\Users\\User\\Desktop\\E-Commerce Data Lakaehouse with AI-Powered Self-Healing Pipelines\\Gold_layer\\Gold_data\\fact_sales\\data")
df.createOrReplaceTempView("fact_sales")

spark.sql("SELECT * FROM fact_sales").show()

+-----------+-----------+-------------+--------------------+--------------------+--------------------+--------+------------+--------------+-------------------+-------------------+-------------------+------------------+--------------------+---------------+--------------+------------+---------------+-------------------+--------------------+--------------------+--------+----------+--------------------+
|customer_sk| product_sk|  customer_id|            order_id|      transaction_id|         delivery_id|quantity|total_amount|stock_quantity|        signup_date|         order_date|      delivery_date|         full_name|        product_name|       category|payment_method|order_status|delivery_status|       last_updated| ingestion_timestamp|          start_date|end_date|is_current|      load_timestamp|
+-----------+-----------+-------------+--------------------+--------------------+--------------------+--------+------------+--------------+-------------------+-------------------+---------------

In [25]:
fact_sales = DeltaTable.forPath(spark, fact_sales_path)

# Convert to DataFrame
fact_sales_df = fact_sales.toDF()

fact_count = fact_sales_df.count()
print(f"Total records in fact table: {fact_count}")

Total records in fact table: 152622


In [26]:
from pyspark.sql.functions import col, when, sum

# Convert to DataFrame first
fact_sales_df = fact_sales.toDF()

null_counts = fact_sales_df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c + "_nulls")
    for c in fact_sales_df.columns
])

null_counts.show(truncate=False)


+-----------------+----------------+-----------------+--------------+--------------------+-----------------+--------------+------------------+--------------------+-----------------+----------------+-------------------+---------------+------------------+--------------+--------------------+------------------+---------------------+------------------+-------------------------+----------------+--------------+----------------+--------------------+
|customer_sk_nulls|product_sk_nulls|customer_id_nulls|order_id_nulls|transaction_id_nulls|delivery_id_nulls|quantity_nulls|total_amount_nulls|stock_quantity_nulls|signup_date_nulls|order_date_nulls|delivery_date_nulls|full_name_nulls|product_name_nulls|category_nulls|payment_method_nulls|order_status_nulls|delivery_status_nulls|last_updated_nulls|ingestion_timestamp_nulls|start_date_nulls|end_date_nulls|is_current_nulls|load_timestamp_nulls|
+-----------------+----------------+-----------------+--------------+--------------------+-----------------+

In [27]:
fact_sales_df.printSchema()

root
 |-- customer_sk: long (nullable = true)
 |-- product_sk: long (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- delivery_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- stock_quantity: integer (nullable = true)
 |-- signup_date: timestamp (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- delivery_date: timestamp (nullable = true)
 |-- full_name: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- delivery_status: string (nullable = true)
 |-- last_updated: timestamp (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- is_current: boo