Skip to content

Commit

Permalink
Iterate through a filtered set of blocks when updating RDDInfo
Browse files Browse the repository at this point in the history
This particular commit is the whole point of this PR. In the existing
code we unconditionally iterate through all blocks in all block managers
whenever we want to update an RDDInfo. Now, we filter out only the
blocks of interest to us in advance, so we don't end up constructing
a huge map and doing a groupBy on it.
  • Loading branch information
andrewor14 committed Jul 31, 2014
1 parent 7b2c4aa commit 8e91921
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 75 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ class SparkContext(config: SparkConf) extends Logging {
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
val rddInfos = StorageUtils.makeRddInfo(this)
StorageUtils.updateRddInfo(getExecutorStorageStatus, rddInfos)
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
rddInfos.toArray
}

Expand Down
57 changes: 30 additions & 27 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,36 +121,39 @@ private[spark] object StorageUtils {
}
}

/** Update the given list of RDDInfo with the given list of storage statuses. */
def updateRddInfo(storageStatuses: Seq[StorageStatus], rddInfos: Seq[RDDInfo]): Unit = {
// Mapping from a block ID -> its status
val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)

// Mapping from RDD ID -> an array of associated BlockStatuses
val rddBlockMap = blockMap
.groupBy { case (k, _) => k.rddId }
.mapValues(_.values.toArray)

// Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap

