In [0]:
from pyspark.sql.functions import concat_ws, sum, desc, col, round, lead, coalesce, lit
from pyspark.sql import Window

In [0]:
major_incident_df = spark.read \
    .table("mta_silver.fct_major_incident")

dim_inc_category_df = spark.read \
    .table("mta_silver.dim_inc_category")

date_df = spark.read \
    .table("mta_silver.dim_date")

In [0]:
window_qtr = Window.partitionBy("qtr")
window_pct_increase = Window.partitionBy("qtr").orderBy(col("inc_count").desc())

qtr_inc_by_cat_t1_df = major_incident_df \
    .join(dim_inc_category_df, major_incident_df.ict_sk == dim_inc_category_df.ict_sk, "inner") \
    .join(date_df, major_incident_df.dte_sk == date_df.dte_sk, "inner") \
    .withColumn("qtr", concat_ws("_", "dte_year", "dte_quarter")) \
    .withColumn("qtr_total_incidents", sum(col("inc_count")).over(window_qtr)) \
    .select(
        "ict_nk",
        "ict_category",
        "qtr",
        "inc_count",
        "qtr_total_incidents",
    ).groupBy(
        "ict_nk",
        "ict_category",
        "qtr",
        "qtr_total_incidents"
    ).agg(  
        sum("inc_count").alias("inc_count")
    ).orderBy("qtr", desc("inc_count"))

qtr_inc_by_cat_t2_df = qtr_inc_by_cat_t1_df \
    .withColumn("qtr_inc_fraction", round((col("inc_count") / col("qtr_total_incidents") * 100), 2)) \
    .withColumn("qic_qtr_inc_percent_diff", coalesce(round(col("qtr_inc_fraction") - lead("qtr_inc_fraction").over(window_pct_increase), 2), lit(0)))

In [0]:
qtr_inc_by_cat_final_df = qtr_inc_by_cat_t2_df.select(
    col("ict_nk"),
    col("ict_category").alias("qic_inc_category"),
    col("qtr").alias("qic_qtr"),
    col("inc_count").alias("qic_inc_count"),
    col("qtr_total_incidents").alias("qic_qtr_total_incidents"),
    col("qtr_inc_fraction").alias("qic_qtr_inc_percent"),
    col("qic_qtr_inc_percent_diff").alias("qic_qtr_inc_percent_lead_diff")
)

In [0]:
qtr_inc_by_cat_final_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("mta_gold.rpt_qtr_incidents_by_category")

In [0]:
%sql
SELECT * FROM mta_gold.rpt_qtr_incidents_by_category;

In [0]:
dbutils.notebook.exit("Success")