In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

CATALOG = "fmucd_capstone"
BRONZE_TABLE = f"{CATALOG}.bronze.bronze_fmucd_raw"

SILVER_SCHEMA = "silver"
DIM_BUILDING = f"{CATALOG}.{SILVER_SCHEMA}.dim_building_scd2"
DIM_ASSET = f"{CATALOG}.{SILVER_SCHEMA}.dim_asset_scd2"
FACT_WO = f"{CATALOG}.{SILVER_SCHEMA}.fact_work_orders"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SILVER_SCHEMA}")

# ------------------------------------------------------------
# 1) Read Bronze
# ------------------------------------------------------------
b = spark.table(BRONZE_TABLE)

# Helper for consistent timestamps
load_ts = F.current_timestamp()
load_dt = F.current_date()

# ------------------------------------------------------------
# 2) Building Dimension (SCD2)
# ------------------------------------------------------------
src_building = (
    b.select(
        F.col("BuildingID").alias("building_id"),
        F.col("UniversityID").alias("university_id"),
        F.col("Country").alias("country"),
        F.col("State_Province").alias("state_province"),
        F.col("BuildingName").alias("building_name"),
        F.col("Size").cast("double").alias("building_size"),
        F.col("Type").alias("building_type"),
        F.floor(F.col("BuiltYear").cast("double")).cast("int").alias("built_year"),
        F.col("FCI").cast("double").alias("fci"),
        F.expr("try_cast(CRV as decimal(18,2))").alias("crv"),
        F.expr("try_cast(DMC as decimal(18,2))").alias("dmc"),
    )
    .dropDuplicates(["building_id"])
)

# Hash to detect changes (only on descriptive attributes)
src_building = src_building.withColumn(
    "attr_hash",
    F.sha2(
        F.concat_ws("||",
            F.coalesce(F.col("university_id").cast("string"), F.lit("")),
            F.coalesce(F.col("country"), F.lit("")),
            F.coalesce(F.col("state_province"), F.lit("")),
            F.coalesce(F.col("building_name"), F.lit("")),
            F.coalesce(F.col("building_size").cast("string"), F.lit("")),
            F.coalesce(F.col("building_type"), F.lit("")),
            F.coalesce(F.col("built_year").cast("string"), F.lit("")),
            F.coalesce(F.col("fci").cast("string"), F.lit("")),
            F.coalesce(F.col("crv").cast("string"), F.lit("")),
            F.coalesce(F.col("dmc").cast("string"), F.lit("")),
        ),
        256
    )
)

# Add SCD2 columns for insert rows
src_building_ins = (
    src_building
    .withColumn("Start_Date", load_dt)
    .withColumn("End_Date", F.lit(None).cast("date"))
    .withColumn("Active_Flag", F.lit(True))
    .withColumn("Load_Date", load_ts)
    .withColumn("Last_Updated_Date", load_ts)
    .withColumn("building_sk", F.xxhash64("building_id", F.col("Start_Date").cast("string")))
)

def scd2_merge(src_df, target_table, natural_key_col, sk_col):
    """
    Generic SCD2 MERGE:
    - matches on natural key AND Active_Flag = true
    - if hash differs: close current record
    - always insert new when not matched
    """
    if not spark.catalog.tableExists(target_table):
        (
            src_df.write.format("delta")
            .mode("overwrite")
            .saveAsTable(target_table)
        )
        return

    tgt = DeltaTable.forName(spark, target_table)

    # Close changed active rows
    (
        tgt.alias("t")
        .merge(
            src_df.alias("s"),
            f"t.{natural_key_col} = s.{natural_key_col} AND t.Active_Flag = true"
        )
        .whenMatchedUpdate(
            condition="t.attr_hash <> s.attr_hash",
            set={
                "End_Date": load_dt,
                "Active_Flag": F.lit(False),
                "Last_Updated_Date": load_ts
            }
        )
        .whenNotMatchedInsertAll()
        .execute()
    )

# Merge Building dim
scd2_merge(src_building_ins, DIM_BUILDING, "building_id", "building_sk")

# ------------------------------------------------------------
# 3) Asset Dimension (SCD2) - System/SubSystem/Component
# ------------------------------------------------------------
src_asset = (
    b.select(
        F.col("SystemCode").alias("system_code"),
        F.col("SystemDescription").alias("system_description"),
        F.col("SubsystemCode").alias("subsystem_code"),
        F.col("SubsystemDescription").alias("subsystem_description"),
        F.col("DescriptiveCode").alias("component_code"),
        F.col("ComponentDescription").alias("component_description"),
    )
    .dropDuplicates(["system_code", "subsystem_code", "component_code"])
)

src_asset = src_asset.withColumn(
    "asset_nk",
    F.concat_ws("::",
        F.coalesce("system_code", F.lit("")),
        F.coalesce("subsystem_code", F.lit("")),
        F.coalesce("component_code", F.lit(""))
    )
)

src_asset = src_asset.withColumn(
    "attr_hash",
    F.sha2(
        F.concat_ws("||",
            F.coalesce(F.col("system_description"), F.lit("")),
            F.coalesce(F.col("subsystem_description"), F.lit("")),
            F.coalesce(F.col("component_description"), F.lit("")),
        ),
        256
    )
)

src_asset_ins = (
    src_asset
    .withColumn("Start_Date", load_dt)
    .withColumn("End_Date", F.lit(None).cast("date"))
    .withColumn("Active_Flag", F.lit(True))
    .withColumn("Load_Date", load_ts)
    .withColumn("Last_Updated_Date", load_ts)
    .withColumn("asset_sk", F.xxhash64("asset_nk", F.col("Start_Date").cast("string")))
)

