In [0]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

# ============================================================================
# FACT TABLE: fact_crime (FIXED - Forces 1M rows)
# ============================================================================

@dlt.table(
    name="fact_crime",
    comment="Fact table for crime incidents with all dimension keys"
)
def fact_crime():
    """
    Create fact table by joining silver data with dimension keys
    Grain: One row per crime incident (DR_NO)
    Expected Output: 1,004,991 rows (one-to-one with Silver)
    """
    
    # Read silver layer
    silver_df = dlt.read("crime_silver")
    
    # Read dimension tables with forced deduplication
    dim_date = dlt.read("dim_date").dropDuplicates(["date_key"])
    dim_time = dlt.read("dim_time").dropDuplicates(["time_key"])
    dim_location = (
        spark.read.table("workspace.crime.dim_location_gold")
        .filter(col("is_current") == True)
        .dropDuplicates(["area"])
    )
    dim_crime_type = dlt.read("dim_crime_type").dropDuplicates(["crime_type_key"])
    dim_weapon = dlt.read("dim_weapon").dropDuplicates(["weapon_key"])
    dim_victim = dlt.read("dim_victim").dropDuplicates(["age_group", "sex"])
    dim_status = dlt.read("dim_status").dropDuplicates(["status_code"])
    
    # ========================================
    # STEP 1: Prepare silver data
    # ========================================
    fact_prep = silver_df.select(
        col("dr_no").alias("crime_incident_key"),
        to_date(col("date_occ")).alias("date_occurred"),
        col("time_occ"),
        col("area"),
        col("crm_cd"),
        col("weapon_used_cd_clean").alias("weapon_used_cd"),
        col("status_clean").alias("status_code"),
        col("vict_age_clean"),
        col("vict_sex_clean"),
        col("vict_descent_clean").alias("descent"),
        col("rpt_dist_no"),
        col("status_clean")
    )
    
    # ========================================
    # STEP 2: Create date_key
    # ========================================
    fact_prep = fact_prep.withColumn(
        "date_key",
        regexp_replace(date_format(col("date_occurred"), "yyyyMMdd"), "-", "").cast("int")
    )
    
    # ========================================
    # STEP 3: Determine victim_key
    # Map age and sex to victim dimension
    # ========================================
    fact_prep = fact_prep.withColumn(
        "age_group",
        when(col("vict_age_clean") == -1, "Unknown")
        .when((col("vict_age_clean") >= 0) & (col("vict_age_clean") <= 17), "0-17")
        .when((col("vict_age_clean") >= 18) & (col("vict_age_clean") <= 25), "18-25")
        .when((col("vict_age_clean") >= 26) & (col("vict_age_clean") <= 35), "26-35")
        .when((col("vict_age_clean") >= 36) & (col("vict_age_clean") <= 50), "36-50")
        .when(col("vict_age_clean") >= 51, "51+")
        .otherwise("Unknown")
    )
    
    # ========================================
    # STEP 4: Create is_arrest_flag
    # ========================================
    fact_prep = fact_prep.withColumn(
        "is_arrest_flag",
        when(col("status_clean").isin(["AA", "JA"]), True)
        .otherwise(False)
    )
    
    # ========================================
    # STEP 5: Join with dimensions to get keys
    # ========================================

    # Join with dim_time
    fact_with_time = fact_prep.alias("f").join(
        dim_time.alias("t").select("time_key"),
        col("f.time_occ") == col("t.time_key"),
        "left"
    )
    
    # Join with dim_location (SCD2 - current records only, deduplicated)
    fact_with_location = fact_with_time.join(
        dim_location.alias("l").select("location_key", "area"),
        col("f.area") == col("l.area"),
        "left"
    )
    
    # Join with dim_crime_type
    fact_with_crime = fact_with_location.join(
        dim_crime_type.alias("c").select("crime_type_key"),
        col("f.crm_cd") == col("c.crime_type_key"),
        "left"
    )
    
    # Join with dim_weapon
    fact_with_weapon = fact_with_crime.join(
        dim_weapon.alias("w").select("weapon_key"),
        col("f.weapon_used_cd") == col("w.weapon_key"),
        "left"
    )
    
    # Join with dim_victim
    fact_with_victim = fact_with_weapon.join(
        dim_victim.alias("v").select("victim_key", "age_group", "sex"),
        (col("f.age_group") == col("v.age_group")) & 
        (col("f.vict_sex_clean") == col("v.sex")),
        "left"
    )
    
    # Join with dim_status
    fact_with_status = fact_with_victim.join(
        dim_status.alias("s").select("status_key", "status_code"),
        col("f.status_code") == col("s.status_code"),
        "left"
    )
    
    # ========================================
    # STEP 6: Select final fact table columns
    # ========================================
    return (
        fact_with_status
        .withColumn("created_by", lit("SYSTEM"))
        .withColumn("created_dt", current_timestamp())
        .select(
            col("f.crime_incident_key"),
            col("f.date_key"),
            col("t.time_key"),
            col("l.location_key"),
            col("c.crime_type_key"),
            col("w.weapon_key"),
            col("v.victim_key"),
            col("s.status_key"),
            col("f.descent"),
            col("f.rpt_dist_no"),
            col("f.is_arrest_flag"),
            col("created_by"),
            col("created_dt")
        )
        .dropDuplicates(["crime_incident_key"])  # CRITICAL: Force one row per DR_NO
    )