Skip to content

Commit

Permalink
Add functionality to query executors for their local BlockStatuses
Browse files Browse the repository at this point in the history
Not all blocks are reported to the master. In HttpBroadcast and
TorrentBroadcast, for instance, most blocks are not reported to master.
The lack of a mechanism to get local block statuses on each executor
makes it difficult to test the correctness of un/persisting a broadcast.

This new functionality, though only used for testing at the moment, is
general enough to be used for other things in the future.
  • Loading branch information
andrewor14 committed Mar 29, 2014
1 parent 34f436f commit fbfeec8
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.network

import java.net._
import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,5 @@ private object BlockInfo {
private val BLOCK_PENDING: Long = -1L
private val BLOCK_FAILED: Long = -2L
}

private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,14 @@ private[spark] class BlockManager(
}
}

/**
* Get storage level of local block. If no info exists for the block, return None.
*/
def getLevel(blockId: BlockId): Option[StorageLevel] = blockInfo.get(blockId).map(_.level)
/** Return the status of the block identified by the given ID, if it exists. */
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
BlockStatus(info.level, memSize, diskSize)
}
}

/**
* Tell the master about the current storage status of a block. This will send a block update
Expand Down Expand Up @@ -631,10 +635,9 @@ private[spark] class BlockManager(
diskStore.putValues(blockId, iterator, level, askForBytes)
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
case ByteBufferValues(bytes) => {
case ByteBufferValues(bytes) =>
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
}
size = res.size
res.data match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,30 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}

/**
* Mainly for testing. Ask the driver to query all executors for their storage levels
* regarding this block. This provides an avenue for the driver to learn the storage
* levels of blocks it has not been informed of.
* Return the block's local status on all block managers, if any.
*
* WARNING: This could lead to deadlocks if there are any outstanding messages the
* executors are already expecting from the driver. In this case, while the driver is
* waiting for the executors to respond to its GetStorageLevel query, the executors
* are also waiting for a response from the driver to a prior message.
* If askSlaves is true, this invokes the master to query each block manager for the most
* updated block statuses. This is useful when the master is not informed of the given block
* by all block managers.
*
* The interim solution is to wait for a brief window of time to pass before asking.
* This should suffice, since this mechanism is largely introduced for testing only.
* To avoid potential deadlocks, the use of Futures is necessary, because the master actor
* should not block on waiting for a block manager, which can in turn be waiting for the
* master actor for a response to a prior message.
*/
def askForStorageLevels(blockId: BlockId, waitTimeMs: Long = 1000) = {
Thread.sleep(waitTimeMs)
askDriverWithReply[Map[BlockManagerId, StorageLevel]](AskForStorageLevels(blockId))
def getBlockStatus(
blockId: BlockId,
askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {
val msg = GetBlockStatus(blockId, askSlaves)
val response = askDriverWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip
val result = Await.result(Future.sequence(futures), timeout)
if (result == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
}
val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
status.map { s => (blockManagerId, s) }
}.toMap
}

/** Stop the driver actor, called only on the Spark driver node */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.{HashMap => JHashMap}

import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.concurrent.{Await, Future}
import scala.concurrent.Future
import scala.concurrent.duration._

import akka.actor.{Actor, ActorRef, Cancellable}
Expand Down Expand Up @@ -93,6 +93,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetStorageStatus =>
sender ! storageStatus

case GetBlockStatus(blockId, askSlaves) =>
sender ! blockStatus(blockId, askSlaves)

case RemoveRdd(rddId) =>
sender ! removeRdd(rddId)

Expand Down Expand Up @@ -126,9 +129,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case HeartBeat(blockManagerId) =>
sender ! heartBeat(blockManagerId)

case AskForStorageLevels(blockId) =>
sender ! askForStorageLevels(blockId)

case other =>
logWarning("Got unknown message: " + other)
}
Expand Down Expand Up @@ -254,16 +254,30 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}.toArray
}

// For testing. Ask all block managers for the given block's local storage level, if any.
private def askForStorageLevels(blockId: BlockId): Map[BlockManagerId, StorageLevel] = {
val getStorageLevel = GetStorageLevel(blockId)
blockManagerInfo.values.flatMap { info =>
val future = info.slaveActor.ask(getStorageLevel)(akkaTimeout)
val result = Await.result(future, akkaTimeout)
if (result != null) {
// If the block does not exist on the slave, the slave replies None
result.asInstanceOf[Option[StorageLevel]].map { reply => (info.blockManagerId, reply) }
} else None
/**
* Return the block's local status for all block managers, if any.
*
* If askSlaves is true, the master queries each block manager for the most updated block
* statuses. This is useful when the master is not informed of the given block by all block
* managers.
*
* Rather than blocking on the block status query, master actor should simply return a
* Future to avoid potential deadlocks. This can arise if there exists a block manager
* that is also waiting for this master actor's response to a previous message.
*/
private def blockStatus(
blockId: BlockId,
askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
import context.dispatcher
val getBlockStatus = GetBlockStatus(blockId)
blockManagerInfo.values.map { info =>
val blockStatusFuture =
if (askSlaves) {
info.slaveActor.ask(getBlockStatus)(akkaTimeout).mapTo[Option[BlockStatus]]
} else {
Future { info.getStatus(blockId) }
}
(info.blockManagerId, blockStatusFuture)
}.toMap
}

Expand Down Expand Up @@ -352,9 +366,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
}


private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long)