rddBlockMap.foreach { case (rddId, blocks) =>
// Add up memory, disk and Tachyon sizes
val persistedBlocks =
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
val _storageLevel =
if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel else StorageLevel.NONE
/**
* Update the given list of RDDInfo with the given list of storage statuses.
* This method overwrites the old values stored in the RDDInfo's.
*/
def updateRddInfo(
rddInfos: Seq[RDDInfo],
storageStatuses: Seq[StorageStatus],
updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Unit = {
rddInfos.foreach { rddInfo =>
val rddId = rddInfo.id

// Collect all block statuses that belong to the given RDD
val newBlocks = updatedBlocks
.collect { case (bid: RDDBlockId, bstatus) => (bid, bstatus) }
.filter { case (bid, _) => bid.rddId == rddId }
val newBlockIds = newBlocks.map { case (bid, _) => bid }.toSet
val oldBlocks = storageStatuses
.filter(_.rddIds.contains(rddId))
.flatMap(_.rddBlocks(rddId))
.filter { case (bid, _) => !newBlockIds.contains(bid) } // avoid duplicates
val blocks = (oldBlocks ++ newBlocks).map { case (_, bstatus) => bstatus }
val persistedBlocks = blocks.filter { s => s.memSize + s.diskSize + s.tachyonSize > 0 }

// Assume all blocks belonging to the same RDD have the same storage level
val storageLevel = blocks.headOption.map(_.storageLevel).getOrElse(StorageLevel.NONE)
val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
rddInfoMap.get(rddId).map { rddInfo =>
rddInfo.storageLevel = _storageLevel
rddInfo.numCachedPartitions = persistedBlocks.length
rddInfo.memSize = memSize
rddInfo.diskSize = diskSize
rddInfo.tachyonSize = tachyonSize
rddInfo
}
rddInfo.storageLevel = storageLevel
rddInfo.numCachedPartitions = persistedBlocks.length
rddInfo.memSize = memSize
rddInfo.diskSize = diskSize
rddInfo.tachyonSize = tachyonSize
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq

/** Update each RDD's info to reflect any updates in the RDD's storage status */
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Unit = {
StorageUtils.updateRddInfo(storageStatusList, _rddInfoMap.values.toSeq)
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
StorageUtils.updateRddInfo(_rddInfoMap.values.toSeq, storageStatusList, updatedBlocks)
}

/**
Expand Down
90 changes: 45 additions & 45 deletions core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class StorageSuite extends FunSuite {
test("StorageUtils.updateRddInfo") {
val storageStatuses = stockStorageStatuses
val rddInfos = stockRDDInfos
StorageUtils.updateRddInfo(storageStatuses, rddInfos)
StorageUtils.updateRddInfo(rddInfos, storageStatuses)
assert(rddInfos(0).numCachedPartitions === 5)
assert(rddInfos(0).memSize === 5L)
assert(rddInfos(0).diskSize === 10L)
Expand All @@ -191,50 +191,50 @@ class StorageSuite extends FunSuite {
assert(rddInfos(1).diskSize === 6L)
}

// test("StorageUtils.rddInfoFromStorageStatus with updated blocks") {
// val storageStatuses = stockStorageStatuses
// val rddInfos = stockRDDInfos
//
// // Drop 3 blocks from RDD 0, and cache more of RDD 1
// val updatedBlocks1 = Seq(
// (RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
// (RDDBlockId(0, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
// (RDDBlockId(0, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
// (RDDBlockId(1, 0), BlockStatus(memAndDisk, 100L, 100L, 0L)),
// (RDDBlockId(1, 100), BlockStatus(memAndDisk, 100L, 100L, 0L))
// )
// StorageUtils.rddInfoFromStorageStatus(storageStatuses, rddInfos, updatedBlocks1)
// assert(rddInfos(0).numCachedPartitions === 2)
// assert(rddInfos(0).memSize === 2L)
// assert(rddInfos(0).diskSize === 4L)
// assert(rddInfos(1).numCachedPartitions === 4)
// assert(rddInfos(1).memSize === 202L)
// assert(rddInfos(1).diskSize === 204L)
//
// // Actually update storage statuses so we can chain the calls to rddInfoFromStorageStatus
// updatedBlocks1.foreach { case (bid, bstatus) =>
// val statusWithBlock = storageStatuses.find(_.blocks.contains(bid))
// statusWithBlock match {
// case Some(s) => s.updateBlock(bid, bstatus)
// case None => storageStatuses(0).addBlock(bid, bstatus) // arbitrarily pick the first
// }
// }
//
// // Drop all of RDD 1
// val updatedBlocks2 = Seq(
// (RDDBlockId(1, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
// (RDDBlockId(1, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
// (RDDBlockId(1, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
// (RDDBlockId(1, 100), BlockStatus(memAndDisk, 0L, 0L, 0L))
// )
// StorageUtils.rddInfoFromStorageStatus(storageStatuses, rddInfos, updatedBlocks2)
// assert(rddInfos(0).numCachedPartitions === 2)
// assert(rddInfos(0).memSize === 2L)
// assert(rddInfos(0).diskSize === 4L)
// assert(rddInfos(1).numCachedPartitions === 0)
// assert(rddInfos(1).memSize === 0L)
// assert(rddInfos(1).diskSize === 0L)
// }
test("StorageUtils.updateRddInfo with updated blocks") {
val storageStatuses = stockStorageStatuses
val rddInfos = stockRDDInfos

// Drop 3 blocks from RDD 0, and cache more of RDD 1
val updatedBlocks1 = Seq(
(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(0, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(0, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(1, 0), BlockStatus(memAndDisk, 100L, 100L, 0L)),
(RDDBlockId(1, 100), BlockStatus(memAndDisk, 100L, 100L, 0L))
)
StorageUtils.updateRddInfo(rddInfos, storageStatuses, updatedBlocks1)
assert(rddInfos(0).numCachedPartitions === 2)
assert(rddInfos(0).memSize === 2L)
assert(rddInfos(0).diskSize === 4L)
assert(rddInfos(1).numCachedPartitions === 4)
assert(rddInfos(1).memSize === 202L)
assert(rddInfos(1).diskSize === 204L)

// Actually update storage statuses so we can chain the calls to rddInfoFromStorageStatus
updatedBlocks1.foreach { case (bid, bstatus) =>
val statusWithBlock = storageStatuses.find(_.blocks.contains(bid))
statusWithBlock match {
case Some(s) => s.updateBlock(bid, bstatus)
case None => storageStatuses(0).addBlock(bid, bstatus) // arbitrarily pick the first
}
}

// Drop all of RDD 1
val updatedBlocks2 = Seq(
(RDDBlockId(1, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(1, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(1, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(1, 100), BlockStatus(memAndDisk, 0L, 0L, 0L))
)
StorageUtils.updateRddInfo(rddInfos, storageStatuses, updatedBlocks2)
assert(rddInfos(0).numCachedPartitions === 2)
assert(rddInfos(0).memSize === 2L)
assert(rddInfos(0).diskSize === 4L)
assert(rddInfos(1).numCachedPartitions === 0)
assert(rddInfos(1).memSize === 0L)
assert(rddInfos(1).diskSize === 0L)
}

test("StorageUtils.getBlockLocations") {
val storageStatuses = stockStorageStatuses
Expand Down

0 comments on commit 8e91921

Please sign in to comment.