In [118]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, monotonically_increasing_id, when, broadcast)
from notebookUtils import readCsvAsDataframe
from nb import check_fact_dim_join

In [109]:
spark = SparkSession.builder.appName("factsales2").getOrCreate()

In [110]:
fact_sales = readCsvAsDataframe(spark,"fact_sales_data_v2.csv")

In [111]:
display(fact_sales)

DataFrame[ProductCategory: string, ProductName: string, Brand: string, StoreRegion: string, StoreName: string, StoreType: string, SalesRep: string, Department: string, EmployeeRole: string, UnitsSold: double, UnitPrice: double, Discount: double, SaleDate: date]

In [112]:
fact_sales2 = fact_sales.fillna({
    "UnitsSold": 0,
    "UnitPrice": 0,
    "ProductName":"N/A",
    "Discount":0
})

In [113]:
fact_sales2.show(30)

+---------------+-----------+------+-----------+---------+---------+--------------+-----------+---------------+---------+---------+--------+----------+
|ProductCategory|ProductName| Brand|StoreRegion|StoreName|StoreType|      SalesRep| Department|   EmployeeRole|UnitsSold|UnitPrice|Discount|  SaleDate|
+---------------+-----------+------+-----------+---------+---------+--------------+-----------+---------------+---------+---------+--------+----------+
|      Furniture|    T-shirt|BrandB|       East|   StoreX|Franchise|   Martha Long|Electronics|        Cashier|     12.0|     -1.0|     5.0|2022-12-14|
|       Clothing|     Tablet|BrandC|       East|   StoreZ|Franchise|   Martha Long|       Home|Sales Associate|      0.0|   272.49|     0.0|2023-02-24|
|       Clothing|     Tablet|BrandA|      South|   StoreX|   Retail| Emily Vazquez|    Apparel|        Cashier|      0.0|   484.75|    15.0|2025-03-24|
|    Electronics| Smartphone|BrandB|       West|   StoreY|   Outlet|Charles Fields|    A

In [114]:
fact_sales3 = fact_sales2.withColumn(
    "UnitPrice",
    when(col("UnitPrice") < 0, 0).otherwise(col("UnitPrice"))
)

In [115]:
dim_product = spark.read.parquet("spark-warehouse/dim_product");

In [119]:
fact_sales4 = fact_sales3.join(
    broadcast(dim_product),
    (fact_sales3.ProductName == dim_product.Title) & (fact_sales3.ProductCategory == dim_product.Category) & (fact_sales3.Brand == dim_product.Brand),
    "left"
).select(
    fact_sales3["*"],
    dim_product["ProductKey"]
)