scd2_merge(src_asset_ins, DIM_ASSET, "asset_nk", "asset_sk")

# ------------------------------------------------------------
# 4) Fact Work Orders
# ------------------------------------------------------------
dim_b = spark.table(DIM_BUILDING).filter("Active_Flag = true").select("building_id", "building_sk")
dim_a = spark.table(DIM_ASSET).filter("Active_Flag = true").select("asset_nk", "asset_sk")

fact = (
    b.select(
        F.col("WOID").alias("wo_id"),
        F.col("WODescription").alias("wo_description"),
        F.expr("try_cast(WOPriority as int)").alias("wo_priority"),
        F.expr("try_cast(WOStartDate as date)").alias("wo_start_date"),
        F.expr("try_cast(WOEndDate as date)").alias("wo_end_date"),
        F.expr("try_cast(WODuration as int)").alias("wo_duration_days"),
        F.col("PPM_UPM").alias("maintenance_type"),

        F.col("LaborCost").cast("decimal(18,2)").alias("labor_cost"),
        F.col("MaterialCost").cast("decimal(18,2)").alias("material_cost"),
        F.col("OtherCost").cast("decimal(18,2)").alias("other_cost"),
        F.col("TotalCost").cast("decimal(18,2)").alias("total_cost"),
        F.col("LaborHours").cast("double").alias("labor_hours"),

        F.col("MinTemp_C").cast("double").alias("min_temp_c"),
        F.col("MaxTemp_C").cast("double").alias("max_temp_c"),
        F.col("AtmosphericPressure_hPa").cast("double").alias("atmospheric_pressure_hpa"),
        F.col("Humidity_pct").cast("double").alias("humidity_pct"),
        F.col("WindSpeed_mps").cast("double").alias("wind_speed_mps"),
        F.col("WindDegree").cast("double").alias("wind_degree"),
        F.col("Precipitation_mm").cast("double").alias("precipitation_mm"),
        F.col("Snow_mm").cast("double").alias("snow_mm"),
        F.col("Cloudness_pct").cast("double").alias("cloudness_pct"),

        F.col("BuildingID").alias("building_id"),
        F.col("SystemCode").alias("system_code"),
        F.col("SubsystemCode").alias("subsystem_code"),
        F.col("DescriptiveCode").alias("component_code"),

        F.col("_ingest_ts").alias("bronze_ingest_ts"),
        F.col("_source_file").alias("bronze_source_file"),
        F.col("_batch_id").alias("bronze_batch_id")
    )
)

fact = fact.withColumn(
    "asset_nk",
    F.concat_ws("::",
        F.coalesce("system_code", F.lit("")),
        F.coalesce("subsystem_code", F.lit("")),
        F.coalesce("component_code", F.lit(""))
    )
)

# Join surrogate keys
fact = (
    fact
    .join(dim_b, on="building_id", how="left")
    .join(dim_a, on="asset_nk", how="left")
    .withColumn("Load_Date", load_ts)
)

# Deduplicate WOID (keep latest ingest if duplicates)
w = (
    F.window(F.col("bronze_ingest_ts"), "36500 days")  # dummy window; we’ll use max ingest
)
# Simple approach: order by ingest timestamp
from pyspark.sql.window import Window
dedupe_w = Window.partitionBy("wo_id").orderBy(F.col("bronze_ingest_ts").desc())

fact = (
    fact
    .withColumn("_rn", F.row_number().over(dedupe_w))
    .filter(F.col("_rn") == 1)
    .drop("_rn")
)

(
    fact.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(FACT_WO)
)

print("✅ Silver tables created:")
print(" -", DIM_BUILDING)
print(" -", DIM_ASSET)
print(" -", FACT_WO)


In [0]:
%sql
SELECT * FROM fmucd_capstone.silver.dim_building_scd2 LIMIT 5;
SELECT * FROM fmucd_capstone.silver.dim_asset_scd2 LIMIT 5;


In [0]:
%sql
SELECT building_id, COUNT(*) AS active_rows
FROM fmucd_capstone.silver.dim_building_scd2
WHERE Active_Flag = true
GROUP BY building_id
HAVING COUNT(*) > 1;


In [0]:
%sql
SELECT asset_nk, COUNT(*) AS active_rows
FROM fmucd_capstone.silver.dim_asset_scd2
WHERE Active_Flag = true
GROUP BY asset_nk
HAVING COUNT(*) > 1;


In [0]:
%sql
SELECT COUNT(*) AS fact_rows
FROM fmucd_capstone.silver.fact_work_orders;

SELECT
  SUM(CASE WHEN building_sk IS NULL THEN 1 ELSE 0 END) AS missing_building_sk,
  SUM(CASE WHEN asset_sk IS NULL THEN 1 ELSE 0 END) AS missing_asset_sk
FROM fmucd_capstone.silver.fact_work_orders;


In [0]:
%sql
SELECT
  SUM(CASE WHEN wo_start_date IS NULL THEN 1 ELSE 0 END) AS null_start,
  SUM(CASE WHEN wo_end_date IS NULL THEN 1 ELSE 0 END) AS null_end,
  SUM(CASE WHEN wo_end_date < wo_start_date THEN 1 ELSE 0 END) AS end_before_start
FROM fmucd_capstone.silver.fact_work_orders;
