Skip to content

Commit

Permalink
Fixed issues based on Andrew's comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 7, 2014
1 parent 4d05314 commit cff023c
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
val blockId = BroadcastBlockId(id)

/*
* Broadcasted data is also stored in the BlockManager of the driver.
* The BlockManagerMaster
* does not need to be told about this block as not only
* need to know about this data block.
* Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
* does not need to be told about this block as not only need to know about this data block.
*/
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
* updated block statuses. This is useful when the master is not informed of the given block
* by all block managers.
*/
def getMatcinghBlockIds(
def getMatchinghBlockIds(
filter: BlockId => Boolean,
askSlaves: Boolean): Seq[BlockId] = {
val msg = GetMatchingBlockIds(filter, askSlaves)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,26 @@ class BlockManagerSlaveActor(
// Operations that involve removing blocks may be slow and should be done asynchronously
override def receive = {
case RemoveBlock(blockId) =>
doAsync[Boolean]("removing block", sender) {
doAsync[Boolean]("removing block " + blockId, sender) {
blockManager.removeBlock(blockId)
true
}

case RemoveRdd(rddId) =>
doAsync[Int]("removing RDD", sender) {
doAsync[Int]("removing RDD " + rddId, sender) {
blockManager.removeRdd(rddId)
}

case RemoveShuffle(shuffleId) =>
doAsync[Boolean]("removing shuffle", sender) {
doAsync[Boolean]("removing shuffle " + shuffleId, sender) {
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
}
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
}

case RemoveBroadcast(broadcastId, tellMaster) =>
doAsync[Int]("removing RDD", sender) {
doAsync[Int]("removing broadcast " + broadcastId, sender) {
blockManager.removeBroadcast(broadcastId, tellMaster)
}

Expand All @@ -72,8 +72,7 @@ class BlockManagerSlaveActor(
private def doAsync[T](actionMessage: String, responseActor: ActorRef)(body: => T) {
val future = Future {
logDebug(actionMessage)
val response = body
response
body
}
future.onSuccess { case response =>
logDebug("Done " + actionMessage + ", response is " + response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private var shuffleSender : ShuffleSender = null


addShutdownHook()

/**
Expand Down
17 changes: 8 additions & 9 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
sc.stop()

val conf2 = new SparkConf()
.setMaster("local[4]")
//.setMaster("local-cluster[2, 1, 512]")
.setMaster("local-cluster[2, 1, 512]")
.setAppName("ContextCleanerSuite")
.set("spark.cleaner.referenceTracking.blocking", "true")
sc = new SparkContext(conf2)
Expand All @@ -180,7 +179,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts

val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
Expand Down Expand Up @@ -391,22 +390,22 @@ class CleanerTester(
toBeCleanedBroadcstIds.isEmpty

private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatcinghBlockIds( _ match {
case RDDBlockId(rddId, _) => true
blockManager.master.getMatchinghBlockIds( _ match {
case RDDBlockId(`rddId`, _) => true
case _ => false
}, askSlaves = true)
}

private def getShuffleBlocks(shuffleId: Int): Seq[BlockId] = {
blockManager.master.getMatcinghBlockIds( _ match {
case ShuffleBlockId(shuffleId, _, _) => true
blockManager.master.getMatchinghBlockIds( _ match {
case ShuffleBlockId(`shuffleId`, _, _) => true
case _ => false
}, askSlaves = true)
}

private def getBroadcastBlocks(broadcastId: Long): Seq[BlockId] = {
blockManager.master.getMatcinghBlockIds( _ match {
case BroadcastBlockId(broadcastId, _) => true
blockManager.master.getMatchinghBlockIds( _ match {
case BroadcastBlockId(`broadcastId`, _) => true
case _ => false
}, askSlaves = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,29 +862,29 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
securityMgr, mapOutputTracker)
val list = List.fill(2)(new Array[Byte](10))

// Tell master. By LRU, only list2 and list3 remains.
// insert some blocks
store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)

// getLocations and getBlockStatus should yield the same locations
assert(store.master.getMatcinghBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
assert(store.master.getMatcinghBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)
assert(store.master.getMatchinghBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
assert(store.master.getMatchinghBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)

// Tell master. By LRU, only list2 and list3 remains.
// insert some more blocks
store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

// getLocations and getBlockStatus should yield the same locations
assert(store.master.getMatcinghBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
assert(store.master.getMatcinghBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3)
assert(store.master.getMatchinghBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
assert(store.master.getMatchinghBlockIds(_.toString.contains("newlist"), askSlaves = true).size === 3)

val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
blockIds.foreach { blockId =>
store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
val matchedBlockIds = store.master.getMatcinghBlockIds(_ match {
val matchedBlockIds = store.master.getMatchinghBlockIds(_ match {
case RDDBlockId(1, _) => true
case _ => false
}, askSlaves = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class JsonProtocolSuite extends FunSuite {
// BlockId
testBlockId(RDDBlockId(1, 2))
testBlockId(ShuffleBlockId(1, 2, 3))
testBlockId(BroadcastBlockId(1L, "<Insert words of wisdom here>"))
testBlockId(BroadcastBlockId(1L, "insert_words_of_wisdom_here"))
testBlockId(TaskResultBlockId(1L))
testBlockId(StreamBlockId(1, 2L))
}
Expand Down

0 comments on commit cff023c

Please sign in to comment.