# Silver Layer — Dimensions & Fact Stream (Databricks)

**Goal:** Clean and standardise the Bronze sales file, then produce Silver-ready dimension tables and a fact stream for Gold modelling.

**Outputs (Delta):** `DimDate`, `DimCategory`, `DimProduct`, `FactStream`

In [0]:
%load_ext autoreload
%autoreload 2

### PySpark Import & Utilities functions

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:
# Add project root to Python path so shared transformation utilities can be reused
import os
import sys

project_path = os.path.join(os.getcwd(), "..","..")
sys.path.append(project_path)
from utils.transformations import *

## 1) Source ingestion & Transformed (Auto Loader / streaming)

In [0]:
# Read Bronze files as a stream (Auto Loader)
df_sales = spark.readStream.format("cloudFiles")\
                .option("cloudFiles.format", "parquet")\
                .option("cloudFiles.schemaLocation","abfss://silver@storagepigments.dfs.core.windows.net/pos_prd_sales/checkpoint")\
                .option("schemaEvolutionMode", "addNewColumns")\
                .load("abfss://bronze@storagepigments.dfs.core.windows.net/pos_prd_sales")

In [0]:
df_sales = df_sales.withColumn(
    "itemization_type",
    when(col("itemization_type").isNull(), None)
    .when(lower(trim(col("itemization_type"))) == "null", None)
    .when(regexp_replace(col("itemization_type"), r"[\s\u00A0]+", "") == "", None)
    .otherwise(col("itemization_type"))
)


In [0]:
df_sales = (
    df_sales
    .drop("_rescued_data")
    # ------------------------
    # Date Time convertions
    # ------------------------
    .withColumn(
        "date_col",
        to_date(col("date_col"), "d/MM/yyyy")
    )
    .withColumn(
        "time_col",
        date_format(
            to_timestamp(
                when(col("time_col").rlike(r"^\d:\d{2}:\d{2}$"),
                     concat(lit("0"), col("time_col")))
                .otherwise(col("time_col")),
                "HH:mm:ss"
            ),
            "HH:mm:ss"
        )
    )
    .withColumn(
        "updated_at",
        date_format(
            to_timestamp(col("updated_at")),
            "yyyy-MM-dd HH:mm:ss"
        )
    )
    # -----------------------
    # Numeric conversions 
    # -----------------------
    .withColumn("qty", col("qty").cast("int"))
    .withColumn("product_sales", regexp_replace(col("product_sales"), "[$,]", "").cast("double"))
    .withColumn("discounts", regexp_replace(col("discounts"), "[$,]", "").cast("double"))
    .withColumn("net_sales", regexp_replace(col("net_sales"), "[$,]", "").cast("double"))
)

In [0]:
df_sales = (
    df_sales
    # ---------------------------
    # Itemization Type override 
    # ---------------------------
    .withColumn(
        "itemization_type",
        when(
            (abs(col("product_sales")) - abs(col("discounts")) == 0)
            & (col("qty") > 0),
            "Cash Payment"
        )
        .when(col("item_name").like("Order%"), "Physical Good")
        .when(col("itemization_type").isNull() & (col("item_name") != "Custom Amount"), "Booking")
        .when(col("item_name") == "Custom Amount", "Physical Good")
        .when(
            col("itemization_type").isNull() | (trim(col("itemization_type")) == "")
            & ~col("item_name").like("%Gift%Card%"),
            "Service"
        )
        .otherwise(col("itemization_type"))
    )
)

In [0]:
df_sales = (
    df_sales
    # --------------------
    # Category override 
    # --------------------
    .withColumn(
        "category",
        when((col("net_sales") < 0) | (col("qty") < 0), "Refund")
        .when(col("itemization_type") == "Booking", "Service")
        .when(col("item_name") == "Custom Amount", "Figurines")
        .when(col("item_name").like("%Gift%Card%"), "Gift Card")

        .when(col("item_name").isin("Imperfect", "Damaged Item"), "Figurines")
        .when(col("item_name") == "Soy Milk", "Beverages")
        .when(col("item_name") == "Booking", "Service")
        .when(col("item_name") == "Bottle - Sting", "Beverages")
        .when(col("item_name").like("Drink%"), "Beverages")
        .when(col("item_name").like("Energy Drink%"), "Beverages")
        .when(col("item_name") == "Extra - Colour Tube", "Accessories")
        .when(col("item_name") == "Return to Paint", "Service")
        .when(col("item_name").like("Tea %"), "Beverages")
        .when(col("item_name").isin("Extra Paints", "Take-home kit", "Glow Serum"), "Accessories")
        .when(col("item_name").isin("Gloss Varnish", "Gloss", "Repeat painting"), "Service")

        .when(~col("category").isin("Accessories", "Service", "Beverages"), "Figurines")
        .otherwise(col("category"))
    )
)

