From 892b9520d828cfa7049e6ec70345b3502b139a8e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 18 Mar 2014 15:09:24 -0700 Subject: [PATCH] Removed use of BoundedHashMap, and made BlockManagerSlaveActor cleanup shuffle metadata in MapOutputTrackerWorker. --- .../org/apache/spark/ContextCleaner.scala | 19 +++- .../scala/org/apache/spark/Dependency.scala | 4 +- .../org/apache/spark/MapOutputTracker.scala | 106 +++++++++--------- .../scala/org/apache/spark/SparkEnv.scala | 25 +++-- .../main/scala/org/apache/spark/rdd/RDD.scala | 15 +-- .../apache/spark/scheduler/DAGScheduler.scala | 8 +- .../apache/spark/scheduler/ResultTask.scala | 10 +- .../spark/scheduler/ShuffleMapTask.scala | 12 +- .../apache/spark/storage/BlockManager.scala | 14 ++- .../storage/BlockManagerSlaveActor.scala | 6 +- .../spark/storage/DiskBlockManager.scala | 2 +- .../apache/spark/storage/ThreadingTest.scala | 5 +- .../apache/spark/ContextCleanerSuite.scala | 14 ++- .../apache/spark/MapOutputTrackerSuite.scala | 6 +- .../spark/storage/BlockManagerSuite.scala | 91 ++++++++++----- .../spark/storage/DiskBlockManagerSuite.scala | 4 +- .../spark/util/WrappedJavaHashMapSuite.scala | 2 + 17 files changed, 196 insertions(+), 147 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 7636c6cf64972..5d996ed34dff5 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -20,6 +20,7 @@ package org.apache.spark import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} +import org.apache.spark.storage.StorageLevel /** Listener class used for testing when any item has been cleaned by the Cleaner class */ private[spark] trait CleanerListener { @@ -61,19 +62,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** - * Clean RDD data. Do not perform any time or resource intensive + * Schedule cleanup of RDD data. Do not perform any time or resource intensive * computation in this function as this is called from a finalize() function. */ - def cleanRDD(rddId: Int) { + def scheduleRDDCleanup(rddId: Int) { enqueue(CleanRDD(rddId)) logDebug("Enqueued RDD " + rddId + " for cleaning up") } /** - * Clean shuffle data. Do not perform any time or resource intensive + * Schedule cleanup of shuffle data. Do not perform any time or resource intensive * computation in this function as this is called from a finalize() function. */ - def cleanShuffle(shuffleId: Int) { + def scheduleShuffleCleanup(shuffleId: Int) { enqueue(CleanShuffle(shuffleId)) logDebug("Enqueued shuffle " + shuffleId + " for cleaning up") } @@ -83,6 +84,13 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { listeners += listener } + /** Unpersists RDD and remove all blocks for it from memory and disk. */ + def unpersistRDD(rddId: Int, blocking: Boolean) { + logDebug("Unpersisted RDD " + rddId) + sc.env.blockManager.master.removeRdd(rddId, blocking) + sc.persistentRdds.remove(rddId) + } + /** * Enqueue a cleaning task. Do not perform any time or resource intensive * computation in this function as this is called from a finalize() function. @@ -115,8 +123,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private def doCleanRDD(rddId: Int) { try { logDebug("Cleaning RDD " + rddId) - blockManagerMaster.removeRdd(rddId, false) - sc.persistentRdds.remove(rddId) + unpersistRDD(rddId, false) listeners.foreach(_.rddCleaned(rddId)) logInfo("Cleaned RDD " + rddId) } catch { diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index d24d54576f77a..557d424d7a786 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -56,7 +56,7 @@ class ShuffleDependency[K, V]( override def finalize() { try { if (rdd != null) { - rdd.sparkContext.cleaner.cleanShuffle(shuffleId) + rdd.sparkContext.cleaner.scheduleShuffleCleanup(shuffleId) } } catch { case t: Throwable => @@ -64,7 +64,7 @@ class ShuffleDependency[K, V]( try { logError("Error in finalize", t) } catch { - case _ => + case _ : Throwable => System.err.println("Error in finalize (and could not write to logError): " + t) } } finally { diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index f37a9d41b2237..ffdf9115e1aae 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -20,7 +20,7 @@ package org.apache.spark import java.io._ import java.util.zip.{GZIPInputStream, GZIPOutputStream} -import scala.collection.mutable.{HashSet, Map} +import scala.collection.mutable.{HashSet, HashMap, Map} import scala.concurrent.Await import akka.actor._ @@ -34,6 +34,7 @@ private[spark] case class GetMapOutputStatuses(shuffleId: Int) extends MapOutputTrackerMessage private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage +/** Actor class for MapOutputTrackerMaster */ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster) extends Actor with Logging { def receive = { @@ -50,7 +51,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster } /** - * Class that keeps track of the location of the location of the map output of + * Class that keeps track of the location of the map output of * a stage. This is abstract because different versions of MapOutputTracker * (driver and worker) use different HashMap to store its metadata. */ @@ -58,20 +59,27 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging private val timeout = AkkaUtils.askTimeout(conf) - // Set to the MapOutputTrackerActor living on the driver + /** Set to the MapOutputTrackerActor living on the driver */ var trackerActor: ActorRef = _ /** This HashMap needs to have different storage behavior for driver and worker */ protected val mapStatuses: Map[Int, Array[MapStatus]] - // Incremented every time a fetch fails so that client nodes know to clear - // their cache of map output locations if this happens. + /** + * Incremented every time a fetch fails so that client nodes know to clear + * their cache of map output locations if this happens. + */ protected var epoch: Long = 0 protected val epochLock = new java.lang.Object - // Send a message to the trackerActor and get its result within a default timeout, or - // throw a SparkException if this fails. - private def askTracker(message: Any): Any = { + /** Remembers which map output locations are currently being fetched on a worker */ + private val fetching = new HashSet[Int] + + /** + * Send a message to the trackerActor and get its result within a default timeout, or + * throw a SparkException if this fails. + */ + protected def askTracker(message: Any): Any = { try { val future = trackerActor.ask(message)(timeout) Await.result(future, timeout) @@ -81,17 +89,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } - // Send a one-way message to the trackerActor, to which we expect it to reply with true. - private def communicate(message: Any) { + /** Send a one-way message to the trackerActor, to which we expect it to reply with true. */ + protected def sendTracker(message: Any) { if (askTracker(message) != true) { throw new SparkException("Error reply received from MapOutputTracker") } } - // Remembers which map output locations are currently being fetched on a worker - private val fetching = new HashSet[Int] - - // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle + /** + * Called from executors to get the server URIs and + * output sizes of the map outputs of a given shuffle + */ def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { @@ -150,22 +158,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } - def stop() { - communicate(StopMapOutputTracker) - mapStatuses.clear() - trackerActor = null - } - - // Called to get current epoch number + /** Called to get current epoch number */ def getEpoch: Long = { epochLock.synchronized { return epoch } } - // Called on workers to update the epoch number, potentially clearing old outputs - // because of a fetch failure. (Each worker task calls this with the latest epoch - // number on the master at the time it was created.) + /** + * Called from executors to update the epoch number, potentially clearing old outputs + * because of a fetch failure. Each worker task calls this with the latest epoch + * number on the master at the time it was created. + */ def updateEpoch(newEpoch: Long) { epochLock.synchronized { if (newEpoch > epoch) { @@ -175,24 +179,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } } -} -/** - * MapOutputTracker for the workers. This uses BoundedHashMap to keep track of - * a limited number of most recently used map output information. - */ -private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) { + /** Unregister shuffle data */ + def unregisterShuffle(shuffleId: Int) { + mapStatuses.remove(shuffleId) + } - /** - * Bounded HashMap for storing serialized statuses in the worker. This allows - * the HashMap stay bounded in memory-usage. Things dropped from this HashMap will be - * automatically repopulated by fetching them again from the driver. Its okay to - * keep the cache size small as it unlikely that there will be a very large number of - * stages active simultaneously in the worker. - */ - protected val mapStatuses = new BoundedHashMap[Int, Array[MapStatus]]( - conf.getInt("spark.mapOutputTracker.cacheSize", 100), true - ) + def stop() { + sendTracker(StopMapOutputTracker) + mapStatuses.clear() + trackerActor = null + } } /** @@ -202,7 +199,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr private[spark] class MapOutputTrackerMaster(conf: SparkConf) extends MapOutputTracker(conf) { - // Cache a serialized version of the output statuses for each shuffle to send them out faster + /** Cache a serialized version of the output statuses for each shuffle to send them out faster */ private var cacheEpoch = epoch /** @@ -211,7 +208,6 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) * by TTL-based cleaning (if set). Other than these two * scenarios, nothing should be dropped from this HashMap. */ - protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]() private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]() @@ -232,6 +228,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) } } + /** Register multiple map output information for the given shuffle */ def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) { mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses) if (changeEpoch) { @@ -239,6 +236,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) } } + /** Unregister map output information of the given shuffle, mapper and block manager */ def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) { val arrayOpt = mapStatuses.get(shuffleId) if (arrayOpt.isDefined && arrayOpt.get != null) { @@ -254,11 +252,17 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) } } - def unregisterShuffle(shuffleId: Int) { + /** Unregister shuffle data */ + override def unregisterShuffle(shuffleId: Int) { mapStatuses.remove(shuffleId) cachedSerializedStatuses.remove(shuffleId) } + /** Check if the given shuffle is being tracked */ + def containsShuffle(shuffleId: Int): Boolean = { + cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId) + } + def incrementEpoch() { epochLock.synchronized { epoch += 1 @@ -295,26 +299,26 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) bytes } - def contains(shuffleId: Int): Boolean = { - cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId) - } - override def stop() { super.stop() metadataCleaner.cancel() cachedSerializedStatuses.clear() } - override def updateEpoch(newEpoch: Long) { - // This might be called on the MapOutputTrackerMaster if we're running in local mode. - } - protected def cleanup(cleanupTime: Long) { mapStatuses.clearOldValues(cleanupTime) cachedSerializedStatuses.clearOldValues(cleanupTime) } } +/** + * MapOutputTracker for the workers, which fetches map output information from the driver's + * MapOutputTrackerMaster. + */ +private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) { + protected val mapStatuses = new HashMap[Int, Array[MapStatus]] +} + private[spark] object MapOutputTracker { private val LOG_BASE = 1.1 diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index fdfd00660377f..f636f6363b34b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -165,18 +165,6 @@ object SparkEnv extends Logging { } } - val blockManagerMaster = new BlockManagerMaster(registerOrLookup( - "BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf)), conf) - val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, - serializer, conf, securityManager) - - val connectionManager = blockManager.connectionManager - - val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) - - val cacheManager = new CacheManager(blockManager) - // Have to assign trackerActor after initialization as MapOutputTrackerActor // requires the MapOutputTracker itself val mapOutputTracker = if (isDriver) { @@ -188,6 +176,19 @@ object SparkEnv extends Logging { "MapOutputTracker", new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])) + val blockManagerMaster = new BlockManagerMaster(registerOrLookup( + "BlockManagerMaster", + new BlockManagerMasterActor(isLocal, conf)), conf) + + val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, + serializer, conf, securityManager, mapOutputTracker) + + val connectionManager = blockManager.connectionManager + + val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) + + val cacheManager = new CacheManager(blockManager) + val shuffleFetcher = instantiateClass[ShuffleFetcher]( "spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher") diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f2e20a108630a..a75bca42257d4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -165,8 +165,7 @@ abstract class RDD[T: ClassTag]( */ def unpersist(blocking: Boolean = true): RDD[T] = { logInfo("Removing RDD " + id + " from persistence list") - sc.env.blockManager.master.removeRdd(id, blocking) - sc.persistentRdds.remove(id) + sc.cleaner.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE this } @@ -1025,14 +1024,6 @@ abstract class RDD[T: ClassTag]( checkpointData.flatMap(_.getCheckpointFile) } - def cleanup() { - logInfo("Cleanup called on RDD " + id) - sc.cleaner.cleanRDD(id) - dependencies.filter(_.isInstanceOf[ShuffleDependency[_, _]]) - .map(_.asInstanceOf[ShuffleDependency[_, _]].shuffleId) - .foreach(sc.cleaner.cleanShuffle) - } - // ======================================================================= // Other internal methods and fields // ======================================================================= @@ -1114,14 +1105,14 @@ abstract class RDD[T: ClassTag]( override def finalize() { try { - cleanup() + sc.cleaner.scheduleRDDCleanup(id) } catch { case t: Throwable => // Paranoia - If logError throws error as well, report to stderr. try { logError("Error in finalize", t) } catch { - case _ => + case _ : Throwable => System.err.println("Error in finalize (and could not write to logError): " + t) } } finally { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1a5cd82571a08..253b19880c700 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -32,7 +32,6 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} -import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of @@ -154,7 +153,7 @@ class DAGScheduler( val running = new HashSet[Stage] // Stages we are running right now val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures // Missing tasks from each stage - val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] + val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] val activeJobs = new HashSet[ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob] @@ -266,7 +265,7 @@ class DAGScheduler( : Stage = { val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite) - if (mapOutputTracker.contains(shuffleDep.shuffleId)) { + if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) val locs = MapOutputTracker.deserializeMapStatuses(serLocs) for (i <- 0 until locs.size) { @@ -398,6 +397,9 @@ class DAGScheduler( stageIdToStage -= stageId stageIdToJobIds -= stageId + ShuffleMapTask.removeStage(stageId) + ResultTask.removeStage(stageId) + logDebug("After removal of stage %d, remaining stages = %d" .format(stageId, stageIdToStage.size)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 59fd630e0431a..083fb895d8696 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -20,17 +20,17 @@ package org.apache.spark.scheduler import java.io._ import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import scala.collection.mutable.HashMap + import org.apache.spark._ import org.apache.spark.rdd.{RDD, RDDCheckpointData} -import org.apache.spark.util.BoundedHashMap private[spark] object ResultTask { // A simple map between the stage id to the serialized byte array of a task. // Served as a cache for task serialization because serialization can be // expensive on the master node if it needs to launch thousands of tasks. - val MAX_CACHE_SIZE = 100 - val serializedInfoCache = new BoundedHashMap[Int, Array[Byte]](MAX_CACHE_SIZE, true) + private val serializedInfoCache = new HashMap[Int, Array[Byte]] def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = { @@ -63,6 +63,10 @@ private[spark] object ResultTask { (rdd, func) } + def removeStage(stageId: Int) { + serializedInfoCache.remove(stageId) + } + def clearCache() { synchronized { serializedInfoCache.clear() diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index df3a7b9ee37ad..bb2eda79ea249 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -17,24 +17,22 @@ package org.apache.spark.scheduler -import scala.collection.mutable.HashMap - import java.io._ import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import scala.collection.mutable.HashMap + import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.rdd.{RDD, RDDCheckpointData} import org.apache.spark.storage._ -import org.apache.spark.util.BoundedHashMap private[spark] object ShuffleMapTask { // A simple map between the stage id to the serialized byte array of a task. // Served as a cache for task serialization because serialization can be // expensive on the master node if it needs to launch thousands of tasks. - val MAX_CACHE_SIZE = 100 - val serializedInfoCache = new BoundedHashMap[Int, Array[Byte]](MAX_CACHE_SIZE, true) + private val serializedInfoCache = new HashMap[Int, Array[Byte]] def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = { synchronized { @@ -75,6 +73,10 @@ private[spark] object ShuffleMapTask { HashMap(set.toSeq: _*) } + def removeStage(stageId: Int) { + serializedInfoCache.remove(stageId) + } + def clearCache() { synchronized { serializedInfoCache.clear() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f2aff78914f96..091df41412f6c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props} import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} import sun.nio.ch.DirectBuffer -import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException, SecurityManager} +import org.apache.spark._ import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer @@ -48,8 +48,9 @@ private[spark] class BlockManager( val defaultSerializer: Serializer, maxMemory: Long, val conf: SparkConf, - securityManager: SecurityManager) - extends Logging { + securityManager: SecurityManager, + mapOutputTracker: MapOutputTracker + ) extends Logging { val shuffleBlockManager = new ShuffleBlockManager(this) val diskBlockManager = new DiskBlockManager(shuffleBlockManager, @@ -89,7 +90,7 @@ private[spark] class BlockManager( val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) - val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)), + val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this, mapOutputTracker)), name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next) // Pending reregistration action being executed asynchronously or null if none @@ -123,9 +124,10 @@ private[spark] class BlockManager( * Construct a BlockManager with a memory limit set based on system properties. */ def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster, - serializer: Serializer, conf: SparkConf, securityManager: SecurityManager) = { + serializer: Serializer, conf: SparkConf, securityManager: SecurityManager, + mapOutputTracker: MapOutputTracker) = { this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf, - securityManager) + securityManager, mapOutputTracker) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala index 9ff7aacec141a..dfc19591781d0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import akka.actor.Actor +import org.apache.spark.MapOutputTracker import org.apache.spark.storage.BlockManagerMessages._ /** @@ -26,7 +27,7 @@ import org.apache.spark.storage.BlockManagerMessages._ * this is used to remove blocks from the slave's BlockManager. */ private[storage] -class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor { +class BlockManagerSlaveActor(blockManager: BlockManager, mapOutputTracker: MapOutputTracker) extends Actor { override def receive = { case RemoveBlock(blockId) => @@ -38,5 +39,8 @@ class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor { case RemoveShuffle(shuffleId) => blockManager.shuffleBlockManager.removeShuffle(shuffleId) + if (mapOutputTracker != null) { + mapOutputTracker.unregisterShuffle(shuffleId) + } } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index cdee285a1cbd4..a57e6f710305a 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -91,7 +91,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD def getFile(blockId: BlockId): File = getFile(blockId.name) /** Check if disk block manager has a block */ - def contains(blockId: BlockId): Boolean = { + def containsBlock(blockId: BlockId): Boolean = { getBlockLocation(blockId).file.exists() } diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index 36f2a0fd02724..233754f6eddfd 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -22,9 +22,8 @@ import java.util.concurrent.ArrayBlockingQueue import akka.actor._ import util.Random -import org.apache.spark.SparkConf +import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SecurityManager} import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.{SecurityManager, SparkConf} /** * This class tests the BlockManager and MemoryStore for thread safety and @@ -100,7 +99,7 @@ private[spark] object ThreadingTest { actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) val blockManager = new BlockManager( "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf, - new SecurityManager(conf)) + new SecurityManager(conf), new MapOutputTrackerMaster(conf)) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) producers.foreach(_.start) diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index cb827b9e955a9..8556888c96e06 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -25,7 +25,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo val rdd = newRDD.persist() rdd.count() val tester = new CleanerTester(sc, rddIds = Seq(rdd.id)) - cleaner.cleanRDD(rdd.id) + cleaner.scheduleRDDCleanup(rdd.id) tester.assertCleanup } @@ -33,7 +33,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo val rdd = newShuffleRDD rdd.count() val tester = new CleanerTester(sc, shuffleIds = Seq(0)) - cleaner.cleanShuffle(0) + cleaner.scheduleShuffleCleanup(0) tester.assertCleanup } @@ -106,6 +106,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo postGCTester.assertCleanup } + // TODO (TD): Test that cleaned up RDD and shuffle can be recomputed again correctly. + def newRDD = sc.makeRDD(1 to 10) def newPairRDD = newRDD.map(_ -> 1) @@ -173,9 +175,9 @@ class CleanerTester(sc: SparkContext, rddIds: Seq[Int] = Nil, shuffleIds: Seq[In "One or more RDDs' blocks cannot be found in block manager, cannot start cleaner test") // Verify the shuffle ids are registered and blocks are present - assert(shuffleIds.forall(mapOutputTrackerMaster.contains), + assert(shuffleIds.forall(mapOutputTrackerMaster.containsShuffle), "One or more shuffles have not been registered cannot start cleaner test") - assert(shuffleIds.forall(shuffleId => diskBlockManager.contains(shuffleBlockId(shuffleId))), + assert(shuffleIds.forall(shuffleId => diskBlockManager.containsBlock(shuffleBlockId(shuffleId))), "One or more shuffles' blocks cannot be found in disk manager, cannot start cleaner test") } @@ -185,8 +187,8 @@ class CleanerTester(sc: SparkContext, rddIds: Seq[Int] = Nil, shuffleIds: Seq[In assert(rddIds.forall(rddId => !blockManager.master.contains(rddBlockId(rddId)))) // Verify all the shuffle have been deregistered and cleaned up - assert(shuffleIds.forall(!mapOutputTrackerMaster.contains(_))) - assert(shuffleIds.forall(shuffleId => !diskBlockManager.contains(shuffleBlockId(shuffleId)))) + assert(shuffleIds.forall(!mapOutputTrackerMaster.containsShuffle(_))) + assert(shuffleIds.forall(shuffleId => !diskBlockManager.containsBlock(shuffleBlockId(shuffleId)))) } private def uncleanedResourcesToString = { diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 9091ab9265465..9358099abbe24 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -60,7 +60,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val tracker = new MapOutputTrackerMaster(conf) tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.registerShuffle(10, 2) - assert(tracker.contains(10)) + assert(tracker.containsShuffle(10)) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) val size1000 = MapOutputTracker.decompressSize(compressedSize1000) @@ -86,10 +86,10 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { Array(compressedSize1000, compressedSize10000))) tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0), Array(compressedSize10000, compressedSize1000))) - assert(tracker.contains(10)) + assert(tracker.containsShuffle(10)) assert(tracker.getServerStatuses(10, 0).nonEmpty) tracker.unregisterShuffle(10) - assert(!tracker.contains(10)) + assert(!tracker.containsShuffle(10)) assert(tracker.getServerStatuses(10, 0).isEmpty) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 1036b9f34e9dd..197b1004990ce 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.concurrent.Timeouts._ import org.scalatest.matchers.ShouldMatchers._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} @@ -41,6 +41,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT var oldArch: String = null conf.set("spark.authenticate", "false") val securityMgr = new SecurityManager(conf) + val mapOutputTracker = new MapOutputTrackerMaster(conf) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test conf.set("spark.kryoserializer.buffer.mb", "1") @@ -128,7 +129,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("master + 1 manager interaction") { - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -158,9 +160,10 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("master + 2 managers interaction") { - store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf, - securityMgr) + securityMgr, mapOutputTracker) val peers = master.getPeers(store.blockManagerId, 1) assert(peers.size === 1, "master did not return the other manager as a peer") @@ -175,7 +178,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("removing block") { - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -223,7 +227,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("removing rdd") { - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -257,7 +262,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) @@ -273,7 +279,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("reregistration on block update") { - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -292,7 +299,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration doesn't dead lock") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager("", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = List(new Array[Byte](400)) @@ -329,7 +337,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -348,7 +357,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage with serialization") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -367,7 +377,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of same RDD") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -386,7 +397,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of multiple RDDs") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) @@ -409,7 +421,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("on-disk storage") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -422,7 +435,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -437,7 +451,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with getLocalBytes") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -452,7 +467,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -467,7 +483,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization and getLocalBytes") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -482,7 +499,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -507,7 +525,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -531,7 +550,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -577,7 +597,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("overly large block") { - store = new BlockManager("", actorSystem, master, serializer, 500, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 500, conf, + securityMgr, mapOutputTracker) store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) @@ -588,7 +609,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block compression") { try { conf.set("spark.shuffle.compress", "true") - store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, "shuffle_0_0_0 was not compressed") @@ -596,7 +618,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store = null conf.set("spark.shuffle.compress", "false") - store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000, "shuffle_0_0_0 was compressed") @@ -604,7 +627,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store = null conf.set("spark.broadcast.compress", "true") - store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100, "broadcast_0 was not compressed") @@ -612,28 +636,32 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store = null conf.set("spark.broadcast.compress", "false") - store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed") store.stop() store = null conf.set("spark.rdd.compress", "true") - store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed") store.stop() store = null conf.set("spark.rdd.compress", "false") - store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed") store.stop() store = null // Check that any other block types are also kept uncompressed - store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf, securityMgr) + store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf, + securityMgr, mapOutputTracker) store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") store.stop() @@ -648,7 +676,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block store put failure") { // Use Java serializer so we can create an unserializable error. store = new BlockManager("", actorSystem, master, new JavaSerializer(conf), 1200, conf, - securityMgr) + securityMgr, mapOutputTracker) // The put should fail since a1 is not serializable. class UnserializableClass @@ -664,7 +692,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") { - store = new BlockManager("", actorSystem, master, serializer, 1200, conf, securityMgr) + store = new BlockManager("", actorSystem, master, serializer, 1200, conf, + securityMgr, mapOutputTracker) store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY) // Access rdd_1_0 to ensure it's not least recently used. diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index d594d2bc06760..0dd34223787cd 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -59,9 +59,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { val newFile = diskBlockManager.getFile(blockId) writeToFile(newFile, 10) assertSegmentEquals(blockId, blockId.name, 0, 10) - assert(diskBlockManager.contains(blockId)) + assert(diskBlockManager.containsBlock(blockId)) newFile.delete() - assert(!diskBlockManager.contains(blockId)) + assert(!diskBlockManager.containsBlock(blockId)) } test("block appending") { diff --git a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala index f0a84064ab9fb..37c1f748a6f3d 100644 --- a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala @@ -107,6 +107,8 @@ class WrappedJavaHashMapSuite extends FunSuite { } assert(map.internalJavaMap.get("k1").weakValue.get == null) assert(map.get("k1") === None) + + // TODO (TD): Test clearing of null-value pairs } def testMap(hashMapConstructor: => Map[String, String]) {