From c5b1d986e89ce8e52cfc9ac25d44be4b8fc5a259 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Apr 2014 18:45:04 -0700 Subject: [PATCH] Address Patrick's comments --- .../org/apache/spark/ContextCleaner.scala | 3 -- .../org/apache/spark/MapOutputTracker.scala | 30 +++++++------- .../scala/org/apache/spark/SparkContext.scala | 3 +- .../apache/spark/broadcast/Broadcast.scala | 15 ++++--- .../spark/broadcast/HttpBroadcast.scala | 12 +----- .../spark/broadcast/TorrentBroadcast.scala | 7 +--- .../apache/spark/storage/BlockManager.scala | 4 +- .../spark/storage/BlockManagerMaster.scala | 9 +++-- .../storage/BlockManagerMasterActor.scala | 5 ++- .../storage/BlockManagerSlaveActor.scala | 39 +++++++++++++------ 10 files changed, 70 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index b71b7fa517fd2..7b1e2af1b824f 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -80,7 +80,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** Stop the cleaner. */ def stop() { stopped = true - cleaningThread.interrupt() } /** Register a RDD for cleanup when it is garbage collected. */ @@ -119,8 +118,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } } } catch { - case ie: InterruptedException => - if (!stopped) logWarning("Cleaning thread interrupted") case t: Throwable => logError("Error in cleaning thread", t) } } diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index c45c5c90048f3..ee82d9fa7874b 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -71,13 +71,18 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster * (driver and worker) use different HashMap to store its metadata. */ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging { - private val timeout = AkkaUtils.askTimeout(conf) - /** Set to the MapOutputTrackerActor living on the driver */ + /** Set to the MapOutputTrackerActor living on the driver. */ var trackerActor: ActorRef = _ - /** This HashMap needs to have different storage behavior for driver and worker */ + /** + * This HashMap has different behavior for the master and the workers. + * + * On the master, it serves as the source of map outputs recorded from ShuffleMapTasks. + * On the workers, it simply serves as a cache, in which a miss triggers a fetch from the + * master's corresponding HashMap. + */ protected val mapStatuses: Map[Int, Array[MapStatus]] /** @@ -87,7 +92,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging protected var epoch: Long = 0 protected val epochLock = new AnyRef - /** Remembers which map output locations are currently being fetched on a worker */ + /** Remembers which map output locations are currently being fetched on a worker. */ private val fetching = new HashSet[Int] /** @@ -173,7 +178,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } - /** Called to get current epoch number */ + /** Called to get current epoch number. */ def getEpoch: Long = { epochLock.synchronized { return epoch @@ -195,16 +200,13 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } - /** Unregister shuffle data */ + /** Unregister shuffle data. */ def unregisterShuffle(shuffleId: Int) { mapStatuses.remove(shuffleId) } - def stop() { - sendTracker(StopMapOutputTracker) - mapStatuses.clear() - trackerActor = null - } + /** Stop the tracker. */ + def stop() { } } /** @@ -219,7 +221,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) /** * Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master, - * so that statuses are dropped only by explicit deregistering or by TTL-based cleaning (if set). + * so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set). * Other than these two scenarios, nothing should be dropped from this HashMap. */ protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]() @@ -314,7 +316,9 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) } override def stop() { - super.stop() + sendTracker(StopMapOutputTracker) + mapStatuses.clear() + trackerActor = null metadataCleaner.cancel() cachedSerializedStatuses.clear() } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 13fba1e0dfe5d..316b9f0ed8a04 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.mesos.MesosNativeLibrary +import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ @@ -643,7 +644,7 @@ class SparkContext( * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. * The variable will be sent to each cluster only once. */ - def broadcast[T](value: T) = { + def broadcast[T](value: T): Broadcast[T] = { val bc = env.broadcastManager.newBroadcast[T](value, isLocal) cleaner.registerBroadcastForCleanup(bc) bc diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index b28e15a6840d9..e8a97d1754901 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -62,16 +62,21 @@ abstract class Broadcast[T](val id: Long) extends Serializable { def value: T /** - * Remove all persisted state associated with this broadcast on the executors. The next use - * of this broadcast on the executors will trigger a remote fetch. + * Delete cached copies of this broadcast on the executors. If the broadcast is used after + * this is called, it will need to be re-sent to each executor. */ def unpersist() /** - * Remove all persisted state associated with this broadcast on both the executors and the - * driver. Overriding implementations should set isValid to false. + * Remove all persisted state associated with this broadcast on both the executors and + * the driver. */ - private[spark] def destroy() + private[spark] def destroy() { + _isValid = false + onDestroy() + } + + protected def onDestroy() /** * If this broadcast is no longer valid, throw an exception. diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index ec5acf5f23f5f..f4e2e222f4984 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -54,12 +54,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea HttpBroadcast.unpersist(id, removeFromDriver = false) } - /** - * Remove all persisted state associated with this HTTP Broadcast on both the executors - * and the driver. - */ - private[spark] def destroy() { - _isValid = false + protected def onDestroy() { HttpBroadcast.unpersist(id, removeFromDriver = true) } @@ -91,7 +86,6 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea private[spark] object HttpBroadcast extends Logging { private var initialized = false - private var broadcastDir: File = null private var compress: Boolean = false private var bufferSize: Int = 65536 @@ -101,11 +95,9 @@ private[spark] object HttpBroadcast extends Logging { // TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist private val files = new TimeStampedHashSet[String] - private var cleaner: MetadataCleaner = null - private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt - private var compressionCodec: CompressionCodec = null + private var cleaner: MetadataCleaner = null def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { synchronized { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 590caa9699dd3..73eeedb8d1f63 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -57,12 +57,7 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo TorrentBroadcast.unpersist(id, removeFromDriver = false) } - /** - * Remove all persisted state associated with this Torrent broadcast on both the executors - * and the driver. - */ - private[spark] def destroy() { - _isValid = false + protected def onDestroy() { TorrentBroadcast.unpersist(id, removeFromDriver = true) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 925cee1eb6be7..616d24ccd8b6e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -829,12 +829,12 @@ private[spark] class BlockManager( /** * Remove all blocks belonging to the given broadcast. */ - def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) { + def removeBroadcast(broadcastId: Long, tellMaster: Boolean) { logInfo("Removing broadcast " + broadcastId) val blocksToRemove = blockInfo.keys.collect { case bid @ BroadcastBlockId(`broadcastId`, _) => bid } - blocksToRemove.foreach { blockId => removeBlock(blockId, removeFromDriver) } + blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) } } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 4e45bb8452fd8..73074e2188e65 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -109,7 +109,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log /** Remove all blocks belonging to the given RDD. */ def removeRdd(rddId: Int, blocking: Boolean) { val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) - future onFailure { + future.onFailure { case e: Throwable => logError("Failed to remove RDD " + rddId, e) } if (blocking) { @@ -117,12 +117,12 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log } } - /** Remove all blocks belonging to the given shuffle. */ + /** Remove all blocks belonging to the given shuffle asynchronously. */ def removeShuffle(shuffleId: Int) { askDriverWithReply(RemoveShuffle(shuffleId)) } - /** Remove all blocks belonging to the given broadcast. */ + /** Remove all blocks belonging to the given broadcast asynchronously. */ def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean) { askDriverWithReply(RemoveBroadcast(broadcastId, removeFromMaster)) } @@ -142,7 +142,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log } /** - * Return the block's status on all block managers, if any. + * Return the block's status on all block managers, if any. This can potentially be an + * expensive operation and is used mainly for testing. * * If askSlaves is true, this invokes the master to query each block manager for the most * updated block statuses. This is useful when the master is not informed of the given block diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 4159fc733a566..3b63bf3f3774d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -168,7 +168,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus */ private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean) { // TODO: Consolidate usages of - val removeMsg = RemoveBroadcast(broadcastId) + val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver) blockManagerInfo.values .filter { info => removeFromDriver || info.blockManagerId.executorId != "" } .foreach { bm => bm.slaveActor ! removeMsg } @@ -255,7 +255,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } /** - * Return the block's status for all block managers, if any. + * Return the block's status for all block managers, if any. This can potentially be an + * expensive operation and is used mainly for testing. * * If askSlaves is true, the master queries each block manager for the most updated block * statuses. This is useful when the master is not informed of the given block by all block diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala index 016ade428c68f..2396ca49a7d3f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala @@ -17,9 +17,11 @@ package org.apache.spark.storage +import scala.concurrent.Future + import akka.actor.Actor -import org.apache.spark.MapOutputTracker +import org.apache.spark.{Logging, MapOutputTracker} import org.apache.spark.storage.BlockManagerMessages._ /** @@ -30,25 +32,40 @@ private[storage] class BlockManagerSlaveActor( blockManager: BlockManager, mapOutputTracker: MapOutputTracker) - extends Actor { + extends Actor with Logging { - override def receive = { + import context.dispatcher + // Operations that involve removing blocks may be slow and should be done asynchronously + override def receive = { case RemoveBlock(blockId) => - blockManager.removeBlock(blockId) + val removeBlock = Future { blockManager.removeBlock(blockId) } + removeBlock.onFailure { case t: Throwable => + logError("Error in removing block " + blockId, t) + } case RemoveRdd(rddId) => - val numBlocksRemoved = blockManager.removeRdd(rddId) - sender ! numBlocksRemoved + val removeRdd = Future { sender ! blockManager.removeRdd(rddId) } + removeRdd.onFailure { case t: Throwable => + logError("Error in removing RDD " + rddId, t) + } case RemoveShuffle(shuffleId) => - blockManager.shuffleBlockManager.removeShuffle(shuffleId) - if (mapOutputTracker != null) { - mapOutputTracker.unregisterShuffle(shuffleId) + val removeShuffle = Future { + blockManager.shuffleBlockManager.removeShuffle(shuffleId) + if (mapOutputTracker != null) { + mapOutputTracker.unregisterShuffle(shuffleId) + } + } + removeShuffle.onFailure { case t: Throwable => + logError("Error in removing shuffle " + shuffleId, t) } - case RemoveBroadcast(broadcastId, removeFromDriver) => - blockManager.removeBroadcast(broadcastId, removeFromDriver) + case RemoveBroadcast(broadcastId, tellMaster) => + val removeBroadcast = Future { blockManager.removeBroadcast(broadcastId, tellMaster) } + removeBroadcast.onFailure { case t: Throwable => + logError("Error in removing broadcast " + broadcastId, t) + } case GetBlockStatus(blockId, _) => sender ! blockManager.getStatus(blockId)