In [0]:
#ADLS configuration

storage_account = "appleretailecommercesa"
application_id = "5a622d34-353e-4fcc-98e8-964f0432f4df"
tenant_id = "995db1e6-31fc-44f9-bee8-638c7294755d"
storage_account_key = "IlK8Q~-wTorVjKVMmm9qEl~PB55V.9yukzveWaax"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", storage_account_key)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", 
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
# all imports
from pyspark.sql import Window
from pyspark.sql.functions import *
from delta import DeltaTable

In [0]:
gold_path = f"abfss://gold@{storage_account}.dfs.core.windows.net/"

category_df = spark.read.format("delta").load(f"{gold_path}/Category")
products_df = spark.read.format("delta").load(f"{gold_path}/Products")
sales_df = spark.read.format("delta").load(f"{gold_path}/Sales")
stores_df = spark.read.format("delta").load(f"{gold_path}/Stores")
warranty_df = spark.read.format("delta").load(f"{gold_path}/Warranty")

In [0]:
category_df.limit(10).display()
products_df.limit(10).display()
sales_df.limit(10).display()
stores_df.limit(10).display()
warranty_df.limit(10).display()

In [0]:
category_df.printSchema()
products_df.printSchema()
sales_df.printSchema()
stores_df.printSchema()
warranty_df.printSchema()

## Facts and Dimension Tables

In [0]:
# Dimensions Table(s) creation
# --- Dimension: Product ---
dim_product = (
    products_df
    # .withColumn("product_key", row_number().over(Window.orderBy("product_id")))
    .withColumn("product_key", monotonically_increasing_id())
    .select("product_key", "product_id", "product_name", "category_id", "launch_date", "price")
)

# --- Dimension: Category ---
dim_category = (
    category_df
    # .withColumn("category_key", row_number().over(Window.orderBy("category_id")))
    .withColumn("category_key", monotonically_increasing_id())
    .select("category_key", "category_id", "category_name")
)

# --- Dimension: Store ---
dim_store = (
    stores_df
    # .withColumn("store_key", row_number().over(Window.orderBy("store_id")))
    .withColumn("store_key", monotonically_increasing_id())
    .select("store_key", "store_id", "store_name", "city", "country")
)

dim_category.display()
dim_product.display()
dim_store.display()

In [0]:
# Fact Table creation
# Join sales and warranty first
fact_sales_temp = (
    sales_df
    .join(warranty_df, "sale_id", "left")
    .withColumn("has_claim", when(col("claim_id").isNotNull(), 1).otherwise(0))
)

# Add dimension joins using surrogate keys
fact_sales = (
    fact_sales_temp
    .join(dim_product.select("product_key", "product_id", "category_id", "price"), "product_id", "inner")
    .join(dim_store.select("store_key", "store_id"), "store_id", "inner")
    .join(dim_category.select("category_key", "category_id"), "category_id", "inner")
    .withColumn("sale_date", to_date("sale_date"))
    .withColumn("revenue", col("price") * col("quantity"))
    .select( "sale_id", "sale_date", "store_key", "product_key", "category_key", "quantity", "price", "revenue", "has_claim")
)

fact_sales.display()

In [0]:
fact_sales.write.format("delta").mode("overwrite").save(f"abfss://gold@{storage_account}.dfs.core.windows.net/fact_sales")
dim_product.write.format("delta").mode("overwrite").save(f"abfss://gold@{storage_account}.dfs.core.windows.net/dim_product")
dim_category.write.format("delta").mode("overwrite").save(f"abfss://gold@{storage_account}.dfs.core.windows.net/dim_category")
dim_store.write.format("delta").mode("overwrite").save(f"abfss://gold@{storage_account}.dfs.core.windows.net/dim_store")

## KPIs needed for business insights
1. Total Sales Revenue by Category
2. Top 10 Best-Selling Products
3. Average Price per Category
4. Total Sales by Country
5. Monthly Sales Trend
6. Top 5 Stores by Revenue
7. Total Sales Revenue

