In [0]:
from pyspark.sql.functions import sum as _sum, avg as _avg

sap_silver = spark.table("enterprise_modernization.silver.silver_sap")
crm_silver = spark.table("enterprise_modernization.silver.silver_crm")
fleet_silver = spark.table("enterprise_modernization.silver.silver_fleet")

# Aggregate fleet data
fleet_agg = fleet_silver.groupBy("vehicle_id").agg(
    _sum("fault_detected").alias("fault_count"),
    _avg("odometer").alias("avg_odometer")
)

# Join with CRM and SAP
gold_df = crm_silver.join(
    sap_silver, "vehicle_id", "left"
).join(
    fleet_agg, "vehicle_id", "left"
)

gold_df.write.mode("overwrite").saveAsTable("enterprise_modernization.gold.customer_vehicle_fleet")


In [0]:
from pyspark.sql import functions as F

# ==============================
# 1Ô∏è‚É£ Load Silver Tables
# ==============================
crm = spark.table("enterprise_modernization.silver.silver_crm_1")
sap = spark.table("enterprise_modernization.silver.silver_sap_1")
fleet = spark.table("enterprise_modernization.silver.silver_fleet_1")

# ==============================
# 2Ô∏è‚É£ Select Only Required Columns & Rename Overlaps
# ==============================

crm_sel = crm.select(
    "Car_ID",
    F.col("Manufacturer").alias("crm_Manufacturer"),
    F.col("Model").alias("crm_Model"),
    F.col("Fuel_type").alias("crm_Fuel_type"),
    F.col("Year_of_manufacture").alias("crm_Year"),
    F.col("Mileage").alias("crm_Mileage"),
    F.col("Price").alias("crm_Price"),
    F.col("Engine_size").alias("crm_Engine_Size"),
    F.col("Vehicle_Age").alias("crm_Vehicle_Age")
)

sap_sel = sap.select(
    "Car_ID",
    F.col("Dealer_ID").alias("sap_Dealer_ID"),
    F.col("Region").alias("sap_Region"),
    F.col("Payment_Mode").alias("sap_Payment_Mode"),
    F.col("Invoice_Amount").alias("sap_Invoice_Amount"),
    F.col("Discount_Percent").alias("sap_Discount_Percent"),
    F.col("Discount_Value").alias("sap_Discount_Value"),
    F.col("GST_Amount").alias("sap_GST_Amount"),
    F.col("Net_Sale").alias("sap_Net_Sale"),
    F.col("Effective_Amount").alias("sap_Effective_Amount")
)

fleet_sel = fleet.select(
    "Car_ID",
    F.col("Total_Km").alias("fleet_Total_Km"),
    F.col("Fuel_Consumption").alias("fleet_Fuel_Consumption"),
    F.col("Accidents_Count").alias("fleet_Accidents_Count"),
    F.col("Insurance_Claim").alias("fleet_Insurance_Claim"),
    F.col("Fleet_Type").alias("fleet_Fleet_Type"),
    F.col("Maintenance_Cost").alias("fleet_Maintenance_Cost"),
    F.col("Is_Commercial").alias("fleet_Is_Commercial"),
    F.col("Has_Insurance_Claim").alias("fleet_Has_Insurance_Claim")
)

# ==============================
# 3Ô∏è‚É£ Join Cleaned DataFrames
# ==============================
gold_joined = (
    crm_sel.join(sap_sel, on="Car_ID", how="inner")
           .join(fleet_sel, on="Car_ID", how="inner")
)

# ==============================
# 4Ô∏è‚É£ Derive Analytical KPIs
# ==============================
gold_final = (
    gold_joined
    .withColumn("Discount_to_Invoice_Ratio", F.round(F.col("sap_Discount_Value") / F.col("sap_Invoice_Amount"), 4))
    .withColumn("Maintenance_to_Sale_Ratio", F.round(F.col("fleet_Maintenance_Cost") / F.col("sap_Net_Sale"), 4))
    .withColumn("Fuel_Efficiency_Normalized", F.round(F.col("fleet_Fuel_Consumption") / F.col("crm_Engine_Size"), 3))
    .withColumn("Age_Risk_Score",
        F.when(F.col("crm_Vehicle_Age") > 15, "High")
         .when(F.col("crm_Vehicle_Age") > 8, "Medium")
         .otherwise("Low")
    )
    .withColumn("Profitability_Index", F.round(
        (F.col("sap_Net_Sale") - F.col("fleet_Maintenance_Cost")) / (F.col("sap_Invoice_Amount") + F.lit(1)), 3)
    )
    .withColumn("Data_Load_Date", F.current_date())
    .withColumn("Updated_Timestamp_Gold", F.current_timestamp())
)

# ==============================
# 5Ô∏è‚É£ Write to Gold Table
# ==============================
(
    gold_final.write
    .option("overwriteSchema", "true")
    .mode("overwrite")
    .saveAsTable("enterprise_modernization.gold.gold_car_sales_analytics")
)

print("‚úÖ Gold layer successfully created: enterprise_modernization.gold.gold_car_sales_analytics")


In [0]:
aggregated_summary = (
    gold_final.groupBy("crm_Manufacturer", "sap_Region")
    .agg(
        F.countDistinct("Car_ID").alias("Total_Cars"),
        F.round(F.avg("crm_Price"), 2).alias("Avg_Price"),
        F.round(F.avg("sap_Net_Sale"), 2).alias("Avg_Sale"),
        F.round(F.avg("fleet_Maintenance_Cost"), 2).alias("Avg_Maint_Cost"),
        F.round(F.avg("Profitability_Index"), 3).alias("Avg_Profitability")
    )
    .withColumn("Data_Load_Date", F.current_date())
    .withColumn("Updated_Timestamp_Gold", F.current_timestamp())
)

aggregated_summary.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    "enterprise_modernization.gold.gold_summary_aggregates"
)

print("üìä Aggregated summary table created: enterprise_modernization.gold.gold_summary_aggregates")
