# Processing Sensor Data from Kafka with Structured Streaming

The intention of this example is to explore the main aspects of the Structured Streaming API.

We will: 
 - use the Kafka `source` to consume events from the `sensor-raw` topic in Kafka
 - implement the application logic using the Dataset API
 - use the `memory` sink to visualize the data
 - use the `kafka` sink to publish our results to a different topic and make it available downstream.
 - have some fun!  

##Common Definitions
We define a series of parameters common to  the notebook

In [ ]:
val sourceTopic = "sensor-raw"
val targetTopic = "sensor-processed"
val kafkaBootstrapServer = "172.17.0.2:9092" // local
// val kafkaBootstrapServer = "10.2.2.191:1025" // fast-data-ec2

sourceTopic: String = sensor-raw
targetTopic: String = sensor-processed
kafkaBootstrapServer: String = 172.17.0.2:9092


## Read a stream from Kafka
We use the kafka source to subscribe to the `sourceTopic` that contains the raw sensor data.
This results in a streaming dataframe that we use to operate on the underlying data

In [ ]:
val rawData = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServer)
      .option("subscribe", sourceTopic)
      .option("startingOffsets", "latest")
      .load()

rawData: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]


In [ ]:
rawData.isStreaming

res51: Boolean = true


In [ ]:
rawData.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



## Declare the schema of the data in the stream
We need to declare the schema of the data in the stream in order to parse it.

We use a case class to define the schema. It's much more convenient that using the sql types directly.

In [ ]:
case class SensorData(id: String, ts: Long, temp: Double, hum: Double)

defined class SensorData


In [ ]:
import org.apache.spark.sql.Encoders
val schema = Encoders.product[SensorData].schema

import org.apache.spark.sql.Encoders
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true), StructField(ts,LongType,false), StructField(temp,DoubleType,false), StructField(hum,DoubleType,false))


## Parse the Data
The actual payload is contained in the 'value' field that we get from the kafka topic (see above).
We first need to convert that binary value field to string and then use the `json` support in Spark to transform our incoming data into a structured streaming `Dataset`


In [ ]:
val rawValues = rawData.selectExpr("CAST(value AS STRING)").as[String]
val jsonValues = rawValues.select(from_json($"value", schema) as "record")
val sensorData = jsonValues.select("record.*").as[SensorData]

rawValues: org.apache.spark.sql.Dataset[String] = [value: string]
jsonValues: org.apache.spark.sql.DataFrame = [record: struct<id: string, ts: bigint ... 2 more fields>]
sensorData: org.apache.spark.sql.Dataset[SensorData] = [id: string, ts: bigint ... 2 more fields]


In [ ]:
sensorData.printSchema()

root
 |-- id: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- temp: double (nullable = true)
 |-- hum: double (nullable = true)



## Explore the data stream
To view the streaming data, we will use the `memory` sink and query the resulting table to get samples of the data.

In [ ]:
val visualizationQuery = sensorData.writeStream
  .queryName("visualization")    // this query name will be the SQL table name
  .outputMode("append")
  .format("memory")
  .start()

visualizationQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6853c5a6


## Explore the Data
The `memory` sink creates an in-memory SQL table (like a `tempTable`) that we can query using Spark SQL
The result of the query is a static `Dataframe` that contains a snapshot of the data.

In [ ]:
val sampleDataset = sparkSession.sql("select * from visualization")

sampleDataset: org.apache.spark.sql.DataFrame = [id: string, ts: bigint ... 2 more fields]


In [ ]:
// This is a static Dataset!
sampleDataset.isStreaming

res62: Boolean = false


### Our dataset is backed by the streaming data, it will update each time we execute an action, delivering the latest data.

In [ ]:
sampleDataset.count

res64: Long = 150


In [ ]:
sampleDataset.count

res66: Long = 203


## Visualize the Data
We will make a custom live update by querying the stream every so often for the latest updates

In [ ]:
val dummy = Seq((System.currentTimeMillis, 0.1), (System.currentTimeMillis, 0.1))

val chart = CustomPlotlyChart(dummy,
                  layout=s"{title: 'sensor data sample'}",
                  dataOptions="""{type: 'line'}""",
                  dataSources="{x: '_1', y: '_2' }")
chart

dummy: Seq[(Long, Double)] = List((1536775496835,0.1), (1536775496835,0.1))
chart: notebook.front.widgets.charts.CustomPlotlyChart[Seq[(Long, Double)]] = <CustomPlotlyChart widget>
res68: notebook.front.widgets.charts.CustomPlotlyChart[Seq[(Long, Double)]] = <CustomPlotlyChart widget>


## Async update of our visualization
We will use a plain old Thread to run a recurrent query on our in-memory table and update the chart accordingly.


In [ ]:
@volatile var running = true

running: Boolean = true


In [ ]:
import scala.concurrent.duration._
import scala.annotation.tailrec

val updater = new Thread() {
  @tailrec
  def visualize(): Unit = {
    val lastMinute = System.currentTimeMillis - 1.minute.toMillis
    val data = sampleDataset.where($"ts" > lastMinute and $"id" === "office").as[SensorData]
                            .map{case SensorData(id, ts, temp, hum) => (ts/1000%3600, temp)}.collect
    chart.applyOn(data)
    if (running) {
      Thread.sleep(1.second.toMillis)
      visualize()
    } else ()
  } 
  
  override def run() {
    visualize()
  }
}.start()


import scala.concurrent.duration._
import scala.annotation.tailrec
updater: Unit = ()


In [ ]:
// visualizationQuery.stop()

In [ ]:
running = false

running: Boolean = false


