Skip to content

Commit

Permalink
Added more unit tests for BlockManager, DiskBlockManager, and Context…
Browse files Browse the repository at this point in the history
…Cleaner.
  • Loading branch information
tdas committed Apr 7, 2014
1 parent 6222697 commit 41c9ece
Show file tree
Hide file tree
Showing 13 changed files with 355 additions and 72 deletions.
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

/** Whether the cleaning thread will block on cleanup tasks */
private val blockOnCleanupTasks = sc.conf.getBoolean("spark.cleaner.referenceTracking.blocking", false)

@volatile private var stopped = false

/** Attach a listener object to get information of when objects are cleaned. */
Expand Down Expand Up @@ -112,9 +115,12 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = false)
case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = false)
case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = false)
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
}
}
} catch {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class SparkContext(
dagScheduler.start()

private[spark] val cleaner: Option[ContextCleaner] =
if (conf.getBoolean("spark.cleaner.automatic", true)) {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ private[spark] class BlockManager(
}
}

/**
* Get the ids of existing blocks that match the given filter. Note that this will
* query the blocks stored in the disk block manager (that the block manager
* may not know of).
*/
def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
(blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
}

/**
* Tell the master about the current storage status of a block. This will send a block update
* message reflecting the current status, *not* the desired storage level in its block info.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log

/** Remove all blocks belonging to the given broadcast. */
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Int]]](RemoveBroadcast(broadcastId, removeFromMaster))
val future = askDriverWithReply[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.onFailure {
case e: Throwable =>
logError("Failed to remove broadcast " + broadcastId +
Expand All @@ -156,8 +157,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}

/**
* Return the block's status on all block managers, if any. This can potentially be an
* expensive operation and is used mainly for testing.
* Return the block's status on all block managers, if any. NOTE: This is a
* potentially expensive operation and should only be used for testing.
*
* If askSlaves is true, this invokes the master to query each block manager for the most
* updated block statuses. This is useful when the master is not informed of the given block
Expand All @@ -184,6 +185,22 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}.toMap
}

/**
* Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This
* is a potentially expensive operation and should only be used for testing.
*
* If askSlaves is true, this invokes the master to query each block manager for the most
* updated block statuses. This is useful when the master is not informed of the given block
* by all block managers.
*/
def getMatcinghBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
val future = askDriverWithReply[Future[Seq[BlockId]]](msg)
Await.result(future, timeout)
}

/** Stop the driver actor, called only on the Spark driver node */
def stop() {
if (driverActor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
case GetBlockStatus(blockId, askSlaves) =>
sender ! blockStatus(blockId, askSlaves)

case GetMatchingBlockIds(filter, askSlaves) =>
sender ! getMatchingBlockIds(filter, askSlaves)

case RemoveRdd(rddId) =>
sender ! removeRdd(rddId)

Expand Down Expand Up @@ -266,8 +269,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}

/**
* Return the block's status for all block managers, if any. This can potentially be an
* expensive operation and is used mainly for testing.
* Return the block's status for all block managers, if any. NOTE: This is a
* potentially expensive operation and should only be used for testing.
*
* If askSlaves is true, the master queries each block manager for the most updated block
* statuses. This is useful when the master is not informed of the given block by all block
Expand All @@ -294,6 +297,32 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}.toMap
}

/**
* Return the ids of blocks present in all the block managers that match the given filter.
* NOTE: This is a potentially expensive operation and should only be used for testing.
*
* If askSlaves is true, the master queries each block manager for the most updated block
* statuses. This is useful when the master is not informed of the given block by all block
* managers.
*/
private def getMatchingBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Future[Seq[BlockId]] = {
import context.dispatcher
val getMatchingBlockIds = GetMatchingBlockIds(filter)
Future.sequence(
blockManagerInfo.values.map { info =>
val future =
if (askSlaves) {
info.slaveActor.ask(getMatchingBlockIds)(akkaTimeout).mapTo[Seq[BlockId]]
} else {
Future { info.blocks.keys.filter(filter).toSeq }
}
future
}
).map(_.flatten.toSeq)
}

private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,8 @@ private[storage] object BlockManagerMessages {
case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
extends ToBlockManagerMaster

case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)
extends ToBlockManagerMaster

case object ExpireDeadHosts extends ToBlockManagerMaster
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,34 @@ class BlockManagerSlaveActor(
// Operations that involve removing blocks may be slow and should be done asynchronously
override def receive = {
case RemoveBlock(blockId) =>
doAsync("removing block", sender) {
doAsync[Boolean]("removing block", sender) {
blockManager.removeBlock(blockId)
true
}

case RemoveRdd(rddId) =>
doAsync("removing RDD", sender) {
doAsync[Int]("removing RDD", sender) {
blockManager.removeRdd(rddId)
}

case RemoveShuffle(shuffleId) =>
doAsync("removing shuffle", sender) {
doAsync[Boolean]("removing shuffle", sender) {
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
}
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
}

case RemoveBroadcast(broadcastId, tellMaster) =>
doAsync("removing RDD", sender) {
doAsync[Int]("removing RDD", sender) {
blockManager.removeBroadcast(broadcastId, tellMaster)
}

case GetBlockStatus(blockId, _) =>
sender ! blockManager.getStatus(blockId)

case GetMatchingBlockIds(filter, _) =>
sender ! blockManager.getMatchingBlockIds(filter)
}

private def doAsync[T](actionMessage: String, responseActor: ActorRef)(body: => T) {
Expand All @@ -70,7 +76,7 @@ class BlockManagerSlaveActor(
response
}
future.onSuccess { case response =>
logDebug("Successful in " + actionMessage + ", response is " + response)
logDebug("Done " + actionMessage + ", response is " + response)
responseActor ! response
logDebug("Sent response: " + response + " to " + responseActor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private var shuffleSender : ShuffleSender = null


addShutdownHook()

/**
Expand Down Expand Up @@ -95,6 +96,15 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
getBlockLocation(blockId).file.exists()
}

/** List all the blocks currently stored in disk by the disk manager. */
def getAllBlocks(): Seq[BlockId] = {
// Get all the files inside the array of array of directories
subDirs.flatten.filter(_ != null).flatMap { dir =>
val files = dir.list()
if (files != null) files else Seq.empty
}.map(BlockId.apply)
}

/** Produces a unique block id and File suitable for intermediate results. */
def createTempBlock(): (TempBlockId, File) = {
var blockId = new TempBlockId(UUID.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,11 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {

/** Remove all the blocks / files and metadata related to a particular shuffle. */
def removeShuffle(shuffleId: ShuffleId): Boolean = {
// Do not change the ordering of this, if shuffleStates should be removed only
// after the corresponding shuffle blocks have been removed
val cleaned = removeShuffleBlocks(shuffleId)
shuffleStates.remove(shuffleId)
removeShuffleBlocks(shuffleId)
cleaned
}

/** Remove all the blocks / files related to a particular shuffle. */
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/scala/org/apache/spark/BroadcastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// Verify that the broadcast file is created, and blocks are persisted only on the driver
def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
assert(blockIds.size === 1)
val statuses = bmm.getBlockStatus(blockIds.head)
val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
Expand All @@ -142,7 +142,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// Verify that blocks are persisted in both the executors and the driver
def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
assert(blockIds.size === 1)
val statuses = bmm.getBlockStatus(blockIds.head)
val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true)
assert(statuses.size === numSlaves + 1)
statuses.foreach { case (_, status) =>
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
Expand All @@ -155,7 +155,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// is true. In the latter case, also verify that the broadcast file is deleted on the driver.
def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
assert(blockIds.size === 1)
val statuses = bmm.getBlockStatus(blockIds.head)
val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true)
val expectedNumBlocks = if (removeFromDriver) 0 else 1
val possiblyNot = if (removeFromDriver) "" else " not"
assert(statuses.size === expectedNumBlocks,
Expand Down Expand Up @@ -197,7 +197,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// Verify that blocks are persisted only on the driver
def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
blockIds.foreach { blockId =>
val statuses = bmm.getBlockStatus(blockIds.head)
val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true)
assert(statuses.size === 1)
statuses.head match { case (bm, status) =>
assert(bm.executorId === "<driver>", "Block should only be on the driver")
Expand All @@ -211,7 +211,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
// Verify that blocks are persisted in both the executors and the driver
def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) {
blockIds.foreach { blockId =>
val statuses = bmm.getBlockStatus(blockId)
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
if (blockId.field == "meta") {
// Meta data is only on the driver
assert(statuses.size === 1)
Expand All @@ -235,7 +235,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
val expectedNumBlocks = if (removeFromDriver) 0 else 1
val possiblyNot = if (removeFromDriver) "" else " not"
blockIds.foreach { blockId =>
val statuses = bmm.getBlockStatus(blockId)
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
assert(statuses.size === expectedNumBlocks,
"Block should%s be unpersisted on the driver".format(possiblyNot))
}
Expand Down
Loading

0 comments on commit 41c9ece

Please sign in to comment.