In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import functions as F
from datetime import datetime,timedelta
from zoneinfo import ZoneInfo

In [0]:
spark.conf.set(
    "fs.azure.account.key.bmwstorageacc.dfs.core.windows.net",
    dbutils.secrets.get(scope = "bmwanalytics", key = "bmwstorevalut")
)

In [0]:
PIPELINE_VERSION = "V-"+datetime.now(ZoneInfo("Asia/Kolkata")).strftime("%Y%m%d-%H%M%S")
storage = "abfss://bmwstorage@bmwstorageacc.dfs.core.windows.net"
sliver_path = f"{storage}/sliver/bmw_sales/sales"
gold_path = f"{storage}/gold/bmw_sales/sales/"
try:
    version = [x for x in dbutils.fs.ls(sliver_path) if x.isDir() and x.name.startswith("V-")]
    if len(version) > 0:
        latest = sorted(version, key=lambda x: x.name)[-1]
        PIPELINE_VERSION = latest.name
except Exception as e:
    print(e)

In [0]:
sliver = spark.read.format("delta").load(f"{sliver_path}/{PIPELINE_VERSION}")
display(sliver)

In [0]:
base = (
    sliver
    .withColumn("year", F.col("year").cast("int"))
    .withColumn("price", F.col("price").cast("double"))
    .withColumn("sales_volume", F.col("sales_volume").cast("long"))
    .withColumn("mileage", F.col("mileage").cast("double"))
    .withColumn("engine_size", F.col("engine_size").cast("double"))
    .withColumn("revenue_usd", (F.col("price") * F.col("sales_volume")).cast("double"))
)

# -----------------------------
# 1) ONE Gold Fact Table (single grain)
# Grain = Year + Region + Model + Fuel + Transmission + Sales_Classification + Price_Band
# -----------------------------
gold_one = (
    base.groupBy(
        "year", "region", "model", "fuel_type", "transmission",
        "sales_classification", "price_brand"
    )
    .agg(
        F.sum("sales_volume").alias("Sales_Volume"),
        F.sum("revenue_usd").alias("Revenue_USD"),
        F.avg("price").alias("Avg_Price_USD"),
        F.avg("mileage").alias("Avg_Mileage_KM"),
        F.avg("engine_size").alias("Avg_Engine_Size_L"),
    )
)

# -----------------------------
# 2) YoY (at Region+Model level)  ✅ filter-safe for Region→Model drilldown
# -----------------------------
w_yoy = Window.partitionBy("region", "model").orderBy(F.col("year").asc())

gold_one = (
    gold_one
    .withColumn("Prev_Year_Sales_Volume", F.lag("Sales_Volume", 1).over(w_yoy))
    .withColumn("Prev_Year_Revenue", F.lag("Revenue_USD", 1).over(w_yoy))
    .withColumn(
        "YoY_Sales_Growth_Pct",
        F.when(
            (F.col("Prev_Year_Sales_Volume").isNull()) | (F.col("Prev_Year_Sales_Volume") == 0),
            F.lit(None).cast("double")
        ).otherwise(
            F.round(((F.col("Sales_Volume") - F.col("Prev_Year_Sales_Volume")) / F.col("Prev_Year_Sales_Volume")) * 100, 2)
        )
    )
    .withColumn(
        "YoY_Revenue_Growth_Pct",
        F.when(
            (F.col("Prev_Year_Revenue").isNull()) | (F.col("Prev_Year_Revenue") == 0),
            F.lit(None).cast("double")
        ).otherwise(
            F.round(((F.col("Revenue_USD") - F.col("Prev_Year_Revenue")) / F.col("Prev_Year_Revenue")) * 100, 2)
        )
    )
)

# -----------------------------
# 3) Shares (these replace separate Gold tables)
# -----------------------------

# 3A) Model Market Share % (within year)
w_year = Window.partitionBy("year")
w_year_model = Window.partitionBy("year", "model")

gold_one = (
    gold_one
    .withColumn("Year_Total_Sales", F.sum("Sales_Volume").over(w_year))
    .withColumn("Year_Model_Sales", F.sum("Sales_Volume").over(w_year_model))
    .withColumn(
        "Model_Market_Share_Pct",
        F.when(F.col("Year_Total_Sales") == 0, F.lit(None).cast("double"))
         .otherwise(F.round((F.col("Year_Model_Sales") / F.col("Year_Total_Sales")) * 100, 2))
    )
)

