From fbfeec80cfb7a1bd86847fa22f641d9b9ad7480f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 28 Mar 2014 18:33:11 -0700 Subject: [PATCH] Add functionality to query executors for their local BlockStatuses Not all blocks are reported to the master. In HttpBroadcast and TorrentBroadcast, for instance, most blocks are not reported to master. The lack of a mechanism to get local block statuses on each executor makes it difficult to test the correctness of un/persisting a broadcast. This new functionality, though only used for testing at the moment, is general enough to be used for other things in the future. --- .../spark/network/ConnectionManager.scala | 1 - .../org/apache/spark/storage/BlockInfo.scala | 2 + .../apache/spark/storage/BlockManager.scala | 15 ++-- .../spark/storage/BlockManagerMaster.scala | 33 +++++---- .../storage/BlockManagerMasterActor.scala | 47 ++++++++----- .../spark/storage/BlockManagerMessages.scala | 11 ++- .../storage/BlockManagerSlaveActor.scala | 4 +- .../org/apache/spark/BroadcastSuite.scala | 69 +++++++++++-------- .../apache/spark/ContextCleanerSuite.scala | 4 +- .../spark/storage/BlockManagerSuite.scala | 40 +++++++++++ 10 files changed, 150 insertions(+), 76 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index a75130cba2a2e..bb3abf1d032d1 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -17,7 +17,6 @@ package org.apache.spark.network -import java.net._ import java.nio._ import java.nio.channels._ import java.nio.channels.spi._ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala index c8f397609a0b4..ef924123a3b11 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala @@ -79,3 +79,5 @@ private object BlockInfo { private val BLOCK_PENDING: Long = -1L private val BLOCK_FAILED: Long = -2L } + +private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a88eb1315a37b..dd2dbd1c8a397 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -209,10 +209,14 @@ private[spark] class BlockManager( } } - /** - * Get storage level of local block. If no info exists for the block, return None. - */ - def getLevel(blockId: BlockId): Option[StorageLevel] = blockInfo.get(blockId).map(_.level) + /** Return the status of the block identified by the given ID, if it exists. */ + def getStatus(blockId: BlockId): Option[BlockStatus] = { + blockInfo.get(blockId).map { info => + val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L + val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L + BlockStatus(info.level, memSize, diskSize) + } + } /** * Tell the master about the current storage status of a block. This will send a block update @@ -631,10 +635,9 @@ private[spark] class BlockManager( diskStore.putValues(blockId, iterator, level, askForBytes) case ArrayBufferValues(array) => diskStore.putValues(blockId, array, level, askForBytes) - case ByteBufferValues(bytes) => { + case ByteBufferValues(bytes) => bytes.rewind() diskStore.putBytes(blockId, bytes, level) - } } size = res.size res.data match { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 5c9ea88d6b1a4..f61aa1d6bc0fc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -148,21 +148,30 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log } /** - * Mainly for testing. Ask the driver to query all executors for their storage levels - * regarding this block. This provides an avenue for the driver to learn the storage - * levels of blocks it has not been informed of. + * Return the block's local status on all block managers, if any. * - * WARNING: This could lead to deadlocks if there are any outstanding messages the - * executors are already expecting from the driver. In this case, while the driver is - * waiting for the executors to respond to its GetStorageLevel query, the executors - * are also waiting for a response from the driver to a prior message. + * If askSlaves is true, this invokes the master to query each block manager for the most + * updated block statuses. This is useful when the master is not informed of the given block + * by all block managers. * - * The interim solution is to wait for a brief window of time to pass before asking. - * This should suffice, since this mechanism is largely introduced for testing only. + * To avoid potential deadlocks, the use of Futures is necessary, because the master actor + * should not block on waiting for a block manager, which can in turn be waiting for the + * master actor for a response to a prior message. */ - def askForStorageLevels(blockId: BlockId, waitTimeMs: Long = 1000) = { - Thread.sleep(waitTimeMs) - askDriverWithReply[Map[BlockManagerId, StorageLevel]](AskForStorageLevels(blockId)) + def getBlockStatus( + blockId: BlockId, + askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { + val msg = GetBlockStatus(blockId, askSlaves) + val response = askDriverWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) + val (blockManagerIds, futures) = response.unzip + val result = Await.result(Future.sequence(futures), timeout) + if (result == null) { + throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) + } + val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]] + blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => + status.map { s => (blockManagerId, s) } + }.toMap } /** Stop the driver actor, called only on the Spark driver node */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 3271d4f1375ef..2d9445425b879 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -21,7 +21,7 @@ import java.util.{HashMap => JHashMap} import scala.collection.mutable import scala.collection.JavaConversions._ -import scala.concurrent.{Await, Future} +import scala.concurrent.Future import scala.concurrent.duration._ import akka.actor.{Actor, ActorRef, Cancellable} @@ -93,6 +93,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case GetStorageStatus => sender ! storageStatus + case GetBlockStatus(blockId, askSlaves) => + sender ! blockStatus(blockId, askSlaves) + case RemoveRdd(rddId) => sender ! removeRdd(rddId) @@ -126,9 +129,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case HeartBeat(blockManagerId) => sender ! heartBeat(blockManagerId) - case AskForStorageLevels(blockId) => - sender ! askForStorageLevels(blockId) - case other => logWarning("Got unknown message: " + other) } @@ -254,16 +254,30 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus }.toArray } - // For testing. Ask all block managers for the given block's local storage level, if any. - private def askForStorageLevels(blockId: BlockId): Map[BlockManagerId, StorageLevel] = { - val getStorageLevel = GetStorageLevel(blockId) - blockManagerInfo.values.flatMap { info => - val future = info.slaveActor.ask(getStorageLevel)(akkaTimeout) - val result = Await.result(future, akkaTimeout) - if (result != null) { - // If the block does not exist on the slave, the slave replies None - result.asInstanceOf[Option[StorageLevel]].map { reply => (info.blockManagerId, reply) } - } else None + /** + * Return the block's local status for all block managers, if any. + * + * If askSlaves is true, the master queries each block manager for the most updated block + * statuses. This is useful when the master is not informed of the given block by all block + * managers. + * + * Rather than blocking on the block status query, master actor should simply return a + * Future to avoid potential deadlocks. This can arise if there exists a block manager + * that is also waiting for this master actor's response to a previous message. + */ + private def blockStatus( + blockId: BlockId, + askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = { + import context.dispatcher + val getBlockStatus = GetBlockStatus(blockId) + blockManagerInfo.values.map { info => + val blockStatusFuture = + if (askSlaves) { + info.slaveActor.ask(getBlockStatus)(akkaTimeout).mapTo[Option[BlockStatus]] + } else { + Future { info.getStatus(blockId) } + } + (info.blockManagerId, blockStatusFuture) }.toMap } @@ -352,9 +366,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } } - -private[spark] case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) - private[spark] class BlockManagerInfo( val blockManagerId: BlockManagerId, timeMs: Long, @@ -371,6 +382,8 @@ private[spark] class BlockManagerInfo( logInfo("Registering block manager %s with %s RAM".format( blockManagerId.hostPort, Utils.bytesToString(maxMem))) + def getStatus(blockId: BlockId) = Option(_blocks.get(blockId)) + def updateLastSeenMs() { _lastSeenMs = System.currentTimeMillis() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 9a29c39a28ab1..afb2c6a12ce67 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -41,9 +41,6 @@ private[storage] object BlockManagerMessages { case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true) extends ToBlockManagerSlave - // For testing. Ask the slave for the block's storage level. - case class GetStorageLevel(blockId: BlockId) extends ToBlockManagerSlave - ////////////////////////////////////////////////////////////////////////////////// // Messages from slaves to the master. @@ -113,10 +110,10 @@ private[storage] object BlockManagerMessages { case object GetMemoryStatus extends ToBlockManagerMaster - case object ExpireDeadHosts extends ToBlockManagerMaster - case object GetStorageStatus extends ToBlockManagerMaster - // For testing. Have the master ask all slaves for the given block's storage level. - case class AskForStorageLevels(blockId: BlockId) extends ToBlockManagerMaster + case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true) + extends ToBlockManagerMaster + + case object ExpireDeadHosts extends ToBlockManagerMaster } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala index 85b8ec40c0ea3..016ade428c68f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala @@ -50,7 +50,7 @@ class BlockManagerSlaveActor( case RemoveBroadcast(broadcastId, removeFromDriver) => blockManager.removeBroadcast(broadcastId, removeFromDriver) - case GetStorageLevel(blockId) => - sender ! blockManager.getLevel(blockId) + case GetBlockStatus(blockId, _) => + sender ! blockManager.getStatus(blockId) } } diff --git a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala index 9e600f1e91aa2..d28496e316a34 100644 --- a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala @@ -107,22 +107,26 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that the broadcast file is created, and blocks are persisted only on the driver def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { assert(blockIds.size === 1) - val levels = bmm.askForStorageLevels(blockIds.head, waitTimeMs = 0) - assert(levels.size === 1) - levels.head match { case (bm, level) => - assert(bm.executorId === "") - assert(level === StorageLevel.MEMORY_AND_DISK) + val statuses = bmm.getBlockStatus(blockIds.head) + assert(statuses.size === 1) + statuses.head match { case (bm, status) => + assert(bm.executorId === "", "Block should only be on the driver") + assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK) + assert(status.memSize > 0, "Block should be in memory store on the driver") + assert(status.diskSize === 0, "Block should not be in disk store on the driver") } - assert(HttpBroadcast.getFile(blockIds.head.broadcastId).exists) + assert(HttpBroadcast.getFile(blockIds.head.broadcastId).exists, "Broadcast file not found!") } // Verify that blocks are persisted in both the executors and the driver def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { assert(blockIds.size === 1) - val levels = bmm.askForStorageLevels(blockIds.head, waitTimeMs = 0) - assert(levels.size === numSlaves + 1) - levels.foreach { case (_, level) => - assert(level === StorageLevel.MEMORY_AND_DISK) + val statuses = bmm.getBlockStatus(blockIds.head) + assert(statuses.size === numSlaves + 1) + statuses.foreach { case (_, status) => + assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK) + assert(status.memSize > 0, "Block should be in memory store") + assert(status.diskSize === 0, "Block should not be in disk store") } } @@ -130,9 +134,13 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // is true. In the latter case, also verify that the broadcast file is deleted on the driver. def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { assert(blockIds.size === 1) - val levels = bmm.askForStorageLevels(blockIds.head, waitTimeMs = 0) - assert(levels.size === (if (removeFromDriver) 0 else 1)) - assert(removeFromDriver === !HttpBroadcast.getFile(blockIds.head.broadcastId).exists) + val statuses = bmm.getBlockStatus(blockIds.head) + val expectedNumBlocks = if (removeFromDriver) 0 else 1 + val possiblyNot = if (removeFromDriver) "" else " not" + assert(statuses.size === expectedNumBlocks, + "Block should%s be unpersisted on the driver".format(possiblyNot)) + assert(removeFromDriver === !HttpBroadcast.getFile(blockIds.head.broadcastId).exists, + "Broadcast file should%s be deleted".format(possiblyNot)) } testUnpersistBroadcast(numSlaves, httpConf, getBlockIds, afterCreation, @@ -158,11 +166,13 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that blocks are persisted only on the driver def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { blockIds.foreach { blockId => - val levels = bmm.askForStorageLevels(blockId, waitTimeMs = 0) - assert(levels.size === 1) - levels.head match { case (bm, level) => - assert(bm.executorId === "") - assert(level === StorageLevel.MEMORY_AND_DISK) + val statuses = bmm.getBlockStatus(blockIds.head) + assert(statuses.size === 1) + statuses.head match { case (bm, status) => + assert(bm.executorId === "", "Block should only be on the driver") + assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK) + assert(status.memSize > 0, "Block should be in memory store on the driver") + assert(status.diskSize === 0, "Block should not be in disk store on the driver") } } } @@ -170,16 +180,18 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that blocks are persisted in both the executors and the driver def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { blockIds.foreach { blockId => - val levels = bmm.askForStorageLevels(blockId, waitTimeMs = 0) + val statuses = bmm.getBlockStatus(blockId) if (blockId.field == "meta") { // Meta data is only on the driver - assert(levels.size === 1) - levels.head match { case (bm, _) => assert(bm.executorId === "") } + assert(statuses.size === 1) + statuses.head match { case (bm, _) => assert(bm.executorId === "") } } else { // Other blocks are on both the executors and the driver - assert(levels.size === numSlaves + 1) - levels.foreach { case (_, level) => - assert(level === StorageLevel.MEMORY_AND_DISK) + assert(statuses.size === numSlaves + 1) + statuses.foreach { case (_, status) => + assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK) + assert(status.memSize > 0, "Block should be in memory store") + assert(status.diskSize === 0, "Block should not be in disk store") } } } @@ -189,12 +201,11 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // is true. def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { val expectedNumBlocks = if (removeFromDriver) 0 else 1 - var waitTimeMs = 1000L + val possiblyNot = if (removeFromDriver) "" else " not" blockIds.foreach { blockId => - // Allow a second for the messages triggered by unpersist to propagate to prevent deadlocks - val levels = bmm.askForStorageLevels(blockId, waitTimeMs) - assert(levels.size === expectedNumBlocks) - waitTimeMs = 0L + val statuses = bmm.getBlockStatus(blockId) + assert(statuses.size === expectedNumBlocks, + "Block should%s be unpersisted on the driver".format(possiblyNot)) } } diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 6a12cb6603700..3d95547b20fc1 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -267,7 +267,7 @@ class CleanerTester( "One or more shuffles' blocks cannot be found in disk manager, cannot start cleaner test") // Verify that the broadcast is in the driver's block manager - assert(broadcastIds.forall(bid => blockManager.getLevel(broadcastBlockId(bid)).isDefined), + assert(broadcastIds.forall(bid => blockManager.getStatus(broadcastBlockId(bid)).isDefined), "One ore more broadcasts have not been persisted in the driver's block manager") } @@ -291,7 +291,7 @@ class CleanerTester( // Verify all broadcasts have been unpersisted assert(broadcastIds.forall { bid => - blockManager.master.askForStorageLevels(broadcastBlockId(bid)).isEmpty + blockManager.master.getBlockStatus(broadcastBlockId(bid)).isEmpty }) return diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 1f5bcca64fc39..bddbd381c2665 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -745,6 +745,46 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(!store.get("list5").isDefined, "list5 was in store") } + test("query block statuses") { + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) + val list = List.fill(2)(new Array[Byte](200)) + + // Tell master. By LRU, only list2 and list3 remains. + store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + + // getLocations and getBlockStatus should yield the same locations + assert(store.master.getLocations("list1").size === 0) + assert(store.master.getLocations("list2").size === 1) + assert(store.master.getLocations("list3").size === 1) + assert(store.master.getBlockStatus("list1", askSlaves = false).size === 0) + assert(store.master.getBlockStatus("list2", askSlaves = false).size === 1) + assert(store.master.getBlockStatus("list3", askSlaves = false).size === 1) + assert(store.master.getBlockStatus("list1", askSlaves = true).size === 0) + assert(store.master.getBlockStatus("list2", askSlaves = true).size === 1) + assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1) + + // This time don't tell master and see what happens. By LRU, only list5 and list6 remains. + store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) + store.put("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + store.put("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false) + + // getLocations should return nothing because the master is not informed + // getBlockStatus without asking slaves should have the same result + // getBlockStatus with asking slaves, however, should present the actual block statuses + assert(store.master.getLocations("list4").size === 0) + assert(store.master.getLocations("list5").size === 0) + assert(store.master.getLocations("list6").size === 0) + assert(store.master.getBlockStatus("list4", askSlaves = false).size === 0) + assert(store.master.getBlockStatus("list5", askSlaves = false).size === 0) + assert(store.master.getBlockStatus("list6", askSlaves = false).size === 0) + assert(store.master.getBlockStatus("list4", askSlaves = true).size === 0) + assert(store.master.getBlockStatus("list5", askSlaves = true).size === 1) + assert(store.master.getBlockStatus("list6", askSlaves = true).size === 1) + } + test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr, mapOutputTracker)