In [0]:
# 1. Total Sales Revenue by Category
total_sales_category_df = (
    fact_sales
    .groupBy("category_key")
    .agg(sum(col("revenue")).alias("total_sales_by_category"))
    .join(dim_category.select("category_key", "category_name"), on="category_key", how="left")
    .orderBy("total_sales_by_category", ascending=False)
    .select("category_key", "category_name", "total_sales_by_category")
)

total_sales_category_df.display()

In [0]:
# 2. Top 10 Best Selling Products
top_10_best_selling_prods = (
    fact_sales
    .groupBy("product_key")
    .agg(sum(col("quantity")).alias("total_quantity"))
    .join(dim_product.select("product_key", "product_name"), on="product_key", how="left")
    .select("product_key", "product_name", "total_quantity")
    .orderBy("total_quantity", ascending=False)
)

top_10_best_selling_prods.display()

In [0]:
# 3. Average Price by Category
avg_price_cat = (
    fact_sales
    .groupBy("category_key")
    .agg(round(avg(col("price")), 2).alias("avg_price"))
    .join(dim_category.select("category_key", "category_name"), on="category_key", how="left")
    .select("category_key", "category_name", "avg_price")
    .orderBy("category_key")
)

avg_price_cat.display()

In [0]:
# 4. Total Sales by Country
total_sales_country = (
    fact_sales
    .join(dim_store.select("store_key", "country"), on="store_key", how="left")
    .groupBy("country")
    .agg(sum(col("revenue")).alias("total_sales"))
    .select("country", "total_sales")
    .orderBy(col("total_sales").desc())
)

total_sales_country.display()

In [0]:
# 5. Monthly Sales Trend
monthly_sales_trend = (
    fact_sales
    .withColumn("sale_date", to_date(col("sale_date"), "yyyy-MM-dd"))  # Convert to proper date
    .withColumn("month_year", date_format(col("sale_date"), "yyyy-MM"))  # e.g., 2025-10
    .groupBy("month_year")
    .agg(sum(col("revenue")).alias("total_revenue"))
    .orderBy(col("month_year"))  # chronological order
)

monthly_sales_trend.display()

In [0]:
# 6. Top 5 Stores by Revenue
top_5_stores = (
    fact_sales
    .groupBy("store_key")
    .agg(sum(col("revenue")).alias("total_revenue"))
    .join(dim_store.select("store_key", "store_name"), on="store_key", how="left")
    .select("store_key", "store_name", "total_revenue")
    .orderBy(col("total_revenue").desc())
    .limit(5)
)

top_5_stores.display()

In [0]:
# 7. Total Sales Revenue
total_sales_revenue = (
    fact_sales
    .agg(sum(col('revenue')).alias("total_sales_revenue"))
)

total_sales_revenue.display()

In [0]:
# Write KPI DataFrames to the gold layer
total_sales_category_df.write.format("delta").mode("overwrite").save(f"{gold_path}/KPI/kpi_total_sales_by_category")
top_10_best_selling_prods.write.format("delta").mode("overwrite").save(f"{gold_path}/KPI/kpi_top_10_best_selling_products")
avg_price_cat.write.format("delta").mode("overwrite").save(f"{gold_path}/KPI/kpi_avg_price_per_category")
total_sales_country.write.format("delta").mode("overwrite").save(f"{gold_path}/KPI/kpi_total_sales_by_country")
monthly_sales_trend.write.format("delta").mode("overwrite").save(f"{gold_path}/KPI/kpi_monthly_sales_trend")
top_5_stores.write.format("delta").mode("overwrite").save(f"{gold_path}/KPI/kpi_top_5_stores_by_revenue")
total_sales_revenue.write.format("delta").mode("overwrite").save(f"{gold_path}/KPI/kpi_total_sales_revenue")