### Import Libraries

In [0]:
from pyspark.sql.functions import schema_of_json, current_timestamp, input_file_name, lit, col, to_json, from_json, to_timestamp, to_utc_timestamp
from pyspark.sql.types import *
import re

### Silver Layer

In [0]:
bronze_catalog_name = "weather"
bronze_schema_name = "01_bronze"
bronze_table_name = "weather_sensor_measurements_raw"

silver_catalog_name = "weather"
silver_schema_name = "02_silver"
silver_table_name = "weather_sensor_measurements"

spark.sql(f"CREATE CATALOG IF NOT EXISTS {silver_catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_catalog_name}.{silver_schema_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {silver_catalog_name}.{silver_schema_name}.checkpoints")
checkpoint_dir = f"/Volumes/{silver_catalog_name}/{silver_schema_name}/checkpoints/{silver_table_name}"

In [0]:
create_silver_table_sql_statement = f"""
    CREATE TABLE IF NOT EXISTS {silver_catalog_name}.{silver_schema_name}.{silver_table_name} (
        uuid STRING PRIMARY KEY,
        device STRING,
        latitude DOUBLE,
        longitude DOUBLE,
        request_timestamp TIMESTAMP,
        humidity DOUBLE,
        temperature DOUBLE,
        pressure DOUBLE,
        measurement_timestamp TIMESTAMP,
        ingestion_timestamp TIMESTAMP,
        processing_timestamp TIMESTAMP
    )
    USING DELTA
    PARTITIONED BY (device);
"""

spark.sql(create_silver_table_sql_statement)

In [0]:
streaming_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "delta") \
    .option("cloudFiles.includeExistingFiles", "true") \
    .table(f"{bronze_catalog_name}.{bronze_schema_name}.{bronze_table_name}")

processing_df = streaming_df \
    .filter(col("measurement_timestamp").cast("long") != 0) \
    .filter(col("pressure").isNotNull()) \
    .withColumn("processing_timestamp", current_timestamp()) \
    .dropDuplicates(['uuid'])
    

column_names = ["uuid"] + [
  line.split()[0] 
  for line in create_silver_table_sql_statement.splitlines()[1:] 
  if (
    len(line.split()) == 2 
    and re.match(r'([A-Z]*).*', line.split()[1].strip()).group(1) in [
      'STRING', 'FLOAT', 'DOUBLE', 'BIGINT', 'BOOLEAN', 'TIMESTAMP', 'DECIMAL']
    )
  ]

filtered_processing_df = processing_df.select(*column_names)

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  microBatchOutputDF.createOrReplaceTempView("streaming_updates")

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql(f"""
    MERGE INTO {silver_catalog_name}.{silver_schema_name}.{silver_table_name} t
    USING streaming_updates s
    ON 
        s.uuid = t.uuid
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
writer = filtered_processing_df.writeStream \
    .queryName("weather_sensor_processing_stream") \
    .foreachBatch(upsertToDelta) \
    .outputMode("update")\
    .option("checkpointLocation", checkpoint_dir) \
    .start()


In [0]:
# for q in spark.streams.active:
#     if q.name == "weather_sensor_processing_stream":
#         q.stop()

In [0]:
# spark.sql(f"""
#     OPTIMIZE {silver_catalog_name}.{silver_schema_name}.{silver_table_name}
#     ZORDER BY (measurement_timestamp);
#     """
# )