In [0]:
# =========================================================================
# LAPD DLT Pipeline - Silver to Snowflake Gold Layer
# =========================================================================

import dlt
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

# =========================================================================
# SNOWFLAKE CONNECTION CONFIGURATION
# =========================================================================

SNOWFLAKE_OPTIONS = {
    "sfUrl": "RCVOYBU-ZSC95331.snowflakecomputing.com",
    "sfUser": "LAPD_USER",
    "sfPassword": "LAPD",
    "sfDatabase": "LAPD_DB",
    "sfSchema": "DW",
    "sfWarehouse": "LAPD_WH",
    "sfRole": "LAPD_ROLE"
}

# =========================================================================
# LOCATION DIMENSION
# =========================================================================

@dlt.table(
    name="snowflake_location_dim",
    comment="Location dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def location_dim():
    """
    Create Location dimension from silver data and write to Snowflake
    """
    silver_df = dlt.read("lapd.silver.lapd_silver")
    
    # Business key columns
    bk_cols = ["Area", "Area_Name", "Reported_District_No", "Location", "Cross_Street"]
    
    location_df = (
        silver_df
        .select(
            "Area",
            "Area_Name",
            "Reported_District_No",
            "Location",
            "Cross_Street",
            "Latitude",
            "Longitude"
        )
        .filter(col("Area").isNotNull())
        .dropDuplicates(bk_cols)
    )
    
    w = Window.orderBy(*bk_cols)
    location_df = location_df.withColumn("Location_ID", row_number().over(w))
    
    final_df = location_df.select(
        col("Location_ID").cast("int"),
        col("Area").cast("int"),
        col("Area_Name"),
        col("Reported_District_No").cast("int").alias("Reporting_District"),
        col("Location"),
        col("Cross_Street"),
        col("Latitude").cast("double"),
        col("Longitude").cast("double"),
        lit("DATABRICKS_DLT").alias("DI_Source_ID"),
        current_timestamp().alias("DI_Load_Dt")
    )
    
    # Write to Snowflake
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "LOCATION_DIM") \
        .mode("overwrite") \
        .save()
    
    return final_df

# =========================================================================
# CRIME TYPE DIMENSION
# =========================================================================

@dlt.table(
    name="snowflake_crimetype_dim",
    comment="Crime type dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def crimetype_dim():
    """
    Create Crime Type dimension from silver data and write to Snowflake
    """
    silver_df = dlt.read("lapd.silver.lapd_silver")
    
    bk_cols = ["Crime_Code", "Crime_Code_Desc", "Part_1_2"]
    
    crime_df = (
        silver_df
        .select("Crime_Code", "Crime_Code_Desc", "Part_1_2")
        .filter(col("Crime_Code").isNotNull())
        .dropDuplicates(bk_cols)
    )
    
    w = Window.orderBy("Crime_Code")
    crime_df = crime_df.withColumn("Crime_SK", row_number().over(w))
    
    final_df = crime_df.select(
        col("Crime_SK").cast("int"),
        col("Crime_Code").cast("int"),
        col("Crime_Code_Desc").alias("Crime_code_desc"),
        col("Part_1_2").cast("int"),
        current_timestamp().alias("DI_load_Dt"),
        lit("DATABRICKS_DLT").alias("DI_Source_ID")
    )
    
    # Write to Snowflake
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "CRIMETYPE_DIM") \
        .mode("overwrite") \
        .save()
    
    return final_df

# =========================================================================
# VICTIM DIMENSION
# =========================================================================

