In [0]:
# Databricks Notebook: 02_Silver_Layer_Transformation
# Purpose: Clean, normalize, and enrich EV range data from Bronze to Silver

# ------------------------------------------------------------------------------
# CELL 1: Imports
# ------------------------------------------------------------------------------
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

print("✅ Libraries imported")

# ------------------------------------------------------------------------------
# CELL 2: Define paths
# ------------------------------------------------------------------------------
bronze_path = "/mnt/cti/bronze/ev_data"
silver_path = "/mnt/cti/silver/ev_data"

print(f"📂 Bronze path: {bronze_path}")
print(f"📂 Silver path: {silver_path}")

# ------------------------------------------------------------------------------
# CELL 3: Read Bronze layer
# ------------------------------------------------------------------------------
print("\n📖 Reading from Bronze layer...")

df_bronze = spark.read.format("delta").load(bronze_path)

bronze_count = df_bronze.count()
print(f"✅ Loaded {bronze_count} records from Bronze")

print("\n📋 Sample Bronze data:")
display(df_bronze.limit(10))

print("\n📜 Bronze schema:")
df_bronze.printSchema()

# Helper: quick column-exists check
def has_col(df, col_name: str) -> bool:
    return col_name in df.columns

# ------------------------------------------------------------------------------
# CELL 4: Basic normalization / standardization
# ------------------------------------------------------------------------------
print("\n🧹 Standardizing key text fields (Make, Model)...")

df_silver = df_bronze

# Normalize Make
if has_col(df_silver, "Make"):
    df_silver = df_silver.withColumn(
        "Make_Clean",
        F.upper(F.trim(F.col("Make")))
    )

# Normalize Model
if has_col(df_silver, "Model"):
    df_silver = df_silver.withColumn(
        "Model_Clean",
        F.upper(F.trim(F.col("Model")))
    )

# If you have City / County columns, standardize them too (if present)
if has_col(df_silver, "City"):
    df_silver = df_silver.withColumn(
        "City_Clean",
        F.initcap(F.trim(F.col("City")))
    )

if has_col(df_silver, "County"):
    df_silver = df_silver.withColumn(
        "County_Clean",
        F.initcap(F.trim(F.col("County")))
    )

print("✅ Text fields standardized (where present)")

# ------------------------------------------------------------------------------
# CELL 5: Range-related features (if Electric_Range exists)
# ------------------------------------------------------------------------------
print("\n🔢 Deriving range-related fields...")

range_col = "Electric_Range" if has_col(df_silver, "Electric_Range") else None

if range_col is not None:
    # Ensure Electric_Range is integer and non-negative
    df_silver = df_silver.withColumn(
        "Electric_Range_Int",
        F.when(F.col(range_col).isNotNull(), F.col(range_col).cast("int"))
         .otherwise(F.lit(None).cast("int"))
    )

    # Flag if range is zero
    df_silver = df_silver.withColumn(
        "range_is_zero",
        F.when(F.col("Electric_Range_Int") == 0, F.lit(True)).otherwise(F.lit(False))
    )

    # Range usable for analysis (exclude negatives)
    df_silver = df_silver.withColumn(
        "range_for_analysis",
        F.when(F.col("Electric_Range_Int") >= 0, F.col("Electric_Range_Int"))
         .otherwise(F.lit(None).cast("int"))
    )

    # Simple range bucket for analysis
    df_silver = df_silver.withColumn(
        "range_bucket_km",
        F.when(F.col("Electric_Range_Int") <= 0, F.lit("0 or Unknown"))
         .when(F.col("Electric_Range_Int") <= 100, F.lit("1–100 km"))
         .when(F.col("Electric_Range_Int") <= 200, F.lit("101–200 km"))
         .when(F.col("Electric_Range_Int") <= 300, F.lit("201–300 km"))
         .when(F.col("Electric_Range_Int") <= 400, F.lit("301–400 km"))
         .otherwise(F.lit("400+ km"))
    )

    print("✅ Range fields and buckets created")
else:
    print("⚠️ Column 'Electric_Range' not found – skipping range feature engineering")

# ------------------------------------------------------------------------------
# CELL 6: EV type / group (if such a column exists)
# ------------------------------------------------------------------------------
print("\n🚗 Deriving EV group (if possible)...")

# Common column names for EV type in WA dataset variants
ev_type_cols = ["Electric_Vehicle_Type", "EV_Type", "ev_group"]

ev_type_col = None
for c in ev_type_cols:
    if has_col(df_silver, c):
        ev_type_col = c
        break

