In [0]:
dbutils.widgets.text("silver_path", "")
dbutils.widgets.text("gold_path", "")

silver_path = dbutils.widgets.get("silver_path")
gold_path = dbutils.widgets.get("gold_path")

In [0]:
# silver_path = "abfss://silver@adlsairqualitypoc.dfs.core.windows.net/aqi"
# gold_path   = "abfss://gold@adlsairqualitypoc.dfs.core.windows.net/aqi"

In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

print("Starting FINAL Production Gold processing...")

In [0]:
silver_df = spark.read.format("delta").load(silver_path)

silver_df = (
    silver_df
    .withColumn("date", to_date("event_ts"))
    .withColumn("hour_ts", date_trunc("hour", col("event_ts")))
)

In [0]:
#dimesions - deterministic Keys using SHA2
dim_location_path = f"{gold_path}/dim_location"
dim_pollutant_path = f"{gold_path}/dim_pollutant"
dim_date_path = f"{gold_path}/dim_date"

dim_location = (
    silver_df
    .select("country","state","city","station")
    .distinct()
    .withColumn(
        "location_key",
        sha2(concat_ws("||","country","state","city","station"),256)
    )
)

dim_pollutant = (
    silver_df
    .select("pollutant_id")
    .distinct()
    .withColumn(
        "pollutant_key",
        sha2(col("pollutant_id"),256)
    )
)

dim_date = (
    silver_df
    .select("date")
    .distinct()
    .withColumn("year", year("date"))
    .withColumn("month", month("date"))
    .withColumn("day", dayofmonth("date"))
)

# Full refresh dimensions (small tables â€” safe)
dim_location.write.format("delta").mode("overwrite").save(dim_location_path)
dim_pollutant.write.format("delta").mode("overwrite").save(dim_pollutant_path)
dim_date.write.format("delta").mode("overwrite").save(dim_date_path)

print("Dimensions updated.")

In [0]:
#fact - Grain = station + pollutant + event_ts
fact_path = f"{gold_path}/fact_air_quality"

fact_df = (
    silver_df
    .withColumn(
        "location_key",
        sha2(concat_ws("||","country","state","city","station"),256)
    )
    .withColumn(
        "pollutant_key",
        sha2(col("pollutant_id"),256)
    )
    .select(
        "location_key",
        "pollutant_key",
        "event_ts",
        "date",
        "hour_ts",
        "pollutant_avg",
        "is_outlier",
        "ingestion_ts"
    )
)

if not DeltaTable.isDeltaTable(spark, fact_path):

    (fact_df.write
        .format("delta")
        .partitionBy("date")
        .save(fact_path)
    )

else:

    delta_table = DeltaTable.forPath(spark, fact_path)

    merge_condition = """
        t.location_key = s.location_key AND
        t.pollutant_key = s.pollutant_key AND
        t.event_ts = s.event_ts
    """

    (delta_table.alias("t")
        .merge(fact_df.alias("s"), merge_condition)
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

print("Fact table updated.")

In [0]:
#incemental
affected_dates = (
    silver_df
    .select("date")
    .distinct()
)

affected_dates.createOrReplaceTempView("affected_dates")

#reload fact after merge
fact_df_full = spark.read.format("delta").load(fact_path)

#load dimesnions
dim_location_df = spark.read.format("delta").load(dim_location_path)
dim_pollutant_df = spark.read.format("delta").load(dim_pollutant_path)
dim_date_df = spark.read.format("delta").load(dim_date_path)

In [0]:
#daily state agg
agg_daily_state_path = f"{gold_path}/agg_daily_state"

daily_state = (
    fact_df_full
    .join(dim_location_df, "location_key")
    .join(dim_pollutant_df, "pollutant_key")
    .groupBy("date","state","pollutant_id")
    .agg(
        avg("pollutant_avg").alias("avg_aqi"),
        count("*").alias("reading_count"),
        max("is_outlier").alias("has_outlier")
    )
)

if not DeltaTable.isDeltaTable(spark, agg_daily_state_path):
    daily_state.write.format("delta").partitionBy("date").save(agg_daily_state_path)
else:
    spark.sql(f"""
        DELETE FROM delta.`{agg_daily_state_path}`
        WHERE date IN (SELECT date FROM affected_dates)
    """)
    daily_state.write.format("delta").mode("append").save(agg_daily_state_path)

print("Daily state aggregation updated.")

In [0]:
#daily city agg
agg_daily_city_path = f"{gold_path}/agg_daily_city"

daily_city = (
    fact_df_full
    .join(dim_location_df, "location_key")
    .join(dim_pollutant_df, "pollutant_key")
    .groupBy("date","state","city","pollutant_id")
    .agg(
        avg("pollutant_avg").alias("avg_aqi"),
        count("*").alias("reading_count"),
        max("is_outlier").alias("has_outlier")
    )
)

if not DeltaTable.isDeltaTable(spark, agg_daily_city_path):
    daily_city.write.format("delta").partitionBy("date").save(agg_daily_city_path)
else:
    spark.sql(f"""
        DELETE FROM delta.`{agg_daily_city_path}`
        WHERE date IN (SELECT date FROM affected_dates)
    """)
    daily_city.write.format("delta").mode("append").save(agg_daily_city_path)

print("Daily city aggregation updated.")

In [0]:
#hourly state agg
agg_hourly_state_path = f"{gold_path}/agg_hourly_state"

hourly_state = (
    fact_df_full
    .join(dim_location_df, "location_key")
    .join(dim_pollutant_df, "pollutant_key")
    .groupBy("hour_ts","state","pollutant_id")
    .agg(
        avg("pollutant_avg").alias("avg_aqi"),
        count("*").alias("reading_count"),
        max("is_outlier").alias("has_outlier")
    )
)

if not DeltaTable.isDeltaTable(spark, agg_hourly_state_path):
    hourly_state.write.format("delta").save(agg_hourly_state_path)
else:
    spark.sql(f"""
        DELETE FROM delta.`{agg_hourly_state_path}`
        WHERE to_date(hour_ts) IN (SELECT date FROM affected_dates)
    """)
    hourly_state.write.format("delta").mode("append").save(agg_hourly_state_path)

print("Hourly state aggregation updated.")

In [0]:
#threshold breach
threshold_path = f"{gold_path}/fact_threshold_breach"

threshold_breach = (
    fact_df_full
    .join(dim_location_df, "location_key")
    .join(dim_pollutant_df, "pollutant_key")
    .filter(col("pollutant_avg") > 200)
    .select(
        "state","city","pollutant_id",
        "event_ts","pollutant_avg"
    )
)

threshold_breach.write.format("delta").mode("overwrite").save(threshold_path)

print("Threshold breach fact updated.")
print("Gold processed successfully.")