private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
Expand All @@ -371,6 +382,8 @@ private[spark] class BlockManagerInfo(
logInfo("Registering block manager %s with %s RAM".format(
blockManagerId.hostPort, Utils.bytesToString(maxMem)))

def getStatus(blockId: BlockId) = Option(_blocks.get(blockId))

def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ private[storage] object BlockManagerMessages {
case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
extends ToBlockManagerSlave

// For testing. Ask the slave for the block's storage level.
case class GetStorageLevel(blockId: BlockId) extends ToBlockManagerSlave


//////////////////////////////////////////////////////////////////////////////////
// Messages from slaves to the master.
Expand Down Expand Up @@ -113,10 +110,10 @@ private[storage] object BlockManagerMessages {

case object GetMemoryStatus extends ToBlockManagerMaster

case object ExpireDeadHosts extends ToBlockManagerMaster

case object GetStorageStatus extends ToBlockManagerMaster

// For testing. Have the master ask all slaves for the given block's storage level.
case class AskForStorageLevels(blockId: BlockId) extends ToBlockManagerMaster
case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
extends ToBlockManagerMaster

case object ExpireDeadHosts extends ToBlockManagerMaster
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BlockManagerSlaveActor(
case RemoveBroadcast(broadcastId, removeFromDriver) =>
blockManager.removeBroadcast(broadcastId, removeFromDriver)

case GetStorageLevel(blockId) =>
sender ! blockManager.getLevel(blockId)
case GetBlockStatus(blockId, _) =>
sender ! blockManager.getStatus(blockId)
}
}
69 changes: 40 additions & 29 deletions core/src/test/scala/org/apache/spark/BroadcastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,32 +107,40 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// Verify that the broadcast file is created, and blocks are persisted only on the driver
def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
assert(blockIds.size === 1)
val levels = bmm.askForStorageLevels(blockIds.head, waitTimeMs = 0)
assert(levels.size === 1)
levels.head match { case (bm, level) =>
assert(bm.executorId === "<driver>")
assert(level === StorageLevel.MEMORY_AND_DISK)
val statuses = bmm.getBlockStatus(blockIds.head)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store on the driver")
assert(status.diskSize === 0, "Block should not be in disk store on the driver")
}
assert(HttpBroadcast.getFile(blockIds.head.broadcastId).exists)
assert(HttpBroadcast.getFile(blockIds.head.broadcastId).exists, "Broadcast file not found!")
}

// Verify that blocks are persisted in both the executors and the driver
def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
assert(blockIds.size === 1)
val levels = bmm.askForStorageLevels(blockIds.head, waitTimeMs = 0)
assert(levels.size === numSlaves + 1)
levels.foreach { case (_, level) =>
assert(level === StorageLevel.MEMORY_AND_DISK)
val statuses = bmm.getBlockStatus(blockIds.head)
assert(statuses.size === numSlaves + 1)
statuses.foreach { case (_, status) =>
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store")
assert(status.diskSize === 0, "Block should not be in disk store")
}
}

// Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver
// is true. In the latter case, also verify that the broadcast file is deleted on the driver.
def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
assert(blockIds.size === 1)
val levels = bmm.askForStorageLevels(blockIds.head, waitTimeMs = 0)
assert(levels.size === (if (removeFromDriver) 0 else 1))
assert(removeFromDriver === !HttpBroadcast.getFile(blockIds.head.broadcastId).exists)
val statuses = bmm.getBlockStatus(blockIds.head)
val expectedNumBlocks = if (removeFromDriver) 0 else 1
val possiblyNot = if (removeFromDriver) "" else " not"
assert(statuses.size === expectedNumBlocks,
"Block should%s be unpersisted on the driver".format(possiblyNot))
assert(removeFromDriver === !HttpBroadcast.getFile(blockIds.head.broadcastId).exists,
"Broadcast file should%s be deleted".format(possiblyNot))
}

