From af0c1daea8a22bca3b7826322205c887370ce247 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 10 Sep 2014 19:54:31 -0700 Subject: [PATCH] Added replication unit tests to BlockManagerSuite --- .../spark/storage/BlockManagerSuite.scala | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index e251660dae5de..2444b79ca3734 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1228,4 +1228,119 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("block replication - 2x") { + testReplication(2) + } + + test("block replication - 3x") { + testReplication(3) + } + + test("block replication - 4x") { + testReplication(4) + } + + /** + * Test replication of blocks with different storage levels (various combinations of + * memory, disk & serialization). For each storage level, this function tests every store + * whether the block is present and also tests the master whether its knowledge of blocks + * is correct. Then it also drops the block from memory of each store (using LRU) and + * again checks whether the master's knowledge gets updated. + */ + def testReplication(replicationFactor: Int) { + import org.apache.spark.storage.StorageLevel._ + + assert(replicationFactor > 1, + s"ReplicationTester cannot test replication factor $replicationFactor") + + // storage levels to test with the given replication factor + val storageLevels = { + Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map { + level => StorageLevel( + level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, replicationFactor) + } + } + + val storeSize = 10000 + val blockSize = 1000 + + // As many stores as the replication factor + val stores = (1 to replicationFactor).map { + i => makeBlockManager(storeSize, s"store$i") + } + + try { + storageLevels.foreach {storageLevel => + + // Put the block into one of the stores + val blockId = new TestBlockId( + "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase) + stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) + + // Assert that master know two locations for the block + assert(master.getLocations(blockId).size === replicationFactor, + s"master did not have $replicationFactor locations for $blockId") + + // Test state of the store for the block + stores.foreach { testStore => + val testStoreName = testStore.blockManagerId.executorId + assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") + assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), + s"master does not have status for ${blockId.name} in $testStoreName") + + val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) + + // Assert that block status in the master for this store has expected storage level + assert( + blockStatus.storageLevel.useDisk === storageLevel.useDisk && + blockStatus.storageLevel.useMemory === storageLevel.useMemory && + blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && + blockStatus.storageLevel.deserialized === storageLevel.deserialized, + s"master does not know correct storage level for ${blockId.name} in $testStoreName") + + // Assert that the block status in the master for this store has correct memory usage info + assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, + s"master does not know size of ${blockId.name} stored in memory of $testStoreName") + + + // If the block is supposed to be in memory, then drop the copy of the block in + // this store test whether master is updated with zero memory usage this store + if (storageLevel.useMemory) { + // Force the block to be dropped by adding a number of dummy blocks + (1 to 10).foreach { i => + testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) + } + (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") } + + val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) + + // Assert that the block status in the master either does not exist (block removed + // from every store) or has zero memory usage for this store + assert( + newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in memory of $testStoreName" + ) + } + + // If the block is supposed to be in disk (after dropping or otherwise, then + // test whether master has correct disk usage for this store + if (storageLevel.useDisk) { + assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in disk of $testStoreName" + ) + } + } + master.removeBlock(blockId) + } + + } finally { + stores.foreach { _.stop() } + master.stop() // to make sure that this master cannot used further in the unit tests + master = null + } + } } +