In [0]:
from pyspark.sql.functions import col, trim, initcap, lower, to_timestamp, row_number
from pyspark.sql.window import Window

base_path = "/Volumes/cetpa_external_catalog/ldp_schema/raw"


orders_bronze = spark.read.json(f"{base_path}/orders/00.json")
customers_bronze = spark.read.json(f"{base_path}/customers/00.json")
status_bronze = spark.read.json(f"{base_path}/status/00.json")

orders_bronze.write.format("delta").mode("overwrite").saveAsTable("bronze_orders")
customers_bronze.write.format("delta").mode("overwrite").saveAsTable("bronze_customers")
status_bronze.write.format("delta").mode("overwrite").saveAsTable("bronze_status")


orders_silver = (
    spark.table("bronze_orders")
    .dropDuplicates(["order_id"])
    .withColumn("order_timestamp", to_timestamp(col("order_timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
)
orders_silver.write.format("delta").mode("overwrite").saveAsTable("silver_orders")

customers_silver = (
    spark.table("bronze_customers")
    .dropDuplicates(["customer_id"])
    .withColumn("name", initcap(trim(col("name"))))
    .withColumn("email", lower(trim(col("email"))))
    .withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
)
customers_silver.write.format("delta").mode("overwrite").saveAsTable("silver_customers")

status_silver = (
    spark.table("bronze_status")
    .withColumn("status_timestamp", to_timestamp(col("status_timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
    .withColumn("order_status", trim(col("order_status")))
)
status_silver.write.format("delta").mode("overwrite").saveAsTable("silver_status")


window_spec = Window.partitionBy("order_id").orderBy(col("status_timestamp").desc())
gold_latest_order_status = (
    status_silver
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .drop("rn")
)
gold_latest_order_status.write.format("delta").mode("overwrite").saveAsTable("gold_latest_order_status")


fact_orders = (
    orders_silver.alias("o")
    .join(customers_silver.alias("c"), "customer_id", "left")
    .join(gold_latest_order_status.alias("s"), "order_id", "left")
    .select(
        col("o.order_id"),
        col("o.order_timestamp"),
        col("customer_id"),
        col("c.name").alias("customer_name"),
        col("c.city"),
        col("c.state"),
        col("o.notifications"),
        col("s.order_status"),
        col("s.status_timestamp").alias("latest_status_timestamp")
    )
)
fact_orders.write.format("delta").mode("overwrite").saveAsTable("gold_fact_orders")
