In [0]:
# ====================================================================================
# CAPSTONE PROJECT: A PRODUCTION-STYLE MEDALLION PIPELINE FOR USED CAR ANALYTICS
#
# This script implements the Bronze, Silver, and Gold layers to ensure data quality,
# reusability, and governance for all five automotive analytics use cases.
# ====================================================================================


# ------------------------------------------------------------------------------------
# SECTION 1: IMPORTS
# ------------------------------------------------------------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, when, split, current_timestamp, avg, count, desc, corr

# ------------------------------------------------------------------------------------
# SECTION 2: CONFIGURATION
# ------------------------------------------------------------------------------------
class Config:
    """Configuration class for the entire pipeline, using managed table names."""
    # Source Table from your Databricks Catalog
    SOURCE_TABLE = "workspace.default.used_cars_data"
    
    # Names for our new Delta Lake tables in the Medallion Architecture
    BRONZE_TABLE_NAME = "bronze_used_cars_raw"
    SILVER_TABLE_NAME = "silver_used_cars_cleaned"
    
    # Gold Table Names for each specific business use case
    GOLD_TABLES = {
        "market_demand": "gold_market_demand_by_brand",
        "price_analysis": "gold_price_analysis_by_segment",
        "brand_efficiency": "gold_brand_efficiency_profile",
        "feature_correlation": "gold_price_feature_correlation",
        "safety_hotspots": "gold_high_wear_hotspots"
    }

# ------------------------------------------------------------------------------------
# SECTION 3: PIPELINE FUNCTIONS (BRONZE, SILVER, GOLD)
# ------------------------------------------------------------------------------------

def create_bronze_layer(spark: SparkSession, config: Config):
    """Ingests raw data, cleans column names, and saves it as a managed Bronze table."""
    print("--- PIPELINE STEP: Creating Bronze Layer ---")
    df_raw = spark.table(config.SOURCE_TABLE)
    
    # Clean column names to be compatible with Delta tables (no spaces or special chars).
    df_renamed = df_raw.withColumnRenamed("S.No.", "S_No") \
                       .withColumnRenamed("Kilometers_Driven", "Kilometers_Driven") \
                       .withColumnRenamed("Fuel_Type", "Fuel_Type") \
                       .withColumnRenamed("Owner_Type", "Owner_Type") \
                       .withColumnRenamed("New_Price", "New_Price")
    
    df_bronze = df_renamed.withColumn("ingestion_timestamp", current_timestamp())
    
    df_bronze.write.format("delta").mode("overwrite").saveAsTable(config.BRONZE_TABLE_NAME)
    print(f"Bronze table '{config.BRONZE_TABLE_NAME}' created successfully.")

def create_silver_layer(spark: SparkSession, config: Config):
    """Cleans, validates, and enriches data from Bronze to create the Silver table."""
    print("--- PIPELINE STEP: Creating Silver Layer ---")
    df_bronze = spark.table(config.BRONZE_TABLE_NAME)
    
    # 1. Clean string columns to extract numeric values and handle "null" strings.
    df_silver = df_bronze.withColumn("Mileage_temp", regexp_replace(col("Mileage"), r"(\s*kmpl|\s*km/kg)", "")) \
                         .withColumn("Engine_temp", regexp_replace(col("Engine"), r"\s*CC", "")) \
                         .withColumn("Power_temp", regexp_replace(col("Power"), r"\s*bhp", ""))

    # Convert string "null" to actual NULL values before casting.
    df_silver = df_silver.withColumn("Mileage", when(col("Mileage_temp") == "null", None).otherwise(col("Mileage_temp")).cast("double")) \
                         .withColumn("Engine", when(col("Engine_temp") == "null", None).otherwise(col("Engine_temp")).cast("double")) \
                         .withColumn("Power", when(col("Power_temp") == "null", None).otherwise(col("Power_temp")).cast("double"))

    # 2. Extract the car's brand name.
    df_silver = df_silver.withColumn("Brand", split(col("Name"), " ")[0])

    # 3. Apply data quality filters: drop rows with nulls in critical columns.
    cols_to_check = ["Kilometers_Driven", "Mileage", "Engine", "Power", "Seats", "Price"]
    df_silver = df_silver.na.drop(subset=cols_to_check)
    
    # 4. Select and cast columns for the final clean table.
    df_silver = df_silver.select(
        "Brand", "Name", "Location", "Year", "Kilometers_Driven", "Fuel_Type", "Transmission", 
        "Owner_Type", col("Seats").cast("integer"), "Mileage", "Engine", "Power", col("Price").cast("double")
    )
                         
    df_silver.write.format("delta").mode("overwrite").saveAsTable(config.SILVER_TABLE_NAME)
    print(f"Silver table '{config.SILVER_TABLE_NAME}' created. This is our single source of truth.")

