# Streaming Data with Kafka - With added Custom Listener

In this notebook extends our Kafka job with an custom Listener that will let us visualize the lifecycle of the streaming job.

As we have explored the application logic of this job already, we can head to towards the bottom of the notebook where we implement a custom StreamingListener and register it to the StreamingContext.

Then, when we start our streaming job, we can observe the different streaming events reported in a reactive UI Table widget in the notebook.


## Our Streaming dataset will consist of sensor information, containing the sensorId, a timestamp, and a value.
For the sake of simplicity in this self-contained example, we are going to generate a randomized dataset, using an scenario that simulates a real IoT use case.
The timestamp will be the time of execution and each record will be formatted as a string coming from "the field" of comma separated values.

We also add a bit of real-world chaos to the data: Due to weather conditions, some sensors publish corrupt data. 

In [ ]:
val topic = "iot-data"
val workDir = "/tmp/learningsparkstreaming/"
val referenceFile = "sensor-records.parquet"
val targetFile = "enrichedIoTStream.parquet"
val unknownSensorsTargetFile = "unknownSensorsStream.parquet"
val kafkaBootstrapServer = "127.0.0.1:9092"

# Load the reference data from a parquet file
We also cache the data to keep it in memory and improve the performance of our steaming application

In [ ]:
val sensorRef = sparkSession.read.parquet(s"$workDir/$referenceFile")
sensorRef.cache()

(Parquet files preserve the schema information, which we can retrieve from the DataFrame)

In [ ]:
sensorRef.schema

## We create our Streaming Context

In [ ]:
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

val streamingContext = new StreamingContext(sparkContext, Seconds(2))

## Our stream source will be a a Direct Kafka Stream


In [ ]:
import org.apache.kafka.clients.consumer.ConsumerRecord
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka._

val kafkaParams = Map[String, String](
  "metadata.broker.list" -> kafkaBootstrapServer,
  "group.id" -> "iot-data-group",
  "auto.offset.reset" -> "largest",
  "enable.auto.commit" -> (false: java.lang.Boolean).toString
)

val topics = Set(topic)
@transient val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     streamingContext, kafkaParams, topics)

// @transient val stream = KafkaUtils.createDirectStream[String, String](
//   streamingContext,
//   PreferConsistent,
//   Subscribe[String, String](topics, kafkaParams)
// )



# Providing Schema information for our streaming data
Now that we have a DStream of fresh data processed in a 2-second interval, we can start focusing on the gist of this example.
First, we want to define and apply a schema to the data we are receiving.
In Scala, we can define a schema with a `case class`

In [ ]:
case class SensorData(sensorId: Int, timestamp: Long, value: Double)

Now we apply that schema to the dstream, using the `flatMap` function.

We use `flatMap` instead of a `map` because there might be cases when the incoming data is incomplete or corrupted.
If we would use `map`, we would have to provide a resulting value for each transformed record. 
That is something we cannot do for invalid records.
With `flatMap` in combination with `Option`, we can represent valid records as `Some(recordValue)` and invalid records as `None`.
By the virtue of `flatMap` the internal `Option` container gets flattend and our resulting stream will only contain valid `recordValue`s.

During the parsing of the comma separated records, we not only protect ourselves against missing fields, but also parse the numeric values to their expected types. The surrounding `Try` captures any `NumberFormatException` that might arise from invalid records.

In [ ]:
import scala.util.Try
val schemaStream = stream.flatMap{case (id, record) => 
                                  val fields = record.split(",")
                                  if (fields.size == 3) {
                                    Try (SensorData(fields(0).toInt, fields(1).toLong, fields(2).toDouble)).toOption
                                  } else { None }
                                 }

# Enrich the streaming data, without dropping records.
With the schema stream in place, we can proceed to transform the underlying RDDs into DataFrames.

As in the previous notebook, we are going to use the reference data to add the specific sensor information.
Previously, we used the default 'join', which is an inner-join that requires the join key to be available on both sides of the join.
This causes us to drop all data records for which we don't know the id. Given that new sensors might become available or misconfigured sensors might be sending an incorrect id, we would like to preserve all records in order to reconcile them in a latter stage.

As before, we do this in the context of the general-purpose action `foreachRDD`. 

