From 3821ab971bcc85b182288f9039bf38da0acedece Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 22 Sep 2014 01:42:21 +0530 Subject: [PATCH] Fixes based on PR comments. --- .../apache/spark/storage/BlockManager.scala | 2 +- .../storage/BlockManagerMasterActor.scala | 14 +-- .../spark/storage/BlockManagerSuite.scala | 105 ++++++++++-------- 3 files changed, 62 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 465c72ef685c4..dec8d93b98226 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -792,7 +792,7 @@ private[spark] class BlockManager( * Get peer block managers in the system. */ private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = { - val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds + val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl cachedPeers.synchronized { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 6ab75b4d1e609..6a06257ed0c08 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -404,17 +404,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus /** Get the list of the peers of the given block manager */ private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = { - val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray - val selfIndex = blockManagerIds.indexOf(blockManagerId) - if (selfIndex == -1) { - logError("Self index for " + blockManagerId + " not found") - Seq.empty + val blockManagerIds = blockManagerInfo.keySet + if (blockManagerIds.contains(blockManagerId)) { + blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq } else { - // If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2 - // Then this code will return the list [ id3 id4 id5 id1 ] - Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i => - blockManagerIds((selfIndex + i + 1) % blockManagerIds.size) - } + Seq.empty } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6a739cea43623..6313be27b2568 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -63,6 +63,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter val securityMgr = new SecurityManager(conf) val mapOutputTracker = new MapOutputTrackerMaster(conf) val shuffleManager = new HashShuffleManager(conf) + + // List of block manager created during an unit test, so that all of the them can be stopped + // after the unit test. val allStores = new ArrayBuffer[BlockManager] // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test @@ -1241,7 +1244,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } - test("get peers with store addition and removal") { + test("get peers with addition and removal of block managers") { val numStores = 4 val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") } val storeIds = stores.map { _.blockManagerId }.toSet @@ -1313,11 +1316,17 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter testReplication(5, storageLevels) } - test("block replication with addition and deletion of executors") { + test("block replication with addition and deletion of block managers") { val blockSize = 1000 val storeSize = 10000 val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") } + /** + * Function to test whether insert a block with replication achieves the expected replication. + * Since this function can be call on the same block id repeatedly through an `eventually`, + * it needs to be ensured that the method leaves block manager + master in the same state as + * it was before attempting to insert the block. + */ def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) { try { initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel) @@ -1345,7 +1354,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } // Remove all but the 1st store, 2x replication should fail - (initialStores.slice(1, initialStores.size) ++ Seq(newStore1, newStore2)).foreach { + (initialStores.tail ++ Seq(newStore1, newStore2)).foreach { store => master.removeExecutor(store.blockManagerId.executorId) store.stop() @@ -1394,57 +1403,57 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter s"master did not have ${storageLevel.replication} locations for $blockId") // Test state of the stores that contain the block - stores.filter(testStore => blockLocations.contains(testStore.blockManagerId.executorId)) + stores.filter { testStore => blockLocations.contains(testStore.blockManagerId.executorId) } .foreach { testStore => - val testStoreName = testStore.blockManagerId.executorId - assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") - assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), - s"master does not have status for ${blockId.name} in $testStoreName") - - val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) - - // Assert that block status in the master for this store has expected storage level - assert( - blockStatus.storageLevel.useDisk === storageLevel.useDisk && - blockStatus.storageLevel.useMemory === storageLevel.useMemory && - blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && - blockStatus.storageLevel.deserialized === storageLevel.deserialized, - s"master does not know correct storage level for ${blockId.name} in $testStoreName") - - // Assert that the block status in the master for this store has correct memory usage info - assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, - s"master does not know size of ${blockId.name} stored in memory of $testStoreName") - - - // If the block is supposed to be in memory, then drop the copy of the block in - // this store test whether master is updated with zero memory usage this store - if (storageLevel.useMemory) { - // Force the block to be dropped by adding a number of dummy blocks - (1 to 10).foreach { i => - testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) - } - (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") } + val testStoreName = testStore.blockManagerId.executorId + assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName") + assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName), + s"master does not have status for ${blockId.name} in $testStoreName") - val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) + val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId) - // Assert that the block status in the master either does not exist (block removed - // from every store) or has zero memory usage for this store + // Assert that block status in the master for this store has expected storage level assert( - newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, - s"after dropping, master does not know size of ${blockId.name} " + - s"stored in memory of $testStoreName" - ) - } + blockStatus.storageLevel.useDisk === storageLevel.useDisk && + blockStatus.storageLevel.useMemory === storageLevel.useMemory && + blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap && + blockStatus.storageLevel.deserialized === storageLevel.deserialized, + s"master does not know correct storage level for ${blockId.name} in $testStoreName") + + // Assert that the block status in the master for this store has correct memory usage info + assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize, + s"master does not know size of ${blockId.name} stored in memory of $testStoreName") + + + // If the block is supposed to be in memory, then drop the copy of the block in + // this store test whether master is updated with zero memory usage this store + if (storageLevel.useMemory) { + // Force the block to be dropped by adding a number of dummy blocks + (1 to 10).foreach { i => + testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) + } + (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") } + + val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId) + + // Assert that the block status in the master either does not exist (block removed + // from every store) or has zero memory usage for this store + assert( + newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in memory of $testStoreName" + ) + } - // If the block is supposed to be in disk (after dropping or otherwise, then - // test whether master has correct disk usage for this store - if (storageLevel.useDisk) { - assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, - s"after dropping, master does not know size of ${blockId.name} " + - s"stored in disk of $testStoreName" - ) + // If the block is supposed to be in disk (after dropping or otherwise, then + // test whether master has correct disk usage for this store + if (storageLevel.useDisk) { + assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize, + s"after dropping, master does not know size of ${blockId.name} " + + s"stored in disk of $testStoreName" + ) + } } - } master.removeBlock(blockId) } }