In [36]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import os
from pyspark.sql.functions import col
from datetime import date,timedelta
import pandas as pd
import datetime as dt

In [4]:
spark = SparkSession.builder.appName("Build Presentation Layer").getOrCreate()

In [30]:
base_path = r"C:\Users\vasudha.tanniru\Documents\GitHub\data_projects\retail_data_warehouse"
presentation_path = os.path.join(base_path, "warehouse", "presentation")
core_path = os.path.join(base_path,"warehouse","core")

In [11]:
start_date = date(2016,1,1)
end_date = date(2018,12,31)

In [12]:
start_date

datetime.date(2016, 1, 1)

In [13]:
dates = pd.date_range(start=start_date,end=end_date)


In [15]:
dim_date_pd = pd.DataFrame({"date":dates})

In [23]:
dim_date_pd["date_key"] = dim_date_pd["date"].dt.strftime("%Y%m%d").astype(int)
dim_date_pd["year"] = dim_date_pd["date"].dt.year
dim_date_pd["month"] = dim_date_pd["date"].dt.month
dim_date_pd["month_name"] = dim_date_pd["date"].dt.month_name()
dim_date_pd["day"] = dim_date_pd["date"].dt.day
dim_date_pd["day_name"] = dim_date_pd["date"].dt.day_name()
dim_date_pd["week_of_year"] = dim_date_pd["date"].dt.isocalendar().week
dim_date_pd["quarter"] = dim_date_pd["date"].dt.quarter
dim_date_pd["is_weekend"] = dim_date_pd["day_name"].isin(["Saturday","Sunday"])

In [24]:
dim_date = spark.createDataFrame(dim_date_pd)

In [25]:
dim_date = dim_date.select( "date_key", "date", "year", "quarter", "month", "month_name",
    "day", "day_name", "week_of_year", "is_weekend"
)

In [26]:
dim_date.write.mode("overwrite").parquet(os.path.join(presentation_path, "dim_date"))

In [32]:
core_customers = spark.read.parquet(os.path.join(core_path,"core_customers"))

In [33]:
dim_customer = (core_customers.dropDuplicates(["customer_id"]))

In [34]:
dim_customer = dim_customer.withColumn("customer_sk",f.abs(f.xxhash64("customer_id")).cast("long"))

In [37]:
load_date = dt.date.today().strftime("%Y-%m-%d")

In [38]:
dim_customer = (
                dim_customer
                .withColumn("effective_date",f.to_date(f.lit(load_date)))
                .withColumn("end_date",f.to_date(f.lit("9999-12-31")))
                .withColumn("is_current",f.lit("Y"))
)

In [39]:
dim_customer = dim_customer.select ( "customer_sk",
    "customer_id",
    "customer_unique_id",
    "customer_zip_code_prefix",
    "customer_city",
    "customer_state",
    "effective_date",
    "end_date",
    "is_current")

In [40]:
dim_customer.write.mode("overwrite").parquet(os.path.join(presentation_path,"dim_customer"))

In [41]:
dim_customer.show()

+-------------------+--------------------+--------------------+------------------------+-------------------+--------------+--------------+----------+----------+
|        customer_sk|         customer_id|  customer_unique_id|customer_zip_code_prefix|      customer_city|customer_state|effective_date|  end_date|is_current|
+-------------------+--------------------+--------------------+------------------------+-------------------+--------------+--------------+----------+----------+
|1315512242788300289|00050bf6e01e69d5c...|e3cf594a99e810f58...|                   98700|               ijui|            RS|    2025-10-31|9999-12-31|         Y|
|3618006856839986048|000598caf2ef41174...|7e0516b486e92ed3f...|                   35540|           oliveira|            MG|    2025-10-31|9999-12-31|         Y|
|1844536163949507448|0013cd8e350a7cc76...|334fed5abcee3aa96...|                    3585|          sao paulo|            SP|    2025-10-31|9999-12-31|         Y|
|7233620935377480045|0015bc9fd2d53

In [42]:
core_sellers = spark.read.parquet(os.path.join(core_path,"core_sellers"))

In [43]:
dim_seller = core_sellers.dropDuplicates(["seller_id"])

In [44]:
dim_seller = core_sellers.withColumn("seller_sk",f.abs(f.xxhash64("seller_id")).cast("long"))

In [45]:
dim_seller = (
                dim_seller
                .withColumn("effective_date",f.to_date(f.lit(load_date)))
                .withColumn("end_date",f.to_date(f.lit("9999-12-31")))
                .withColumn("is_current",f.lit("Y"))
)

In [46]:
dim_seller = dim_seller.select(
    "seller_sk",
    "seller_id",
    "seller_zip_code_prefix",
    "seller_city",
    "seller_state",
    "effective_date",
    "end_date",
    "is_current"
)

In [47]:
dim_seller.write.mode("overwrite").parquet(os.path.join(presentation_path,"dim_seller"))

In [48]:
dim_seller.show(5)
print("Row count:", dim_seller.count())


