In [0]:
from pyspark.sql.functions import col, sum as _sum, avg as _avg, countDistinct, approx_count_distinct
from pyspark.sql.types import IntegerType, DoubleType
import json

with open("/dbfs/FileStore/configs/config.json", "r") as f:
    config = json.load(f)

# Paths
silver_path = config["delta_tables"]["silver_path"]
gold_path = config["delta_tables"]["gold_path"]

# Read Silver as streaming
silver_df = spark.readStream.format("delta").load(silver_path)

# Drop duplicates (optional safety)
gold_input_df = silver_df.dropDuplicates(["InvoiceNo", "StockCode", "CustomerID"])

# Cast numeric columns if not already
gold_input_df = gold_input_df \
    .withColumn("Quantity", col("Quantity").cast(IntegerType())) \
    .withColumn("UnitPrice", col("UnitPrice").cast(DoubleType()))

# Aggregations
gold_agg_df = gold_input_df.groupBy("Country").agg(
    _sum(col("Quantity") * col("UnitPrice")).alias("TotalSales"),
    _sum("Quantity").alias("TotalQuantity"),
    approx_count_distinct("CustomerID").alias("UniqueCustomers"),
    (_sum(col("Quantity") * col("UnitPrice")) / _sum("Quantity")).alias("AvgOrderValue")
)

#display(gold_agg_df)
# Write to Gold Delta
gold_query = (
    gold_agg_df.writeStream
    .format("delta")
    .option("checkpointLocation", gold_path + "/_checkpoint")
    .outputMode("complete")   # Complete mode is required for aggregations
    .start(gold_path)
)


