Skip to content

Commit

Permalink
Address Patrick's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 4, 2014
1 parent a6460d4 commit c5b1d98
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 57 deletions.
3 changes: 0 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
/** Stop the cleaner. */
def stop() {
stopped = true
cleaningThread.interrupt()
}

/** Register a RDD for cleanup when it is garbage collected. */
Expand Down Expand Up @@ -119,8 +118,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}
} catch {
case ie: InterruptedException =>
if (!stopped) logWarning("Cleaning thread interrupted")
case t: Throwable => logError("Error in cleaning thread", t)
}
}
Expand Down
30 changes: 17 additions & 13 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,18 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
* (driver and worker) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {

private val timeout = AkkaUtils.askTimeout(conf)

/** Set to the MapOutputTrackerActor living on the driver */
/** Set to the MapOutputTrackerActor living on the driver. */
var trackerActor: ActorRef = _

/** This HashMap needs to have different storage behavior for driver and worker */
/**
* This HashMap has different behavior for the master and the workers.
*
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
* master's corresponding HashMap.
*/
protected val mapStatuses: Map[Int, Array[MapStatus]]

/**
Expand All @@ -87,7 +92,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
protected var epoch: Long = 0
protected val epochLock = new AnyRef

/** Remembers which map output locations are currently being fetched on a worker */
/** Remembers which map output locations are currently being fetched on a worker. */
private val fetching = new HashSet[Int]

/**
Expand Down Expand Up @@ -173,7 +178,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}

/** Called to get current epoch number */
/** Called to get current epoch number. */
def getEpoch: Long = {
epochLock.synchronized {
return epoch
Expand All @@ -195,16 +200,13 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}

/** Unregister shuffle data */
/** Unregister shuffle data. */
def unregisterShuffle(shuffleId: Int) {
mapStatuses.remove(shuffleId)
}

def stop() {
sendTracker(StopMapOutputTracker)
mapStatuses.clear()
trackerActor = null
}
/** Stop the tracker. */
def stop() { }
}

/**
Expand All @@ -219,7 +221,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)

/**
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master,
* so that statuses are dropped only by explicit deregistering or by TTL-based cleaning (if set).
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
* Other than these two scenarios, nothing should be dropped from this HashMap.
*/
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
Expand Down Expand Up @@ -314,7 +316,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
}