@dlt.table(
    name="snowflake_victim_dim",
    comment="Victim demographics dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def victim_dim():
    """
    Create Victim dimension from silver data and write to Snowflake
    """
    silver_df = dlt.read("lapd.silver.lapd_silver")
    

        # Add derived columns
    silver_df = silver_df.withColumn(
        "Victim_Age_Category",
        when(col("Victim_Age") == 99999, "Unknown")
        .when(col("Victim_Age") < 18, "Juvenile")
        .when((col("Victim_Age") >= 18) & (col("Victim_Age") <= 29), "18-29")
        .when((col("Victim_Age") >= 30) & (col("Victim_Age") <= 44), "30-44")
        .when((col("Victim_Age") >= 45) & (col("Victim_Age") <= 59), "45-59")
        .when(col("Victim_Age") >= 60, "60+")
        .otherwise("Unknown")
    )
    bk_cols = ["Victim_Age", "Victim_Age_Category", "Victim_Sex"]
    
    victim_df = (
        silver_df
        .select("Victim_Age", "Victim_Age_Category", "Victim_Sex")
        .dropDuplicates(bk_cols)
    )
    
    w = Window.orderBy("Victim_Age", "Victim_Sex", "Victim_Age_Category")
    victim_df = victim_df.withColumn("Victim_SK", row_number().over(w))
    
    final_df = victim_df.select(
        col("Victim_SK").cast("int"),
        col("Victim_Age").cast("int").alias("Victim_age"),
        col("Victim_Age_Category").alias("Victim_age_category"),
        col("Victim_Sex").alias("Victim_sex"),
        current_timestamp().alias("DI_Load_Dt"),
        lit("DATABRICKS_DLT").alias("DI_Source_ID")
    )
    
    # Write to Snowflake
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "VICTIM_DIM") \
        .mode("overwrite") \
        .save()
    
    return final_df

# =========================================================================
# DESCENT DIMENSION
# =========================================================================

@dlt.table(
    name="snowflake_descent_dim",
    comment="Victim descent dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def descent_dim():
    """
    Create Descent dimension from silver data and write to Snowflake
    """
    silver_df = dlt.read("lapd.silver.lapd_silver")
    
    descent_df = (
        silver_df
        .select("Victim_Descent")
        .filter(col("Victim_Descent").isNotNull())
        .dropDuplicates(["Victim_Descent"])
    )
    
    w = Window.orderBy("Victim_Descent")
    descent_df = descent_df.withColumn("Descent_SK", row_number().over(w))
    
    descent_df = descent_df.withColumn(
        "Descent_Description",
        when(col("Victim_Descent") == "A", "Asian")
        .when(col("Victim_Descent") == "B", "Black")
        .when(col("Victim_Descent") == "C", "Chinese")
        .when(col("Victim_Descent") == "D", "Cambodian")
        .when(col("Victim_Descent") == "F", "Filipino")
        .when(col("Victim_Descent") == "G", "Guamanian")
        .when(col("Victim_Descent") == "H", "Hispanic/Latino/Mexican")
        .when(col("Victim_Descent") == "I", "American Indian")
        .when(col("Victim_Descent") == "J", "Japanese")
        .when(col("Victim_Descent") == "K", "Korean")
        .when(col("Victim_Descent") == "L", "Laotian")
        .when(col("Victim_Descent") == "O", "Other")
        .when(col("Victim_Descent") == "P", "Pacific Islander")
        .when(col("Victim_Descent") == "S", "Samoan")
        .when(col("Victim_Descent") == "U", "Hawaiian")
        .when(col("Victim_Descent") == "V", "Vietnamese")
        .when(col("Victim_Descent") == "W", "White")
        .when(col("Victim_Descent") == "X", "Unknown")
        .when(col("Victim_Descent") == "Z", "Asian Indian")
        .otherwise("Other")
    )
    
    final_df = descent_df.select(
        col("Descent_SK").cast("int"),
        col("Victim_Descent"),
        col("Descent_Description"),
        current_timestamp().alias("DI_Load_Dt"),
        lit("DATABRICKS_DLT").alias("DI_Source_ID")
    )
    
    # Write to Snowflake
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "DESCENT_DIM") \
        .mode("overwrite") \
        .save()
    
    return final_df

# =========================================================================
# PREMISE DIMENSION
# =========================================================================

