diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 8309b82d0bbbd..a3ee1213200e9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -184,7 +184,12 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging /** * Pushes a block (as an ArrayBuffer filled with data) into the block manager. */ - def pushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + def pushBlock( + blockId: StreamBlockId, + arrayBuffer: ArrayBuffer[T], + metadata: Any, + level: StorageLevel + ) { env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) actor ! ReportBlock(blockId, arrayBuffer.size, metadata) } @@ -192,7 +197,12 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging /** * Pushes a block (as bytes) into the block manager. */ - def pushBlock(blockId: StreamBlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + def pushBlock( + blockId: StreamBlockId, + bytes: ByteBuffer, + metadata: Any, + level: StorageLevel + ) { env.blockManager.putBytes(blockId, bytes, level) actor ! ReportBlock(blockId, -1 , metadata) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala index 86427ca171489..9a3cd8058e338 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala @@ -105,8 +105,13 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi val latestBatchInfos = allBatches.reverse.take(batchInfoLimit) val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) (0 until numNetworkReceivers).map { receiverId => - val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty)) - val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble * 1000 / batchDuration) + val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo => + batchInfo.get(receiverId).getOrElse(Array.empty) + } + val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo => + // calculate records per second for each batch + blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration + } val distributionOption = Distribution(recordsOfParticularReceiver) (receiverId, distributionOption) }.toMap @@ -231,16 +236,24 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging { val numBatches = listener.completedBatches.size val lastCompletedBatch = listener.lastCompletedBatch val table = if (numBatches > 0) { - val processingDelayQuantilesRow = - Seq("Processing Time", msDurationToString(lastCompletedBatch.flatMap(_.processingDelay))) ++ - getQuantiles(listener.processingDelayDistribution) - val schedulingDelayQuantilesRow = - Seq("Scheduling Delay", msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay))) ++ - getQuantiles(listener.schedulingDelayDistribution) - val totalDelayQuantilesRow = - Seq("Total Delay", msDurationToString(lastCompletedBatch.flatMap(_.totalDelay))) ++ - getQuantiles(listener.totalDelayDistribution) - + val processingDelayQuantilesRow = { + Seq( + "Processing Time", + msDurationToString(lastCompletedBatch.flatMap(_.processingDelay)) + ) ++ getQuantiles(listener.processingDelayDistribution) + } + val schedulingDelayQuantilesRow = { + Seq( + "Scheduling Delay", + msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay)) + ) ++ getQuantiles(listener.schedulingDelayDistribution) + } + val totalDelayQuantilesRow = { + Seq( + "Total Delay", + msDurationToString(lastCompletedBatch.flatMap(_.totalDelay)) + ) ++ getQuantiles(listener.totalDelayDistribution) + } val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile", "Median", "75th percentile", "Maximum") val dataRows: Seq[Seq[String]] = Seq( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala index 401ee1fe63590..51b11a29cbd98 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -45,7 +45,7 @@ private[spark] object UIUtils { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala index 1ca8b259c9622..a4e42189f3309 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -25,7 +25,7 @@ import scala.util.Random class UISuite extends FunSuite with BeforeAndAfterAll { - test("Testing") { + ignore("Testing") { runStreaming(1000000) } @@ -58,12 +58,4 @@ class FunctionBasedInputDStream[T: ClassTag]( def stop(): Unit = {} def compute(validTime: Time): Option[RDD[T]] = function(ssc, validTime) -} - - - - - - - - +} \ No newline at end of file