From 70d8533b3bcc63c472d6405c6dc440203c230338 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 30 Apr 2015 15:56:52 -0700 Subject: [PATCH] Remove BatchInfo.numRecords and a few renames --- .../org/apache/spark/streaming/scheduler/BatchInfo.scala | 7 ------- .../spark/streaming/ui/StreamingJobProgressListener.scala | 4 ++-- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 9d21be50a566f..92dc113f397ca 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -58,11 +58,4 @@ case class BatchInfo( */ def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay) .map(x => x._1 + x._2).headOption - - /** - * The number of recorders received by the receivers in this batch. - */ - def numRecords: Long = receivedBlockInfo.map { case (_, infos) => - infos.map(_.numRecords).sum - }.sum } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 7ccc0dfd5cbe6..863fff415e386 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -182,8 +182,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) * Return all of the event rates for each receiver in each batch. */ def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized { - val latestBatches = retainedBatches.map { batchInfo => - (batchInfo.batchTime.milliseconds, batchInfo.receiverNumRecords) + val latestBatches = retainedBatches.map { batchUIData => + (batchUIData.batchTime.milliseconds, batchUIData.receiverNumRecords) } (0 until numReceivers).map { receiverId => val eventRates = latestBatches.map {