Skip to content

Commit

Permalink
Change replication logic to correctly refetch peers from master on fa…
Browse files Browse the repository at this point in the history
…ilure and on new worker addition.
  • Loading branch information
tdas committed Sep 12, 2014
1 parent d081bf6 commit 03de02d
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 70 deletions.
104 changes: 69 additions & 35 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ private[spark] class BlockManager(
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
private val cachedPeers = new ArrayBuffer[BlockManagerId]
private val cachedPeers = new mutable.HashSet[BlockManagerId]
private var lastPeerFetchTime = 0L

initialize()

/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
Expand Down Expand Up @@ -791,52 +792,85 @@ private[spark] class BlockManager(
/**
* Get peer block managers in the system.
*/
private def getPeers(numPeers: Int): Seq[BlockManagerId] = cachedPeers.synchronized {
val currentTime = System.currentTimeMillis
// If cache is empty or has insufficient number of peers, fetch from master
if (cachedPeers.isEmpty || numPeers > cachedPeers.size) {
cachedPeers.clear()
cachedPeers ++= master.getPeers(blockManagerId)
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
lastPeerFetchTime = System.currentTimeMillis
}
if (numPeers > cachedPeers.size) {
// if enough peers cannot be provided, return all of them
logDebug(s"Not enough peers - cached peers = ${cachedPeers.size}, required peers = $numPeers")
cachedPeers
} else {
cachedPeers.take(numPeers)
private def getPeers(forceFetch: Boolean): mutable.Set[BlockManagerId] = {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds
def timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl

cachedPeers.synchronized {
if (cachedPeers.isEmpty || forceFetch || timeout) {
cachedPeers.clear()
cachedPeers ++= master.getPeers(blockManagerId)
lastPeerFetchTime = System.currentTimeMillis
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
}
}
cachedPeers
}

/**
* Replicate block to another node.
*/
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersReplicatedTo = new mutable.HashSet[BlockManagerId]
val peersFailedToReplicateTo = new mutable.HashSet[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
val selectedPeers = getPeers(level.replication - 1)
if (selectedPeers.size < level.replication - 1) {
logWarning(s"Failed to replicate block to ${level.replication - 1} peer(s) " +
s"as only ${selectedPeers.size} peer(s) were found")
val startTime = System.nanoTime

var forceFetchPeers = false
var failures = 0
var done = false

// Get a random peer
def getRandomPeer(): Option[BlockManagerId] = {
val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo
if (!peers.isEmpty) Some(peers.toSeq(Random.nextInt(peers.size))) else None
}
selectedPeers.foreach { peer =>
val start = System.nanoTime
data.rewind()
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} bytes. " +
s"To node: $peer")

try {
blockTransferService.uploadBlockSync(
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
} catch {
case e: Exception =>
logError(s"Failed to replicate block to $peer", e)
cachedPeers.synchronized { cachedPeers.clear() }
// One by one choose a random peer and try uploading the block to it
// If replication fails (e.g., target peer is down), force the list of cached peers
// to be re-fetched from driver and then pick another random peer for replication. Also
// temporarily black list the peer for which replication failed.
while (!done) {
getRandomPeer() match {
case Some(peer) =>
try {
val onePeerStartTime = System.nanoTime
data.rewind()
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
blockTransferService.uploadBlockSync(
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms"
.format((System.nanoTime - onePeerStartTime) / 1e6))
peersReplicatedTo += peer
forceFetchPeers = false
if (peersReplicatedTo.size == numPeersToReplicateTo) {
done = true
}
} catch {
case e: Exception =>
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
failures += 1
forceFetchPeers = true
peersFailedToReplicateTo += peer
if (failures > maxReplicationFailures) {
done = true
}
}
case None =>
// no peer left to replicate to
done = true
}

logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
.format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
}
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logError(s"Replicated $blockId of ${data.limit()} bytes to only " +
s"${peersReplicatedTo.size} peer(s) instead of ${numPeersToReplicateTo} " +
s"in ${(System.nanoTime - startTime) / 1e6} ms")
} else {
logDebug(s"Successfully replicated $blockId of ${data.limit()} bytes to " +
s"${peersReplicatedTo.size} peer(s) in ${(System.nanoTime - startTime) / 1e6} ms")
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

# Set everything to be logged to the file core/target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.rootCategory=DEBUG, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=target/unit-tests.log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
conf.set("spark.driver.port", boundPort.toString)
conf.set("spark.storage.unrollFraction", "0.4")
conf.set("spark.storage.unrollMemoryThreshold", "512")

conf.set("spark.core.connection.ack.wait.timeout", "1")
conf.set("spark.storage.cachedPeersTtl", "10")
master = new BlockManagerMaster(
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
conf, true)
Expand Down Expand Up @@ -1300,59 +1301,55 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
testReplication(5, storageLevels)
}

test("block replication with addition and removal of executors") {
test("block replication with addition and deletion of executors") {
val blockSize = 1000
val storeSize = 10000
val allStores = new ArrayBuffer[BlockManager]()


try {
val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") }
allStores ++= initialStores

// 2x replication works
initialStores(0).putSingle("a1", new Array[Byte](blockSize), StorageLevel.MEMORY_AND_DISK_2)
assert(master.getLocations("a1").size === 2)
def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) {
initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
assert(master.getLocations(blockId).size === expectedNumLocations)
master.removeBlock(blockId)
}

// 3x replication should only replicate 2x
initialStores(0).putSingle(
"a2", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3))
assert(master.getLocations("a2").size === 2)
// 2x replication should work, 3x replication should only replicate 2x
testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2)
testPut("a2", StorageLevel(true, true, false, true, 3), 2)

// Add another store, 3x replication should work now, 4x replication should only replicate 3x
val newStore1 = makeBlockManager(storeSize, s"newstore1")
allStores += newStore1
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
testPut("a3", StorageLevel(true, true, false, true, 3), 3)
}
testPut("a4",StorageLevel(true, true, false, true, 4), 3)

// 3x replication should work now
initialStores(0).putSingle(
"a3", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3))
assert(master.getLocations("a3").size === 3)

// 4x replication should only replicate 3x
initialStores(0).putSingle(
"a4", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4))
assert(master.getLocations("a4").size === 3)

// Add another store, 4x replication should work now
val newStore2 = makeBlockManager(storeSize, s"newstore2")
allStores += newStore2
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
testPut("a5", StorageLevel(true, true, false, true, 4), 4)
}

// 4x replication should work now
initialStores(0).putSingle(
"a5", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4))
assert(master.getLocations("a5").size === 4)

// Remove all the stores and add new stores
(initialStores ++ Seq(newStore1, newStore2)).map { store =>
store.blockManagerId.executorId
}.foreach { execId =>
master.removeExecutor(execId)
// Remove all but the 1st store, 2x replication should fail
(initialStores.slice(1, initialStores.size) ++ Seq(newStore1, newStore2)).foreach {
store =>
master.removeExecutor(store.blockManagerId.executorId)
store.stop()
}
testPut("a6", StorageLevel.MEMORY_AND_DISK_2, 1)

// Add new stores and test if replication works
// Add new stores, 3x replication should work
val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") }
allStores ++= newStores

newStores(0).putSingle(
"a6", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3))
assert(master.getLocations("a6").size === 3)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
testPut("a7", StorageLevel(true, true, false, true, 3), 3)
}
} finally {
allStores.foreach { _.stop() }
}
Expand Down

0 comments on commit 03de02d

Please sign in to comment.