Skip to content

Commit

Permalink
Add extensive tests for StorageListener and the new code in StorageUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Aug 1, 2014
1 parent e080b9e commit 6970bc8
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ class StorageStatusListener extends SparkListener {

/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
val filteredStatus = executorIdToStorageStatus.get(execId)
filteredStatus.foreach { storageStatus =>
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
updatedBlocks.foreach { case (blockId, updatedStatus) =>
if (updatedStatus.storageLevel == StorageLevel.NONE) {
storageStatus.removeBlock(blockId)
Expand Down
36 changes: 20 additions & 16 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,21 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
*/
def blocks: Map[BlockId, BlockStatus] = _nonRddBlocks ++ rddBlocks

/** Return the RDD blocks stored in this block manager. */
/**
* Return the RDD blocks stored in this block manager.
*
* Note that this is somewhat expensive, as it involves cloning the underlying maps and then
* concatenating them together. Much faster alternatives exist for common operations such as
* getting the memory, disk, and off-heap memory sizes occupied by this RDD.
*/
def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_, blocks) => blocks }

/** Return the blocks that belong to the given RDD stored in this block manager. */
def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = {
_rddBlocks.get(rddId).getOrElse(Map.empty)
}

/** Add the given block to this storage status. */
/** Add the given block to this storage status. If it already exists, overwrite it. */
def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
blockId match {
case RDDBlockId(rddId, _) =>
Expand Down Expand Up @@ -162,15 +168,13 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
* Return the number of blocks stored in this block manager in O(RDDs) time.
* Note that this is much faster than `this.blocks.size`, which is O(blocks) time.
*/
def numBlocks: Int = {
_nonRddBlocks.size + _rddBlocks.values.map(_.size).reduceOption(_ + _).getOrElse(0)
}
def numBlocks: Int = _nonRddBlocks.size + numRddBlocks

/**
* Return the number of RDD blocks stored in this block manager in O(RDDs) time.
* Note that this is much faster than `this.rddBlocks.size`, which is O(RDD blocks) time.
*/
def numRddBlocks: Int = _rddBlocks.keys.map(numRddBlocksById).reduceOption(_ + _).getOrElse(0)
def numRddBlocks: Int = _rddBlocks.values.map(_.size).reduceOption(_ + _).getOrElse(0)

/**
* Return the number of blocks that belong to the given RDD in O(1) time.
Expand All @@ -182,32 +186,32 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
/** Return the memory used by this block manager. */
def memUsed: Long = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)

/** Return the memory used by the given RDD in this block manager. */
def memUsedByRDD(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)

/** Return the memory remaining in this block manager. */
def memRemaining: Long = maxMem - memUsed

/** Return the disk space used by this block manager. */
def diskUsed: Long = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)

/** Return the disk space used by the given RDD in this block manager. */
def diskUsedByRDD(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)

/** Return the off-heap space used by this block manager. */
def offHeapUsed: Long = blocks.values.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)

/** Return the off-heap space used by the given RDD in this block manager. */
/** Return the memory used by the given RDD in this block manager in O(1) time. */
def memUsedByRDD(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)

/** Return the disk space used by the given RDD in this block manager in O(1) time. */
def diskUsedByRDD(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)

/** Return the off-heap space used by the given RDD in this block manager in O(1) time. */
def offHeapUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L)

/** Return the storage level, if any, used by the given RDD in this block manager. */
def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._4)

/**
* Helper function to update the given RDD's storage information based on the
* (possibly negative) changes in memory, disk, and off-heap memory usages.
* Helper function to update the given RDD's storage information based on the (possibly
* negative) changes in memory, disk, and off-heap memory usages. This is exposed for testing.
*/
private def updateRddStorageInfo(
private[spark] def updateRddStorageInfo(
rddId: Int,
changeInMem: Long,
changeInDisk: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
*/
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing

def storageStatusList = storageStatusListener.storageStatusList

Expand Down
125 changes: 107 additions & 18 deletions core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@ import org.scalatest.FunSuite
*/
class StorageSuite extends FunSuite {
private val memAndDisk = StorageLevel.MEMORY_AND_DISK
private val memOnly = StorageLevel.MEMORY_ONLY
private val diskOnly = StorageLevel.DISK_ONLY

// For testing add/update/removeBlock (for non-RDD blocks)
// For testing add, update, and remove (for non-RDD blocks)
private def storageStatus1: StorageStatus = {
val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
assert(status.blocks.isEmpty)
assert(status.rddBlocks.isEmpty)
assert(status.memUsed === 0)
assert(status.memUsed === 0L)
assert(status.memRemaining === 1000L)
assert(status.diskUsed === 0)
status.addBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 10L, 20L, 0L))
status.addBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 10L, 20L, 0L))
status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L, 0L))
assert(status.diskUsed === 0L)
assert(status.offHeapUsed === 0L)
status.addBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 10L, 20L, 1L))
status.addBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 10L, 20L, 1L))
status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L, 1L))
status
}