In [121]:
# see Broadcast join in query plan
fact_sales4.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [ProductCategory#2672, ProductName#2711, Brand#2674, StoreRegion#2675, StoreName#2676, StoreType#2677, SalesRep#2678, Department#2679, EmployeeRole#2680, UnitsSold#2712, UnitPrice#2795, Discount#2714, SaleDate#2684, ProductKey#2812L]
   +- BroadcastHashJoin [ProductName#2711, ProductCategory#2672, Brand#2674], [Title#2809, Category#2810, Brand#2811], LeftOuter, BuildRight, false
      :- Project [ProductCategory#2672, ProductName#2711, Brand#2674, StoreRegion#2675, StoreName#2676, StoreType#2677, SalesRep#2678, Department#2679, EmployeeRole#2680, UnitsSold#2712, CASE WHEN (UnitPrice#2713 < 0.0) THEN 0.0 ELSE UnitPrice#2713 END AS UnitPrice#2795, Discount#2714, SaleDate#2684]
      :  +- Project [ProductCategory#2672, coalesce(ProductName#2673, N/A) AS ProductName#2711, Brand#2674, StoreRegion#2675, StoreName#2676, StoreType#2677, SalesRep#2678, Department#2679, EmployeeRole#2680, coalesce(nanvl(UnitsSold#2681, null), 0.

In [122]:
dim_store = spark.read.parquet("spark-warehouse/dim_store");

In [123]:
fact_sales5 = fact_sales4.join(
    dim_store,
    (fact_sales4.StoreRegion == dim_store.Region) & (fact_sales4.StoreName == dim_store.Name) & (fact_sales4.StoreType == dim_store.Type),
    "left"
).select(
    fact_sales4["*"],
    dim_store["StoreKey"]
)

In [124]:
fact_sales5.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [ProductCategory#2672, ProductName#2711, Brand#2674, StoreRegion#2675, StoreName#2676, StoreType#2677, SalesRep#2678, Department#2679, EmployeeRole#2680, UnitsSold#2712, UnitPrice#2795, Discount#2714, SaleDate#2684, ProductKey#2812L, StoreKey#2869L]
   +- BroadcastHashJoin [StoreRegion#2675, StoreName#2676, StoreType#2677], [Region#2866, Name#2867, Type#2868], LeftOuter, BuildRight, false
      :- Project [ProductCategory#2672, ProductName#2711, Brand#2674, StoreRegion#2675, StoreName#2676, StoreType#2677, SalesRep#2678, Department#2679, EmployeeRole#2680, UnitsSold#2712, UnitPrice#2795, Discount#2714, SaleDate#2684, ProductKey#2812L]
      :  +- BroadcastHashJoin [ProductName#2711, ProductCategory#2672, Brand#2674], [Title#2809, Category#2810, Brand#2811], LeftOuter, BuildRight, false
      :     :- Project [ProductCategory#2672, ProductName#2711, Brand#2674, StoreRegion#2675, StoreName#2676, StoreType#2677, SalesRep#2

In [39]:
dim_employee = spark.read.parquet("spark-warehouse/dim_employee");

In [49]:
f_sales = fact_sales5.join(
    dim_employee,
    (fact_sales5.SalesRep == dim_employee.SalesRep) & (fact_sales5.Department == dim_employee.Department) & (fact_sales5.EmployeeRole == dim_employee.EmployeeRole),
    "left"
).select(
    fact_sales5["*"],
    dim_employee["EmployeeKey"]
)

In [50]:
f_sales.show(30)

+---------------+-----------+------+-----------+---------+---------+--------------+-----------+---------------+---------+---------+--------+----------+----------+--------+-----------+
|ProductCategory|ProductName| Brand|StoreRegion|StoreName|StoreType|      SalesRep| Department|   EmployeeRole|UnitsSold|UnitPrice|Discount|  SaleDate|ProductKey|StoreKey|EmployeeKey|
+---------------+-----------+------+-----------+---------+---------+--------------+-----------+---------------+---------+---------+--------+----------+----------+--------+-----------+
|      Furniture|    T-shirt|BrandB|       East|   StoreX|Franchise|   Martha Long|Electronics|        Cashier|     12.0|      0.0|     5.0|2022-12-14|        12|       7|         21|
|       Clothing|     Tablet|BrandC|       East|   StoreZ|Franchise|   Martha Long|       Home|Sales Associate|      0.0|   272.49|     0.0|2023-02-24|         0|       9|         11|
|       Clothing|     Tablet|BrandA|      South|   StoreX|   Retail| Emily Vazqu

In [51]:
f_sales_clean = f_sales.fillna({
    "ProductKey":-1,
    "EmployeeKey":-1,
    "StoreKey":-1
})

In [52]:
f_sales2 = f_sales_clean.withColumn("NetRevenue", col('UnitsSold') * col('UnitPrice') * (1 - col('Discount')/100))

In [58]:
f_sales_final = f_sales2.drop(col('ProductName'),col('ProductCategory'), col('Brand'),col('StoreRegion'),col('StoreType'), col('StoreName'),col('EmployeeRole'),col('SalesRep'),col('Department'))

In [60]:
f_sales_final.write.format("parquet").mode("overwrite").saveAsTable(name="fact_sales")

### Testing : Check all IDs in FACT join with DIM or not

In [61]:
df_sales_get = spark.read.parquet("spark-warehouse/fact_sales")

In [72]:
dim_product = spark.read.parquet("spark-warehouse/dim_product")

In [75]:
is_valid, unmatched_df = check_fact_dim_join(df_sales_get, dim_product, "ProductKey")
if is_valid:
    print("All ProductKey values match.")
else:
    print("ome ProductKey values not found.")
    unmatched_df.show()

Validation Passed: All ProductKey values match.


In [76]:
dim_store = spark.read.parquet("spark-warehouse/dim_store")

In [91]:
is_valid, unmatched_df = check_fact_dim_join(df_sales_get, dim_store, "StoreKey")
if is_valid:
    print("All StoreKey values match.")
else:
    print("Some StoreKey values not found.")
    unmatched_df.show()

All StoreKey values match.


In [79]:
dim_employee = spark.read.parquet("spark-warehouse/dim_employee")

In [90]:
is_valid, unmatched_df = check_fact_dim_join(df_sales_get, dim_employee, "EmployeeKey")
if is_valid:
    print("All EmployeeKey values match.")
else:
    print("Some EmployeeKey values not found.")
    unmatched_df.show()

All EmployeeKey values match.


#### Manually adding fail case scenario

In [106]:
df_sales_invalid = spark.createDataFrame([
    (10, 10, 10, "2020-10-20", 99, 99, 99, 9999)
],["UnitsSold","UnitPrice","Discount", "SaleDate", "ProductKey","StoreKey","EmployeeKey","NetRevenue"])
df_sales_inv = df_sales_get.unionAll(df_sales_invalid)

In [105]:
is_valid, unmatched_df = check_fact_dim_join(df_sales_inv, dim_employee, "EmployeeKey")
if is_valid:
    print("All EmployeeKey values match.")
else:
    print("Some EmployeeKey values not found.")
    unmatched_df.show()

Some EmployeeKey values not found.
+-----------+
|EmployeeKey|
+-----------+
|         99|
+-----------+

