In [0]:
# access configuration settings
storage_account_name = "dlaqimddev"
client_id = dbutils.secrets.get(scope="key-vault-secrets", key="client-id")
client_secret = dbutils.secrets.get(scope="key-vault-secrets", key="client-secret")
tenant_id = dbutils.secrets.get(scope="key-vault-secrets", key="tenant-id")

# Configure OAuth 2.0 connection
spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, col
from delta.tables import DeltaTable

In [0]:
# source and target paths
silver_path = f"abfss://silver@{storage_account_name}.dfs.core.windows.net/cpcb_aqi"
gold_path = f"abfss://gold@{storage_account_name}.dfs.core.windows.net/cpcb_aqi/"

In [0]:
# 1. load the cleaned silver data
df_silver = spark.read.format("delta").load(silver_path)

display(df_silver.limit(10))

In [0]:
# 2. DIMENSION: location
# extract unique geographic attributes
dim_location = df_silver.select("country", "state", "city", "station", "longitude", "latitude").distinct().withColumn("location_key", monotonically_increasing_id())

display(dim_location.limit(10))

# location
dim_location.write.format("delta").mode("overwrite").save(gold_path + "dim_location")

In [0]:
# 3. DIMENSION: pollutant
# extract unique pollutant ids
dim_pollutant = df_silver.select("pollutant_id").distinct().withColumn("pollutant_key", monotonically_increasing_id())

display(dim_pollutant.limit(10))

# pollutant
dim_pollutant.write.format("delta").mode("overwrite").save(gold_path + "dim_pollutant")

In [0]:
# 4. FACT: aqi measurement
# join back to dimensions to replace strings with surrogate keys
fact_aqi = df_silver.join(dim_location, ["country", "state", "city", "station", "longitude", "latitude"], how="left").join(dim_pollutant, ["pollutant_id"],how="left").select(
    col("location_key"),
    col("pollutant_key"),
    col("last_update").alias("measurement_time"),
    col("avg_value").alias("aqi_avg"),
    col("max_value").alias("aqi_max"),
    col("min_value").alias("aqi_min")
)

display(fact_aqi.limit(10))

## 1. Load the new data you just processed (Incremental batch)
new_fact_data = fact_aqi

## 2. Check if the Gold table already exists
if DeltaTable.isDeltaTable(spark, gold_path + "fact_aqi"):
    gold_fact_table = DeltaTable.forPath(spark, gold_path + "fact_aqi")
    
    # 3. Perform the Merge (Upsert)
    gold_fact_table.alias("old").merge(
        new_fact_data.alias("new"),
        # Define the unique key: Same Location, Same Pollutant, Same Time
        "old.location_key = new.location_key AND old.pollutant_key = new.pollutant_key AND old.measurement_time = new.measurement_time"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()
    print("New data successfully merged into Gold table")
else:
    # 4. If table doesn't exist, create it for the first time
    new_fact_data.write.format("delta").mode("overwrite").save(gold_path + "fact_aqi")
    print("New data successfully written to Gold table")

In [0]:
print(f"Total no. of rows read from silver: {df_silver.count()}")
print(f"Total no. of rows extracted for dim_location: {dim_location.count()}")
print(f"Total no. of rows extracted for dim_pollutant: {dim_pollutant.count()}")
print(f"Total no. of rows extracted for fact_aqi: {fact_aqi.count()}")