In [1]:
server = "ubuntuserver010.westus2.cloudapp.azure.com:9092"
topic = "week7"
offset = "latest" #"earliest"

#Read stream from Kafka
kafkaDF = (spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", server)
        .option("subscribe", topic)
        #.option("subscribe", "topic1,topic2") #Can subscribe to multiple topics
        #.option("subscribePattern", "topic.*") #Can subscribe based on a pattern too
        .option("startingOffsets", offset)
        .load()
        )

#Select key and value from Kafka data
sensorsRawDF = kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [2]:
#REFERENCE DOCUMENTATION
#https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
#https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
#https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html
#https://docs.azuredatabricks.net/_static/notebooks/transform-complex-data-types-python.html

from pyspark.sql.types import *
from pyspark.sql.functions import from_json, avg

rootPath = "abfss://pbchamp@uwbigdatatechnologies.dfs.core.windows.net/Week7"
archivePath = rootPath + "/archive"
invalidSensorPath = rootPath + "/invalidsensors"

#Empty directories
dbutils.fs.rm(archivePath, True)
dbutils.fs.rm(invalidSensorPath, True)

#Archive key/value stream data to disk
archiveQuery = (
  sensorsRawDF
    .writeStream
    .format("parquet")
    .option("path", archivePath)  
    .option("checkpointLocation", archivePath)
    .start()
)

#Create nested schema for JSON
jsonSchema = (StructType()
  .add("server", StringType())
  .add("timestamp", TimestampType())
  .add("sensors", StructType()
       .add("sensor1", IntegerType())
       .add("sensor2", IntegerType())
       .add("sensor3", IntegerType())
       .add("sensor4", IntegerType())
       .add("sensor5", IntegerType())
       .add("sensor6", IntegerType())
       .add("sensor7", IntegerType())
       .add("sensor8", IntegerType())
       .add("sensor9", IntegerType())
       .add("sensor10", IntegerType())
  )
)

#Structure JSON value data
sensorsDF = (
  sensorsRawDF
     .select(from_json("value", jsonSchema)
     .alias("tempsensors"))
     .select("tempsensors.*")
)

#Create invalid records filter
invalidSensorsFilter = "sensors.sensor1 <= 0 or sensors.sensor2 <= 0 or sensors.sensor3 <= 0 or sensors.sensor4 <= 0 or sensors.sensor5 <= 0 or sensors.sensor6 <= 0 or sensors.sensor7 <= 0 or sensors.sensor8 <= 0 or sensors.sensor9 <= 0 or sensors.sensor10 <= 0"

#Copy invalid records into a dataframe
invalidSensorsDF = (
  sensorsDF
     .filter(invalidSensorsFilter)
)

#Write invalid records to disk
invalidSensorsQuery = (
  invalidSensorsDF
    .writeStream
    .format("parquet")
    .option("path", invalidSensorPath)  
    .option("checkpointLocation", invalidSensorPath)
    .start()
)

#From valid records calculate averages for sensors 1 and 2 by server
sensorAvgByServerDF = ((
  sensorsDF
    .select("server", "sensors.*")
    .filter("not " + invalidSensorsFilter)
)
  .groupBy("server")
  .agg(
    avg("sensor1").alias("Sensor1Avg"), 
    avg("sensor2").alias("Sensor2Avg")
  )
)

#Display averages
display(sensorAvgByServerDF)

server,Sensor1Avg,Sensor2Avg
c17-3c0s5,25.0,36.0
c23-4c2s2,33.0,42.0
c2-5c0s6,25.0,38.0
c13-1c1s2,28.0,40.5
c15-6c1s5,28.5,41.5
c22-6c2s4,30.5,39.0
c17-4c0s7,26.0,38.0
c7-6c2s0,31.83333333333333,40.0
c3-5c0s3,26.0,38.0
c0-5c0s5,29.0,42.0


In [3]:
#############
### BONUS ###
#############

#https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

#CREATED KAFKA TOPICS windowAvg1 and windowAvg2
#bin/kafka-topics.sh --create --bootstrap-server ubuntuserver010.westus2.cloudapp.azure.com:9092 --replication-factor 1 --partitions 1 --topic windowAvg1
#bin/kafka-topics.sh --create --bootstrap-server ubuntuserver010.westus2.cloudapp.azure.com:9092 --replication-factor 1 --partitions 1 --topic windowAvg2

from pyspark.sql.functions import window, col #to_json, struct

#From valid records calculate windowed averages for sensor 1 
sensorAvgByServerWindowDF1 = ((
  sensorsDF
    .select("server", "timestamp", "sensors.*")
    .filter("not " + invalidSensorsFilter)
)
  .groupBy(
      col("server").alias("key"), 
      window("timestamp", "5 minutes", "1 minutes"))
  .agg(
    avg("sensor1").cast("string").alias("value")
    #avg("sensor2").alias("Sensor2Avg")
  )
)
      #to_json(struct("window")).alias("key"), 

#display(sensorAgg2DF)

server = "ubuntuserver010.westus2.cloudapp.azure.com:9092"
topic = "windowAvg1"
windowAvg1Path = rootPath + "/windowAvg1"

#Write to sensor 1 windowed average to Kafka topic
#  (view via bin/kafka-console-consumer.sh --bootstrap-server ubuntuserver010.westus2.cloudapp.azure.com:9092 --topic windowAvg1 --from-beginning)
sensorAgg2Query = (
  sensorAvgByServerWindowDF1
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", server)
    .option("topic", topic)
    .option("checkpointLocation", windowAvg1Path)
    .outputMode("complete")
    .start()
)

In [4]:
#TEST all/invalid data without streaming

#path = archivePath = rootPath + "/archive"
#path = invalidSensorPath = rootPath + "/invalidsensors"

#df = spark.read.parquet(path + "/*.parquet")
#display(df)