Skip to content

Commit

Permalink
Fixes based on PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Sep 21, 2014
1 parent 08e5646 commit 3821ab9
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ private[spark] class BlockManager(
* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl

cachedPeers.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,17 +404,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

/** Get the list of the peers of the given block manager */
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray
val selfIndex = blockManagerIds.indexOf(blockManagerId)
if (selfIndex == -1) {
logError("Self index for " + blockManagerId + " not found")
Seq.empty
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
} else {
// If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2
// Then this code will return the list [ id3 id4 id5 id1 ]
Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i =>
blockManagerIds((selfIndex + i + 1) % blockManagerIds.size)
}
Seq.empty
}
}
}
Expand Down
105 changes: 57 additions & 48 deletions core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val securityMgr = new SecurityManager(conf)
val mapOutputTracker = new MapOutputTrackerMaster(conf)
val shuffleManager = new HashShuffleManager(conf)

// List of block manager created during an unit test, so that all of the them can be stopped
// after the unit test.
val allStores = new ArrayBuffer[BlockManager]

// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
Expand Down Expand Up @@ -1241,7 +1244,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
}

test("get peers with store addition and removal") {
test("get peers with addition and removal of block managers") {
val numStores = 4
val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") }
val storeIds = stores.map { _.blockManagerId }.toSet
Expand Down Expand Up @@ -1313,11 +1316,17 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
testReplication(5, storageLevels)
}

test("block replication with addition and deletion of executors") {
test("block replication with addition and deletion of block managers") {
val blockSize = 1000
val storeSize = 10000
val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") }

/**
* Function to test whether insert a block with replication achieves the expected replication.
* Since this function can be call on the same block id repeatedly through an `eventually`,
* it needs to be ensured that the method leaves block manager + master in the same state as
* it was before attempting to insert the block.
*/
def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) {
try {
initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
Expand Down Expand Up @@ -1345,7 +1354,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}

// Remove all but the 1st store, 2x replication should fail
(initialStores.slice(1, initialStores.size) ++ Seq(newStore1, newStore2)).foreach {
(initialStores.tail ++ Seq(newStore1, newStore2)).foreach {
store =>
master.removeExecutor(store.blockManagerId.executorId)
store.stop()
Expand Down Expand Up @@ -1394,57 +1403,57 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
s"master did not have ${storageLevel.replication} locations for $blockId")

// Test state of the stores that contain the block
stores.filter(testStore => blockLocations.contains(testStore.blockManagerId.executorId))
stores.filter { testStore => blockLocations.contains(testStore.blockManagerId.executorId) }
.foreach { testStore =>
val testStoreName = testStore.blockManagerId.executorId
assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")

val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)

// Assert that block status in the master for this store has expected storage level
assert(
blockStatus.storageLevel.useDisk === storageLevel.useDisk &&
blockStatus.storageLevel.useMemory === storageLevel.useMemory &&
blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap &&
blockStatus.storageLevel.deserialized === storageLevel.deserialized,
s"master does not know correct storage level for ${blockId.name} in $testStoreName")

// Assert that the block status in the master for this store has correct memory usage info
assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize,
s"master does not know size of ${blockId.name} stored in memory of $testStoreName")


// If the block is supposed to be in memory, then drop the copy of the block in
// this store test whether master is updated with zero memory usage this store
if (storageLevel.useMemory) {
// Force the block to be dropped by adding a number of dummy blocks
(1 to 10).foreach { i =>
testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
}
(1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") }
val testStoreName = testStore.blockManagerId.executorId
assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")

val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId)
val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)

// Assert that the block status in the master either does not exist (block removed
// from every store) or has zero memory usage for this store
// Assert that block status in the master for this store has expected storage level
assert(
newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0,
s"after dropping, master does not know size of ${blockId.name} " +
s"stored in memory of $testStoreName"
)
}
blockStatus.storageLevel.useDisk === storageLevel.useDisk &&
blockStatus.storageLevel.useMemory === storageLevel.useMemory &&
blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap &&
blockStatus.storageLevel.deserialized === storageLevel.deserialized,
s"master does not know correct storage level for ${blockId.name} in $testStoreName")

// Assert that the block status in the master for this store has correct memory usage info
assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize,
s"master does not know size of ${blockId.name} stored in memory of $testStoreName")


// If the block is supposed to be in memory, then drop the copy of the block in
// this store test whether master is updated with zero memory usage this store
if (storageLevel.useMemory) {
// Force the block to be dropped by adding a number of dummy blocks
(1 to 10).foreach { i =>
testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
}
(1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") }

val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId)

// Assert that the block status in the master either does not exist (block removed
// from every store) or has zero memory usage for this store
assert(
newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0,
s"after dropping, master does not know size of ${blockId.name} " +
s"stored in memory of $testStoreName"
)
}

// If the block is supposed to be in disk (after dropping or otherwise, then
// test whether master has correct disk usage for this store
if (storageLevel.useDisk) {
assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize,
s"after dropping, master does not know size of ${blockId.name} " +
s"stored in disk of $testStoreName"
)
// If the block is supposed to be in disk (after dropping or otherwise, then
// test whether master has correct disk usage for this store
if (storageLevel.useDisk) {
assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize,
s"after dropping, master does not know size of ${blockId.name} " +
s"stored in disk of $testStoreName"
)
}
}
}
master.removeBlock(blockId)
}
}
Expand Down

0 comments on commit 3821ab9

Please sign in to comment.