In [0]:
df_sales = (
    df_sales.
    # ------------------
    # Clean Item name 
    # ------------------
    withColumn(
        "item_name",
        regexp_replace(
            col("item_name"),
            r"\s*-\s*DIY\s+(Plaster\s+)?Figurine\s+Painting(\s+Kit)?\s*$",
            ""
        )
    )
    .withColumn(
        "item_name",
        when(col("item_name") == "Custom Amount", "New Product")
        .otherwise(col("item_name"))
    )
)

In [0]:
# Write transformed data to the Silver layer as Delta tables (streaming).
df_sales.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storagepigments.dfs.core.windows.net/pos_prd_sales/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storagepigments.dfs.core.windows.net/pos_prd_sales/data")\
    .toTable("pigments_cata.silver.pos_prd_sales")
    

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f8535780500>

## 2) Build Dimension & Fact Tables (Streaming)

### `DimDate`

In [0]:
# Apply utility functions
schema_name = name_changed()
df_dim_date = schema_name.dim(df_sales, ["date"])

schema_key = table_key()
df_dim_date = schema_key.add_date_key(df_dim_date)

# Derive standard calendar attributes
df_dim_date = (
    df_dim_date
    .dropna()
    .withColumn("day", dayofmonth(col("date")))
    .withColumn("month", month(col("date")))
    .withColumn("year", year(col("date")))
    .withColumn("weekday", date_format(col("date"), "EEEE"))  # Monday, Tuesday...
    .withColumn("updated_at", current_timestamp())
)

# Final Date dimension schema
df_dim_date = df_dim_date.select(
    "date_key",
    "date",
    "day",
    "month",
    "year",
    "weekday",
    "updated_at"
)

date_key,date,day,month,year,weekday,updated_at
20250216,2025-02-16,16,2,2025,Sunday,2025-12-29T05:21:19.883Z
20240918,2024-09-18,18,9,2024,Wednesday,2025-12-29T05:21:19.883Z
20250629,2025-06-29,29,6,2025,Sunday,2025-12-29T05:21:19.883Z
20240827,2024-08-27,27,8,2024,Tuesday,2025-12-29T05:21:19.883Z
20240612,2024-06-12,12,6,2024,Wednesday,2025-12-29T05:21:19.883Z
20240604,2024-06-04,4,6,2024,Tuesday,2025-12-29T05:21:19.883Z
20240530,2024-05-30,30,5,2024,Thursday,2025-12-29T05:21:19.883Z
20250608,2025-06-08,8,6,2025,Sunday,2025-12-29T05:21:19.883Z
20250215,2025-02-15,15,2,2025,Saturday,2025-12-29T05:21:19.883Z
20250201,2025-02-01,1,2,2025,Saturday,2025-12-29T05:21:19.883Z


In [0]:
df_dim_date.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storagepigments.dfs.core.windows.net/DimDate/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storagepigments.dfs.core.windows.net/DimDate/data")\
    .toTable("pigments_cata.silver.DimDate")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f84fc4fdb20>

### `DimCategory`

In [0]:
schema_name = name_changed()
df_dim_category = schema_name.dim(df_sales, ["category", "type"])
    
schema_key = table_key()
df_dim_category = schema_key.add_cat_id(df_dim_category)

df_dim_category = (
    df_dim_category
    .dropna()
    .withColumn("updated_at", current_timestamp())
)

df_dim_category = df_dim_category.select(
    "cat_id", 
    "category", 
    "type", 
    "updated_at"
)

display(df_dim_category)