def create_gold_tables(spark: SparkSession, config: Config):
    """Reads from the Silver table to create and save all final, business-ready Gold tables."""
    print("--- PIPELINE STEP: Creating Gold Layer Tables ---")
    silver_df = spark.table(config.SILVER_TABLE_NAME)
    
    # --- Case 1: Market Demand Analysis ---
    print("  -> Generating Gold Table: Market Demand by Brand")
    gold_df_c1 = silver_df.groupBy("Brand").agg(count("*").alias("Number_of_Listings")).orderBy(desc("Number_of_Listings"))
    gold_df_c1.write.format("delta").mode("overwrite").saveAsTable(config.GOLD_TABLES["market_demand"])
    display(gold_df_c1.limit(15))

    # --- Case 2: Smart Pricing Analysis ---
    print("  -> Generating Gold Table: Price Analysis by Segment")
    gold_df_c2 = silver_df.groupBy("Transmission").agg(avg("Price").alias("Average_Price_Lakhs")).orderBy(desc("Average_Price_Lakhs"))
    gold_df_c2.write.format("delta").mode("overwrite").saveAsTable(config.GOLD_TABLES["price_analysis"])
    display(gold_df_c2)
    
    # --- Case 3: Brand Efficiency Analysis ---
    print("  -> Generating Gold Table: Brand Efficiency Profile")
    brand_counts = silver_df.groupBy("Brand").count()
    popular_brands = brand_counts.filter(col("count") > 20).select("Brand")
    popular_brands_df = silver_df.join(popular_brands, "Brand", "inner")
    gold_df_c3 = popular_brands_df.groupBy("Brand").agg(avg("Mileage").alias("Average_Mileage_kmpl"), avg("Power").alias("Average_Power_bhp")).orderBy(desc("Average_Mileage_kmpl"))
    gold_df_c3.write.format("delta").mode("overwrite").saveAsTable(config.GOLD_TABLES["brand_efficiency"])
    display(gold_df_c3)

    # --- Case 4: Feature Correlation Analysis ---
    print("  -> Generating Gold Table: Price Feature Correlation")
    correlation_data = [
        ("Year", silver_df.select(corr("Year", "Price")).first()[0]),
        ("Kilometers_Driven", silver_df.select(corr("Kilometers_Driven", "Price")).first()[0]),
        ("Mileage", silver_df.select(corr("Mileage", "Price")).first()[0]),
        ("Engine", silver_df.select(corr("Engine", "Price")).first()[0]),
        ("Power", silver_df.select(corr("Power", "Price")).first()[0])
    ]
    gold_df_c4 = spark.createDataFrame(correlation_data, ["Feature", "Correlation_with_Price"])
    gold_df_c4.write.format("delta").mode("overwrite").saveAsTable(config.GOLD_TABLES["feature_correlation"])
    display(gold_df_c4)
    
    # --- Case 5: Safety (High-Wear) Hotspots Analysis ---
    print("  -> Generating Gold Table: High-Wear Vehicle Hotspots")
    YEAR_THRESHOLD = 2010
    KMS_THRESHOLD = 100000
    risky_cars_df = silver_df.filter((col("Year") < YEAR_THRESHOLD) | (col("Kilometers_Driven") > KMS_THRESHOLD))
    gold_df_c5 = risky_cars_df.groupBy("Location").agg(count("*").alias("High_Risk_Car_Count")).orderBy(desc("High_Risk_Car_Count"))
    gold_df_c5.write.format("delta").mode("overwrite").saveAsTable(config.GOLD_TABLES["safety_hotspots"])
    display(gold_df_c5)

# ------------------------------------------------------------------------------------
# SECTION 4: MAIN ORCHESTRATION BLOCK
# ------------------------------------------------------------------------------------

def main():
    """Main function to orchestrate the entire Medallion pipeline."""
    spark = SparkSession.builder.appName("UsedCarsMedallionPipeline").getOrCreate()
    config = Config()
    
    # Execute the pipeline in the correct order
    create_bronze_layer(spark, config)
    create_silver_layer(spark, config)
    create_gold_tables(spark, config)
    
    print("\n--- MEDALLION PIPELINE EXECUTION COMPLETE ---")
    print("All Bronze, Silver, and Gold tables have been created in your schema and are ready for dashboarding.")

if __name__ == "__main__":
    main()

--- PIPELINE STEP: Creating Bronze Layer ---
Bronze table 'bronze_used_cars_raw' created successfully.
--- PIPELINE STEP: Creating Silver Layer ---
Silver table 'silver_used_cars_cleaned' created. This is our single source of truth.
--- PIPELINE STEP: Creating Gold Layer Tables ---
  -> Generating Gold Table: Market Demand by Brand


Brand,Number_of_Listings
Maruti,1175
Hyundai,1058
Honda,600
Toyota,394
Mercedes-Benz,316
Volkswagen,314
Ford,294
Mahindra,268
BMW,262
Audi,235


  -> Generating Gold Table: Price Analysis by Segment


Transmission,Average_Price_Lakhs
Automatic,19.91491774383084
Manual,5.39544844124703


  -> Generating Gold Table: Brand Efficiency Profile


Brand,Average_Mileage_kmpl,Average_Power_bhp
Maruti,21.65870638297874,74.24561702127691
Renault,20.704758620689663,87.32517241379307
Tata,19.9436612021858,79.59595628415299
Nissan,19.475505617977547,88.37146067415735
Hyundai,19.2108884688091,92.22972589792064
Mini,18.593846153846155,147.43846153846155
Ford,18.59132653061224,96.7479591836735
Honda,18.530333333333285,108.5553666666666
Chevrolet,18.43200000000001,90.6045833333333
Volkswagen,18.3170382165605,94.67159235668792


  -> Generating Gold Table: Price Feature Correlation


Feature,Correlation_with_Price
Year,0.2994754501870621
Kilometers_Driven,-0.0082485362784922
Mileage,-0.3416519550661074
Engine,0.6580472262886926
Power,0.7728428987389121


  -> Generating Gold Table: High-Wear Vehicle Hotspots


Location,High_Risk_Car_Count
Hyderabad,189
Pune,170
Chennai,160
Jaipur,104
Mumbai,100
Kolkata,60
Bangalore,55
Delhi,48
Ahmedabad,28
Coimbatore,26



--- MEDALLION PIPELINE EXECUTION COMPLETE ---
All Bronze, Silver, and Gold tables have been created in your schema and are ready for dashboarding.