@dlt.table(
    name="snowflake_premis_dim",
    comment="Premise dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def premis_dim():
    """
    Create Premise dimension from silver data and write to Snowflake
    """
    silver_df = dlt.read("lapd.silver.lapd_silver")
    
    bk_cols = ["Premis_Code", "Premis_Description"]
    
    premise_df = (
        silver_df
        .select("Premis_Code", "Premis_Description")
        .filter(col("Premis_Code").isNotNull())
        .dropDuplicates(bk_cols)
    )
    
    w = Window.orderBy("Premis_Code")
    premise_df = premise_df.withColumn("Premis_SK", row_number().over(w))
    
    final_df = premise_df.select(
        col("Premis_SK").cast("int"),
        col("Premis_Code").cast("int"),
        col("Premis_Description").alias("Premis_desc"),
        current_timestamp().alias("DI_load_Dt"),
        lit("DATABRICKS_DLT").alias("DI_Source_ID")
    )
    
    # Write to Snowflake
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "PREMIS_DIM") \
        .mode("overwrite") \
        .save()
    
    return final_df

# =========================================================================
# WEAPON DIMENSION
# =========================================================================

@dlt.table(
    name="snowflake_weapon_dim",
    comment="Weapon dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def weapon_dim():
    """
    Create Weapon dimension from silver data and write to Snowflake
    """
    silver_df = dlt.read("lapd.silver.lapd_silver")
    
    bk_cols = ["Weapon_Used_Code", "Weapon_Desc"]
    
    weapon_df = (
        silver_df
        .select("Weapon_Used_Code", "Weapon_Desc")
        .filter(col("Weapon_Used_Code").isNotNull())
        .dropDuplicates(bk_cols)
    )
    
    w = Window.orderBy("Weapon_Used_Code")
    weapon_df = weapon_df.withColumn("Weapon_SK", row_number().over(w))
    
    final_df = weapon_df.select(
        col("Weapon_SK").cast("int"),
        col("Weapon_Used_Code").cast("int"),
        col("Weapon_Desc"),
        current_timestamp().alias("DI_Load_Dt"),
        lit("DATABRICKS_DLT").alias("DI_Source_ID")
    )
    
    # Write to Snowflake
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "WEAPON_DIM") \
        .mode("overwrite") \
        .save()
    
    return final_df

# =========================================================================
# ARREST DIMENSION
# =========================================================================

@dlt.table(
    name="snowflake_arrest_dim",
    comment="Arrest status dimension table in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "dimension",
        "target": "snowflake"
    }
)
def arrest_dim():
    """
    Create Arrest dimension from silver data and write to Snowflake
    """
    silver_df = dlt.read("lapd.silver.lapd_silver")

    bk_cols = ["Arrest_Status", "Arrest_Status_Desc"]
    
    arrest_df = (
        silver_df
        .select("Arrest_Status", "Arrest_Status_Desc")
        .filter(col("Arrest_Status").isNotNull())
        .dropDuplicates(bk_cols)
    )
    
    w = Window.orderBy("Arrest_Status")
    arrest_df = arrest_df.withColumn("Arrest_SK", row_number().over(w))
    
    final_df = arrest_df.select(
        col("Arrest_SK").cast("int"),
        col("Arrest_Status").alias("Arrest_status"),
        col("Arrest_Status_Desc").alias("Arrest_status_desc"),
        current_timestamp().alias("DI_Load_Dt"),
        lit("DATABRICKS_DLT").alias("DI_Source_ID")
    )
    
    # Write to Snowflake
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "ARREST_DIM") \
        .mode("overwrite") \
        .save()
    
    return final_df

# =========================================================================
# FACT CRIME TABLE
# =========================================================================