testUnpersistBroadcast(numSlaves, httpConf, getBlockIds, afterCreation,
Expand All @@ -158,28 +166,32 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// Verify that blocks are persisted only on the driver
def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
blockIds.foreach { blockId =>
val levels = bmm.askForStorageLevels(blockId, waitTimeMs = 0)
assert(levels.size === 1)
levels.head match { case (bm, level) =>
assert(bm.executorId === "<driver>")
assert(level === StorageLevel.MEMORY_AND_DISK)
val statuses = bmm.getBlockStatus(blockIds.head)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store on the driver")
assert(status.diskSize === 0, "Block should not be in disk store on the driver")
}
}
}

// Verify that blocks are persisted in both the executors and the driver
def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
blockIds.foreach { blockId =>
val levels = bmm.askForStorageLevels(blockId, waitTimeMs = 0)
val statuses = bmm.getBlockStatus(blockId)
if (blockId.field == "meta") {
// Meta data is only on the driver
assert(levels.size === 1)
levels.head match { case (bm, _) => assert(bm.executorId === "<driver>") }
assert(statuses.size === 1)
statuses.head match { case (bm, _) => assert(bm.executorId === "<driver>") }
} else {
// Other blocks are on both the executors and the driver
assert(levels.size === numSlaves + 1)
levels.foreach { case (_, level) =>
assert(level === StorageLevel.MEMORY_AND_DISK)
assert(statuses.size === numSlaves + 1)
statuses.foreach { case (_, status) =>
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
assert(status.memSize > 0, "Block should be in memory store")
assert(status.diskSize === 0, "Block should not be in disk store")
}
}
}
Expand All @@ -189,12 +201,11 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// is true.
def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
val expectedNumBlocks = if (removeFromDriver) 0 else 1
var waitTimeMs = 1000L
val possiblyNot = if (removeFromDriver) "" else " not"
blockIds.foreach { blockId =>
// Allow a second for the messages triggered by unpersist to propagate to prevent deadlocks
val levels = bmm.askForStorageLevels(blockId, waitTimeMs)
assert(levels.size === expectedNumBlocks)
waitTimeMs = 0L
val statuses = bmm.getBlockStatus(blockId)
assert(statuses.size === expectedNumBlocks,
"Block should%s be unpersisted on the driver".format(possiblyNot))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class CleanerTester(
"One or more shuffles' blocks cannot be found in disk manager, cannot start cleaner test")

// Verify that the broadcast is in the driver's block manager
assert(broadcastIds.forall(bid => blockManager.getLevel(broadcastBlockId(bid)).isDefined),
assert(broadcastIds.forall(bid => blockManager.getStatus(broadcastBlockId(bid)).isDefined),
"One ore more broadcasts have not been persisted in the driver's block manager")
}

Expand All @@ -291,7 +291,7 @@ class CleanerTester(

// Verify all broadcasts have been unpersisted
assert(broadcastIds.forall { bid =>
blockManager.master.askForStorageLevels(broadcastBlockId(bid)).isEmpty
blockManager.master.getBlockStatus(broadcastBlockId(bid)).isEmpty
})

return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,46 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(!store.get("list5").isDefined, "list5 was in store")
}

test("query block statuses") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
val list = List.fill(2)(new Array[Byte](200))

// Tell master. By LRU, only list2 and list3 remains.
store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)

// getLocations and getBlockStatus should yield the same locations
assert(store.master.getLocations("list1").size === 0)
assert(store.master.getLocations("list2").size === 1)
assert(store.master.getLocations("list3").size === 1)
assert(store.master.getBlockStatus("list1", askSlaves = false).size === 0)
assert(store.master.getBlockStatus("list2", askSlaves = false).size === 1)
assert(store.master.getBlockStatus("list3", askSlaves = false).size === 1)
assert(store.master.getBlockStatus("list1", askSlaves = true).size === 0)
assert(store.master.getBlockStatus("list2", askSlaves = true).size === 1)
assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)

// This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
store.put("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
store.put("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)

// getLocations should return nothing because the master is not informed
// getBlockStatus without asking slaves should have the same result
// getBlockStatus with asking slaves, however, should present the actual block statuses
assert(store.master.getLocations("list4").size === 0)
assert(store.master.getLocations("list5").size === 0)
assert(store.master.getLocations("list6").size === 0)
assert(store.master.getBlockStatus("list4", askSlaves = false).size === 0)
assert(store.master.getBlockStatus("list5", askSlaves = false).size === 0)
assert(store.master.getBlockStatus("list6", askSlaves = false).size === 0)
assert(store.master.getBlockStatus("list4", askSlaves = true).size === 0)
assert(store.master.getBlockStatus("list5", askSlaves = true).size === 1)
assert(store.master.getBlockStatus("list6", askSlaves = true).size === 1)
}

test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
Expand Down

0 comments on commit fbfeec8

Please sign in to comment.