
### Structured Streaming with Databricks Delta Tables

In [0]:
import time
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timezone
import uuid

In [0]:
file_schema = (spark
               .read
               .format("csv")
               .option("header", True)
               .option("inferSchema", True)
               .load("/databricks-datasets/iot-stream/data-user/userData.csv")
               .limit(10)
               .schema)

In [0]:
uuidUdf= udf(lambda : uuid.uuid4().hex,StringType())

In [0]:
# Stream raw IOT Events from S3 bucket
iot_event_stream = (spark
                    .readStream
                    .option( "maxFilesPerTrigger", 1 )
                    .format("csv")
                    .option("header", True)
                    .schema(file_schema)
                    .load("/databricks-datasets/iot-stream/data-user/*.csv")
                    .withColumn( "id", uuidUdf() )
                    .withColumn( "timestamp", lit(datetime.now().timestamp()).cast("timestamp") )
                    .repartition(200)
                   )
display(iot_event_stream)

##  Setup checkpoint directory for writing out streaming workloads

In [0]:
checkpointDir = "/local_disk0/tmp/delta-stream_test/1";
checkpoint_dir_1 = "/local_disk0/tmp/delta-stream_test/silver_check_1"
checkpoint_dir_2 = "/local_disk0/tmp/delta-stream_test/gold_check_1"

In [0]:
iot_stream = iot_event_stream.writeStream\
                             .format("delta")\
                             .outputMode("append")\
                             .option("header", True)\
                             .option("checkpointLocation", checkpointDir)\
                             .trigger(processingTime='10 seconds')\
                             .table("iot_event_bronze")

In [0]:
%sql

DESCRIBE TABLE EXTENDED iot_event_bronze;

In [0]:
%sql

SELECT * FROM iot_event_bronze;

In [0]:
display(dbutils.fs.ls('dbfs:/user/hive/warehouse/iot_event_bronze'))


### Deduplicate Bronze level data

In [0]:
# Drop terribly out-of-order events
bronzeClean = iot_event_stream.withWatermark( "timestamp", "1 day" )

In [0]:
# Drop bad events
bronzeClean = bronzeClean.dropna()

silverStream = bronzeClean.writeStream\
            .format("delta")\
            .outputMode("append")\
            .option( "checkpointLocation", checkpoint_dir_1)\
            .trigger(processingTime='10 seconds')\
            .table("iot_event_silver")
silverStream

In [0]:
%sql

DESCRIBE TABLE EXTENDED iot_event_silver;

In [0]:
display(dbutils.fs.ls('dbfs:/user/hive/warehouse/iot_event_silver'))

In [0]:
%sql

SELECT * FROM iot_event_silver;


## Streaming Aggregation from Silver to Gold

In [0]:
silver_stream = (spark.readStream
.option( "maxFilesPerTrigger", 1 )
.format( "delta" )
.table("iot_event_silver")
)

In [0]:
def updateGold( batch, batchId ):
  ( gold.alias("gold")
        .merge( batch.alias("batch"),
                "gold.date = batch.date AND gold.miles_walked = batch.miles_walked"
              )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
  )

In [0]:
( (silver_stream.withWatermark("timestamp", "1 hour").groupBy("gender").agg(avg("weight").alias("avg_weight")))
   .writeStream
   .trigger(processingTime='12 seconds')
   .outputMode("complete")\
   .option("checkpointLocation", checkpoint_dir_2)\
   .table("iot_event_gold")
)

In [0]:
%sql

SELECT gender, avg(weight) as AVG_weight, avg(height) as AVG_height
FROM iot_event_silver
Group by gender
ORDER by gender DESC, AVG_weight ASC;

In [0]:
%sql
OPTIMIZE iot_event_silver
ZORDER BY gender, height, weight;

-- Run the same select query at higher performance

SELECT gender, avg(weight) as AVG_weight, avg(height) as AVG_height
FROM iot_event_silver
Group by gender
ORDER by gender DESC, AVG_weight ASC;