In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

# Read streaming data from the Silver tables
orders_df = spark.readStream.table("oms_analytics.silver.orders")
order_items_df = spark.readStream.table("oms_analytics.silver.order_items")

# Rename to avoid ambiguity
order_items_df = order_items_df.withColumnRenamed("order_timestamp", "order_item_timestamp")

# Add watermark to handle late data
orders_df = orders_df.withWatermark("order_timestamp", "5 minutes")
order_items_df = order_items_df.withWatermark("order_item_timestamp", "5 minutes")

# Join the DataFrames on order_item_id
joined_df = orders_df.join(order_items_df, on="order_id", how="inner")

# Aggregate the data by date_id, customer_id, product_id using a time window
daily_aggregates_df = joined_df.groupBy(
    "date_id",   
    "customer_id",
    "product_id",
    F.window("order_timestamp", "5 minutes")
).agg(
    F.sum("quantity").alias("items_sold"),
    F.sum("line_total").alias("sales_amount")
).select(
    "date_id",
    "customer_id",
    "product_id",
    "items_sold",
    "sales_amount"
)

# Add surrogate_key and additional columns
daily_aggregates_df = daily_aggregates_df \
    .withColumn("surrogate_key", F.concat_ws("_", 
        F.col("date_id").cast("string"),
        F.col("customer_id").cast("string"),
        F.col("product_id").cast("string")
    )) \
    .withColumn("process_id", F.lit("de_nb_102")) \
    .withColumn("gold_load_ts", F.current_timestamp())


# Define the external location for Azure Data Lake Storage
external_location_name = "abfss://orders@omslanding.dfs.core.windows.net"
checkpoint_location_daily_sales_fact = f"{external_location_name}/checkpoints/gold_loader/daily_sales_fact"

# Create the table if it doesn't exist
spark.sql("""
    CREATE TABLE IF NOT EXISTS oms_analytics.gold.daily_sales_fact (
    surrogate_key STRING,
    date_id STRING,
    customer_id STRING,
    product_id STRING,
    items_sold DOUBLE,
    sales_amount DOUBLE,
    process_id STRING,
    gold_load_ts TIMESTAMP
) USING DELTA
""")

# Define the upsert logic as a separate function named upsert_data
def upsert_data(batch_df, batch_id):
    # Define the target Delta table
    target_table = DeltaTable.forName(spark, "oms_analytics.gold.daily_sales_fact")
    
    # Define the merge condition
    merge_condition = (
        "target.date_id = source.date_id AND "
        "target.customer_id = source.customer_id AND "
        "target.product_id = source.product_id"
    )
    
    # Perform the MERGE operation
    target_table.alias("target").merge(
        batch_df.alias("source"),
        merge_condition
    ) \
    .whenMatchedUpdate(set={
        "items_sold": "source.items_sold",
        "sales_amount": "source.sales_amount"
    }) \
    .whenNotMatchedInsert(values={
        "surrogate_key": "source.surrogate_key",
        "date_id": "source.date_id",
        "customer_id": "source.customer_id",
        "product_id": "source.product_id",
        "items_sold": "source.items_sold",
        "sales_amount": "source.sales_amount",
        "process_id": "source.process_id",
        "gold_load_ts": "source.gold_load_ts"
    }) \
    .execute()

# Write the streaming data to the table using foreachBatch
writequery = daily_aggregates_df.writeStream \
    .outputMode("append") \
    .foreachBatch(upsert_data) \
    .option("checkpointLocation", checkpoint_location_daily_sales_fact) \
    .start()

# Start the streaming process
writequery.awaitTermination()