diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index d48b51aa69565..36aae958019ce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -351,15 +351,6 @@ abstract class DStream[T: ClassTag] ( dependencies.foreach(_.clearMetadata(time)) } - /* Adds metadata to the Stream while it is running. - * This method should be overwritten by sublcasses of InputDStream. - */ - private[streaming] def addMetadata(metadata: Any) { - if (metadata != null) { - logInfo("Dropping Metadata: " + metadata.toString) - } - } - /** * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 0dc6704603f82..8da4309daf4ca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -17,23 +17,23 @@ package org.apache.spark.streaming.dstream -import java.util.concurrent.ArrayBlockingQueue import java.nio.ByteBuffer +import java.util.concurrent.ArrayBlockingQueue -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.Await import scala.concurrent.duration._ import scala.reflect.ClassTag -import akka.actor.{Props, Actor} +import akka.actor.{Actor, Props} import akka.pattern.ask -import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} -import org.apache.spark.streaming._ import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.rdd.{RDD, BlockRDD} +import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} -import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver} +import org.apache.spark.streaming._ +import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, AddBlocks, DeregisterReceiver, RegisterReceiver} +import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -48,8 +48,10 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { - // This is an unique identifier that is used to match the network receiver with the - // corresponding network input stream. + /** Keeps all received blocks information */ + private val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] + + /** This is an unique identifier for the network input stream. */ val id = ssc.getNewNetworkStreamId() /** @@ -64,23 +66,45 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte def stop() {} + /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */ override def compute(validTime: Time): Option[RDD[T]] = { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure if (validTime >= graph.startTime) { - val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime) + val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id) + receivedBlockInfo(validTime) = blockInfo + val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) Some(new BlockRDD[T](ssc.sc, blockIds)) } else { Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } } + + /** Get information on received blocks. */ + private[streaming] def getReceivedBlockInfo(time: Time) = { + receivedBlockInfo(time) + } + + /** + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This + * implementation overrides the default implementation to clear received + * block information. + */ + private[streaming] override def clearMetadata(time: Time) { + super.clearMetadata(time) + val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration)) + receivedBlockInfo --= oldReceivedBlocks.keys + logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", ")) + } } private[streaming] sealed trait NetworkReceiverMessage private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) +private[streaming] case class ReportBlock(blockId: StreamBlockId, numRecords: Long, metadata: Any) extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage @@ -156,21 +180,20 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging actor ! ReportError(e.toString) } - /** * Pushes a block (as an ArrayBuffer filled with data) into the block manager. */ - def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + def pushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) - actor ! ReportBlock(blockId, metadata) + actor ! ReportBlock(blockId, arrayBuffer.size, metadata) } /** * Pushes a block (as bytes) into the block manager. */ - def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + def pushBlock(blockId: StreamBlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { env.blockManager.putBytes(blockId, bytes, level) - actor ! ReportBlock(blockId, metadata) + actor ! ReportBlock(blockId, -1 , metadata) } /** A helper actor that communicates with the NetworkInputTracker */ @@ -188,8 +211,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging } override def receive() = { - case ReportBlock(blockId, metadata) => - tracker ! AddBlocks(streamId, Array(blockId), metadata) + case ReportBlock(blockId, numRecords, metadata) => + tracker ! AddBlocks(ReceivedBlockInfo(streamId, blockId, numRecords, metadata)) case ReportError(msg) => tracker ! DeregisterReceiver(streamId, msg) case StopReceiver(msg) => @@ -211,7 +234,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging class BlockGenerator(storageLevel: StorageLevel) extends Serializable with Logging { - case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) + case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 7f3cd2f8eb1fd..9c69a2a4e21f5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time */ case class BatchInfo( batchTime: Time, + receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]], submissionTime: Long, processingStartTime: Option[Long], processingEndTime: Option[Long] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index c7306248b1950..80888c755c6bc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -147,7 +147,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => - jobScheduler.runJobs(time, graph.generateJobs(time)) + jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) ) // Restart the timer @@ -159,7 +159,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { private def generateJobs(time: Time) { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { - case Success(jobs) => jobScheduler.runJobs(time, jobs) + case Success(jobs) => + val receivedBlockInfo = graph.getNetworkInputStreams.map { stream => + val streamId = stream.id + val receivedBlockInfo = stream.getReceivedBlockInfo(time) + (streamId, receivedBlockInfo) + }.toMap + jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventActor ! DoCheckpoint(time) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index de675d3c7fb94..ae99454cf8b86 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -82,14 +82,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } } - def runJobs(time: Time, jobs: Seq[Job]) { - if (jobs.isEmpty) { - logInfo("No jobs added for time " + time) + def submitJobSet(jobSet: JobSet) { + if (jobSet.jobs.isEmpty) { + logInfo("No jobs added for time " + jobSet.time) } else { - val jobSet = new JobSet(time, jobs) - jobSets.put(time, jobSet) + jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => executor.execute(new JobHandler(job))) - logInfo("Added jobs for time " + time) + logInfo("Added jobs for time " + jobSet.time) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index fcf303aee6cd7..a69d74362173e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time * belong to the same batch. */ private[streaming] -case class JobSet(time: Time, jobs: Seq[Job]) { +case class JobSet( + time: Time, + jobs: Seq[Job], + receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty + ) { private val incompleteJobs = new HashSet[Job]() private val submissionTime = System.currentTimeMillis() // when this jobset was submitted @@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) { def toBatchInfo: BatchInfo = { new BatchInfo( time, + receivedBlockInfo, submissionTime, if (processingStartTime >= 0 ) Some(processingStartTime) else None, if (processingEndTime >= 0 ) Some(processingEndTime) else None diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index e4fa163f2e069..74a7644d1c7ad 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -17,26 +17,33 @@ package org.apache.spark.streaming.scheduler -import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} -import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} -import org.apache.spark.{SparkException, Logging, SparkEnv} -import org.apache.spark.SparkContext._ - -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import scala.concurrent.duration._ +import scala.collection.mutable.{HashMap, SynchronizedQueue, SynchronizedMap} import akka.actor._ -import akka.pattern.ask -import akka.dispatch._ -import org.apache.spark.storage.BlockId -import org.apache.spark.streaming.{Time, StreamingContext} + +import org.apache.spark.{Logging, SparkEnv, SparkException} +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StreamBlockId +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver} import org.apache.spark.util.AkkaUtils +/** Information about block received by the network receiver */ +case class ReceivedBlockInfo( + streamId: Int, + blockId: StreamBlockId, + numRecords: Long, + metadata: Any + ) + +/** + * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate + * with each other. + */ private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) +private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage @@ -53,9 +60,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverExecutor() val receiverInfo = new HashMap[Int, ActorRef] - val receivedBlockIds = new HashMap[Int, Queue[BlockId]] + val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]] + with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]] val timeout = AkkaUtils.askTimeout(ssc.conf) - + val listenerBus = ssc.scheduler.listenerBus // actor is created when generator starts. // This not being null means the tracker has been started and not stopped @@ -87,15 +95,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } /** Return all the blocks received from a receiver. */ - def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized { - val queue = receivedBlockIds.synchronized { - receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]()) - } - val result = queue.synchronized { - queue.dequeueAll(x => true) - } - logInfo("Stream " + receiverId + " received " + result.size + " blocks") - result.toArray + def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = { + val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true) + logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks") + receivedBlockInfo.toArray + } + + private def getReceivedBlockInfoQueue(streamId: Int) = { + receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo]) } /** Actor to receive messages from the receivers. */ @@ -110,17 +117,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { + sender.path.address) sender ! true } - case AddBlocks(streamId, blockIds, metadata) => { - val tmp = receivedBlockIds.synchronized { - if (!receivedBlockIds.contains(streamId)) { - receivedBlockIds += ((streamId, new Queue[BlockId])) - } - receivedBlockIds(streamId) - } - tmp.synchronized { - tmp ++= blockIds - } - networkInputStreamMap(streamId).addMetadata(metadata) + case AddBlocks(receivedBlockInfo) => { + getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo } case DeregisterReceiver(streamId, msg) => { receiverInfo -= streamId diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 461ea3506477f..0c1edff9c8616 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -23,6 +23,7 @@ import org.apache.spark.util.Distribution /** Base trait for events related to StreamingListener */ sealed trait StreamingListenerEvent +case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent @@ -34,14 +35,14 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen * computation. */ trait StreamingListener { - /** - * Called when processing of a batch has completed - */ + + /** Called when a batch of jobs has been submitted for processing. */ + def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } + + /** Called when processing of a batch of jobs has completed. */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } - /** - * Called when processing of a batch has started - */ + /** Called when processing of a batch of jobs has started. */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala index e9f8d21faab45..b574cb103f766 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala @@ -17,40 +17,86 @@ package org.apache.spark.streaming.ui -import scala.collection.mutable.SynchronizedQueue +import scala.collection.mutable.{HashMap, Queue} import scala.xml.Node import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.servlet.ServletContextHandler -import org.apache.spark.Logging -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListener, StreamingListenerBatchCompleted} +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.scheduler._ import org.apache.spark.ui.{ServerInfo, SparkUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{Distribution, Utils} +import java.util.Locale -private[spark] class StreamingUIListener() extends StreamingListener { +private[spark] class StreamingUIListener(conf: SparkConf) extends StreamingListener { - private val batchInfos = new SynchronizedQueue[BatchInfo] - private val maxBatchInfos = 100 + private val waitingBatchInfos = new HashMap[Time, BatchInfo] + private val runningBatchInfos = new HashMap[Time, BatchInfo] + private val completedaBatchInfos = new Queue[BatchInfo] + private val batchInfoLimit = conf.getInt("spark.steaming.ui.maxBatches", 100) - override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted) { - batchInfos.enqueue(batchStarted.batchInfo) - if (batchInfos.size > maxBatchInfos) batchInfos.dequeue() + override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized { + runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo } - def processingDelayDistribution = extractDistribution(_.processingDelay) + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized { + runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo + waitingBatchInfos.remove(batchStarted.batchInfo.batchTime) + } + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized { + waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime) + runningBatchInfos.remove(batchCompleted.batchInfo.batchTime) + completedaBatchInfos.enqueue(batchCompleted.batchInfo) + if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue() + } + + def numNetworkReceivers: Int = synchronized { + completedaBatchInfos.headOption.map(_.receivedBlockInfo.size).getOrElse(0) + } + + def waitingBatches: Seq[BatchInfo] = synchronized { + waitingBatchInfos.values.toSeq + } + + def runningBatches: Seq[BatchInfo] = synchronized { + runningBatchInfos.values.toSeq + } - def schedulingDelayDistribution = extractDistribution(_.schedulingDelay) + def completedBatches: Seq[BatchInfo] = synchronized { + completedaBatchInfos.toSeq + } - def totalDelay = extractDistribution(_.totalDelay) + def processingDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.processingDelay) + } - def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { - Distribution(batchInfos.flatMap(getMetric(_)).map(_.toDouble)) + def schedulingDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.schedulingDelay) } - def numBatchInfos = batchInfos.size + def totalDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.totalDelay) + } + + def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized { + val allBatcheInfos = waitingBatchInfos.values.toSeq ++ + runningBatchInfos.values.toSeq ++ completedaBatchInfos + val latestBatchInfos = allBatcheInfos.sortBy(_.batchTime)(Time.ordering).reverse.take(batchInfoLimit) + val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) + (0 until numNetworkReceivers).map { receiverId => + val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty)) + val distributionOption = Distribution(blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble)) + (receiverId, distributionOption) + }.toMap + } + + private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { + Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble)) + } } private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { @@ -62,7 +108,7 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) private val port = conf.getInt("spark.streaming.ui.port", StreamingUI.DEFAULT_PORT) private val securityManager = sc.env.securityManager - private val listener = new StreamingUIListener() + private val listener = new StreamingUIListener(conf) private val handlers: Seq[ServletContextHandler] = { Seq( createServletHandler("/", @@ -89,23 +135,19 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) private def render(request: HttpServletRequest): Seq[Node] = { - val batchStatsTable = generateBatchStatsTable() - val content = batchStatsTable + val content = generateBatchStatsTable() ++ generateNetworkStatsTable() UIUtils.headerStreamingPage(content, "", appName, "Spark Streaming Overview") } - private def generateBatchStatsTable(): Seq[Node] = { - def getQuantiles(timeDistributionOption: Option[Distribution]) = { - timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) } - } - val numBatches = listener.numBatchInfos + private def generateBatchStatsTable(): Seq[Node] = { + val numBatches = listener.completedBatches.size val table = if (numBatches > 0) { val processingDelayQuantilesRow = "Processing Times" +: getQuantiles(listener.processingDelayDistribution) val schedulingDelayQuantilesRow = - "Scheduling Delay:" +: getQuantiles(listener.processingDelayDistribution) + "Scheduling Delay:" +: getQuantiles(listener.schedulingDelayDistribution) val totalDelayQuantilesRow = - "End-to-end Delay:" +: getQuantiles(listener.totalDelay) + "End-to-end Delay:" +: getQuantiles(listener.totalDelayDistribution) val headerRow = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") @@ -119,11 +161,86 @@ private[spark] class StreamingUI(ssc: StreamingContext) extends Logging { None } + val batchCounts = + + + val batchStats = + + val content =

