In [2]:
# ============================================================
# PART 1 â€” LOAD RAW CSVs
# ============================================================

df_sales = spark.read.csv("Files/raw/sales.csv", header=True, inferSchema=True)
df_products = spark.read.csv("Files/raw/products.csv", header=True, inferSchema=True)
df_stores = spark.read.csv("Files/raw/stores.csv", header=True, inferSchema=True)
df_customers = spark.read.csv("Files/raw/customers.csv", header=True, inferSchema=True)

display(df_sales)
display(df_products)
display(df_stores)
display(df_customers)



# ============================================================
# PART 2 â€” CLEANING
# ============================================================

# Remove duplicates
df_sales = df_sales.dropDuplicates()
df_products = df_products.dropDuplicates()
df_stores = df_stores.dropDuplicates()
df_customers = df_customers.dropDuplicates()


# ------------------------------------------------------------
# HANDLE MISSING PRICES
# Recommended: If UnitPrice is null, use CostPrice from products
# ------------------------------------------------------------
from pyspark.sql.functions import coalesce

df_sales = df_sales.alias("s").join(
    df_products.select("ProductID", "CostPrice").alias("p"),
    on="ProductID",
    how="left"
).select(
    "OrderID",
    "OrderDate",
    "StoreID",
    "ProductID",
    coalesce("s.UnitPrice", "p.CostPrice").alias("Price"),
    "Quantity"
)

display(df_sales)


# ------------------------------------------------------------
# CONVERT DATE FORMAT TO DATE TYPE
# ------------------------------------------------------------
from pyspark.sql.functions import to_date

df_sales = df_sales.withColumn(
    "OrderDate",
    to_date("OrderDate", "dd-MM-yyyy")
)

display(df_sales)


# ------------------------------------------------------------
# FILTER INVALID IDs (store/product)
# ------------------------------------------------------------
valid_stores = df_stores.select("StoreID").distinct()
df_sales = df_sales.join(valid_stores, on="StoreID", how="inner")

valid_products = df_products.select("ProductID").distinct()
df_sales = df_sales.join(valid_products, on="ProductID", how="inner")

display(df_sales)



# ============================================================
# PART 3 â€” SAVE CLEAN RAW TABLES TO LAKEHOUSE
# ============================================================
df_sales.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("sales")
df_products.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("products")
df_stores.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("stores")
# df_customers.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("customers")



# ============================================================
# PART 4 â€” ENABLE DELTA SCHEMA MERGE (FOR DIM + FACT TABLES)
# ============================================================

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")



# ============================================================
# PART 5 â€” DIMENSION TABLES
# ============================================================

# ------------------------------
# DIM PRODUCTS
# ------------------------------
df_dim_products = df_products.select(
    "ProductID",
    "ProductName",
    "Category",
    "CostPrice"
).dropDuplicates()

df_dim_products.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("dim_products")


# ------------------------------
# DIM STORES
# ------------------------------
df_dim_stores = df_stores.select(
    "StoreID",
    "StoreName",
    "City",
    "State"
).dropDuplicates()

df_dim_stores.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("dim_stores")


# ------------------------------
# DIM CUSTOMERS
# ------------------------------
# df_dim_customers = df_customers.select(
#     "CustomerID",
#     "CustomerName",
#     "Email",
#     "Phone"
# ).dropDuplicates()

# df_dim_customers.write.format("delta") \
#     .mode("overwrite") \
#     .option("overwriteSchema", "true") \
#     .saveAsTable("dim_customers")



# ============================================================
# PART 6 â€” FACT TABLE
# ============================================================
from pyspark.sql.functions import col
df_fact_sales = df_sales.withColumn(
    "TotalAmount",
    col("Quantity") * col("Price")
).select(
    "OrderID",
    "OrderDate",
    "StoreID",
    "ProductID",
    "Quantity",
    "Price",
    "TotalAmount"
)

df_fact_sales.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("fact_sales")



print("ðŸŽ‰ All dim and fact tables created successfully!")


StatementMeta(, b13c5d4f-ba8b-415c-bef3-fc3bb31f07e0, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 62350739-e0ff-4022-8d2f-c920cb0b6762)

SynapseWidget(Synapse.DataFrame, 60eb267b-c01c-46e5-8147-95e6f7a1a35e)

SynapseWidget(Synapse.DataFrame, 7b933e41-7529-4947-a776-9db652462cad)

SynapseWidget(Synapse.DataFrame, 0ad5bfc4-d8aa-4fd9-87ac-ac0c62b9c1bc)

SynapseWidget(Synapse.DataFrame, ecb9ca85-adac-4b54-bd84-d15f3e97d75c)

SynapseWidget(Synapse.DataFrame, cb2bc011-6856-4d5a-9f50-92ce703bd72b)

SynapseWidget(Synapse.DataFrame, f2c6e160-2ad4-4b8b-bd87-4feb2081f582)

ðŸŽ‰ All dim and fact tables created successfully!
