Skip to content

Commit

Permalink
Filter out zero blocks, rename variables
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaram committed Jun 5, 2015
1 parent 9d5831a commit 6cfae98
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,20 +309,23 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
if (statuses.nonEmpty) {
val ordering = Ordering.by[(BlockManagerId, Long), Long](_._2).reverse
shuffleIdToReduceLocations(shuffleId) = new HashMap[Int, Array[BlockManagerId]]
var r = 0
var reduceIdx = 0
// HashMap to add up sizes of all blocks at the same location
val locs = new HashMap[BlockManagerId, Long]
while (r < numReducers) {
var i = 0
while (reduceIdx < numReducers) {
var mapIdx = 0
locs.clear()
while (i < statuses.length) {
locs(statuses(i).location) = locs.getOrElse(statuses(i).location, 0L) +
statuses(i).getSizeForBlock(r)
i = i + 1
while (mapIdx < statuses.length) {
val blockSize = statuses(mapIdx).getSizeForBlock(reduceIdx)
if (blockSize > 0) {
locs(statuses(mapIdx).location) = locs.getOrElse(statuses(mapIdx).location, 0L) +
blockSize
}
mapIdx = mapIdx + 1
}
val topLocs = CollectionUtils.takeOrdered(locs.toIterator, numTopLocs)(ordering)
shuffleIdToReduceLocations(shuffleId) += (r -> topLocs.map(_._1).toArray)
r = r + 1
shuffleIdToReduceLocations(shuffleId) += (reduceIdx -> topLocs.map(_._1).toArray)
reduceIdx = reduceIdx + 1
}
}
}
Expand Down

0 comments on commit 6cfae98

Please sign in to comment.