diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 75c87a0553a7a..cd29ac05bdfb2 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -63,6 +63,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private val cleaningThread = new Thread() { override def run() { keepCleaning() }} + /** Whether the cleaning thread will block on cleanup tasks */ + private val blockOnCleanupTasks = sc.conf.getBoolean("spark.cleaner.referenceTracking.blocking", false) + @volatile private var stopped = false /** Attach a listener object to get information of when objects are cleaned. */ @@ -112,9 +115,12 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { logDebug("Got cleaning task " + task) referenceBuffer -= reference.get task match { - case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = false) - case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = false) - case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = false) + case CleanRDD(rddId) => + doCleanupRDD(rddId, blocking = blockOnCleanupTasks) + case CleanShuffle(shuffleId) => + doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks) + case CleanBroadcast(broadcastId) => + doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 579c963094a78..c8d659d656ef4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -229,7 +229,7 @@ class SparkContext( dagScheduler.start() private[spark] val cleaner: Option[ContextCleaner] = - if (conf.getBoolean("spark.cleaner.automatic", true)) { + if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else None diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4c8e718539ec7..e684831c00abc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -218,6 +218,15 @@ private[spark] class BlockManager( } } + /** + * Get the ids of existing blocks that match the given filter. Note that this will + * query the blocks stored in the disk block manager (that the block manager + * may not know of). + */ + def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = { + (blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq + } + /** * Tell the master about the current storage status of a block. This will send a block update * message reflecting the current status, *not* the desired storage level in its block info. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 29300de7d6638..d939c5da96967 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -130,7 +130,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log /** Remove all blocks belonging to the given broadcast. */ def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { - val future = askDriverWithReply[Future[Seq[Int]]](RemoveBroadcast(broadcastId, removeFromMaster)) + val future = askDriverWithReply[Future[Seq[Int]]]( + RemoveBroadcast(broadcastId, removeFromMaster)) future.onFailure { case e: Throwable => logError("Failed to remove broadcast " + broadcastId + @@ -156,8 +157,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log } /** - * Return the block's status on all block managers, if any. This can potentially be an - * expensive operation and is used mainly for testing. + * Return the block's status on all block managers, if any. NOTE: This is a + * potentially expensive operation and should only be used for testing. * * If askSlaves is true, this invokes the master to query each block manager for the most * updated block statuses. This is useful when the master is not informed of the given block @@ -184,6 +185,22 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log }.toMap } + /** + * Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This + * is a potentially expensive operation and should only be used for testing. + * + * If askSlaves is true, this invokes the master to query each block manager for the most + * updated block statuses. This is useful when the master is not informed of the given block + * by all block managers. + */ + def getMatcinghBlockIds( + filter: BlockId => Boolean, + askSlaves: Boolean): Seq[BlockId] = { + val msg = GetMatchingBlockIds(filter, askSlaves) + val future = askDriverWithReply[Future[Seq[BlockId]]](msg) + Await.result(future, timeout) + } + /** Stop the driver actor, called only on the Spark driver node */ def stop() { if (driverActor != null) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index f238820942e34..69f261b2002a6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -96,6 +96,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus case GetBlockStatus(blockId, askSlaves) => sender ! blockStatus(blockId, askSlaves) + case GetMatchingBlockIds(filter, askSlaves) => + sender ! getMatchingBlockIds(filter, askSlaves) + case RemoveRdd(rddId) => sender ! removeRdd(rddId) @@ -266,8 +269,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } /** - * Return the block's status for all block managers, if any. This can potentially be an - * expensive operation and is used mainly for testing. + * Return the block's status for all block managers, if any. NOTE: This is a + * potentially expensive operation and should only be used for testing. * * If askSlaves is true, the master queries each block manager for the most updated block * statuses. This is useful when the master is not informed of the given block by all block @@ -294,6 +297,32 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus }.toMap } + /** + * Return the ids of blocks present in all the block managers that match the given filter. + * NOTE: This is a potentially expensive operation and should only be used for testing. + * + * If askSlaves is true, the master queries each block manager for the most updated block + * statuses. This is useful when the master is not informed of the given block by all block + * managers. + */ + private def getMatchingBlockIds( + filter: BlockId => Boolean, + askSlaves: Boolean): Future[Seq[BlockId]] = { + import context.dispatcher + val getMatchingBlockIds = GetMatchingBlockIds(filter) + Future.sequence( + blockManagerInfo.values.map { info => + val future = + if (askSlaves) { + info.slaveActor.ask(getMatchingBlockIds)(akkaTimeout).mapTo[Seq[BlockId]] + } else { + Future { info.blocks.keys.filter(filter).toSeq } + } + future + } + ).map(_.flatten.toSeq) + } + private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index afb2c6a12ce67..365e3900731dc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -115,5 +115,8 @@ private[storage] object BlockManagerMessages { case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true) extends ToBlockManagerMaster + case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true) + extends ToBlockManagerMaster + case object ExpireDeadHosts extends ToBlockManagerMaster } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala index 5c91ad36371bc..fc22f54ceb9d8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala @@ -39,28 +39,34 @@ class BlockManagerSlaveActor( // Operations that involve removing blocks may be slow and should be done asynchronously override def receive = { case RemoveBlock(blockId) => - doAsync("removing block", sender) { + doAsync[Boolean]("removing block", sender) { blockManager.removeBlock(blockId) true } case RemoveRdd(rddId) => - doAsync("removing RDD", sender) { + doAsync[Int]("removing RDD", sender) { blockManager.removeRdd(rddId) } case RemoveShuffle(shuffleId) => - doAsync("removing shuffle", sender) { + doAsync[Boolean]("removing shuffle", sender) { + if (mapOutputTracker != null) { + mapOutputTracker.unregisterShuffle(shuffleId) + } blockManager.shuffleBlockManager.removeShuffle(shuffleId) } case RemoveBroadcast(broadcastId, tellMaster) => - doAsync("removing RDD", sender) { + doAsync[Int]("removing RDD", sender) { blockManager.removeBroadcast(broadcastId, tellMaster) } case GetBlockStatus(blockId, _) => sender ! blockManager.getStatus(blockId) + + case GetMatchingBlockIds(filter, _) => + sender ! blockManager.getMatchingBlockIds(filter) } private def doAsync[T](actionMessage: String, responseActor: ActorRef)(body: => T) { @@ -70,7 +76,7 @@ class BlockManagerSlaveActor( response } future.onSuccess { case response => - logDebug("Successful in " + actionMessage + ", response is " + response) + logDebug("Done " + actionMessage + ", response is " + response) responseActor ! response logDebug("Sent response: " + response + " to " + responseActor) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index fcad84669c79a..47a1a6d4a5869 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -47,6 +47,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private var shuffleSender : ShuffleSender = null + addShutdownHook() /** @@ -95,6 +96,15 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD getBlockLocation(blockId).file.exists() } + /** List all the blocks currently stored in disk by the disk manager. */ + def getAllBlocks(): Seq[BlockId] = { + // Get all the files inside the array of array of directories + subDirs.flatten.filter(_ != null).flatMap { dir => + val files = dir.list() + if (files != null) files else Seq.empty + }.map(BlockId.apply) + } + /** Produces a unique block id and File suitable for intermediate results. */ def createTempBlock(): (TempBlockId, File) = { var blockId = new TempBlockId(UUID.randomUUID()) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 4eeeb9aa9c7ab..4cd4cdbd9909d 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -171,8 +171,11 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { /** Remove all the blocks / files and metadata related to a particular shuffle. */ def removeShuffle(shuffleId: ShuffleId): Boolean = { + // Do not change the ordering of this, if shuffleStates should be removed only + // after the corresponding shuffle blocks have been removed + val cleaned = removeShuffleBlocks(shuffleId) shuffleStates.remove(shuffleId) - removeShuffleBlocks(shuffleId) + cleaned } /** Remove all the blocks / files related to a particular shuffle. */ diff --git a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala index e2f6ba80e0dbb..79dcc4a159235 100644 --- a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala @@ -125,7 +125,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that the broadcast file is created, and blocks are persisted only on the driver def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { assert(blockIds.size === 1) - val statuses = bmm.getBlockStatus(blockIds.head) + val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) assert(statuses.size === 1) statuses.head match { case (bm, status) => assert(bm.executorId === "", "Block should only be on the driver") @@ -142,7 +142,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that blocks are persisted in both the executors and the driver def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { assert(blockIds.size === 1) - val statuses = bmm.getBlockStatus(blockIds.head) + val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) assert(statuses.size === numSlaves + 1) statuses.foreach { case (_, status) => assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK) @@ -155,7 +155,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // is true. In the latter case, also verify that the broadcast file is deleted on the driver. def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { assert(blockIds.size === 1) - val statuses = bmm.getBlockStatus(blockIds.head) + val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) val expectedNumBlocks = if (removeFromDriver) 0 else 1 val possiblyNot = if (removeFromDriver) "" else " not" assert(statuses.size === expectedNumBlocks, @@ -197,7 +197,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that blocks are persisted only on the driver def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { blockIds.foreach { blockId => - val statuses = bmm.getBlockStatus(blockIds.head) + val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) assert(statuses.size === 1) statuses.head match { case (bm, status) => assert(bm.executorId === "", "Block should only be on the driver") @@ -211,7 +211,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that blocks are persisted in both the executors and the driver def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { blockIds.foreach { blockId => - val statuses = bmm.getBlockStatus(blockId) + val statuses = bmm.getBlockStatus(blockId, askSlaves = true) if (blockId.field == "meta") { // Meta data is only on the driver assert(statuses.size === 1) @@ -235,7 +235,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { val expectedNumBlocks = if (removeFromDriver) 0 else 1 val possiblyNot = if (removeFromDriver) "" else " not" blockIds.foreach { blockId => - val statuses = bmm.getBlockStatus(blockId) + val statuses = bmm.getBlockStatus(blockId, askSlaves = true) assert(statuses.size === expectedNumBlocks, "Block should%s be unpersisted on the driver".format(possiblyNot)) } diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 9eb434ed0ac0e..345bee6930c49 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -29,16 +29,28 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD -import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId} +import org.apache.spark.storage.{BlockId, BroadcastBlockId, RDDBlockId, ShuffleBlockId} class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { implicit val defaultTimeout = timeout(10000 millis) + val conf = new SparkConf() + .setMaster("local[2]") + .setAppName("ContextCleanerSuite") + .set("spark.cleaner.referenceTracking.blocking", "true") before { - sc = new SparkContext("local[2]", "CleanerSuite") + sc = new SparkContext(conf) } + after { + if (sc != null) { + sc.stop() + sc = null + } + } + + test("cleanup RDD") { val rdd = newRDD.persist() val collected = rdd.collect().toList @@ -150,6 +162,40 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo postGCTester.assertCleanup() } + test("automatically cleanup RDD + shuffle + broadcast in distributed mode") { + sc.stop() + + val conf2 = new SparkConf() + .setMaster("local[4]") + //.setMaster("local-cluster[2, 1, 512]") + .setAppName("ContextCleanerSuite") + .set("spark.cleaner.referenceTracking.blocking", "true") + sc = new SparkContext(conf2) + + val numRdds = 10 + val numBroadcasts = 4 // Broadcasts are more costly + val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer + val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer + val rddIds = sc.persistentRdds.keys.toSeq + val shuffleIds = 0 until sc.newShuffleId + val broadcastIds = 0L until numBroadcasts + + val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) + runGC() + intercept[Exception] { + preGCTester.assertCleanup()(timeout(1000 millis)) + } + + // Test that GC triggers the cleanup of all variables after the dereferencing them + val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) + broadcastBuffer.clear() + rddBuffer.clear() + runGC() + postGCTester.assertCleanup() + } + + //------ Helper functions ------ + def newRDD = sc.makeRDD(1 to 10) def newPairRDD = newRDD.map(_ -> 1) def newShuffleRDD = newPairRDD.reduceByKey(_ + _) @@ -192,7 +238,6 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo // Wait until a weak reference object has been GCed while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) { System.gc() - System.runFinalization() Thread.sleep(200) } } @@ -212,6 +257,7 @@ class CleanerTester( val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds + val isDistributed = !sc.isLocal val cleanerListener = new CleanerListener { def rddCleaned(rddId: Int): Unit = { @@ -240,10 +286,9 @@ class CleanerTester( /** Assert that all the stuff has been cleaned up */ def assertCleanup()(implicit waitTimeout: Eventually.Timeout) { try { - eventually(waitTimeout, interval(10 millis)) { + eventually(waitTimeout, interval(100 millis)) { assert(isAllCleanedUp) } - Thread.sleep(100) // to allow async cleanup actions to be completed postCleanupValidate() } finally { logInfo("Resources left from cleaning up:\n" + uncleanedResourcesToString) @@ -255,20 +300,41 @@ class CleanerTester( assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, "Nothing to cleanup") // Verify the RDDs have been persisted and blocks are present - assert(rddIds.forall(sc.persistentRdds.contains), - "One or more RDDs have not been persisted, cannot start cleaner test") - assert(rddIds.forall(rddId => blockManager.master.contains(rddBlockId(rddId))), - "One or more RDDs' blocks cannot be found in block manager, cannot start cleaner test") + rddIds.foreach { rddId => + assert( + sc.persistentRdds.contains(rddId), + "RDD " + rddId + " have not been persisted, cannot start cleaner test" + ) + + assert( + !getRDDBlocks(rddId).isEmpty, + "Blocks of RDD " + rddId + " cannot be found in block manager, " + + "cannot start cleaner test" + ) + } // Verify the shuffle ids are registered and blocks are present - assert(shuffleIds.forall(mapOutputTrackerMaster.containsShuffle), - "One or more shuffles have not been registered cannot start cleaner test") - assert(shuffleIds.forall(sid => diskBlockManager.containsBlock(shuffleBlockId(sid))), - "One or more shuffles' blocks cannot be found in disk manager, cannot start cleaner test") - - // Verify that the broadcast is in the driver's block manager - assert(broadcastIds.forall(bid => blockManager.getStatus(broadcastBlockId(bid)).isDefined), - "One ore more broadcasts have not been persisted in the driver's block manager") + shuffleIds.foreach { shuffleId => + assert( + mapOutputTrackerMaster.containsShuffle(shuffleId), + "Shuffle " + shuffleId + " have not been registered, cannot start cleaner test" + ) + + assert( + !getShuffleBlocks(shuffleId).isEmpty, + "Blocks of shuffle " + shuffleId + " cannot be found in block manager, " + + "cannot start cleaner test" + ) + } + + // Verify that the broadcast blocks are present + broadcastIds.foreach { broadcastId => + assert( + !getBroadcastBlocks(broadcastId).isEmpty, + "Blocks of broadcast " + broadcastId + "cannot be found in block manager, " + + "cannot start cleaner test" + ) + } } /** @@ -276,41 +342,46 @@ class CleanerTester( * as there is not guarantee on how long it will take clean up the resources. */ private def postCleanupValidate() { - var attempts = 0 - while (attempts < MAX_VALIDATION_ATTEMPTS) { - attempts += 1 - logInfo("Attempt: " + attempts) - try { - // Verify all RDDs have been unpersisted - assert(rddIds.forall(!sc.persistentRdds.contains(_))) - assert(rddIds.forall(rddId => !blockManager.master.contains(rddBlockId(rddId)))) - - // Verify all shuffles have been deregistered and cleaned up - assert(shuffleIds.forall(!mapOutputTrackerMaster.containsShuffle(_))) - assert(shuffleIds.forall(sid => !diskBlockManager.containsBlock(shuffleBlockId(sid)))) - - // Verify all broadcasts have been unpersisted - assert(broadcastIds.forall { bid => - blockManager.master.getBlockStatus(broadcastBlockId(bid)).isEmpty - }) - - return - } catch { - case t: Throwable => - if (attempts >= MAX_VALIDATION_ATTEMPTS) { - throw t - } else { - Thread.sleep(VALIDATION_ATTEMPT_INTERVAL) - } - } + // Verify the RDDs have been persisted and blocks are present + rddIds.foreach { rddId => + assert( + !sc.persistentRdds.contains(rddId), + "RDD " + rddId + " was not cleared from sc.persistentRdds" + ) + + assert( + getRDDBlocks(rddId).isEmpty, + "Blocks of RDD " + rddId + " were not cleared from block manager" + ) + } + + // Verify the shuffle ids are registered and blocks are present + shuffleIds.foreach { shuffleId => + assert( + !mapOutputTrackerMaster.containsShuffle(shuffleId), + "Shuffle " + shuffleId + " was not deregistered from map output tracker" + ) + + assert( + getShuffleBlocks(shuffleId).isEmpty, + "Blocks of shuffle " + shuffleId + " were not cleared from block manager" + ) + } + + // Verify that the broadcast blocks are present + broadcastIds.foreach { broadcastId => + assert( + getBroadcastBlocks(broadcastId).isEmpty, + "Blocks of broadcast " + broadcastId + " were not cleared from block manager" + ) } } private def uncleanedResourcesToString = { s""" - |\tRDDs = ${toBeCleanedRDDIds.mkString("[", ", ", "]")} - |\tShuffles = ${toBeCleanedShuffleIds.mkString("[", ", ", "]")} - |\tBroadcasts = ${toBeCleanedBroadcstIds.mkString("[", ", ", "]")} + |\tRDDs = ${toBeCleanedRDDIds.toSeq.sorted.mkString("[", ", ", "]")} + |\tShuffles = ${toBeCleanedShuffleIds.toSeq.sorted.mkString("[", ", ", "]")} + |\tBroadcasts = ${toBeCleanedBroadcstIds.toSeq.sorted.mkString("[", ", ", "]")} """.stripMargin } @@ -319,11 +390,27 @@ class CleanerTester( toBeCleanedShuffleIds.isEmpty && toBeCleanedBroadcstIds.isEmpty - private def rddBlockId(rddId: Int) = RDDBlockId(rddId, 0) - private def shuffleBlockId(shuffleId: Int) = ShuffleBlockId(shuffleId, 0, 0) - private def broadcastBlockId(broadcastId: Long) = BroadcastBlockId(broadcastId) + private def getRDDBlocks(rddId: Int): Seq[BlockId] = { + blockManager.master.getMatcinghBlockIds( _ match { + case RDDBlockId(rddId, _) => true + case _ => false + }, askSlaves = true) + } + + private def getShuffleBlocks(shuffleId: Int): Seq[BlockId] = { + blockManager.master.getMatcinghBlockIds( _ match { + case ShuffleBlockId(shuffleId, _, _) => true + case _ => false + }, askSlaves = true) + } + + private def getBroadcastBlocks(broadcastId: Long): Seq[BlockId] = { + blockManager.master.getMatcinghBlockIds( _ match { + case BroadcastBlockId(broadcastId, _) => true + case _ => false + }, askSlaves = true) + } private def blockManager = sc.env.blockManager - private def diskBlockManager = blockManager.diskBlockManager private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index b47de5eab95a4..970b4f70ee6d7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -262,6 +262,78 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT master.getLocations(rdd(0, 1)) should have size 0 } + test("removing broadcast") { + store = new BlockManager("", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) + val driverStore = store + val executorStore = new BlockManager("executor", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + val a4 = new Array[Byte](400) + + val broadcast0BlockId = BroadcastBlockId(0) + val broadcast1BlockId = BroadcastBlockId(1) + val broadcast2BlockId = BroadcastBlockId(2) + val broadcast2BlockId2 = BroadcastBlockId(2, "_") + + // insert broadcast blocks in both the stores + Seq(driverStore, executorStore).foreach { case s => + s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY) + s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY) + s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY) + s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY) + } + + // verify whether the blocks exist in both the stores + Seq(driverStore, executorStore).foreach { case s => + s.getLocal(broadcast0BlockId) should not be (None) + s.getLocal(broadcast1BlockId) should not be (None) + s.getLocal(broadcast2BlockId) should not be (None) + s.getLocal(broadcast2BlockId2) should not be (None) + } + + // remove broadcast 0 block only from executors + master.removeBroadcast(0, removeFromMaster = false, blocking = true) + + // only broadcast 0 block should be removed from the executor store + executorStore.getLocal(broadcast0BlockId) should be (None) + executorStore.getLocal(broadcast1BlockId) should not be (None) + executorStore.getLocal(broadcast2BlockId) should not be (None) + + // nothing should be removed from the driver store + driverStore.getLocal(broadcast0BlockId) should not be (None) + driverStore.getLocal(broadcast1BlockId) should not be (None) + driverStore.getLocal(broadcast2BlockId) should not be (None) + + // remove broadcast 0 block from the driver as well + master.removeBroadcast(0, removeFromMaster = true, blocking = true) + driverStore.getLocal(broadcast0BlockId) should be (None) + driverStore.getLocal(broadcast1BlockId) should not be (None) + + // remove broadcast 1 block from both the stores asynchronously + // and verify all broadcast 1 blocks have been removed + master.removeBroadcast(1, removeFromMaster = true, blocking = false) + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + driverStore.getLocal(broadcast1BlockId) should be (None) + executorStore.getLocal(broadcast1BlockId) should be (None) + } + + // remove broadcast 2 from both the stores asynchronously + // and verify all broadcast 2 blocks have been removed + master.removeBroadcast(2, removeFromMaster = true, blocking = false) + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + driverStore.getLocal(broadcast2BlockId) should be (None) + driverStore.getLocal(broadcast2BlockId2) should be (None) + executorStore.getLocal(broadcast2BlockId) should be (None) + executorStore.getLocal(broadcast2BlockId2) should be (None) + } + executorStore.stop() + driverStore.stop() + store = null + } + test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) store = new BlockManager("", actorSystem, master, serializer, 2000, conf, @@ -785,6 +857,40 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(store.master.getBlockStatus("list6", askSlaves = true).size === 1) } + test("get matching blocks") { + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) + val list = List.fill(2)(new Array[Byte](10)) + + // Tell master. By LRU, only list2 and list3 remains. + store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + + // getLocations and getBlockStatus should yield the same locations + assert(store.master.getMatcinghBlockIds(_.toString.contains("list"), askSlaves = false).size === 3) + assert(store.master.getMatcinghBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1) + + // Tell master. By LRU, only list2 and list3 remains. + store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) + + // getLocations and getBlockStatus should yield the same locations + assert(store.master.getMatcinghBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1) + assert(store.master.getMatcinghBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3) + + val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0)) + blockIds.foreach { blockId => + store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } + val matchedBlockIds = store.master.getMatcinghBlockIds(_ match { + case RDDBlockId(1, _) => true + case _ => false + }, askSlaves = true) + assert(matchedBlockIds.toSet === Set(RDDBlockId(1, 0), RDDBlockId(1, 1))) + } + test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr, mapOutputTracker) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 0dd34223787cd..808ddfdcf45d8 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -64,6 +64,13 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { assert(!diskBlockManager.containsBlock(blockId)) } + test("enumerating blocks") { + val ids = (1 to 100).map(i => TestBlockId("test_" + i)) + val files = ids.map(id => diskBlockManager.getFile(id)) + files.foreach(file => writeToFile(file, 10)) + assert(diskBlockManager.getAllBlocks.toSet === ids.toSet) + } + test("block appending") { val blockId = new TestBlockId("test") val newFile = diskBlockManager.getFile(blockId)