# 3B) Fuel + Transmission Share % (within year)
w_year_fuel_trans = Window.partitionBy("year", "fuel_type", "transmission")

gold_one = (
    gold_one
    .withColumn("Year_FuelTrans_Sales", F.sum("Sales_Volume").over(w_year_fuel_trans))
    .withColumn(
        "FuelTrans_Sales_Share_Pct",
        F.when(F.col("Year_Total_Sales") == 0, F.lit(None).cast("double"))
         .otherwise(F.round((F.col("Year_FuelTrans_Sales") / F.col("Year_Total_Sales")) * 100, 2))
    )
)

# 3C) Sales Classification distribution share %
#    - By Region (within year+region)
w_year_region = Window.partitionBy("year", "region")
w_year_region_class = Window.partitionBy("year", "region", "sales_classification")

gold_one = (
    gold_one
    .withColumn("Year_Region_Sales", F.sum("Sales_Volume").over(w_year_region))
    .withColumn("Year_Region_Class_Sales", F.sum("Sales_Volume").over(w_year_region_class))
    .withColumn(
        "SalesClass_Share_By_Region_Pct",
        F.when(F.col("Year_Region_Sales") == 0, F.lit(None).cast("double"))
         .otherwise(F.round((F.col("Year_Region_Class_Sales") / F.col("Year_Region_Sales")) * 100, 2))
    )
)

#    - By Model (within year+model)
w_year_model_total = Window.partitionBy("year", "model")
w_year_model_class = Window.partitionBy("year", "model", "sales_classification")

gold_one = (
    gold_one
    .withColumn("Year_Model_Total_Sales", F.sum("Sales_Volume").over(w_year_model_total))
    .withColumn("Year_Model_Class_Sales", F.sum("Sales_Volume").over(w_year_model_class))
    .withColumn(
        "SalesClass_Share_By_Model_Pct",
        F.when(F.col("Year_Model_Total_Sales") == 0, F.lit(None).cast("double"))
         .otherwise(F.round((F.col("Year_Model_Class_Sales") / F.col("Year_Model_Total_Sales")) * 100, 2))
    )
)

#    - By Fuel Type (within year+fuel_type)
w_year_fuel = Window.partitionBy("year", "fuel_type")
w_year_fuel_class = Window.partitionBy("year", "fuel_type", "sales_classification")

gold_one = (
    gold_one
    .withColumn("Year_Fuel_Total_Sales", F.sum("Sales_Volume").over(w_year_fuel))
    .withColumn("Year_Fuel_Class_Sales", F.sum("Sales_Volume").over(w_year_fuel_class))
    .withColumn(
        "SalesClass_Share_By_Fuel_Pct",
        F.when(F.col("Year_Fuel_Total_Sales") == 0, F.lit(None).cast("double"))
         .otherwise(F.round((F.col("Year_Fuel_Class_Sales") / F.col("Year_Fuel_Total_Sales")) * 100, 2))
    )
)

# -----------------------------
# 4) Premium + High Sales Combo Flag (client wants this)
# -----------------------------
gold_one = (
    gold_one
    .withColumn(
        "Is_Premium_HighSales_Combo",
        F.when((F.col("price_brand") == "High") & (F.col("sales_classification") == "High"), F.lit(1)).otherwise(F.lit(0))
    )
)

# Optional: drop helper totals if you don't want them in final output
gold_one_final = gold_one.drop(
    "Year_Total_Sales", "Year_Model_Sales", "Year_FuelTrans_Sales",
    "Year_Region_Sales", "Year_Region_Class_Sales",
    "Year_Model_Total_Sales", "Year_Model_Class_Sales",
    "Year_Fuel_Total_Sales", "Year_Fuel_Class_Sales"
)

gold_one_final_filtered = (
    gold_one_final
    .filter(
        (F.col("Prev_Year_Sales_Volume").isNotNull()) &
        (F.col("Prev_Year_Revenue").isNotNull()) &
        (F.col("YoY_Sales_Growth_Pct").isNotNull()) &
        (F.col("YoY_Revenue_Growth_Pct").isNotNull()) &
        (F.col("YoY_Sales_Growth_Pct") > 0) &
        (F.col("YoY_Revenue_Growth_Pct") > 0) &
        (F.col("Sales_Volume") > 0) &
        (F.col("Revenue_USD") > 0)
    )
)

display(gold_one_final_filtered)



In [0]:
(
    gold_one_final_filtered
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(gold_path + f"/sales_and_growth/{PIPELINE_VERSION}")
)