Skip to content

Commit

Permalink
Minor style updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 3, 2014
1 parent aef4dd5 commit 53be2c5
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,25 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
/**
* Pushes a block (as an ArrayBuffer filled with data) into the block manager.
*/
def pushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
def pushBlock(
blockId: StreamBlockId,
arrayBuffer: ArrayBuffer[T],
metadata: Any,
level: StorageLevel
) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
actor ! ReportBlock(blockId, arrayBuffer.size, metadata)
}

/**
* Pushes a block (as bytes) into the block manager.
*/
def pushBlock(blockId: StreamBlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
def pushBlock(
blockId: StreamBlockId,
bytes: ByteBuffer,
metadata: Any,
level: StorageLevel
) {
env.blockManager.putBytes(blockId, bytes, level)
actor ! ReportBlock(blockId, -1 , metadata)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,13 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
(0 until numNetworkReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty))
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map(_.map(_.numRecords).sum.toDouble * 1000 / batchDuration)
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
batchInfo.get(receiverId).getOrElse(Array.empty)
}
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
// calculate records per second for each batch
blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
}
val distributionOption = Distribution(recordsOfParticularReceiver)
(receiverId, distributionOption)
}.toMap
Expand Down Expand Up @@ -231,16 +236,24 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
val numBatches = listener.completedBatches.size
val lastCompletedBatch = listener.lastCompletedBatch
val table = if (numBatches > 0) {
val processingDelayQuantilesRow =
Seq("Processing Time", msDurationToString(lastCompletedBatch.flatMap(_.processingDelay))) ++
getQuantiles(listener.processingDelayDistribution)
val schedulingDelayQuantilesRow =
Seq("Scheduling Delay", msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay))) ++
getQuantiles(listener.schedulingDelayDistribution)
val totalDelayQuantilesRow =
Seq("Total Delay", msDurationToString(lastCompletedBatch.flatMap(_.totalDelay))) ++
getQuantiles(listener.totalDelayDistribution)

val processingDelayQuantilesRow = {
Seq(
"Processing Time",
msDurationToString(lastCompletedBatch.flatMap(_.processingDelay))
) ++ getQuantiles(listener.processingDelayDistribution)
}
val schedulingDelayQuantilesRow = {
Seq(
"Scheduling Delay",
msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay))
) ++ getQuantiles(listener.schedulingDelayDistribution)
}
val totalDelayQuantilesRow = {
Seq(
"Total Delay",
msDurationToString(lastCompletedBatch.flatMap(_.totalDelay))
) ++ getQuantiles(listener.totalDelayDistribution)
}
val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
"Median", "75th percentile", "Maximum")
val dataRows: Seq[Seq[String]] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] object UIUtils {
<script type="text/JavaScript">
<!--
function timedRefresh(timeoutPeriod) {
setTimeout("location.reload(true);",timeoutPeriod);
setTimeout("location.reload(true);",timeoutPeriod);
}
// -->
</script>
Expand Down
12 changes: 2 additions & 10 deletions streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.Random

class UISuite extends FunSuite with BeforeAndAfterAll {

test("Testing") {
ignore("Testing") {
runStreaming(1000000)
}

Expand Down Expand Up @@ -58,12 +58,4 @@ class FunctionBasedInputDStream[T: ClassTag](
def stop(): Unit = {}

def compute(validTime: Time): Option[RDD[T]] = function(ssc, validTime)
}








}

0 comments on commit 53be2c5

Please sign in to comment.