Skip to content

Commit

Permalink
Added replication unit tests to BlockManagerSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Sep 11, 2014
1 parent 558962a commit af0c1da
Showing 1 changed file with 115 additions and 0 deletions.
115 changes: 115 additions & 0 deletions core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1228,4 +1228,119 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
}

test("block replication - 2x") {
testReplication(2)
}

test("block replication - 3x") {
testReplication(3)
}

test("block replication - 4x") {
testReplication(4)
}

/**
* Test replication of blocks with different storage levels (various combinations of
* memory, disk & serialization). For each storage level, this function tests every store
* whether the block is present and also tests the master whether its knowledge of blocks
* is correct. Then it also drops the block from memory of each store (using LRU) and
* again checks whether the master's knowledge gets updated.
*/
def testReplication(replicationFactor: Int) {
import org.apache.spark.storage.StorageLevel._

assert(replicationFactor > 1,
s"ReplicationTester cannot test replication factor $replicationFactor")

// storage levels to test with the given replication factor
val storageLevels = {
Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map {
level => StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, replicationFactor)
}
}

val storeSize = 10000
val blockSize = 1000

// As many stores as the replication factor
val stores = (1 to replicationFactor).map {
i => makeBlockManager(storeSize, s"store$i")
}

try {
storageLevels.foreach {storageLevel =>

// Put the block into one of the stores
val blockId = new TestBlockId(
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)

// Assert that master know two locations for the block
assert(master.getLocations(blockId).size === replicationFactor,
s"master did not have $replicationFactor locations for $blockId")

// Test state of the store for the block
stores.foreach { testStore =>
val testStoreName = testStore.blockManagerId.executorId
assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")

val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)

// Assert that block status in the master for this store has expected storage level
assert(
blockStatus.storageLevel.useDisk === storageLevel.useDisk &&
blockStatus.storageLevel.useMemory === storageLevel.useMemory &&
blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap &&
blockStatus.storageLevel.deserialized === storageLevel.deserialized,
s"master does not know correct storage level for ${blockId.name} in $testStoreName")

// Assert that the block status in the master for this store has correct memory usage info
assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize,
s"master does not know size of ${blockId.name} stored in memory of $testStoreName")


// If the block is supposed to be in memory, then drop the copy of the block in
// this store test whether master is updated with zero memory usage this store
if (storageLevel.useMemory) {
// Force the block to be dropped by adding a number of dummy blocks
(1 to 10).foreach { i =>
testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
}
(1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") }

val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId)

// Assert that the block status in the master either does not exist (block removed
// from every store) or has zero memory usage for this store
assert(
newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0,
s"after dropping, master does not know size of ${blockId.name} " +
s"stored in memory of $testStoreName"
)
}

// If the block is supposed to be in disk (after dropping or otherwise, then
// test whether master has correct disk usage for this store
if (storageLevel.useDisk) {
assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize,
s"after dropping, master does not know size of ${blockId.name} " +
s"stored in disk of $testStoreName"
)
}
}
master.removeBlock(blockId)
}

} finally {
stores.foreach { _.stop() }
master.stop() // to make sure that this master cannot used further in the unit tests
master = null
}
}
}

0 comments on commit af0c1da

Please sign in to comment.