Permalink
Browse files

StreamingListener

  • Loading branch information...
youngbink committed Mar 22, 2018
1 parent f203d01 commit 108ff87ad82410595105f95934b5c782bc5f8d15
Showing with 22 additions and 11 deletions.
  1. +22 −11 src/main/scala/io/bespin/scala/spark/streaming/EventCount.scala
@@ -17,12 +17,14 @@
package io.bespin.scala.spark.streaming
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import org.apache.log4j._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{ManualClockWrapper, Minutes, StreamingContext}
import org.apache.spark.streaming.scheduler.{StreamingListener, StreamingListenerBatchCompleted}
import org.apache.spark.util.LongAccumulator
import org.rogach.scallop._
@@ -54,6 +56,8 @@ object EventCount {
val batchDuration = Minutes(1)
val ssc = new StreamingContext(spark.sparkContext, batchDuration)
val batchListener = new StreamingContextBatchCompletionListener(ssc, 24)
ssc.addStreamingListener(batchListener)
val rdds = buildMockStream(ssc.sparkContext, args.input())
val inputData: mutable.Queue[RDD[String]] = mutable.Queue()
@@ -75,23 +79,30 @@ object EventCount {
for (rdd <- rdds) {
inputData += rdd
ManualClockWrapper.advanceManualClock(ssc, batchDuration.milliseconds)
// Sleep a bit to make sure the batch is processed.
while (inputData.length > 100) {
Thread.sleep(50L)
}
ManualClockWrapper.advanceManualClock(ssc, batchDuration.milliseconds, 50L)
}
waitForAccumulator(numCompletedRDDs, 24) { () =>
batchListener.waitUntilCompleted(() =>
ssc.stop()
}
)
}
def waitForAccumulator(accum: LongAccumulator, target: Long)(cleanUpFunc: () => Unit): Unit = {
while (accum.value < target) {
Thread.sleep(50L)
class StreamingContextBatchCompletionListener(val ssc: StreamingContext, val limit: Int) extends StreamingListener {
def waitUntilCompleted(cleanUpFunc: () => Unit): Unit = {
while (!sparkExSeen) {}
cleanUpFunc()
}
val numBatchesExecuted = new AtomicInteger(0)
@volatile var sparkExSeen = false
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val curNumBatches = numBatchesExecuted.incrementAndGet()
log.info(s"${curNumBatches} batches have been executed")
if (curNumBatches == limit) {
sparkExSeen = true
}
}
cleanUpFunc()
}
def buildMockStream(sc: SparkContext, directoryName: String): Array[RDD[String]] = {

0 comments on commit 108ff87

Please sign in to comment.