In [0]:
from pyspark.sql.functions import (
    col, to_timestamp, date_format, hour, dayofweek, month, when, lit, 
    sin, cos, trim, upper
)
from pyspark.sql.types import DoubleType, IntegerType
import math

# --- Setup & Configuration ---
dbutils.widgets.text("run_date", "2025-12-09")

source_table = "final_project.bronze.crimes_bronze"
target_table = "final_project.silver.crimes_silver"

# Classification logic for Crime Categories
violent_crimes = [
    'HOMICIDE', 'CRIM SEXUAL ASSAULT', 'ROBBERY', 'ASSAULT', 
    'BATTERY', 'KIDNAPPING', 'SEX OFFENSE', 'CRIMINAL SEXUAL ASSAULT'
]
property_crimes = [
    'BURGLARY', 'THEFT', 'MOTOR VEHICLE THEFT', 'ARSON', 
    'DECEPTIVE PRACTICE', 'CRIMINAL DAMAGE'
]

try:
    print(f"Reading from Bronze: {source_table}")
    df = spark.read.table(source_table)
    initial_count = df.count()
    print(f"Initial rows: {initial_count}")
    
    # ==============================================================================
    # STEP 1: Cleaning & Quality Checks
    # ==============================================================================
    print("Performing cleaning...")
    
    # 1. Deduplication based on unique identifiers
    df_clean = df.dropDuplicates(["ID", "Case_Number"])
    
    # 2. Casting types and Filtering Invalid Coordinates
    # Chicago Bounding Box approx: Lat 41.6-42.1, Lon -87.9 to -87.5
    df_clean = df_clean \
        .withColumn("Latitude", col("Latitude").cast(DoubleType())) \
        .withColumn("Longitude", col("Longitude").cast(DoubleType())) \
        .withColumn("District", col("District").cast(IntegerType())) \
        .filter(
            (col("Latitude") > 41.6) & (col("Latitude") < 42.1) &
            (col("Longitude") > -87.95) & (col("Longitude") < -87.5)
        )
    
    # 3. Text Standardization (Trimming whitespace)
    df_clean = df_clean.withColumn("Location_Description", trim(upper(col("Location_Description"))))

    # ==============================================================================
    # STEP 2: Feature Engineering (Human Readable)
    # ==============================================================================
    print("Performing Feature Engineering...")
    
    # Parsing Timestamp
    df_transformed = df_clean.withColumn("parsed_timestamp", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a"))
    
    # Extracting Date Components
    df_transformed = df_transformed \
        .withColumn("crime_date", date_format(col("parsed_timestamp"), "yyyy-MM-dd")) \
        .withColumn("crime_time", date_format(col("parsed_timestamp"), "HH:mm:ss")) \
        .withColumn("crime_hour", hour(col("parsed_timestamp"))) \
        .withColumn("crime_month", month(col("parsed_timestamp"))) \
        .withColumn("day_of_week", dayofweek(col("parsed_timestamp"))) \
        .withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), 1).otherwise(0))

    # Time of Day Binning (Morning, Afternoon, Evening, Night)
    df_transformed = df_transformed.withColumn(
        "Time_of_Day",
        when((col("crime_hour") >= 5) & (col("crime_hour") < 12), "Morning")
        .when((col("crime_hour") >= 12) & (col("crime_hour") < 17), "Afternoon")
        .when((col("crime_hour") >= 17) & (col("crime_hour") < 21), "Evening")
        .otherwise("Night")
    )

    # Crime Severity Grouping (Violent vs Property)
    df_transformed = df_transformed.withColumn(
        "Crime_Category",
        when(col("Primary_Type").isin(violent_crimes), "Violent")
        .when(col("Primary_Type").isin(property_crimes), "Property")
        .otherwise("Other")
    )

    # ==============================================================================
    # STEP 3: Advanced ML Preparation (Cyclical Features)
    # ==============================================================================
    print("Calculating Cyclical Features for ML...")
    
    # Converting linear time (0-23 hours) into circular dimensions (sin/cos)
    # This helps models understand that 23:00 is close to 00:00
    df_transformed = df_transformed \
        .withColumn("hour_sin", sin(col("crime_hour") * (2 * math.pi / 24))) \
        .withColumn("hour_cos", cos(col("crime_hour") * (2 * math.pi / 24))) \
        .withColumn("month_sin", sin(col("crime_month") * (2 * math.pi / 12))) \
        .withColumn("month_cos", cos(col("crime_month") * (2 * math.pi / 12)))

    # Dropping raw/temporary columns
    df_final = df_transformed.drop("Location", "Date", "parsed_timestamp")
    
    # ==============================================================================
    # STEP 4: Data Storage (Silver Layer)
    # ==============================================================================
    print(f"Writing to Silver: {target_table}")
    
    # Partitioning by District optimizes downstream queries for specific areas
    df_final.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("District") \
        .option("overwriteSchema", "true") \
        .saveAsTable(target_table)
        
    final_count = spark.read.table(target_table).count()
    print(f"Silver transformation successful. Final records: {final_count}")

except Exception as e:
    print(f"Error during Silver transformation: {str(e)}")