Skip to content

Commit

Permalink
Clean up broadcast blocks through BlockManager*
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Mar 26, 2014
1 parent d0edef3 commit 544ac86
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private[spark] object HttpBroadcast extends Logging {
* and delete the associated broadcast file.
*/
def unpersist(id: Long, removeFromDriver: Boolean) = synchronized {
//SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
if (removeFromDriver) {
val file = new File(broadcastDir, BroadcastBlockId(id).name)
files.remove(file.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private[spark] object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean) = synchronized {
//SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver)
}

}
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -820,10 +820,22 @@ private[spark] class BlockManager(
// from RDD.id to blocks.
logInfo("Removing RDD " + rddId)
val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach(blockId => removeBlock(blockId, tellMaster = false))
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
blocksToRemove.size
}

/**
* Remove all blocks belonging to the given broadcast.
*/
def removeBroadcast(broadcastId: Long) {
logInfo("Removing broadcast " + broadcastId)
val blocksToRemove = blockInfo.keys.filter(_.isBroadcast).collect {
case bid: BroadcastBlockId if bid.broadcastId == broadcastId => bid
case bid: BroadcastHelperBlockId if bid.broadcastId.broadcastId == broadcastId => bid
}
blocksToRemove.foreach { blockId => removeBlock(blockId) }
}

/**
* Remove a block from both memory and disk.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
askDriverWithReply(RemoveShuffle(shuffleId))
}

/**
* Remove all blocks belonging to the given broadcast.
*/
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean) {
askDriverWithReply(RemoveBroadcast(broadcastId, removeFromMaster))
}

/**
* Return the memory status for each block manager, in the form of a map from
* the block manager's id to two long values. The first value is the maximum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
removeShuffle(shuffleId)
sender ! true

case RemoveBroadcast(broadcastId, removeFromDriver) =>
removeBroadcast(broadcastId, removeFromDriver)
sender ! true

case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
sender ! true
Expand Down Expand Up @@ -151,9 +155,15 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
private def removeShuffle(shuffleId: Int) {
// Nothing to do in the BlockManagerMasterActor data structures
val removeMsg = RemoveShuffle(shuffleId)
blockManagerInfo.values.foreach { bm =>
bm.slaveActor ! removeMsg
}
blockManagerInfo.values.foreach { bm => bm.slaveActor ! removeMsg }
}

private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
// TODO(aor): Consolidate usages of <driver>
val removeMsg = RemoveBroadcast(broadcastId)
blockManagerInfo.values
.filter { info => removeFromDriver || info.blockManagerId.executorId != "<driver>" }
.foreach { bm => bm.slaveActor ! removeMsg }
}

private def removeBlockManager(blockManagerId: BlockManagerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import akka.actor.ActorRef

private[storage] object BlockManagerMessages {

//////////////////////////////////////////////////////////////////////////////////
// Messages from the master to slaves.
//////////////////////////////////////////////////////////////////////////////////

sealed trait ToBlockManagerSlave

// Remove a block from the slaves that have it. This can only be used to remove
Expand All @@ -37,10 +39,15 @@ private[storage] object BlockManagerMessages {
// Remove all blocks belonging to a specific shuffle.
case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave

// Remove all blocks belonging to a specific broadcast.
case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
extends ToBlockManagerSlave


//////////////////////////////////////////////////////////////////////////////////
// Messages from slaves to the master.
//////////////////////////////////////////////////////////////////////////////////

sealed trait ToBlockManagerMaster

case class RegisterBlockManager(
Expand All @@ -57,8 +64,7 @@ private[storage] object BlockManagerMessages {
var storageLevel: StorageLevel,
var memSize: Long,
var diskSize: Long)
extends ToBlockManagerMaster
with Externalizable {
extends ToBlockManagerMaster with Externalizable {

def this() = this(null, null, null, 0, 0) // For deserialization only

Expand All @@ -80,7 +86,8 @@ private[storage] object BlockManagerMessages {
}

object UpdateBlockInfo {
def apply(blockManagerId: BlockManagerId,
def apply(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ class BlockManagerSlaveActor(
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
}

case RemoveBroadcast(broadcastId, _) =>
blockManager.removeBroadcast(broadcastId)
}
}
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,10 @@ private[spark] object Utils extends Logging {
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()

def parseHostPort(hostPort: String): (String, Int) = {
{
// Check cache first.
val cached = hostPortParseResults.get(hostPort)
if (cached != null) return cached
// Check cache first.
val cached = hostPortParseResults.get(hostPort)
if (cached != null) {
return cached
}

val indx: Int = hostPort.lastIndexOf(':')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkContext._
import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId}

class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {

Expand Down

0 comments on commit 544ac86

Please sign in to comment.