# Improve the data with sliding windows

In [ ]:
import org.apache.spark.sql.types._

import org.apache.spark.sql.types._


In [ ]:
val toSeconds = udf((ts:Long) => ts/1000)
val tempBySensorMovingAverage = sensorData.withColumn("timestamp", toSeconds($"ts").cast(TimestampType))
                                          .withWatermark("timestamp", "30 seconds")
                                          .groupBy($"id", window($"timestamp", "30 seconds", "10 seconds"))
                                          .agg(avg($"temp"))

toSeconds: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(LongType)))
tempBySensorMovingAverage: org.apache.spark.sql.DataFrame = [id: string, window: struct<start: timestamp, end: timestamp> ... 1 more field]


In [ ]:
tempBySensorMovingAverage.printSchema

root
 |-- id: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- avg(temp): double (nullable = true)



In [ ]:
val windowedSensorQuery = tempBySensorMovingAverage.writeStream
  .queryName("movingAverage")    // this query name will be the table name
  .outputMode("append")  
  .format("memory")
  .start()

windowedSensorQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@60135bc2


### Get the data from the in-memory table

In [ ]:
val movingAvgDF = sparkSession.sql("select * from movingAverage")

movingAvgDF: org.apache.spark.sql.DataFrame = [id: string, window: struct<start: timestamp, end: timestamp> ... 1 more field]


In [ ]:
movingAvgDF

res37: org.apache.spark.sql.DataFrame = [id: string, window: struct<start: timestamp, end: timestamp> ... 1 more field]


### Chart the Moving Average Data

In [ ]:
import org.apache.spark.sql.functions._
val lastMinute: Long = System.currentTimeMillis/1000 - 5.minute.toSeconds
val mAvgSample = movingAvgDF.select($"window.start".cast(LongType) as "timestamp", $"avg(temp)" as "temp")
                   .where($"timestamp" > lastMinute and $"id" === "office")
                   .orderBy($"timestamp")
                   .as[(Long, Double)]
                   .collect().map{case (ts, v) => (ts  % 3600,v)}


CustomPlotlyChart(mAvgSample,
                  layout=s"{title: 'moving average sensor data'}",
                  dataOptions="""{type: 'line'}""",
                  dataSources="{x: '_1', y: '_2'}")

import org.apache.spark.sql.functions._
lastMinute: Long = 1536766758
mAvgSample: Array[(Long, Double)] = Array((2520,6.72), (2530,6.860357142857143), (2540,6.848541666666667), (2550,7.002833333333333), (2560,7.156666666666666), (2570,7.207), (2580,7.148000000000001), (2590,7.156833333333333))
res41: notebook.front.widgets.charts.CustomPlotlyChart[Array[(Long, Double)]] = <CustomPlotlyChart widget>


In [ ]:
// stop the ancilliary visualization queries
windowedSensorQuery.stop()
visualizationQuery.stop()

## Write our moving average data to our `sensor-clean` topic

In [ ]:
val kafkaFormat = tempBySensorMovingAverage
.select($"id", $"window.start".cast(LongType) as "timestamp", $"avg(temp)" as "temp")
.select($"id" as "key", to_json(struct($"id", $"timestamp", $"temp")) as "value")

kafkaFormat: org.apache.spark.sql.DataFrame = [key: string, value: string]


In [ ]:
val kafkaWriterQuery = kafkaFormat.writeStream
  .queryName("kafkaWriter")    // this query name will be the table name
  .outputMode("append") 
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServer)
  .option("topic", targetTopic)
  .option("checkpointLocation", "/tmp/spark/checkpoint2")
  .option("failOnDataLoss", "false")
  .start()

kafkaWriterQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@b08f00f


## View Progress

In [ ]:
val progress = kafkaWriterQuery.recentProgress

progress: Array[org.apache.spark.sql.streaming.StreamingQueryProgress] =
Array({
  "id" : "7d259da6-e9d7-4745-b80c-1c191005fd11",
  "runId" : "a5cc9f69-984f-4a96-960b-9862e08890d7",
  "name" : "kafkaWriter",
  "timestamp" : "2018-09-12T15:45:32.615Z",
  "batchId" : 9048,
  "numInputRows" : 55,
  "processedRowsPerSecond" : 30.95104108047271,
  "durationMs" : {
    "addBatch" : 1707,
    "getBatch" : 7,
    "queryPlanning" : 36,
    "triggerExecution" : 1777
  },
  "eventTime" : {
    "avg" : "2018-09-12T13:54:12.036Z",
    "max" : "2018-09-12T13:54:13.000Z",
    "min" : "2018-09-12T13:54:11.000Z",
    "watermark" : "2018-09-12T13:53:41.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 4336,
    "numRowsUpdated" : 159,
    "memoryUsedBytes" : 1271863
  } ],
  "sources" : [ {
    "d...

In [ ]:
progress.map(entry  => (entry.inputRowsPerSecond, entry.processedRowsPerSecond))

res45: Array[(Double, Double)] = Array((NaN,10.378681626928472), (1503.6211699164346,1928.545909253305), (33.771773906861,38.46153846153846), (35.36977491961415,53.88854868340478), (35.6280193236715,35.24492234169654), (35.39823008849557,32.11991434689507), (35.0132625994695,43.16546762589928), (35.5297157622739,37.59398496240601), (35.714285714285715,34.46033810143043), (34.65982028241335,38.35227272727273), (35.81460674157304,33.842070338420704), (35.526315789473685,23.872679045092838), (35.55750658472344,60.13363028953229), (34.53343130051433,33.837293016558675), (35.536602700781806,16.594756057085963))
