From cff023c4cde102834c8a0fb12d7d8500f33675e8 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 7 Apr 2014 13:32:17 -0700 Subject: [PATCH] Fixed issues based on Andrew's comments. --- .../apache/spark/broadcast/HttpBroadcast.scala | 6 ++---- .../spark/storage/BlockManagerMaster.scala | 2 +- .../spark/storage/BlockManagerSlaveActor.scala | 11 +++++------ .../apache/spark/storage/DiskBlockManager.scala | 1 - .../org/apache/spark/ContextCleanerSuite.scala | 17 ++++++++--------- .../spark/storage/BlockManagerSuite.scala | 14 +++++++------- .../apache/spark/util/JsonProtocolSuite.scala | 2 +- 7 files changed, 24 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 02158afa972a2..51399bb980fcd 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -43,10 +43,8 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea val blockId = BroadcastBlockId(id) /* - * Broadcasted data is also stored in the BlockManager of the driver. - * The BlockManagerMaster - * does not need to be told about this block as not only - * need to know about this data block. + * Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster + * does not need to be told about this block as not only need to know about this data block. */ HttpBroadcast.synchronized { SparkEnv.get.blockManager.putSingle( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index d939c5da96967..4191f4e4c71e4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -193,7 +193,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log * updated block statuses. This is useful when the master is not informed of the given block * by all block managers. */ - def getMatcinghBlockIds( + def getMatchinghBlockIds( filter: BlockId => Boolean, askSlaves: Boolean): Seq[BlockId] = { val msg = GetMatchingBlockIds(filter, askSlaves) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala index fc22f54ceb9d8..6d4db064dff58 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala @@ -39,18 +39,18 @@ class BlockManagerSlaveActor( // Operations that involve removing blocks may be slow and should be done asynchronously override def receive = { case RemoveBlock(blockId) => - doAsync[Boolean]("removing block", sender) { + doAsync[Boolean]("removing block " + blockId, sender) { blockManager.removeBlock(blockId) true } case RemoveRdd(rddId) => - doAsync[Int]("removing RDD", sender) { + doAsync[Int]("removing RDD " + rddId, sender) { blockManager.removeRdd(rddId) } case RemoveShuffle(shuffleId) => - doAsync[Boolean]("removing shuffle", sender) { + doAsync[Boolean]("removing shuffle " + shuffleId, sender) { if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } @@ -58,7 +58,7 @@ class BlockManagerSlaveActor( } case RemoveBroadcast(broadcastId, tellMaster) => - doAsync[Int]("removing RDD", sender) { + doAsync[Int]("removing broadcast " + broadcastId, sender) { blockManager.removeBroadcast(broadcastId, tellMaster) } @@ -72,8 +72,7 @@ class BlockManagerSlaveActor( private def doAsync[T](actionMessage: String, responseActor: ActorRef)(body: => T) { val future = Future { logDebug(actionMessage) - val response = body - response + body } future.onSuccess { case response => logDebug("Done " + actionMessage + ", response is " + response) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 47a1a6d4a5869..866c52150a48e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -47,7 +47,6 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private var shuffleSender : ShuffleSender = null - addShutdownHook() /** diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 345bee6930c49..c828e4ebcf924 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -166,8 +166,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo sc.stop() val conf2 = new SparkConf() - .setMaster("local[4]") - //.setMaster("local-cluster[2, 1, 512]") + .setMaster("local-cluster[2, 1, 512]") .setAppName("ContextCleanerSuite") .set("spark.cleaner.referenceTracking.blocking", "true") sc = new SparkContext(conf2) @@ -180,7 +179,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo val shuffleIds = 0 until sc.newShuffleId val broadcastIds = 0L until numBroadcasts - val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) + val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) runGC() intercept[Exception] { preGCTester.assertCleanup()(timeout(1000 millis)) @@ -391,22 +390,22 @@ class CleanerTester( toBeCleanedBroadcstIds.isEmpty private def getRDDBlocks(rddId: Int): Seq[BlockId] = { - blockManager.master.getMatcinghBlockIds( _ match { - case RDDBlockId(rddId, _) => true + blockManager.master.getMatchinghBlockIds( _ match { + case RDDBlockId(`rddId`, _) => true case _ => false }, askSlaves = true) } private def getShuffleBlocks(shuffleId: Int): Seq[BlockId] = { - blockManager.master.getMatcinghBlockIds( _ match { - case ShuffleBlockId(shuffleId, _, _) => true + blockManager.master.getMatchinghBlockIds( _ match { + case ShuffleBlockId(`shuffleId`, _, _) => true case _ => false }, askSlaves = true) } private def getBroadcastBlocks(broadcastId: Long): Seq[BlockId] = { - blockManager.master.getMatcinghBlockIds( _ match { - case BroadcastBlockId(broadcastId, _) => true + blockManager.master.getMatchinghBlockIds( _ match { + case BroadcastBlockId(`broadcastId`, _) => true case _ => false }, askSlaves = true) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 970b4f70ee6d7..fb1920bd47fb1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -862,29 +862,29 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT securityMgr, mapOutputTracker) val list = List.fill(2)(new Array[Byte](10)) - // Tell master. By LRU, only list2 and list3 remains. + // insert some blocks store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) // getLocations and getBlockStatus should yield the same locations - assert(store.master.getMatcinghBlockIds(_.toString.contains("list"), askSlaves = false).size === 3) - assert(store.master.getMatcinghBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1) + assert(store.master.getMatchinghBlockIds(_.toString.contains("list"), askSlaves = false).size === 3) + assert(store.master.getMatchinghBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1) - // Tell master. By LRU, only list2 and list3 remains. + // insert some more blocks store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false) // getLocations and getBlockStatus should yield the same locations - assert(store.master.getMatcinghBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1) - assert(store.master.getMatcinghBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3) + assert(store.master.getMatchinghBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1) + assert(store.master.getMatchinghBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3) val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0)) blockIds.foreach { blockId => store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) } - val matchedBlockIds = store.master.getMatcinghBlockIds(_ match { + val matchedBlockIds = store.master.getMatchinghBlockIds(_ match { case RDDBlockId(1, _) => true case _ => false }, askSlaves = true) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 7af43f0cc5276..79075a7eb847c 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -108,7 +108,7 @@ class JsonProtocolSuite extends FunSuite { // BlockId testBlockId(RDDBlockId(1, 2)) testBlockId(ShuffleBlockId(1, 2, 3)) - testBlockId(BroadcastBlockId(1L, "")) + testBlockId(BroadcastBlockId(1L, "insert_words_of_wisdom_here")) testBlockId(TaskResultBlockId(1L)) testBlockId(StreamBlockId(1, 2L)) }