In [0]:
from pyspark.sql.functions import col, explode

# Load Bronze Layer Data
products_df = spark.read.format("delta").load("abfss://silver@nishantchurndata.dfs.core.windows.net/products_fact")
carts_df = spark.read.format("delta").load("abfss://silver@nishantchurndata.dfs.core.windows.net/sales_fact")
users_df = spark.read.format("delta").load("abfss://silver@nishantchurndata.dfs.core.windows.net/Customers_data_df")


# Join tables
sales_df = (
    carts_df
    .join(users_df, carts_df.user_id == users_df.user_id, "left")
    .join(products_df, carts_df.product_id == products_df.product_Id, "left")
    .select(
        col("cart_id").alias("transaction_id"),
        users_df.user_id.alias("customer_id"),
        carts_df.quantity.alias("quantity"),
        products_df.price.alias("unit_price"),
        products_df.title.alias("product_name"),
        users_df.username.alias("customer_name"),
        col("date").alias("transaction_date")
    )
)
sales_df.display()


In [0]:
from pyspark.sql.functions import col, explode, sum, avg, to_date

# Aggregate sales by product
product_sales_df = sales_df.groupBy("product_name").agg(
    sum(col("quantity")).alias("total_units_sold"),
    sum(col("quantity") * col("unit_price")).alias("total_revenue"),
    avg(col("unit_price")).alias("average_price")
)

# Aggregate sales by date for forecasting
daily_sales_df = sales_df.groupBy(to_date(col("transaction_date")).alias("date")).agg(
    sum("quantity").alias("total_units_sold"),
    sum(col("quantity") * col("unit_price")).alias("total_revenue")
)



In [0]:
abfss://bronze@nishantchurndata.dfs.core.windows.net/products

In [0]:
# Store in Gold Layer
product_sales_df.write.format("delta").mode("overwrite").save("abfss://gold@nishantchurndata.dfs.core.windows.net/product_sales")
daily_sales_df.write.format("delta").mode("overwrite").save("abfss://gold@nishantchurndata.dfs.core.windows.net/daily_sales")


In [0]:
products_df.write.format("delta").mode("overwrite") \
        .saveAsTable("gold.analytics_products.product_sales_df")

In [0]:
daily_sales_df.write.format("delta").mode("overwrite") \
        .saveAsTable("gold.analytics_products.daily_sales_df")