+-------------------+--------------------+----------------------+-------------+------------+--------------+----------+----------+
|          seller_sk|           seller_id|seller_zip_code_prefix|  seller_city|seller_state|effective_date|  end_date|is_current|
+-------------------+--------------------+----------------------+-------------+------------+--------------+----------+----------+
|8430555222707066092|f6122bc84774df1b3...|                  6436|      barueri|          SP|    2025-10-31|9999-12-31|         Y|
|7089285264522450137|8132b9bd16876e1b0...|                 88058|florianopolis|          SC|    2025-10-31|9999-12-31|         Y|
|7089285264522450137|8132b9bd16876e1b0...|                 88058|florianopolis|          SC|    2025-10-31|9999-12-31|         Y|
|4451240133718641123|a254c682cc01e119f...|                 16500|   cafelandia|          SP|    2025-10-31|9999-12-31|         Y|
| 981196736987199410|cb9fb4ca75d7ba843...|                  3551|    sao paulo|          S

In [62]:
core_products = spark.read.parquet(os.path.join(core_path,"core_products"))

In [63]:
dim_product = core_products.dropDuplicates(["product_id"])

In [64]:
dim_product = dim_product.withColumn("product_sk",f.abs(f.xxhash64("product_id")).cast("long"))

In [68]:
dim_product = dim_product.withColumn(
    "product_sk",
    f.abs(f.xxhash64("product_id")).cast("long")
)

In [69]:
dim_product.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-------------------+--------------+----------+----------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|         product_sk|effective_date|  end_date|is_current|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+-------------------+--------------+----------+----------+
|00066f42aeeb9f300...|           perfumaria|                 53|                       596|                 6|             300|               20|               16|              16|6126185559987931235|    2025-10-31|9999-12-31|         Y|
|00088930e925c41fd...|           automotivo|    

In [71]:
dim_product = dim_product.select(
    "product_sk",
    "product_id",
    "product_category_name",
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm",
    "effective_date",
    "end_date",
    "is_current"
)

In [72]:
dim_product.write.mode("overwrite").parquet(os.path.join(presentation_path, "dim_product"))

In [73]:
core_orders = spark.read.parquet(os.path.join(core_path,"core_orders"))

In [74]:
dim_customer_curr = dim_customer.filter(F.col("is_current") == "Y").select("customer_id", "customer_sk")
dim_seller_curr = dim_seller.filter(F.col("is_current") == "Y").select("seller_id", "seller_sk")
dim_product_curr = dim_product.filter(F.col("is_current") == "Y").select("product_id", "product_sk")

In [75]:
fact_order_items = (core_orders
                   .join(dim_customer_curr,on="customer_id",how="left")
                   .join(dim_seller_curr,on="seller_id",how="left")
                   .join(dim_product_curr,on="product_id",how="left")
                   )

In [76]:
fact_order_items = fact_order_items.withColumn(
                    "order_date_key",
                    f.date_format("order_purchase_timestamp","yyyyMMdd").cast("int")
)

In [77]:
fact_order_items = fact_order_items.select(
    "order_id",
    "order_date_key",
    "customer_sk",
    "seller_sk",
    "product_sk",
    "price",
    "freight_value",
    "total_payment_value",
    "order_status",
    "avg_review_score"
)




In [78]:
fact_order_items.write.mode("overwrite").parquet(os.path.join(presentation_path, "fact_order_items"))

In [79]:
fact_order_items.show(5)
print("Row count:", fact_order_items.count())


+--------------------+--------------+-------------------+-------------------+-------------------+-----+-------------+-------------------+------------+----------------+
|            order_id|order_date_key|        customer_sk|          seller_sk|         product_sk|price|freight_value|total_payment_value|order_status|avg_review_score|
+--------------------+--------------+-------------------+-------------------+-------------------+-----+-------------+-------------------+------------+----------------+
|60550084e6b4c0cb8...|      20180221|4357242971434401747|2217727985610372786|4675076524623699621| 39.9|        26.89|              66.79|   delivered|             4.0|
|6bb1e842418aac0c9...|      20180211|5620935395720227588|5949656149277273052|3574052695948403021| 15.9|        16.79|              32.69|   delivered|             4.0|
|1f6405caa14a2debb...|      20171226| 763048609861940720|3201134900678085194|8621891304593567076|133.0|        15.68|             148.68|   delivered|          

In [82]:
fact_orders = (
    fact_order_items
    .groupBy("order_id")
    .agg(
        f.first("order_date_key").alias("order_date_key"),
        f.first("customer_sk").alias("customer_sk"),
        f.countDistinct("product_sk").alias("num_products"),
        f.sum("price").alias("total_price"),
        f.sum("freight_value").alias("total_freight"),
        f.sum("total_payment_value").alias("total_payment_value"),
        f.avg("avg_review_score").alias("avg_review_score"),
        f.first("order_status").alias("order_status")
    )
)

In [83]:
fact_orders = fact_orders.withColumn(
    "total_order_value", f.col("total_price") + f.col("total_freight")
)

In [84]:
fact_orders.write.mode("overwrite").parquet(os.path.join(presentation_path, "fact_orders"))

In [85]:
fact_orders.show(5)
print("Row count:", fact_orders.count())