Expand All @@ -49,16 +52,18 @@ class StorageSuite extends FunSuite {
assert(status.memUsed === 30L)
assert(status.memRemaining === 970L)
assert(status.diskUsed === 60L)
assert(status.offHeapUsed === 3L)
}

test("storage status update non-RDD blocks") {
val status = storageStatus1
status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L, 0L))
status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L, 1L))
status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L, 0L))
assert(status.blocks.size === 3)
assert(status.memUsed === 160L)
assert(status.memRemaining === 840L)
assert(status.diskUsed === 140L)
assert(status.offHeapUsed === 2L)
}

test("storage status remove non-RDD blocks") {
Expand All @@ -70,17 +75,18 @@ class StorageSuite extends FunSuite {
assert(status.memUsed === 10L)
assert(status.memRemaining === 990L)
assert(status.diskUsed === 20L)
assert(status.offHeapUsed === 1L)
}

// For testing add/update/remove/contains/getBlock and numBlocks
// For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
private def storageStatus2: StorageStatus = {
val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
assert(status.rddBlocks.isEmpty)
status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L, 0L))
status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L, 0L))
status.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 10L, 20L, 0L))
status.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 100L, 200L, 0L))
status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L, 0L))
status.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 10L, 20L, 1L))
status.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 100L, 200L, 1L))
status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L, 1L))
status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L, 0L))
status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L, 0L))
status
Expand Down Expand Up @@ -109,6 +115,19 @@ class StorageSuite extends FunSuite {
assert(status.diskUsedByRDD(0) === 20L)
assert(status.diskUsedByRDD(1) === 200L)
assert(status.diskUsedByRDD(2) === 80L)
assert(status.offHeapUsedByRdd(0) === 1L)
assert(status.offHeapUsedByRdd(1) === 1L)
assert(status.offHeapUsedByRdd(2) === 1L)
assert(status.rddStorageLevel(0) === Some(memAndDisk))
assert(status.rddStorageLevel(1) === Some(memAndDisk))
assert(status.rddStorageLevel(2) === Some(memAndDisk))

// Verify default values for RDDs that don't exist
assert(status.rddBlocksById(10).isEmpty)
assert(status.memUsedByRDD(10) === 0L)
assert(status.diskUsedByRDD(10) === 0L)
assert(status.offHeapUsedByRdd(10) === 0L)
assert(status.rddStorageLevel(10) === None)
}

