Skip to content

Commit

Permalink
Use more efficient underlying data structures for blocks
Browse files Browse the repository at this point in the history
Previously we were still linearly traversing all the blocks held
by each storage status. Now we index by the RDD ID and return only
the blocks of interest to us.
  • Loading branch information
andrewor14 committed Aug 1, 2014
1 parent 6a7b7c0 commit b66b6b0
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class StorageStatusListener extends SparkListener {
/** Update storage status list to reflect the removal of an RDD from the cache */
private def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
storageStatus.rddBlocks(unpersistedRDDId).foreach { case (blockId, _) =>
storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
storageStatus.removeBlock(blockId)
}
}
Expand Down
144 changes: 85 additions & 59 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.storage
import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi

/**
Expand All @@ -31,9 +30,24 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

// This should not be mutated directly, but through the add/update/removeBlock methods
private val _blocks = new mutable.HashMap[BlockId, BlockStatus]
private val _rddIds = new mutable.HashSet[Int]
/**
* Internal representation of the blocks stored in this block manager.
*
* Common consumption patterns of these blocks include
* (1) selecting all blocks,
* (2) selecting only RDD blocks or,
* (3) selecting only the blocks that belong to a specific RDD
*
* If we are only interested in a fraction of the blocks, as in (2) and (3), we should avoid
* linearly scanning through all the blocks, which could be expensive if there are thousands
* of blocks on each block manager. We achieve this by storing RDD blocks and non-RDD blocks
* separately. In particular, RDD blocks are stored in a map indexed by RDD IDs, so we can
* filter out the blocks of interest quickly.
*
* These collections should only be mutated through the add/update/removeBlock methods.
*/
private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]

/**
* Instantiate a StorageStatus with the given initial blocks. This essentially makes a copy of
Expand All @@ -44,67 +58,94 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
initialBlocks.foreach { case (blockId, blockStatus) => addBlock(blockId, blockStatus) }
}

/** Return the blocks stored in this block manager as a mapping from ID to status. */
def blocks: Map[BlockId, BlockStatus] = _blocks
/** Return the blocks stored in this block manager. */
def blocks: Seq[(BlockId, BlockStatus)] = {
_nonRddBlocks.toSeq ++ rddBlocks.toSeq
}

/** Return the RDD blocks stored in this block manager. */
def rddBlocks: Seq[(BlockId, BlockStatus)] = {
_rddBlocks.flatMap { case (_, blocks) => blocks }.toSeq
}

/** Return the blocks that belong to the given RDD stored in this block manager. */
def rddBlocksById(rddId: Int): Seq[(BlockId, BlockStatus)] = {
_rddBlocks.get(rddId).map(_.toSeq).getOrElse(Seq.empty)
}

/** Add the given block, keeping track of the RDD ID if this is an RDD block. */
/** Add the given block to this storage status. */
def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
blockId match {
case RDDBlockId(rddId, _) => _rddIds.add(rddId)
case RDDBlockId(rddId, _) =>
_rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) = blockStatus
case _ =>
_nonRddBlocks(blockId) = blockStatus
}
_blocks(blockId) = blockStatus
}

/** Update the given block, keeping track of the RDD ID if this is an RDD block. */
/** Update the given block in this storage status. If it doesn't already exist, add it. */
def updateBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = addBlock(blockId, blockStatus)

/** Remove the given block, keeping track of the RDD ID if this is an RDD block. */
/** Remove the given block from this storage status. */
def removeBlock(blockId: BlockId): Option[BlockStatus] = {
val removed = _blocks.remove(blockId)
blockId match {
case RDDBlockId(rddId, _) =>
if (rddBlocks(rddId).isEmpty) {
_rddIds.remove(rddId)
if (_rddBlocks.contains(rddId)) {
val removed = _rddBlocks(rddId).remove(blockId)
// If the given RDD has no more blocks left, remove the RDD
if (_rddBlocks(rddId).isEmpty) {
_rddBlocks.remove(rddId)
}
removed
} else {
None
}
case _ =>
_nonRddBlocks.remove(blockId)
}
removed
}

/** Return the IDs of the RDDs which have blocks stored in this block manager. */
def rddIds: Seq[Int] = _rddIds.toSeq

/** Return the RDD blocks stored in this block manager as a mapping from ID to status. */
def rddBlocks: Map[RDDBlockId, BlockStatus] =
blocks.filterKeys(_.isInstanceOf[RDDBlockId]).asInstanceOf[Map[RDDBlockId, BlockStatus]]
/**
* Return whether the given block is stored in this block manager in O(1) time.
* Note that the alternative of doing this through `blocks` is O(blocks), which is much slower.
*/
def containsBlock(blockId: BlockId): Boolean = {
blockId match {
case RDDBlockId(rddId, _) =>
_rddBlocks.get(rddId).filter(_.contains(blockId)).isDefined
case _ =>
_nonRddBlocks.contains(blockId)
}
}

/**
* Return the RDD blocks with the given RDD ID stored in this block manager as a mapping
* from ID to status.
* Return the number of blocks in O(R) time, where R is the number of distinct RDD IDs.
* Note that the alternative of doing this through `blocks` is O(blocks), which is much slower.
*/
def rddBlocks(rddId: Int): Map[RDDBlockId, BlockStatus] = rddBlocks.filterKeys(_.rddId == rddId)
def numBlocks: Int = {
_nonRddBlocks.size + _rddBlocks.values.map(_.size).reduceOption(_ + _).getOrElse(0)
}

/** Return the memory used by this block manager. */
def memUsed: Long = memUsed(blocks.values)
def memUsed: Long = memUsed(blocks)

/** Return the memory used by the given RDD in this block manager. */
def memUsedByRDD(rddId: Int): Long = memUsed(rddBlocks(rddId).values)
def memUsedByRDD(rddId: Int): Long = memUsed(rddBlocksById(rddId))

/** Return the memory remaining in this block manager. */
def memRemaining: Long = maxMem - memUsed

/** Return the disk space used by this block manager. */
def diskUsed: Long = diskUsed(blocks.values)
def diskUsed: Long = diskUsed(blocks)

/** Return the disk space used by the given RDD in this block manager. */
def diskUsedByRDD(rddId: Int): Long = diskUsed(rddBlocks(rddId).values)
def diskUsedByRDD(rddId: Int): Long = diskUsed(rddBlocksById(rddId))

// Helper methods for computing memory and disk usages
private def memUsed(statuses: Iterable[BlockStatus]): Long =
statuses.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
private def diskUsed(statuses: Iterable[BlockStatus]): Long =
statuses.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
private def memUsed(_blocks: Seq[(BlockId, BlockStatus)]): Long =
_blocks.map { case (_, s) => s.memSize }.reduceOption(_ + _).getOrElse(0L)
private def diskUsed(_blocks: Seq[(BlockId, BlockStatus)]): Long =
_blocks.map { case (_, s) => s.diskSize }.reduceOption(_ + _).getOrElse(0L)
}

