In [0]:
# Notebook parameters
dbutils.widgets.text("source_path", "")
dbutils.widgets.text("target_path", "")
dbutils.widgets.text("process_date", "")

source_path = dbutils.widgets.get("source_path")
target_path = dbutils.widgets.get("target_path")
process_date = dbutils.widgets.get("process_date")

print("Source:", source_path)
print("Target:", target_path)
print("Process Date:", process_date)

In [0]:
from pyspark.sql import functions as F

silver = spark.read.format("delta").load(source_path)

gold_df = (
    silver
    .groupBy("product_id", "brand")
    .agg(
        F.sum(F.when(F.col("event_type") == "view", 1)).alias("views"),
        F.sum(F.when(F.col("event_type") == "purchase", 1)).alias("purchases"),
        F.sum(F.when(F.col("event_type") == "purchase", F.col("price"))).alias("revenue")
    )
    .withColumn(
        "conversion_rate",
        F.when(F.col("views") > 0,
               (F.col("purchases") / F.col("views")) * 100
        ).otherwise(0)
    )
)

gold_df.write.format("delta") \
    .mode("overwrite") \
    .save(target_path)

print("âœ… Gold layer completed")
