Skip to content

Commit

Permalink
Remove BatchInfo.numRecords and a few renames
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 30, 2015
1 parent 7bbdc0a commit 70d8533
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,4 @@ case class BatchInfo(
*/
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
.map(x => x._1 + x._2).headOption

/**
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = receivedBlockInfo.map { case (_, infos) =>
infos.map(_.numRecords).sum
}.sum
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
* Return all of the event rates for each receiver in each batch.
*/
def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
val latestBatches = retainedBatches.map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.receiverNumRecords)
val latestBatches = retainedBatches.map { batchUIData =>
(batchUIData.batchTime.milliseconds, batchUIData.receiverNumRecords)
}
(0 until numReceivers).map { receiverId =>
val eventRates = latestBatches.map {
Expand Down

0 comments on commit 70d8533

Please sign in to comment.