if ev_type_col is not None:
    df_silver = df_silver.withColumn(
        "ev_group",
        F.when(F.upper(F.col(ev_type_col)).like("%BATTERY%"), "BEV")
         .when(F.upper(F.col(ev_type_col)).like("%PLUG-IN%"), "PHEV")
         .otherwise(F.lit("Other_EV"))
    )
    print(f"✅ EV group derived from column '{ev_type_col}'")
else:
    print("⚠️ No EV type column found – 'ev_group' not derived")

# ------------------------------------------------------------------------------
# CELL 7: Geo-related sanity checks (if latitude/longitude exist)
# ------------------------------------------------------------------------------
print("\n🌍 Validating geo coordinates (if present)...")

if has_col(df_silver, "latitude") and has_col(df_silver, "longitude"):
    df_silver = df_silver.withColumn(
        "latitude_valid",
        F.when((F.col("latitude") >= -90) & (F.col("latitude") <= 90), F.lit(True))
         .otherwise(F.lit(False))
    ).withColumn(
        "longitude_valid",
        F.when((F.col("longitude") >= -180) & (F.col("longitude") <= 180), F.lit(True))
         .otherwise(F.lit(False))
    )
    print("✅ Geo validation flags added")
else:
    print("⚠️ latitude/longitude not found – skipping geo validation")

# ------------------------------------------------------------------------------
# CELL 8: Deduplicate by record_id (carried from Bronze)
# ------------------------------------------------------------------------------
print("\n🔍 Deduplicating by record_id...")

if has_col(df_silver, "record_id"):
    before_dedup = df_silver.count()
    df_silver = df_silver.dropDuplicates(["record_id"])
    after_dedup = df_silver.count()

    print(f"✅ Deduplication complete:")
    print(f"   Before: {before_dedup}")
    print(f"   After : {after_dedup}")
    print(f"   Removed: {before_dedup - after_dedup} duplicate records")
else:
    print("⚠️ 'record_id' column not found – no deduplication performed")

# ------------------------------------------------------------------------------
# CELL 9: Preview Silver dataset
# ------------------------------------------------------------------------------
print("\n📋 Sample of Silver candidate data:")
display(
    df_silver.select(
        *[c for c in df_silver.columns if c in [
            "record_id",
            "Model_Year",
            "Make",
            "Model",
            "Make_Clean",
            "Model_Clean",
            "Electric_Range",
            "Electric_Range_Int",
            "range_for_analysis",
            "range_bucket_km",
            "ev_group",
            "City_Clean",
            "County_Clean",
            "latitude",
            "longitude",
            "latitude_valid",
            "longitude_valid",
            "ingestion_timestamp",
            "year",
            "month",
            "day",
            "source"
        ]]
    ).limit(20)
)

# ------------------------------------------------------------------------------
# CELL 10: Write to Silver Delta Lake (full overwrite)
# ------------------------------------------------------------------------------
print("\n💾 Writing to Silver Delta Lake (overwrite mode)...")

(
    df_silver
        .write
        .format("delta")
        .mode("overwrite")              # full refresh to avoid MERGE issues
        .partitionBy("year", "month")   # day often optional at Silver
        .save(silver_path)
)

print("✅ Silver table written successfully")

# ------------------------------------------------------------------------------
# CELL 11: Enable Change Data Feed on Silver
# ------------------------------------------------------------------------------
print("\n🔄 Enabling Change Data Feed on Silver table...")

spark.sql(f"""
    ALTER TABLE delta.`{silver_path}`
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

print("✅ CDC enabled on Silver layer")

# ------------------------------------------------------------------------------
# CELL 12: Verify Silver table and basic stats
# ------------------------------------------------------------------------------
print("\n📊 Verifying Silver layer...")

df_silver_verify = spark.read.format("delta").load(silver_path)

total_silver = df_silver_verify.count()
print(f"✅ Total Silver records: {total_silver}")

if has_col(df_silver_verify, "ev_group"):
    print("\nBy EV Group:")
    df_silver_verify.groupBy("ev_group").count().orderBy(F.col("count").desc()).show()

if has_col(df_silver_verify, "range_bucket_km"):
    print("\nBy Range Bucket:")
    df_silver_verify.groupBy("range_bucket_km").count().orderBy(F.col("count").desc()).show()

if has_col(df_silver_verify, "Make_Clean"):
    print("\nTop 10 Makes:")
    df_silver_verify.groupBy("Make_Clean").count().orderBy(F.col("count").desc()).show(10)

print("\n🎉 Silver Layer Transformation Complete!")