@dlt.table(
    name="snowflake_fact_crime",
    comment="Crime fact table with foreign keys to all dimensions in Snowflake",
    table_properties={
        "quality": "gold",
        "type": "fact",
        "target": "snowflake"
    }
)
def fact_crime():
    """
    Create Fact Crime table by joining silver data with dimension surrogate keys
    """
    silver_df = dlt.read("lapd.silver.lapd_silver")
    
    
    # CREATE DIMENSION LOOKUPS FROM SILVER DATA (not from Snowflake)
    
    # Location lookup
    location_bk_cols = ["Area", "Area_Name", "Reported_District_No", "Location", "Cross_Street"]
    location_df = (
        silver_df
        .select(*location_bk_cols)
        .filter(col("Area").isNotNull())
        .dropDuplicates(location_bk_cols)
    )
    w = Window.orderBy(*location_bk_cols)
    location_df = location_df.withColumn("Location_ID", row_number().over(w)).select(
        "Location_ID", "Area", 
        col("Reported_District_No").alias("Reporting_District"), 
        "Location", "Cross_Street"
    )
    
    # Crime lookup
    crime_bk_cols = ["Crime_Code", "Crime_Code_Desc", "Part_1_2"]
    crime_df = (
        silver_df
        .select(*crime_bk_cols)
        .filter(col("Crime_Code").isNotNull())
        .dropDuplicates(crime_bk_cols)
    )
    w = Window.orderBy("Crime_Code")
    crime_df = crime_df.withColumn("Crime_SK", row_number().over(w)).select(
        "Crime_SK", "Crime_Code"
    )
    
    # Victim lookup
    victim_bk_cols = ["Victim_Age", "Victim_Age_Category", "Victim_Sex"]
    victim_df = (
        silver_df
        .select(*victim_bk_cols)
        .dropDuplicates(victim_bk_cols)
    )
    w = Window.orderBy("Victim_Age", "Victim_Sex", "Victim_Age_Category")
    victim_df = victim_df.withColumn("Victim_SK", row_number().over(w)).select(
        "Victim_SK",
        col("Victim_Age").alias("Victim_age"),
        col("Victim_Age_Category").alias("Victim_age_category"),
        col("Victim_Sex").alias("Victim_sex")
    )
    
    # Descent lookup
    descent_df = (
        silver_df
        .select("Victim_Descent")
        .filter(col("Victim_Descent").isNotNull())
        .dropDuplicates(["Victim_Descent"])
    )
    w = Window.orderBy("Victim_Descent")
    descent_df = descent_df.withColumn("Descent_SK", row_number().over(w)).select(
        "Descent_SK", "Victim_Descent"
    )
    
    # Premise lookup
    premise_bk_cols = ["Premis_Code", "Premis_Description"]
    premise_df = (
        silver_df
        .select(*premise_bk_cols)
        .filter(col("Premis_Code").isNotNull())
        .dropDuplicates(premise_bk_cols)
    )
    w = Window.orderBy("Premis_Code")
    premise_df = premise_df.withColumn("Premis_SK", row_number().over(w)).select(
        "Premis_SK", "Premis_Code"
    )
    
    # Weapon lookup
    weapon_bk_cols = ["Weapon_Used_Code", "Weapon_Desc"]
    weapon_df = (
        silver_df
        .select(*weapon_bk_cols)
        .filter(col("Weapon_Used_Code").isNotNull())
        .dropDuplicates(weapon_bk_cols)
    )
    w = Window.orderBy("Weapon_Used_Code")
    weapon_df = weapon_df.withColumn("Weapon_SK", row_number().over(w)).select(
        "Weapon_SK", "Weapon_Used_Code"
    )
    
    # Arrest lookup
    arrest_bk_cols = ["Arrest_Status", "Arrest_Status_Desc"]
    arrest_df = (
        silver_df
        .select(*arrest_bk_cols)
        .filter(col("Arrest_Status").isNotNull())
        .dropDuplicates(arrest_bk_cols)
    )
    w = Window.orderBy("Arrest_Status")
    arrest_df = arrest_df.withColumn("Arrest_SK", row_number().over(w)).select(
        "Arrest_SK",
        col("Arrest_Status").alias("Arrest_status"),
        col("Arrest_Status_Desc").alias("Arrest_status_desc")
    )
    
    # NOW JOIN WITH FACT TABLE
    fact_df = silver_df
    
    # Join with Location dimension (all business keys)
    fact_df = fact_df.join(
        location_df,
        (
            (fact_df.Area == location_df.Area) &
            (fact_df.Reported_District_No == location_df.Reporting_District) &
            (fact_df.Location == location_df.Location) &
            (fact_df.Cross_Street == location_df.Cross_Street)
        ),
        "left"
    ).drop(location_df.Area, location_df.Reporting_District, 
           location_df.Location, location_df.Cross_Street)
    
    # Join with Crime dimension
    fact_df = fact_df.join(
        crime_df,
        fact_df.Crime_Code == crime_df.Crime_Code,
        "left"
    ).drop(crime_df.Crime_Code)
    
    # Join with Victim dimension
    fact_df = fact_df.join(
        victim_df,
        (
            (fact_df.Victim_Age == victim_df.Victim_age) &
            (fact_df.Victim_Age_Category == victim_df.Victim_age_category) &
            (fact_df.Victim_Sex == victim_df.Victim_sex)
        ),
        "left"
    ).drop(victim_df.Victim_age, victim_df.Victim_age_category, victim_df.Victim_sex)
    
    # Join with Descent dimension
    fact_df = fact_df.join(
        descent_df,
        fact_df.Victim_Descent == descent_df.Victim_Descent,
        "left"
    ).drop(descent_df.Victim_Descent)
    
    # Join with Premise dimension
    fact_df = fact_df.join(
        premise_df,
        fact_df.Premis_Code == premise_df.Premis_Code,
        "left"
    ).drop(premise_df.Premis_Code)
    
    # Join with Weapon dimension
    fact_df = fact_df.join(
        weapon_df,
        fact_df.Weapon_Used_Code == weapon_df.Weapon_Used_Code,
        "left"
    ).drop(weapon_df.Weapon_Used_Code)
    
    # Join with Arrest dimension
    fact_df = fact_df.join(
        arrest_df,
        (fact_df.Arrest_Status == arrest_df.Arrest_status) &
        (fact_df.Arrest_Status_Desc == arrest_df.Arrest_status_desc),
        "left"
    ).drop(arrest_df.Arrest_status, arrest_df.Arrest_status_desc)
    
    # Generate Crime_key and calculate metrics
    w = Window.orderBy("File_Number")
    fact_df = fact_df.withColumn("Crime_key", row_number().over(w))
    
    fact_df = fact_df.withColumn(
        "Days_to_Report",
        datediff(col("Date_Reported"), col("Date_Occured"))
    )
    
    fact_df = fact_df.withColumn(
        "DATE_SK",
        date_format(col("Date_Occured"), "yyyyMMdd").cast("int")
    )
    
    # Extract hour and minute properly from HH:MM:SS
    fact_df = fact_df.withColumn(
        "Time_Key",
        lpad(
            regexp_replace(col("Time_Occured"), ":", ""), 4, "0"
        )
    )


    # Select final columns
    final_df = fact_df.select(
        col("Crime_key").cast("int"),
        col("Days_to_Report").cast("int"),
        col("File_Number").alias("File_number"),
        col("Premis_SK").cast("int"),
        col("Weapon_SK").cast("int"),
        col("Crime_SK").cast("int"),
        col("Time_Key").cast("string"),
        col("Arrest_SK").cast("int"),
        col("Location_ID").cast("int"),
        col("Victim_SK").cast("int"),
        col("DATE_SK").cast("int"),
        col("Descent_SK").cast("int")
    )
    
    # Write to Snowflake
    final_df.write \
        .format("snowflake") \
        .options(**SNOWFLAKE_OPTIONS) \
        .option("dbtable", "FACT_CRIME") \
        .mode("overwrite") \
        .save()
    
    return final_df