In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
# iotanalyticsdatavricksscope

# https://iotsamstorage.blob.core.windows.net/bronze

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS iotsamdatabricks.bronze

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS iotsamdatabricks.bronze.iot_bronze_data

In [0]:

spark.conf.set("fs.azure.account.key.iotsamstorage.dfs.core.windows.net", dbutils.secrets.get(scope = "iotanalyticsdatavricksscope", key = "storagekeyiot"))

bronze_path = "abfss://bronze@iotsamstorage.dfs.core.windows.net/"


# df = spark.read.json(bronze_path)

# df.show(5, truncate=False)
# df.display()

In [0]:
iot_schema = StructType([
    StructField("deviceId", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("pressure", DoubleType(), True)
])

df = spark.readStream.format("cloudFiles")\
    .option("cloudFiles.format", "json")\
    .option("cloudFiles.schemaLocation", "abfss://bronze@iotsamstorage.dfs.core.windows.net/checkpoints/bronze_stream/schema")\
    .option("cloudFiles.inferColumnTypes", "true")\
    .schema(iot_schema)\
    .load(bronze_path)

In [0]:
df = df.withColumn("ingestion_timestamp", current_timestamp())\
    .withColumn("ingestion_date", current_date())\
    .withColumn("device_timestamp_id", concat(col("deviceId"), lit("_"), col("timestamp")))\
    .withColumn("source_system", lit("azure_stream_analytics"))

In [0]:
df.writeStream.format("delta").outputMode("append").option("checkpointLocation", "abfss://bronze@iotsamstorage.dfs.core.windows.net/checkpoints").option("mergeSchema", "true").partitionBy("ingestion_date").trigger(processingTime= "10 seconds").table("iotsamdatabricks.bronze.iot_bronze_data")


print("ðŸ‘Œ Bronze Streaming Layer Done And Active!")
print(f"Output Table: dev_catalog.bronze.bronze_iot_data")
print(f"Processing Trigger: Every 10 seconds")
print(f"Stream is running continuously...")

In [0]:

df.display()