In [0]:
SECRET_SCOPE = "IOT_CHALLENGE_SCOPE"
config_dict = {"API_KEY": dbutils.secrets.get(SECRET_SCOPE, "API_KEY"), 
 "API_SECRET":dbutils.secrets.get(SECRET_SCOPE, "API_SECRET"), 
 "BOOTSTRAP": dbutils.secrets.get(SECRET_SCOPE,  "BOOTSTRAP"),  
 "TOPIC": "IOT_SENSORS_TOPIC"
}
CATALOG = "sandbox_first_catalog"
SCHEMA = "iot_sensors"
LANDING_TABLE = f"{CATALOG}.{SCHEMA}.iot_messages_landing"
BRONZE_TABLE = f"{CATALOG}.{SCHEMA}.iot_messages_bronze"
SILVER_TABLE = f"{CATALOG}.{SCHEMA}.iot_messages_silver"
CHECKPOINT_LANDING = "/tmp/chk_landing"
CHECKPOINT_BRONZE = "/tmp/chk_bronze"
CHECKPOINT_SILVER = "/tmp/chk_silver"

In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

In [0]:
landing_df = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", config_dict.get("BOOTSTRAP"))
        .option("subscribe", config_dict.get("TOPIC"))
        .option("startingOffsets", "earliest")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
                f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{config_dict.get("API_KEY")}" password="{config_dict.get("API_SECRET")}";')
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.ssl.endpoint.identification.algorithm", "https")
        .load()
)

landing_df.writeStream \
    .format("delta") \
    .trigger(availableNow=True)\
    .option("checkpointLocation", CHECKPOINT_LANDING) \
    .toTable(LANDING_TABLE)

In [0]:
bronze_df = (
    spark.readStream
        .format("delta")
        .option("readChangeData", "false")
        .table(LANDING_TABLE)
)
bronze_df.writeStream \
    .format("delta") \
    .trigger(availableNow=True)\
    .option("checkpointLocation", CHECKPOINT_BRONZE) \
    .toTable(BRONZE_TABLE)

In [0]:
value_schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("device_id", StringType(), True),
    StructField("location", StringType(), True),
    StructField("ts", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True)
])
silver_df = (
    spark.readStream
        .format("delta")
        .option("readChangeData", "false")
        .table(BRONZE_TABLE)
        .withColumn("value_struct", from_json(col("value").cast("string"), value_schema))
        .select(
            "key",
            "topic", "partition", "offset", "timestamp", "timestampType",
            "value_struct.*"
        )
)

silver_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", CHECKPOINT_SILVER) \
    .toTable(SILVER_TABLE)