## Deduplicate temperature measurements, model pollution measurements fact table in silver layer
- Source table: *bronze openaq_measurements*
- Target table: *silver fct_pollution*






### Imports and common variables

In [0]:
from pyspark.sql.functions import col, current_timestamp, concat, sha2, split # type: ignore
from delta.tables import * # type: ignore

catalog = "air_polution_analytics_dev"
landing_schema = dbutils.widgets.get("landing_schema") # "00_landing"
silver_schema = dbutils.widgets.get("silver_schema") # "02_silver"
bronze_schema = dbutils.widgets.get("bronze_schema") # "01_bronze"
source_table = "openaq_measurements"
target_table = "fct_pollution"

base_path = f"/Volumes/{catalog}/{silver_schema}/metadata"
metadata_path = f"{base_path}/openaq/{target_table}"

### Create deduped table, volume and directory for metadata

In [0]:
create_table = f"""
    create table if not exists {catalog}.{silver_schema}.{target_table} (
        id STRING NOT NULL,
        date_int INT NOT NULL REFERENCES {catalog}.{silver_schema}.dim_calendar(date_int),
        time_int INT NOT NULL REFERENCES {catalog}.{silver_schema}.dim_time(time_int),
        value FLOAT NOT NULL,
        location_id INT NOT NULL REFERENCES {catalog}.{silver_schema}.dim_pollution_locations(location_id),
        sensor_id INT NOT NULL REFERENCES {catalog}.{silver_schema}.dim_pollution_sensors(sensor_id),
        PRIMARY KEY (id) RELY
    )
    COMMENT 'Fact table for pollution measurements'
cluster by (date_int);
""" 
create_volume = f"create volume if not exists {catalog}.{silver_schema}.metadata;"

spark.sql(create_table)
spark.sql(create_volume)
dbutils.fs.mkdirs(metadata_path)

### Read streaming table, set up a write stream and a deduplication function
Generate a unique id for every record in the stream and use it as deduplication key

In [0]:
def upsertToDelta(microBatchOutputDF, batchId):
    tableDeduped = DeltaTable.forName(spark, f"{catalog}.{silver_schema}.{target_table}")
    (tableDeduped.alias("t").merge(
        microBatchOutputDF.alias("s"),
        "s.id = t.id")
    .whenNotMatchedInsertAll()
    .execute()
    )
    
df = (spark.readStream
    .table(f"{catalog}.{bronze_schema}.{source_table}")
    .withColumn("id", sha2(concat(col("sensor_id"), col("location_id"), col("datetime_from")), 256))
    .selectExpr(
        "id",
        "year(datetime_from) * 10000 + month(datetime_from) * 100 + day(datetime_from) as date_int",
        "cast(date_format(datetime_from, 'HHmm') as int) as time_int",
        "value",
        "location_id",
        "sensor_id"
    )
)

(df.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .trigger(availableNow=True)
  .option("checkpointLocation", f"{metadata_path}/_schema")
  .start()
)