test("storage status update RDD blocks") {
Expand All @@ -127,6 +146,9 @@ class StorageSuite extends FunSuite {
assert(status.diskUsedByRDD(0) === 0L)
assert(status.diskUsedByRDD(1) === 200L)
assert(status.diskUsedByRDD(2) === 1060L)
assert(status.offHeapUsedByRdd(0) === 0L)
assert(status.offHeapUsedByRdd(1) === 1L)
assert(status.offHeapUsedByRdd(2) === 0L)
}

test("storage status remove RDD blocks") {
Expand All @@ -150,6 +172,9 @@ class StorageSuite extends FunSuite {
assert(status.diskUsedByRDD(0) === 20L)
assert(status.diskUsedByRDD(1) === 0L)
assert(status.diskUsedByRDD(2) === 20L)
assert(status.offHeapUsedByRdd(0) === 1L)
assert(status.offHeapUsedByRdd(1) === 0L)
assert(status.offHeapUsedByRdd(2) === 0L)
}

test("storage status containsBlock") {
Expand Down Expand Up @@ -182,23 +207,87 @@ class StorageSuite extends FunSuite {
assert(status.blocks.get(RDDBlockId(100, 0)) === status.getBlock(RDDBlockId(100, 0)))
}

test("storage status numBlocks") {
test("storage status num[Rdd]Blocks") {
val status = storageStatus2
assert(status.blocks.size === status.numBlocks)
assert(status.rddBlocks.size === status.numRddBlocks)
status.addBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 0L, 100L))
status.addBlock(RDDBlockId(4, 4), BlockStatus(memAndDisk, 0L, 0L, 100L))
assert(status.blocks.size === status.numBlocks)
status.addBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
assert(status.blocks.size === status.numBlocks)
status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 100L))
assert(status.blocks.size === status.numBlocks)
// update a block that doesn't exist
status.updateBlock(RDDBlockId(100, 99), BlockStatus(memAndDisk, 0L, 0L, 100L))
assert(status.rddBlocks.size === status.numRddBlocks)
assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
status.updateBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 10L, 400L))
status.updateBlock(RDDBlockId(4, 0), BlockStatus(memAndDisk, 0L, 0L, 100L))
status.updateBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
status.updateBlock(RDDBlockId(10, 10), BlockStatus(memAndDisk, 0L, 0L, 100L))
assert(status.blocks.size === status.numBlocks)
status.removeBlock(RDDBlockId(0, 0))
assert(status.rddBlocks.size === status.numRddBlocks)
assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
assert(status.rddBlocksById(100).size === status.numRddBlocksById(100))
status.removeBlock(RDDBlockId(4, 0))
status.removeBlock(RDDBlockId(10, 10))
assert(status.blocks.size === status.numBlocks)
assert(status.rddBlocks.size === status.numRddBlocks)
assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
// remove a block that doesn't exist
status.removeBlock(RDDBlockId(1000, 999))
assert(status.blocks.size === status.numBlocks)
assert(status.rddBlocks.size === status.numRddBlocks)
assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
assert(status.rddBlocksById(1000).size === status.numRddBlocksById(1000))
}

test("storage status updateRddStorageInfo") {
val status = storageStatus2
// Positive delta
status.updateRddStorageInfo(0, 1000L, 1000L, 1000L, memOnly)
status.updateRddStorageInfo(1, 2000L, 2000L, 2000L, diskOnly)
status.updateRddStorageInfo(2, 3000L, 3000L, 3000L, memAndDisk)
assert(status.memUsedByRDD(0) === 1010L)
assert(status.memUsedByRDD(1) === 2100L)
assert(status.memUsedByRDD(2) === 3030L)
assert(status.diskUsedByRDD(0) === 1020L)
assert(status.diskUsedByRDD(1) === 2200L)
assert(status.diskUsedByRDD(2) === 3080L)
assert(status.offHeapUsedByRdd(0) === 1001L)
assert(status.offHeapUsedByRdd(1) === 2001L)
assert(status.offHeapUsedByRdd(2) === 3001L)
assert(status.rddStorageLevel(0) === Some(memOnly))
assert(status.rddStorageLevel(1) === Some(diskOnly))
assert(status.rddStorageLevel(2) === Some(memAndDisk))

// Negative delta
status.updateRddStorageInfo(0, -100L, -100L, -100L, memOnly)
status.updateRddStorageInfo(1, -200L, -200L, -200L, diskOnly)
status.updateRddStorageInfo(2, -300L, -300L, -300L, memAndDisk)
assert(status.memUsedByRDD(0) === 910L)
assert(status.memUsedByRDD(1) === 1900L)
assert(status.memUsedByRDD(2) === 2730L)
assert(status.diskUsedByRDD(0) === 920L)
assert(status.diskUsedByRDD(1) === 2000L)
assert(status.diskUsedByRDD(2) === 2780L)
assert(status.offHeapUsedByRdd(0) === 901L)
assert(status.offHeapUsedByRdd(1) === 1801L)
assert(status.offHeapUsedByRdd(2) === 2701L)

// Negative delta so large that the RDDs are no longer persisted
status.updateRddStorageInfo(0, -10000L, -10000L, -10000L, memOnly)
status.updateRddStorageInfo(1, -20000L, -20000L, -20000L, diskOnly)
status.updateRddStorageInfo(2, -30000L, -30000L, -30000L, memAndDisk)
assert(status.memUsedByRDD(0) === 0L)
assert(status.memUsedByRDD(1) === 0L)
assert(status.memUsedByRDD(2) === 0L)
assert(status.diskUsedByRDD(0) === 0L)
assert(status.diskUsedByRDD(1) === 0L)
assert(status.diskUsedByRDD(2) === 0L)
assert(status.offHeapUsedByRdd(0) === 0L)
assert(status.offHeapUsedByRdd(1) === 0L)
assert(status.offHeapUsedByRdd(2) === 0L)
}

// For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
Expand Down
Loading

0 comments on commit 6970bc8

Please sign in to comment.