In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, from_unixtime

gld_order_enriched = spark.readStream.table("ecommerce.silver.slv_order_enrich")
gld_order_item_enriched = spark.readStream.table("ecommerce.silver.slv_order_item_enrich")
gld_order_enriched = gld_order_enriched.withColumn("order_date", from_unixtime(col("order_date")/1000))


#need to convert to static source because spark structure streaming don't allow us to perform join on strem-strem join with complate as write mode.

gld_order_item_enriched = spark.table("ecommerce.silver.slv_order_item_enrich")


#gld_order_enriched.display()
#gld_order_item_enriched.display()

daily_sale = gld_order_enriched.join(gld_order_item_enriched, ["order_id"],'inner')\
    .drop(gld_order_enriched.order_id).groupBy("order_date")\
        .agg(F.sum(col("price").cast("double")*col("quantity").cast("int")).alias("total_price"))\
            .orderBy(col("total_price").desc())


daily_sale.writeStream\
    .format("delta")\
    .option("checkpointLocation", "/Volumes/ecommerce/gold/checkpoint/DailySaleData/")\
    .outputMode("Complete")\
    .trigger(once=True)\
    .toTable("ecommerce.gold.daily_sale")



In [0]:

customer_lifetime_value = gld_order_enriched.join(gld_order_item_enriched, ["order_id"],'inner')\
    .drop(gld_order_enriched.order_id).groupBy("customer_unique_id")\
        .agg(F.sum(col("price").cast("double")*col("quantity").cast("int")).alias("life_time_value"))\
            .orderBy(col("life_time_value"))

customer_lifetime_value.writeStream\
    .format("delta")\
    .option("checkpointLocation","/Volumes/ecommerce/gold/checkpoint/customer_lifetime_value/")\
    .outputMode("Complete")\
    .trigger(once=True)\
    .toTable("ecommerce.gold.customer_lifetime_value")




In [0]:

topproducts = gld_order_enriched.join(gld_order_item_enriched, ["order_id"],'inner')\
    .drop(gld_order_enriched.order_id).groupBy(gld_order_item_enriched.product_category_name)\
        .agg(F.sum(col("price").cast("double")*col("quantity").cast("int")).alias("total_price"))\
            .orderBy(col("total_price")).limit(10)

topproducts.writeStream\
    .format("delta")\
    .option("checkpointLocation","/Volumes/ecommerce/gold/checkpoint/customer_lifetime_value/")\
    .outputMode("Complete")\
    .trigger(once=True)\
    .toTable("ecommerce.gold.topproducts")

In [0]:
%sql
--select * from ecommerce.gold.customer_lifetime_value;
select * from ecommerce.gold.topproducts;
--select * from ecommerce.gold.daily_sale;