/** Helper methods for storage-related objects. */
Expand All @@ -123,18 +164,13 @@ private[spark] object StorageUtils {
val rddId = rddInfo.id

// Collect all block statuses that belong to the given RDD
val newBlocks = updatedBlocks.filter { case (b, _) =>
b.asRDDId.filter(_.rddId == rddId).isDefined
val newBlocks = updatedBlocks.filter { case (bid, _) =>
bid.asRDDId.filter(_.rddId == rddId).isDefined
}
val newBlockIds = newBlocks.map { case (bid, _) => bid }.toSet
val oldBlocks = storageStatuses.flatMap { s =>
if (s.rddIds.contains(rddId)) {
// If the block is being updated, leave it out here in favor of the new status
s.rddBlocks(rddId).filterKeys { bid => !newBlockIds.contains(bid) }
} else {
Seq.empty
}
}
val oldBlocks = storageStatuses
.flatMap(_.rddBlocksById(rddId))
.filter { case (bid, _) => !newBlockIds.contains(bid) } // avoid double counting
val blocks = (oldBlocks ++ newBlocks).map { case (_, bstatus) => bstatus }
val persistedBlocks = blocks.filter(_.isCached)

Expand All @@ -151,30 +187,20 @@ private[spark] object StorageUtils {
}
}

/** Return a mapping from block ID to the locations of the associated block. */
def getBlockLocations(storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
/**
* Return mapping from block ID to its locations for each block that belongs to the given RDD.
*/
def getRDDBlockLocations(
storageStatuses: Seq[StorageStatus],
rddId: Int): Map[BlockId, Seq[String]] = {
val blockLocations = new mutable.HashMap[BlockId, mutable.ListBuffer[String]]
storageStatuses.foreach { status =>
status.blocks.foreach { case (bid, _) =>
status.rddBlocksById(rddId).foreach { case (bid, _) =>
val location = status.blockManagerId.hostPort
blockLocations.getOrElseUpdate(bid, mutable.ListBuffer.empty) += location
}
}
blockLocations
}

/**
* Return a filtered list of storage statuses in which the only blocks remaining are the ones
* that belong to given RDD.
*/
def filterByRDD(storageStatuses: Seq[StorageStatus], rddId: Int): Seq[StorageStatus] = {
storageStatuses
.filter(_.rddIds.contains(rddId))
.map { status =>
new StorageStatus(
status.blockManagerId,
status.maxMem,
status.rddBlocks(rddId).asInstanceOf[Map[BlockId, BlockStatus]])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.blocks.size
val rddBlocks = status.numBlocks
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers)

// Block table
val filteredStorageStatusList = StorageUtils.filterByRDD(storageStatusList, rddId)
val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).sortWith(_._1.name < _._1.name)
val blockLocations = StorageUtils.getBlockLocations(filteredStorageStatusList)
val blocks = blockStatuses.map { case (blockId, status) =>
(blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
}
val blockLocations = StorageUtils.getRDDBlockLocations(storageStatusList, rddId)
val blocks = storageStatusList
.flatMap(_.rddBlocksById(rddId))
.sortWith(_._1.name < _._1.name)
.map { case (blockId, status) =>
(blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
}
val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks)

val content =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ class StorageStatusListenerSuite extends FunSuite {
assert(listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
assert(listener.executorIdToStorageStatus.size === 2)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)

// Block manager remove
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
Expand All @@ -67,14 +67,14 @@ class StorageStatusListenerSuite extends FunSuite {
val taskMetrics = new TaskMetrics

// Task end with no updated blocks
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}

test("task end with updated blocks") {
Expand All @@ -90,20 +90,20 @@ class StorageStatusListenerSuite extends FunSuite {
taskMetrics2.updatedBlocks = Some(Seq(block3))

// Task end with new blocks
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))

// Task end with dropped blocks
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
Expand All @@ -112,17 +112,17 @@ class StorageStatusListenerSuite extends FunSuite {
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}

test("unpersist RDD") {
Expand All @@ -137,16 +137,16 @@ class StorageStatusListenerSuite extends FunSuite {
taskMetrics2.updatedBlocks = Some(Seq(block3))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
assert(listener.executorIdToStorageStatus("big").numBlocks === 3)

// Unpersist RDD
listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
}
}
Loading

0 comments on commit b66b6b0

Please sign in to comment.