+--------------------+--------------+-------------------+------------+-----------+-------------+-------------------+----------------+------------+------------------+
|            order_id|order_date_key|        customer_sk|num_products|total_price|total_freight|total_payment_value|avg_review_score|order_status| total_order_value|
+--------------------+--------------+-------------------+------------+-----------+-------------+-------------------+----------------+------------+------------------+
|00018f77f2f0320c5...|      20170426|8926991648247086088|           1|      239.9|        19.93|             259.83|             4.0|   delivered|            259.83|
|00042b26cf59d7ce6...|      20170204| 140999376642671625|           1|      199.9|        18.14|             218.04|             5.0|   delivered|218.04000000000002|
|00054e8431b9d7675...|      20171210|2520559504703609822|           1|       19.9|        11.85|              31.75|             4.0|   delivered|             31.75|
|000

In [88]:
# Count total rows
print("fact_order_items:", spark.read.parquet(os.path.join(presentation_path, "fact_order_items")).count())
print("fact_orders:", spark.read.parquet(os.path.join(presentation_path, "fact_orders")).count())

# Check for null SKs (join key validation)
fact_order_items = spark.read.parquet(os.path.join(presentation_path, "fact_order_items"))
fact_order_items.select([
    f.count(f.when(f.col("customer_sk").isNull(), 1)).alias("null_customer_sk"),
    f.count(f.when(f.col("seller_sk").isNull(), 1)).alias("null_seller_sk"),
    f.count(f.when(f.col("product_sk").isNull(), 1)).alias("null_product_sk"),
    f.count(f.when(f.col("order_date_key").isNull(), 1)).alias("null_date_key")
]).show()


fact_order_items: 116129
fact_orders: 98665
+----------------+--------------+---------------+-------------+
|null_customer_sk|null_seller_sk|null_product_sk|null_date_key|
+----------------+--------------+---------------+-------------+
|               0|             0|              0|            0|
+----------------+--------------+---------------+-------------+



In [89]:

# Verify number of unique orders in fact_order_items == number of rows in fact_orders
unique_orders = fact_order_items.select("order_id").distinct().count()
print(f"Unique orders in fact_order_items: {unique_orders}")
print(f"Rows in fact_orders: {fact_orders.count()}")


Unique orders in fact_order_items: 98665
Rows in fact_orders: 98665


In [90]:
missing_customers = (
    fact_order_items.select("customer_sk").distinct()
    .join(dim_customer.select("customer_sk"), on="customer_sk", how="left_anti")
    .count()
)
print("Missing customer_sk in dim_customer:", missing_customers)

Missing customer_sk in dim_customer: 0


In [92]:

fact_orders.groupBy(f.year(f.to_date(f.col("order_date_key").cast("string"), "yyyyMMdd"))).count().show()


fact_order_items.groupBy("product_sk").agg(f.sum("price").alias("total_sales")).orderBy(f.desc("total_sales")).show(5)


+-------------------------------------------------------+-----+
|year(to_date(CAST(order_date_key AS STRING), yyyyMMdd))|count|
+-------------------------------------------------------+-----+
|                                                   2018|53775|
|                                                   2016|  311|
|                                                   2017|44579|
+-------------------------------------------------------+-----+

+-------------------+------------------+
|         product_sk|       total_sales|
+-------------------+------------------+
|4651157491581199490|102455.96000000004|
|3834202346976227443|           63885.0|
|7588894187677470592| 54730.20000000012|
|3164455214278113927|48899.340000000004|
| 607073365047566499| 47214.50999999999|
+-------------------+------------------+
only showing top 5 rows


In [93]:
fact_orders.printSchema()
fact_order_items.printSchema()


root
 |-- order_id: string (nullable = true)
 |-- order_date_key: integer (nullable = true)
 |-- customer_sk: long (nullable = true)
 |-- num_products: long (nullable = false)
 |-- total_price: double (nullable = true)
 |-- total_freight: double (nullable = true)
 |-- total_payment_value: double (nullable = true)
 |-- avg_review_score: double (nullable = true)
 |-- order_status: string (nullable = true)
 |-- total_order_value: double (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- order_date_key: integer (nullable = true)
 |-- customer_sk: long (nullable = true)
 |-- seller_sk: long (nullable = true)
 |-- product_sk: long (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- total_payment_value: double (nullable = true)
 |-- order_status: string (nullable = true)
 |-- avg_review_score: double (nullable = true)



In [94]:
# Join one fact with dims
sample = (
    fact_orders
    .join(dim_customer, on="customer_sk", how="inner")
    .join(dim_date, fact_orders.order_date_key == dim_date.date_key)
    .select("customer_city", "year", "total_order_value")
)
sample.show(5)


+--------------------+----+-----------------+
|       customer_city|year|total_order_value|
+--------------------+----+-----------------+
|           barbacena|2018|           135.59|
|               irece|2018|            360.4|
|             ribeira|2018|           168.48|
|               belem|2017|87.75999999999999|
|sao jose do rio p...|2018|           513.22|
+--------------------+----+-----------------+
only showing top 5 rows