cat_id,category,type,updated_at
FIPH,Figurines,Physical Good,2025-12-29T05:21:32.882Z
BEPH,Beverages,Physical Good,2025-12-29T05:21:32.882Z
REPH,Refund,Physical Good,2025-12-29T05:21:32.882Z
RESE,Refund,Service,2025-12-29T05:21:32.882Z
SESE,Service,Service,2025-12-29T05:21:32.882Z
ACCA,Accessories,Cash Payment,2025-12-29T05:21:32.882Z
FICA,Figurines,Cash Payment,2025-12-29T05:21:32.882Z
ACPH,Accessories,Physical Good,2025-12-29T05:21:32.882Z
SEBO,Service,Booking,2025-12-29T05:21:32.882Z
SECA,Service,Cash Payment,2025-12-29T05:21:32.882Z


In [0]:
df_dim_category.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storagepigments.dfs.core.windows.net/DimCategory/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storagepigments.dfs.core.windows.net/DimCategory/data")\
    .toTable("pigments_cata.silver.DimCategory")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f852cf83a40>

### **`DimProduct`**

In [0]:
schema_name = name_changed()
df_dim_product = schema_name.dim(df_sales, ["category", "type", "product_name"])

schema_key = table_key()
df_dim_product = schema_key.add_product_id(df_dim_product)
df_dim_product = schema_key.add_cat_id(df_dim_product)

df_dim_product = (
    df_dim_product
    .dropna()
    .withColumn("updated_at", current_timestamp())
)

df_dim_product = df_dim_product.select(
    "product_id", 
    "product_name", 
    "cat_id", 
    "updated_at"
)


product_id,product_name,cat_id,updated_at
6f19fda1191385082713a7e80399cc31c62090dcff21a37b957f4804c4c83be8,Bear Stick Rainbow Heart,FICA,2025-12-29T05:21:39.122Z
8fa168695b5f42d6bbd050127098783a60c300392aaee5d575e41c2b22d3617e,Capybara on the Toilet,FICA,2025-12-29T05:21:39.122Z
47565921687ef7a3be2603f79cea2a37d518b5457251fd6527802bd52830c274,Raws Furry,FIPH,2025-12-29T05:21:39.122Z
9b6280736979669017b9357e3a8fbe1090a2002f3eb39463ab36bd54bea2c181,Astro Bear,REPH,2025-12-29T05:21:39.122Z
ecdbea0330ddfbd101c03046dc29a9cae0e94f4bd481f7a1e65fd3b7fc18a6e4,Ltd Ed - Headphone Doremon,FIPH,2025-12-29T05:21:39.122Z
60417b7e1bfcdcc937e40f5a09f40c7117e307d1caa181864a297d19c380b601,Shin the Durian,FIPH,2025-12-29T05:21:39.122Z
47f4b4d2768fb961865c495b3937079311f4e8d64468348f1c5f09dfacaf73b7,Pok the Bulb,FICA,2025-12-29T05:21:39.122Z
9e98bf4fc074a67db8a3c0d5326411d791f06ae156ffc3300a296d598bdb5c42,Larva,FIPH,2025-12-29T05:21:39.122Z
a263d21235934d60d062e631ea49d1d55e67e487b7a69d6e5ca8fbc329fd8f1f,Monster - Suli & Mike Small,FICA,2025-12-29T05:21:39.122Z
913a7e09487bea095a93c2f08ca00dee450285c61350a77af65c9f21adaa4a9c,Larva - Small B,REPH,2025-12-29T05:21:39.122Z


In [0]:
df_dim_product.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storagepigments.dfs.core.windows.net/DimProduct/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storagepigments.dfs.core.windows.net/DimProduct/data")\
    .toTable("pigments_cata.silver.DimProduct")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f852c4b4200>

### **`FactStream`**

In [0]:
schema_name = name_changed()
df_fact_stream = schema_name.fact_sales(df_sales)

schema_key = table_key()
df_fact_stream = schema_key.apply_all(df_fact_stream)

df_fact_stream = (
    df_fact_stream
    .dropna() 
    .withColumn("updated_at", current_timestamp())
)

df_fact_stream = df_fact_stream.select( 
            col("stream_id"), 
            col("order_id"), 
            col("product_id"), 
            col("cat_id"), 
            col("date_key"), 
            col("transaction_time"), 
            col("quantity"), 
            col("sales_amount"), 
            col("product_price"), 
            col("discount_amount"),
            col("updated_at")
            ) 

