From b66b6b0ab3077ae2e1fcce331b8efe42fb28c8c5 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 31 Jul 2014 19:21:58 -0700 Subject: [PATCH] Use more efficient underlying data structures for blocks Previously we were still linearly traversing all the blocks held by each storage status. Now we index by the RDD ID and return only the blocks of interest to us. --- .../spark/storage/StorageStatusListener.scala | 2 +- .../apache/spark/storage/StorageUtils.scala | 144 +++++++++++------- .../apache/spark/ui/exec/ExecutorsPage.scala | 2 +- .../org/apache/spark/ui/storage/RDDPage.scala | 13 +- .../storage/StorageStatusListenerSuite.scala | 72 ++++----- .../apache/spark/storage/StorageSuite.scala | 134 ++++++++-------- 6 files changed, 192 insertions(+), 175 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 7b75afddef68d..3966c33e3fb92 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -50,7 +50,7 @@ class StorageStatusListener extends SparkListener { /** Update storage status list to reflect the removal of an RDD from the cache */ private def updateStorageStatus(unpersistedRDDId: Int) { storageStatusList.foreach { storageStatus => - storageStatus.rddBlocks(unpersistedRDDId).foreach { case (blockId, _) => + storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) => storageStatus.removeBlock(blockId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 537779b7f22f6..e1f2ae2f3457c 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -20,7 +20,6 @@ package org.apache.spark.storage import scala.collection.Map import scala.collection.mutable -import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi /** @@ -31,9 +30,24 @@ import org.apache.spark.annotation.DeveloperApi @DeveloperApi class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { - // This should not be mutated directly, but through the add/update/removeBlock methods - private val _blocks = new mutable.HashMap[BlockId, BlockStatus] - private val _rddIds = new mutable.HashSet[Int] + /** + * Internal representation of the blocks stored in this block manager. + * + * Common consumption patterns of these blocks include + * (1) selecting all blocks, + * (2) selecting only RDD blocks or, + * (3) selecting only the blocks that belong to a specific RDD + * + * If we are only interested in a fraction of the blocks, as in (2) and (3), we should avoid + * linearly scanning through all the blocks, which could be expensive if there are thousands + * of blocks on each block manager. We achieve this by storing RDD blocks and non-RDD blocks + * separately. In particular, RDD blocks are stored in a map indexed by RDD IDs, so we can + * filter out the blocks of interest quickly. + * + * These collections should only be mutated through the add/update/removeBlock methods. + */ + private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]] + private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus] /** * Instantiate a StorageStatus with the given initial blocks. This essentially makes a copy of @@ -44,67 +58,94 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { initialBlocks.foreach { case (blockId, blockStatus) => addBlock(blockId, blockStatus) } } - /** Return the blocks stored in this block manager as a mapping from ID to status. */ - def blocks: Map[BlockId, BlockStatus] = _blocks + /** Return the blocks stored in this block manager. */ + def blocks: Seq[(BlockId, BlockStatus)] = { + _nonRddBlocks.toSeq ++ rddBlocks.toSeq + } + + /** Return the RDD blocks stored in this block manager. */ + def rddBlocks: Seq[(BlockId, BlockStatus)] = { + _rddBlocks.flatMap { case (_, blocks) => blocks }.toSeq + } + + /** Return the blocks that belong to the given RDD stored in this block manager. */ + def rddBlocksById(rddId: Int): Seq[(BlockId, BlockStatus)] = { + _rddBlocks.get(rddId).map(_.toSeq).getOrElse(Seq.empty) + } - /** Add the given block, keeping track of the RDD ID if this is an RDD block. */ + /** Add the given block to this storage status. */ def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = { blockId match { - case RDDBlockId(rddId, _) => _rddIds.add(rddId) + case RDDBlockId(rddId, _) => + _rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) = blockStatus case _ => + _nonRddBlocks(blockId) = blockStatus } - _blocks(blockId) = blockStatus } - /** Update the given block, keeping track of the RDD ID if this is an RDD block. */ + /** Update the given block in this storage status. If it doesn't already exist, add it. */ def updateBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = addBlock(blockId, blockStatus) - /** Remove the given block, keeping track of the RDD ID if this is an RDD block. */ + /** Remove the given block from this storage status. */ def removeBlock(blockId: BlockId): Option[BlockStatus] = { - val removed = _blocks.remove(blockId) blockId match { case RDDBlockId(rddId, _) => - if (rddBlocks(rddId).isEmpty) { - _rddIds.remove(rddId) + if (_rddBlocks.contains(rddId)) { + val removed = _rddBlocks(rddId).remove(blockId) + // If the given RDD has no more blocks left, remove the RDD + if (_rddBlocks(rddId).isEmpty) { + _rddBlocks.remove(rddId) + } + removed + } else { + None } case _ => + _nonRddBlocks.remove(blockId) } - removed } - /** Return the IDs of the RDDs which have blocks stored in this block manager. */ - def rddIds: Seq[Int] = _rddIds.toSeq - - /** Return the RDD blocks stored in this block manager as a mapping from ID to status. */ - def rddBlocks: Map[RDDBlockId, BlockStatus] = - blocks.filterKeys(_.isInstanceOf[RDDBlockId]).asInstanceOf[Map[RDDBlockId, BlockStatus]] + /** + * Return whether the given block is stored in this block manager in O(1) time. + * Note that the alternative of doing this through `blocks` is O(blocks), which is much slower. + */ + def containsBlock(blockId: BlockId): Boolean = { + blockId match { + case RDDBlockId(rddId, _) => + _rddBlocks.get(rddId).filter(_.contains(blockId)).isDefined + case _ => + _nonRddBlocks.contains(blockId) + } + } /** - * Return the RDD blocks with the given RDD ID stored in this block manager as a mapping - * from ID to status. + * Return the number of blocks in O(R) time, where R is the number of distinct RDD IDs. + * Note that the alternative of doing this through `blocks` is O(blocks), which is much slower. */ - def rddBlocks(rddId: Int): Map[RDDBlockId, BlockStatus] = rddBlocks.filterKeys(_.rddId == rddId) + def numBlocks: Int = { + _nonRddBlocks.size + _rddBlocks.values.map(_.size).reduceOption(_ + _).getOrElse(0) + } /** Return the memory used by this block manager. */ - def memUsed: Long = memUsed(blocks.values) + def memUsed: Long = memUsed(blocks) /** Return the memory used by the given RDD in this block manager. */ - def memUsedByRDD(rddId: Int): Long = memUsed(rddBlocks(rddId).values) + def memUsedByRDD(rddId: Int): Long = memUsed(rddBlocksById(rddId)) /** Return the memory remaining in this block manager. */ def memRemaining: Long = maxMem - memUsed /** Return the disk space used by this block manager. */ - def diskUsed: Long = diskUsed(blocks.values) + def diskUsed: Long = diskUsed(blocks) /** Return the disk space used by the given RDD in this block manager. */ - def diskUsedByRDD(rddId: Int): Long = diskUsed(rddBlocks(rddId).values) + def diskUsedByRDD(rddId: Int): Long = diskUsed(rddBlocksById(rddId)) // Helper methods for computing memory and disk usages - private def memUsed(statuses: Iterable[BlockStatus]): Long = - statuses.map(_.memSize).reduceOption(_ + _).getOrElse(0L) - private def diskUsed(statuses: Iterable[BlockStatus]): Long = - statuses.map(_.diskSize).reduceOption(_ + _).getOrElse(0L) + private def memUsed(_blocks: Seq[(BlockId, BlockStatus)]): Long = + _blocks.map { case (_, s) => s.memSize }.reduceOption(_ + _).getOrElse(0L) + private def diskUsed(_blocks: Seq[(BlockId, BlockStatus)]): Long = + _blocks.map { case (_, s) => s.diskSize }.reduceOption(_ + _).getOrElse(0L) } /** Helper methods for storage-related objects. */ @@ -123,18 +164,13 @@ private[spark] object StorageUtils { val rddId = rddInfo.id // Collect all block statuses that belong to the given RDD - val newBlocks = updatedBlocks.filter { case (b, _) => - b.asRDDId.filter(_.rddId == rddId).isDefined + val newBlocks = updatedBlocks.filter { case (bid, _) => + bid.asRDDId.filter(_.rddId == rddId).isDefined } val newBlockIds = newBlocks.map { case (bid, _) => bid }.toSet - val oldBlocks = storageStatuses.flatMap { s => - if (s.rddIds.contains(rddId)) { - // If the block is being updated, leave it out here in favor of the new status - s.rddBlocks(rddId).filterKeys { bid => !newBlockIds.contains(bid) } - } else { - Seq.empty - } - } + val oldBlocks = storageStatuses + .flatMap(_.rddBlocksById(rddId)) + .filter { case (bid, _) => !newBlockIds.contains(bid) } // avoid double counting val blocks = (oldBlocks ++ newBlocks).map { case (_, bstatus) => bstatus } val persistedBlocks = blocks.filter(_.isCached) @@ -151,11 +187,15 @@ private[spark] object StorageUtils { } } - /** Return a mapping from block ID to the locations of the associated block. */ - def getBlockLocations(storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = { + /** + * Return mapping from block ID to its locations for each block that belongs to the given RDD. + */ + def getRDDBlockLocations( + storageStatuses: Seq[StorageStatus], + rddId: Int): Map[BlockId, Seq[String]] = { val blockLocations = new mutable.HashMap[BlockId, mutable.ListBuffer[String]] storageStatuses.foreach { status => - status.blocks.foreach { case (bid, _) => + status.rddBlocksById(rddId).foreach { case (bid, _) => val location = status.blockManagerId.hostPort blockLocations.getOrElseUpdate(bid, mutable.ListBuffer.empty) += location } @@ -163,18 +203,4 @@ private[spark] object StorageUtils { blockLocations } - /** - * Return a filtered list of storage statuses in which the only blocks remaining are the ones - * that belong to given RDD. - */ - def filterByRDD(storageStatuses: Seq[StorageStatus], rddId: Int): Seq[StorageStatus] = { - storageStatuses - .filter(_.rddIds.contains(rddId)) - .map { status => - new StorageStatus( - status.blockManagerId, - status.maxMem, - status.rddBlocks(rddId).asInstanceOf[Map[BlockId, BlockStatus]]) - } - } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 46d4d68efbf23..cc8ef9f1b6126 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -145,7 +145,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort - val rddBlocks = status.blocks.size + val rddBlocks = status.numBlocks val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index c3d0aa3d364ad..2ae835e663635 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -45,12 +45,13 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers) // Block table - val filteredStorageStatusList = StorageUtils.filterByRDD(storageStatusList, rddId) - val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).sortWith(_._1.name < _._1.name) - val blockLocations = StorageUtils.getBlockLocations(filteredStorageStatusList) - val blocks = blockStatuses.map { case (blockId, status) => - (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) - } + val blockLocations = StorageUtils.getRDDBlockLocations(storageStatusList, rddId) + val blocks = storageStatusList + .flatMap(_.rddBlocksById(rddId)) + .sortWith(_._1.name < _._1.name) + .map { case (blockId, status) => + (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown"))) + } val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks) val content = diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 2179c6dd3302e..51fb646a3cb61 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -41,13 +41,13 @@ class StorageStatusListenerSuite extends FunSuite { assert(listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1) assert(listener.executorIdToStorageStatus("big").maxMem === 1000L) - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) assert(listener.executorIdToStorageStatus.size === 2) assert(listener.executorIdToStorageStatus.get("fat").isDefined) assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2) assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) // Block manager remove listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1)) @@ -67,14 +67,14 @@ class StorageStatusListenerSuite extends FunSuite { val taskMetrics = new TaskMetrics // Task end with no updated blocks - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics)) - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics)) - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) } test("task end with updated blocks") { @@ -90,20 +90,20 @@ class StorageStatusListenerSuite extends FunSuite { taskMetrics2.updatedBlocks = Some(Seq(block3)) // Task end with new blocks - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 2) - assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 2) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 2) - assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) - assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + assert(listener.executorIdToStorageStatus("big").numBlocks === 2) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 1) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0))) // Task end with dropped blocks val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) @@ -112,17 +112,17 @@ class StorageStatusListenerSuite extends FunSuite { taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3)) taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3)) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 1) - assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) - assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) - assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + assert(listener.executorIdToStorageStatus("big").numBlocks === 1) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 1) + assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0))) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 1) - assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) - assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) - assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 1) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) + assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) } test("unpersist RDD") { @@ -137,16 +137,16 @@ class StorageStatusListenerSuite extends FunSuite { taskMetrics2.updatedBlocks = Some(Seq(block3)) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + assert(listener.executorIdToStorageStatus("big").numBlocks === 3) // Unpersist RDD listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + assert(listener.executorIdToStorageStatus("big").numBlocks === 3) listener.onUnpersistRDD(SparkListenerUnpersistRDD(4)) - assert(listener.executorIdToStorageStatus("big").blocks.size === 2) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) - assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("big").numBlocks === 2) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2))) listener.onUnpersistRDD(SparkListenerUnpersistRDD(1)) - assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("big").numBlocks === 0) } } diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala index fce45462655f4..50924e4ce9ee8 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala @@ -48,11 +48,10 @@ class StorageSuite extends FunSuite { Seq(info0, info1) } - test("storage status add/remove blocks") { + test("storage status add/remove non-RDD blocks") { val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L) assert(status.blocks.isEmpty) assert(status.rddBlocks.isEmpty) - assert(status.rddIds.isEmpty) assert(status.memUsed === 0) assert(status.memRemaining === 1000L) assert(status.diskUsed === 0) @@ -63,7 +62,6 @@ class StorageSuite extends FunSuite { status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L, 0L)) assert(status.blocks.size === 3) assert(status.rddBlocks.isEmpty) - assert(status.rddIds.isEmpty) assert(status.memUsed === 30L) assert(status.memRemaining === 970L) assert(status.diskUsed === 60L) @@ -73,7 +71,6 @@ class StorageSuite extends FunSuite { status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L, 0L)) assert(status.blocks.size === 3) assert(status.rddBlocks.isEmpty) - assert(status.rddIds.isEmpty) assert(status.memUsed === 160L) assert(status.memRemaining === 840L) assert(status.diskUsed === 140L) @@ -83,7 +80,6 @@ class StorageSuite extends FunSuite { status.removeBlock(TestBlockId("faa")) assert(status.blocks.size === 1) assert(status.rddBlocks.isEmpty) - assert(status.rddIds.isEmpty) assert(status.memUsed === 100L) assert(status.memRemaining === 900L) assert(status.diskUsed === 20L) @@ -93,7 +89,6 @@ class StorageSuite extends FunSuite { val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L) assert(status.blocks.isEmpty) assert(status.rddBlocks.isEmpty) - assert(status.rddIds.isEmpty) // Add a few blocks status.addBlock(TestBlockId("DANGER"), BlockStatus(memAndDisk, 10L, 20L, 0L)) @@ -105,16 +100,15 @@ class StorageSuite extends FunSuite { status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L, 0L)) assert(status.blocks.size === 7) assert(status.rddBlocks.size === 5) - assert(status.rddIds.toSet === Seq(0, 1, 2).toSet) - assert(status.rddBlocks(0).size === 1) - assert(status.rddBlocks(0).head._1 === RDDBlockId(0, 0)) - assert(status.rddBlocks(0).head._2 === BlockStatus(memAndDisk, 10L, 20L, 0L)) - assert(status.rddBlocks(1).size === 1) - assert(status.rddBlocks(1).head._1 === RDDBlockId(1, 1)) - assert(status.rddBlocks(1).head._2 === BlockStatus(memAndDisk, 100L, 200L, 0L)) - assert(status.rddBlocks(2).size === 3) - assert(status.rddBlocks(2).head._1 === RDDBlockId(2, 2)) - assert(status.rddBlocks(2).head._2 === BlockStatus(memAndDisk, 10L, 20L, 0L)) + assert(status.rddBlocksById(0).size === 1) + assert(status.rddBlocksById(0).head._1 === RDDBlockId(0, 0)) + assert(status.rddBlocksById(0).head._2 === BlockStatus(memAndDisk, 10L, 20L, 0L)) + assert(status.rddBlocksById(1).size === 1) + assert(status.rddBlocksById(1).head._1 === RDDBlockId(1, 1)) + assert(status.rddBlocksById(1).head._2 === BlockStatus(memAndDisk, 100L, 200L, 0L)) + assert(status.rddBlocksById(2).size === 3) + assert(status.rddBlocksById(2).head._1 === RDDBlockId(2, 2)) + assert(status.rddBlocksById(2).head._2 === BlockStatus(memAndDisk, 10L, 20L, 0L)) assert(status.memUsedByRDD(0) === 10L) assert(status.memUsedByRDD(1) === 100L) assert(status.memUsedByRDD(2) === 30L) @@ -126,12 +120,14 @@ class StorageSuite extends FunSuite { status.addBlock(TestBlockId("DANGER"), BlockStatus(memAndDisk, 5000L, 0L, 0L)) status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)) status.updateBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 0L, 1000L, 0L)) - assert(status.rddBlocks(0).size === 1) - assert(status.rddBlocks(0).head._1 === RDDBlockId(0, 0)) - assert(status.rddBlocks(0).head._2 === BlockStatus(memAndDisk, 0L, 0L, 0L)) - assert(status.rddBlocks(2).size === 3) - assert(status.rddBlocks(2).head._1 === RDDBlockId(2, 2)) - assert(status.rddBlocks(2).head._2 === BlockStatus(memAndDisk, 0L, 1000L, 0L)) + assert(status.blocks.size === 7) + assert(status.rddBlocks.size === 5) + assert(status.rddBlocksById(0).size === 1) + assert(status.rddBlocksById(0).head._1 === RDDBlockId(0, 0)) + assert(status.rddBlocksById(0).head._2 === BlockStatus(memAndDisk, 0L, 0L, 0L)) + assert(status.rddBlocksById(2).size === 3) + assert(status.rddBlocksById(2).head._1 === RDDBlockId(2, 2)) + assert(status.rddBlocksById(2).head._2 === BlockStatus(memAndDisk, 0L, 1000L, 0L)) assert(status.memUsedByRDD(0) === 0L) assert(status.memUsedByRDD(1) === 100L) assert(status.memUsedByRDD(2) === 20L) @@ -146,10 +142,9 @@ class StorageSuite extends FunSuite { status.removeBlock(RDDBlockId(2, 4)) assert(status.blocks.size === 3) assert(status.rddBlocks.size === 2) - assert(status.rddIds.toSet === Seq(0, 2).toSet) - assert(status.rddBlocks(0).size === 1) - assert(status.rddBlocks(1).size === 0) - assert(status.rddBlocks(2).size === 1) + assert(status.rddBlocksById(0).size === 1) + assert(status.rddBlocksById(1).size === 0) + assert(status.rddBlocksById(2).size === 1) assert(status.memUsedByRDD(0) === 0L) assert(status.memUsedByRDD(1) === 0L) assert(status.memUsedByRDD(2) === 10L) @@ -162,13 +157,12 @@ class StorageSuite extends FunSuite { status.addBlock(RDDBlockId(3, 5), BlockStatus(memAndDisk, 10L, 200L, 0L)) assert(status.blocks.size === 5) assert(status.rddBlocks.size === 4) - assert(status.rddIds.toSet === Seq(0, 2, 3).toSet) - assert(status.rddBlocks(0).size === 1) - assert(status.rddBlocks(1).size === 0) - assert(status.rddBlocks(2).size === 2) - assert(status.rddBlocks(3).size === 1) - assert(status.rddBlocks(3).head._1 === RDDBlockId(3, 5)) - assert(status.rddBlocks(3).head._2 === BlockStatus(memAndDisk, 10L, 200L, 0L)) + assert(status.rddBlocksById(0).size === 1) + assert(status.rddBlocksById(1).size === 0) + assert(status.rddBlocksById(2).size === 2) + assert(status.rddBlocksById(3).size === 1) + assert(status.rddBlocksById(3).head._1 === RDDBlockId(3, 5)) + assert(status.rddBlocksById(3).head._2 === BlockStatus(memAndDisk, 10L, 200L, 0L)) assert(status.memUsedByRDD(0) === 0L) assert(status.memUsedByRDD(1) === 0L) assert(status.memUsedByRDD(2) === 20L) @@ -213,7 +207,7 @@ class StorageSuite extends FunSuite { // Actually update storage statuses so we can chain the calls to rddInfoFromStorageStatus updatedBlocks1.foreach { case (bid, bstatus) => - val statusWithBlock = storageStatuses.find(_.blocks.contains(bid)) + val statusWithBlock = storageStatuses.find(_.containsBlock(bid)) statusWithBlock match { case Some(s) => s.updateBlock(bid, bstatus) case None => storageStatuses(0).addBlock(bid, bstatus) // arbitrarily pick the first @@ -238,55 +232,51 @@ class StorageSuite extends FunSuite { test("StorageUtils.getBlockLocations") { val storageStatuses = stockStorageStatuses - val blockLocations1 = StorageUtils.getBlockLocations(storageStatuses) - assert(blockLocations1.contains(RDDBlockId(0, 0))) - assert(blockLocations1.contains(RDDBlockId(0, 1))) - assert(blockLocations1.contains(RDDBlockId(0, 2))) - assert(blockLocations1.contains(RDDBlockId(0, 3))) - assert(blockLocations1.contains(RDDBlockId(0, 4))) + var blockLocations0 = StorageUtils.getRDDBlockLocations(storageStatuses, 0) + var blockLocations1 = StorageUtils.getRDDBlockLocations(storageStatuses, 1) + assert(blockLocations0.size === 5) + assert(blockLocations1.size === 3) + assert(blockLocations0.contains(RDDBlockId(0, 0))) + assert(blockLocations0.contains(RDDBlockId(0, 1))) + assert(blockLocations0.contains(RDDBlockId(0, 2))) + assert(blockLocations0.contains(RDDBlockId(0, 3))) + assert(blockLocations0.contains(RDDBlockId(0, 4))) assert(blockLocations1.contains(RDDBlockId(1, 0))) assert(blockLocations1.contains(RDDBlockId(1, 1))) assert(blockLocations1.contains(RDDBlockId(1, 2))) - assert(blockLocations1.size === 8) - assert(blockLocations1(RDDBlockId(0, 0)) === Seq("dog:1")) - assert(blockLocations1(RDDBlockId(0, 1)) === Seq("dog:1")) - assert(blockLocations1(RDDBlockId(0, 2)) === Seq("duck:2")) - assert(blockLocations1(RDDBlockId(0, 3)) === Seq("duck:2")) + assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1")) + assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1")) + assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2")) + assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2")) + assert(blockLocations0(RDDBlockId(0, 4)) === Seq("cat:3")) assert(blockLocations1(RDDBlockId(1, 0)) === Seq("duck:2")) assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2")) - assert(blockLocations1(RDDBlockId(0, 4)) === Seq("cat:3")) assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3")) // Multiple locations storageStatuses(0).addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L)) storageStatuses(0).addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L)) storageStatuses(2).addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L)) - val blockLocations2 = StorageUtils.getBlockLocations(storageStatuses) - assert(blockLocations2.contains(RDDBlockId(0, 0))) - assert(blockLocations2.contains(RDDBlockId(0, 1))) - assert(blockLocations2.contains(RDDBlockId(0, 2))) - assert(blockLocations2.contains(RDDBlockId(0, 3))) - assert(blockLocations2.contains(RDDBlockId(0, 4))) - assert(blockLocations2.contains(RDDBlockId(1, 0))) - assert(blockLocations2.contains(RDDBlockId(1, 1))) - assert(blockLocations2.contains(RDDBlockId(1, 2))) - assert(blockLocations2.size === 8) - assert(blockLocations2(RDDBlockId(0, 0)) === Seq("dog:1", "cat:3")) - assert(blockLocations2(RDDBlockId(0, 1)) === Seq("dog:1")) - assert(blockLocations2(RDDBlockId(0, 2)) === Seq("duck:2")) - assert(blockLocations2(RDDBlockId(0, 3)) === Seq("duck:2")) - assert(blockLocations2(RDDBlockId(1, 0)) === Seq("dog:1", "duck:2")) - assert(blockLocations2(RDDBlockId(1, 1)) === Seq("duck:2")) - assert(blockLocations2(RDDBlockId(0, 4)) === Seq("dog:1", "cat:3")) - assert(blockLocations2(RDDBlockId(1, 2)) === Seq("cat:3")) - } - - test("StorageUtils.filterByRDD") { - val storageStatuses = stockStorageStatuses - val filteredStorageStatuses0 = StorageUtils.filterByRDD(storageStatuses, 0) - val filteredStorageStatuses1 = StorageUtils.filterByRDD(storageStatuses, 1) - assert(filteredStorageStatuses0.size === 3) - assert(filteredStorageStatuses1.size === 2) + blockLocations0 = StorageUtils.getRDDBlockLocations(storageStatuses, 0) + blockLocations1 = StorageUtils.getRDDBlockLocations(storageStatuses, 1) + assert(blockLocations0.size === 5) + assert(blockLocations1.size === 3) + assert(blockLocations0.contains(RDDBlockId(0, 0))) + assert(blockLocations0.contains(RDDBlockId(0, 1))) + assert(blockLocations0.contains(RDDBlockId(0, 2))) + assert(blockLocations0.contains(RDDBlockId(0, 3))) + assert(blockLocations0.contains(RDDBlockId(0, 4))) + assert(blockLocations1.contains(RDDBlockId(1, 0))) + assert(blockLocations1.contains(RDDBlockId(1, 1))) + assert(blockLocations1.contains(RDDBlockId(1, 2))) + assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1", "cat:3")) + assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1")) + assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2")) + assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2")) + assert(blockLocations0(RDDBlockId(0, 4)) === Seq("dog:1", "cat:3")) + assert(blockLocations1(RDDBlockId(1, 0)) === Seq("dog:1", "duck:2")) + assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2")) + assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3")) } }