Skip to content

Commit

Permalink
Filtered driver block manager from peer list, and also consolidated t…
Browse files Browse the repository at this point in the history
…he use of <driver> in BlockManager.
  • Loading branch information
tdas committed Sep 12, 2014
1 parent 7598f91 commit 4a20531
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class BlockManagerId private (

def port: Int = port_

def isDriver = (executorId == "<driver>")

override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
* from the executors, but not from the driver.
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
// TODO: Consolidate usages of <driver>
import context.dispatcher
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || info.blockManagerId.executorId != "<driver>"
removeFromDriver || !info.blockManagerId.isDriver
}
Future.sequence(
requiredBlockManagers.map { bm =>
Expand Down Expand Up @@ -212,7 +211,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
val minSeenTime = now - slaveTimeout
val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
Expand All @@ -232,7 +231,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
*/
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
blockManagerId.executorId == "<driver>" && !isLocal
blockManagerId.isDriver && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
true
Expand Down Expand Up @@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
tachyonSize: Long) {

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.executorId == "<driver>" && !isLocal) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
Expand Down Expand Up @@ -405,7 +404,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

/** Get the list of the peers of the given block manager */
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet.toArray
val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray
val selfIndex = blockManagerIds.indexOf(blockManagerId)
if (selfIndex == -1) {
logError("Self index for " + blockManagerId + " not found")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
assert(bm.isDriver, "Block should only be on the driver")
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store on the driver")
assert(status.diskSize === 0, "Block should not be in disk store on the driver")
Expand Down
Loading

0 comments on commit 4a20531

Please sign in to comment.