stream_id,order_id,product_id,cat_id,date_key,transaction_time,quantity,sales_amount,product_price,discount_amount,updated_at
71beb0449542f36ed97ba27cf8fc3735a4c0d2cac07781ec0e16b3551a5f5a85,pzIfWQYQGTRG2DT2t24FTtbM3i8YY,9f526fc3c74ea8f5e9ade94ce6694d44883fe047b3c79e8a5aa0101cf51a11e6,SECA,20250814,18:30:08,1,0.0,5.0,-5.0,2025-12-29T05:21:44.946Z
71beb0449542f36ed97ba27cf8fc3735a4c0d2cac07781ec0e16b3551a5f5a85,pzIfWQYQGTRG2DT2t24FTtbM3i8YY,9f526fc3c74ea8f5e9ade94ce6694d44883fe047b3c79e8a5aa0101cf51a11e6,SECA,20250814,18:30:08,1,0.0,5.0,-5.0,2025-12-29T05:21:44.946Z
0530506d39afd95f39bae540c412d98a1b5e07637efaec5d2fe4c856f8fa0ab7,Fd2lrnyOD8YYKGeD61kZAPFn7uXZY,0ed67b26a85126d7fb4dd1712430349cfa001c55d5c883a67d70d04ab2863c02,FICA,20250814,18:29:19,1,0.0,45.0,-45.0,2025-12-29T05:21:44.946Z
d36d64ed7bb800e182502d1a6d02349fd2d98135e4e9de97b4f8d6e13778fe22,Fd2lrnyOD8YYKGeD61kZAPFn7uXZY,84358beb1d700818622aa7a9bcc4a8fff2414d2ae8fa5e3faa7a0c1b3858cd75,FICA,20250814,18:29:19,1,0.0,45.0,-45.0,2025-12-29T05:21:44.946Z
d9e1d6b80a104ed5cbaa3c761714bdaa1473622c9bc1c4f7cb1bc6e1820f3b6c,Fd2lrnyOD8YYKGeD61kZAPFn7uXZY,b228ea1e6e7602ceebeaf3883703bf1d4d2132fec8f259c85c484faddedebf68,FICA,20250814,18:29:19,1,0.0,55.0,-55.0,2025-12-29T05:21:44.946Z
e4be775e0ab2f592db42a14969c2a6c6b005cbb708e7c03ff99b09b11f10ded3,xRbawvM8d0gRyM5g91gZLlTeA9fZY,7009c5ce5aa288280fde6eb02bae5b6d7f7794b50adf59172549e10b110dc098,FIPH,20250814,16:51:17,1,36.31,36.31,0.0,2025-12-29T05:21:44.946Z
f35f37ec413f7a51b6e159057c17dbc1db9ec050807873478942a09b82176bb1,VT1VHAt1IRhu54k31DLf2ZsDfQPZY,7d824cf5ecad067fb4156d818a423bfa34f9b41a26ca993baad3c566f0d489d2,FICA,20250814,15:01:30,1,0.0,45.0,-45.0,2025-12-29T05:21:44.946Z
62bbfbb259894d7a6a6b8bbe1e0a8f293969d30282ff805fc9937e9149c1313c,VT1VHAt1IRhu54k31DLf2ZsDfQPZY,61007b9153a09e82ee84dbc17a379070e99550c49db9681c5c5c7c7009aba262,FICA,20250814,15:01:30,1,0.0,50.0,-50.0,2025-12-29T05:21:44.946Z
d3dd6e0e7746a257a7c42632b0c3a760214a27456967e4788ef6571eb2241725,zGfTok6q3WXFVWsJR1HyaoxCvAZZY,e18b8688971993267c1a25db589619f9323e11797934c26d3029ca3a5e9c5acc,FIPH,20250813,19:34:04,1,9.08,34.08,-25.0,2025-12-29T05:21:44.946Z
97ee6825d6ed69ca1346a46dc97aa05ce78828b7adb8a7cf73e94962ea2b12c1,zGfTok6q3WXFVWsJR1HyaoxCvAZZY,e680283359ff779fa10d23c011113d52c955102bd10c2e7e20ed4ce55ff25ccf,FIPH,20250813,19:34:04,1,9.07,34.07,-25.0,2025-12-29T05:21:44.946Z


In [0]:
df_fact_stream.writeStream.format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "abfss://silver@storagepigments.dfs.core.windows.net/FactStream/checkpoint")\
    .trigger(once=True)\
    .option("path", "abfss://silver@storagepigments.dfs.core.windows.net/FactStream/data")\
    .toTable("pigments_cata.silver.FactStream")

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f85207b8b90>