Batch Processing Statistics

++ -
{table.getOrElse("No statistics have been generated yet.")}
+
{batchCounts}
++ +
{batchStats}
+ content } + + private def generateNetworkStatsTable(): Seq[Node] = { + val receivedRecordDistributions = listener.receivedRecordsDistributions + val numNetworkReceivers = receivedRecordDistributions.size + val table = if (receivedRecordDistributions.size > 0) { + val headerRow = Seq("Receiver", "Min", "25th percentile", + "Median", "75th percentile", "Max") + val dataRows = (0 until numNetworkReceivers).map { receiverId => + val receiverName = s"Receiver-$receiverId" + val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => + d.getQuantiles().map(r => numberToString(r.toLong) + " records/batch") + }.getOrElse { + Seq("-", "-", "-", "-", "-") + } + receiverName +: receivedRecordStats + } + Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true)) + } else { + None + } + + val content = +

Network Input Statistics

++ +
{table.getOrElse("No network receivers")}
+ + content + } + + private def getQuantiles(timeDistributionOption: Option[Distribution]) = { + timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) } + } + + private def numberToString(records: Double): String = { + val trillion = 1e12 + val billion = 1e9 + val million = 1e6 + val thousand = 1e3 + + val (value, unit) = { + if (records >= 2*trillion) { + (records / trillion, "T") + } else if (records >= 2*billion) { + (records / billion, "B") + } else if (records >= 2*million) { + (records / million, "M") + } else if (records >= 2*thousand) { + (records / thousand, "K") + } else { + (records, "") + } + } + "%.1f%s".formatLocal(Locale.US, value, unit) + } } object StreamingUI { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 74e73ebb342fe..723ea18e91dbf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -238,11 +238,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { /** This is a server to test the network input stream */ -class TestServer() extends Logging { +class TestServer(portToBind: Int = 0) extends Logging { val queue = new ArrayBlockingQueue[String](100) - val serverSocket = new ServerSocket(0) + val serverSocket = new ServerSocket(portToBind) val servingThread = new Thread() { override def run() { @@ -281,7 +281,7 @@ class TestServer() extends Logging { def start() { servingThread.start() } - def send(msg: String) { queue.add(msg) } + def send(msg: String) { queue.put(msg) } def stop() { servingThread.interrupt() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala new file mode 100644 index 0000000000000..204041def7dfc --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -0,0 +1,52 @@ +package org.apache.spark.streaming + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} +import org.apache.spark.streaming.dstream.InputDStream +import scala.reflect.ClassTag +import org.apache.spark.rdd.RDD +import scala.util.Random + +class UISuite extends FunSuite with BeforeAndAfterAll { + + test("Testing") { + runStreaming(1000000) + } + + def runStreaming(duration: Long) { + val ssc = new StreamingContext("local[10]", "test", Seconds(1)) + val servers = (1 to 5).map { i => new TestServer(10000 + i) } + + val inputStream = ssc.union(servers.map(server => ssc.socketTextStream("localhost", server.port))) + inputStream.count.print + + ssc.start() + servers.foreach(_.start()) + val startTime = System.currentTimeMillis() + while (System.currentTimeMillis() - startTime < duration) { + servers.map(_.send(Random.nextString(10) + "\n")) + Thread.sleep(1) + } + ssc.stop() + servers.foreach(_.stop()) + } +} + +class FunctionBasedInputDStream[T: ClassTag]( + ssc_ : StreamingContext, + function: (StreamingContext, Time) => Option[RDD[T]] + ) extends InputDStream[T](ssc_) { + + def start(): Unit = {} + + def stop(): Unit = {} + + def compute(validTime: Time): Option[RDD[T]] = function(ssc, validTime) +} + + + + + + + +