Skip to content

Commit

Permalink
Addressed Patrick's comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 11, 2014
1 parent 89dae36 commit e038b4b
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
4 changes: 2 additions & 2 deletions project/MimaBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ object MimaBuild {
excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator") ++
excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator#Block") ++
excludeSparkClass("streaming.dstream.ReportError") ++
excludeSparkClass("org.apache.spark.streaming.dstream.ReportBlock") ++
excludeSparkClass("org.apache.spark.streaming.dstream.DStream")
excludeSparkClass("streaming.dstream.ReportBlock") ++
excludeSparkClass("streaming.dstream.DStream")
case _ => Seq()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
private val completedaBatchInfos = new Queue[BatchInfo]
private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]

Expand Down Expand Up @@ -82,7 +82,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
runningBatchInfos.values.toSeq
}

def completedBatches: Seq[BatchInfo] = synchronized {
def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
completedaBatchInfos.toSeq
}

Expand All @@ -99,7 +99,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
}

def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
(0 until numNetworkReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
Expand Down Expand Up @@ -134,10 +134,10 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
}

def lastReceivedBatch: Option[BatchInfo] = {
allBatches.lastOption
retainedBatches.lastOption
}

private def allBatches: Seq[BatchInfo] = synchronized {
private def retainedBatches: Seq[BatchInfo] = synchronized {
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ private[ui] class StreamingPage(parent: StreamingTab)

private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime()
private val emptyCellTest = "-"
private val emptyCell = "-"

/** Render the page */
override def render(request: HttpServletRequest): Seq[Node] = {
val content =
generateBasicStats() ++
<br></br><h4>Statistics over last {listener.completedBatches.size} processed batches</h4> ++
<br></br><h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
generateNetworkStatsTable() ++
generateBatchStatsTable()
UIUtils.headerSparkPage(
Expand Down Expand Up @@ -89,12 +89,12 @@ private[ui] class StreamingPage(parent: StreamingTab)
val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCellTest)
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
}.getOrElse {
Seq(emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest)
Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
}
Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats
}
Expand All @@ -112,7 +112,7 @@ private[ui] class StreamingPage(parent: StreamingTab)

/** Generate stats of batch jobs of the streaming program */
private def generateBatchStatsTable(): Seq[Node] = {
val numBatches = listener.completedBatches.size
val numBatches = listener.retainedCompletedBatches.size
val lastCompletedBatch = listener.lastCompletedBatch
val table = if (numBatches > 0) {
val processingDelayQuantilesRow = {
Expand Down Expand Up @@ -161,7 +161,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
private def formatDurationOption(msOption: Option[Long]): String = {
msOption.map(formatDurationVerbose).getOrElse(emptyCellTest)
msOption.map(formatDurationVerbose).getOrElse(emptyCell)
}

/** Get quantiles for any time distribution */
Expand Down

0 comments on commit e038b4b

Please sign in to comment.