Skip to content

Commit

Permalink
SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Jul 20, 2014
1 parent 98ab411 commit 43f79e6
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class ShuffleReadMetrics extends Serializable {
/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
var totalBlocksFetched: Int = _
def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched

/**
* Number of remote blocks fetched in this shuffle by this task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import org.apache.spark.util.Utils
private[storage]
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize()
def totalBlocks: Int
def numLocalBlocks: Int
def numRemoteBlocks: Int
def fetchWaitTime: Long
Expand Down Expand Up @@ -192,7 +191,7 @@ object BlockFetcherIterator {
}
}
logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
totalBlocks + " blocks")
(numLocal + numRemote) + " blocks")
remoteRequests
}

Expand Down Expand Up @@ -235,7 +234,6 @@ object BlockFetcherIterator {
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
}

override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
override def fetchWaitTime: Long = _fetchWaitTime
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ private[spark] object JsonProtocol {

def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = {
("Shuffle Finish Time" -> shuffleReadMetrics.shuffleFinishTime) ~
("Total Blocks Fetched" -> shuffleReadMetrics.totalBlocksFetched) ~
("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~
("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
Expand Down Expand Up @@ -548,7 +547,6 @@ private[spark] object JsonProtocol {
def shuffleReadMetricsFromJson(json: JValue): ShuffleReadMetrics = {
val metrics = new ShuffleReadMetrics
metrics.shuffleFinishTime = (json \ "Shuffle Finish Time").extract[Long]
metrics.totalBlocksFetched = (json \ "Total Blocks Fetched").extract[Int]
metrics.remoteBlocksFetched = (json \ "Remote Blocks Fetched").extract[Int]
metrics.localBlocksFetched = (json \ "Local Blocks Fetched").extract[Int]
metrics.fetchWaitTime = (json \ "Fetch Wait Time").extract[Long]
Expand Down

0 comments on commit 43f79e6

Please sign in to comment.