In [ ]:
val stableSparkSession = sparkSession
import stableSparkSession.implicits._
import org.apache.spark.sql.SaveMode.Append
schemaStream.foreachRDD{rdd => 
                        val sensorDF = rdd.toDF()
                        val sensorWithInfo = sensorRef.join(broadcast(sensorDF), Seq("sensorId"), "rightouter")
                        val unknownSensors = sensorWithInfo.filter($"sensorType".isNull) 
                        val knownSensors = sensorWithInfo.filter(!$"sensorType".isNull) 
                        val denormalizedSensorData =
                            knownSensors.withColumn("dnvalue", $"value"*($"maxRange"-$"minRange")+$"minRange")
                        val sensorRecords = denormalizedSensorData.drop("value", "maxRange", "minRange")
                        val ts= System.currentTimeMillis
                        sensorRecords.write.format("parquet").mode(Append).save(s"$workDir/$targetFile")
                        unknownSensors.write.format("parquet").mode(Append).save(s"$workDir/$unknownSensorsTargetFile")
                       }

# Custom Streaming Listener
This sample custom listener shows how to implement a Streaming Custom Listener to receive updates about our streaming application evetns and progress.
We have opted for a UI data display: This custom listener produces a Notebook Widget that reactively receives and displays the notified data from the `StreamingListener` interface.
That way we can visually explore the execution lifecycle of this Spark Streaming Job

In [ ]:
import org.apache.spark.streaming.scheduler._
import scala.collection.immutable.Queue
class NotebookTableStreamingListener() extends StreamingListener {
  case class TableEntry(timestamp: Long, operation: String, target: String, duration: Option[Long])
  object TableEntry {
    def now() = System.currentTimeMillis()
    def apply(operation: String, target: String, duration: Option[Long] = None): TableEntry = {
      this(now(), operation, target, duration)
    }
  }
  val dummyEntry = Seq(TableEntry("-","-"))
  val table = new notebook.front.widgets.charts.TableChart[Seq[TableEntry]](dummyEntry)
  var entries: List[TableEntry] = List() 
  val EventLimit = 40
  def add(tableEntry: TableEntry) = {
    entries = (tableEntry :: entries).take(EventLimit)
    table.applyOn(entries)
  }
  
  def batchName(batchInfo: BatchInfo):String = {
    "batch-" + batchInfo.batchTime
  }
  def shortOutputOperationDescription(outOp: OutputOperationInfo) : String = {
    outOp.description.split("\n").headOption.getOrElse("-")
  }
    
  /** Called when the streaming has been started */
  override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {
    add(TableEntry("stream started", "-"))
  }

  /** Called when a receiver has been started */
  override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit =  {
    add(TableEntry("receiver started", receiverStarted.receiverInfo.name))
  }

  /** Called when a receiver has reported an error */
  override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
    add(TableEntry("receiver error", receiverError.receiverInfo.lastError))
  }

  /** Called when a receiver has been stopped */
  override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) =  {
    add(TableEntry("receiver stopped", receiverStopped.receiverInfo.name))
  }

  /** Called when a batch of jobs has been submitted for processing. */
  override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = {
    add(TableEntry("batch submitted", batchName(batchSubmitted.batchInfo)))
  }

  /** Called when processing of a batch of jobs has started.  */
  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
    add(TableEntry("batch started", batchName(batchStarted.batchInfo)))
  }

  /** Called when processing of a batch of jobs has completed. */
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    add(TableEntry("batch completed", batchName(batchCompleted.batchInfo), batchCompleted.batchInfo.totalDelay))
  }

  /** Called when processing of a job of a batch has started. */
  override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
    add(TableEntry("output operation started", shortOutputOperationDescription(outputOperationStarted.outputOperationInfo)))
  }

  /** Called when processing of a job of a batch has completed. */
  override def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
    add(TableEntry("output operation completed", shortOutputOperationDescription(outputOperationCompleted.outputOperationInfo), outputOperationCompleted.outputOperationInfo.duration))
  }

}


## Create an instance of the listener that we just defined

In [ ]:
val customTableListener = new NotebookTableStreamingListener()

## Add the listener to the streaming context so that it can receive callbacks from the different lifecycle points

In [ ]:
streamingContext.addStreamingListener(customTableListener)

## We add the table widget to our notebook to render it

In [ ]:
customTableListener.table

## Start the streaming context so that the streaming process can start.
Watch the table for updates with data about the execution of our streaming context.

In [ ]:
streamingContext.start()

In [ ]:
// Be careful not to stop the context if you want the streaming process to continue
streamingContext.stop(false)