override def stop() {
super.stop()
sendTracker(StopMapOutputTracker)
mapStatuses.clear()
trackerActor = null
metadataCleaner.cancel()
cachedSerializedStatuses.clear()
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand Down Expand Up @@ -643,7 +644,7 @@ class SparkContext(
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T) = {
def broadcast[T](value: T): Broadcast[T] = {
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
cleaner.registerBroadcastForCleanup(bc)
bc
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,21 @@ abstract class Broadcast[T](val id: Long) extends Serializable {
def value: T

/**
* Remove all persisted state associated with this broadcast on the executors. The next use
* of this broadcast on the executors will trigger a remote fetch.
* Delete cached copies of this broadcast on the executors. If the broadcast is used after
* this is called, it will need to be re-sent to each executor.
*/
def unpersist()

/**
* Remove all persisted state associated with this broadcast on both the executors and the
* driver. Overriding implementations should set isValid to false.
* Remove all persisted state associated with this broadcast on both the executors and
* the driver.
*/
private[spark] def destroy()
private[spark] def destroy() {
_isValid = false
onDestroy()
}

protected def onDestroy()

/**
* If this broadcast is no longer valid, throw an exception.
Expand Down
12 changes: 2 additions & 10 deletions core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
HttpBroadcast.unpersist(id, removeFromDriver = false)
}

/**
* Remove all persisted state associated with this HTTP Broadcast on both the executors
* and the driver.
*/
private[spark] def destroy() {
_isValid = false
protected def onDestroy() {
HttpBroadcast.unpersist(id, removeFromDriver = true)
}

Expand Down Expand Up @@ -91,7 +86,6 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea

private[spark] object HttpBroadcast extends Logging {
private var initialized = false

private var broadcastDir: File = null
private var compress: Boolean = false
private var bufferSize: Int = 65536
Expand All @@ -101,11 +95,9 @@ private[spark] object HttpBroadcast extends Logging {

// TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
private val files = new TimeStampedHashSet[String]
private var cleaner: MetadataCleaner = null

private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt

private var compressionCodec: CompressionCodec = null
private var cleaner: MetadataCleaner = null

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
TorrentBroadcast.unpersist(id, removeFromDriver = false)
}

/**
* Remove all persisted state associated with this Torrent broadcast on both the executors
* and the driver.
*/
private[spark] def destroy() {
_isValid = false
protected def onDestroy() {
TorrentBroadcast.unpersist(id, removeFromDriver = true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,12 +829,12 @@ private[spark] class BlockManager(
/**
* Remove all blocks belonging to the given broadcast.
*/
def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
def removeBroadcast(broadcastId: Long, tellMaster: Boolean) {
logInfo("Removing broadcast " + broadcastId)
val blocksToRemove = blockInfo.keys.collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
blocksToRemove.foreach { blockId => removeBlock(blockId, removeFromDriver) }
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,20 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
/** Remove all blocks belonging to the given RDD. */
def removeRdd(rddId: Int, blocking: Boolean) {
val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
future onFailure {
future.onFailure {
case e: Throwable => logError("Failed to remove RDD " + rddId, e)
}
if (blocking) {
Await.result(future, timeout)
}
}

/** Remove all blocks belonging to the given shuffle. */
/** Remove all blocks belonging to the given shuffle asynchronously. */
def removeShuffle(shuffleId: Int) {
askDriverWithReply(RemoveShuffle(shuffleId))
}

/** Remove all blocks belonging to the given broadcast. */
/** Remove all blocks belonging to the given broadcast asynchronously. */
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean) {
askDriverWithReply(RemoveBroadcast(broadcastId, removeFromMaster))
}
Expand All @@ -142,7 +142,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
}

/**
* Return the block's status on all block managers, if any.
* Return the block's status on all block managers, if any. This can potentially be an
* expensive operation and is used mainly for testing.
*
* If askSlaves is true, this invokes the master to query each block manager for the most
* updated block statuses. This is useful when the master is not informed of the given block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
*/
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) {
// TODO: Consolidate usages of <driver>
val removeMsg = RemoveBroadcast(broadcastId)
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
blockManagerInfo.values
.filter { info => removeFromDriver || info.blockManagerId.executorId != "<driver>" }
.foreach { bm => bm.slaveActor ! removeMsg }
Expand Down Expand Up @@ -255,7 +255,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}

/**
* Return the block's status for all block managers, if any.
* Return the block's status for all block managers, if any. This can potentially be an
* expensive operation and is used mainly for testing.
*
* If askSlaves is true, the master queries each block manager for the most updated block
* statuses. This is useful when the master is not informed of the given block by all block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.storage

import scala.concurrent.Future

import akka.actor.Actor

import org.apache.spark.MapOutputTracker
import org.apache.spark.{Logging, MapOutputTracker}
import org.apache.spark.storage.BlockManagerMessages._

/**
Expand All @@ -30,25 +32,40 @@ private[storage]
class BlockManagerSlaveActor(
blockManager: BlockManager,
mapOutputTracker: MapOutputTracker)
extends Actor {
extends Actor with Logging {

override def receive = {
import context.dispatcher

// Operations that involve removing blocks may be slow and should be done asynchronously
override def receive = {
case RemoveBlock(blockId) =>
blockManager.removeBlock(blockId)
val removeBlock = Future { blockManager.removeBlock(blockId) }
removeBlock.onFailure { case t: Throwable =>
logError("Error in removing block " + blockId, t)
}

case RemoveRdd(rddId) =>
val numBlocksRemoved = blockManager.removeRdd(rddId)
sender ! numBlocksRemoved
val removeRdd = Future { sender ! blockManager.removeRdd(rddId) }
removeRdd.onFailure { case t: Throwable =>
logError("Error in removing RDD " + rddId, t)
}

case RemoveShuffle(shuffleId) =>
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
val removeShuffle = Future {
blockManager.shuffleBlockManager.removeShuffle(shuffleId)
if (mapOutputTracker != null) {
mapOutputTracker.unregisterShuffle(shuffleId)
}
}
removeShuffle.onFailure { case t: Throwable =>
logError("Error in removing shuffle " + shuffleId, t)
}

case RemoveBroadcast(broadcastId, removeFromDriver) =>
blockManager.removeBroadcast(broadcastId, removeFromDriver)
case RemoveBroadcast(broadcastId, tellMaster) =>
val removeBroadcast = Future { blockManager.removeBroadcast(broadcastId, tellMaster) }
removeBroadcast.onFailure { case t: Throwable =>
logError("Error in removing broadcast " + broadcastId, t)
}

case GetBlockStatus(blockId, _) =>
sender ! blockManager.getStatus(blockId)
Expand Down

0 comments on commit c5b1d98

Please sign in to comment.