Skip to content

Commit

Permalink
Reduce run time of StorageUtils.updateRddInfo to near constant
Browse files Browse the repository at this point in the history
With the existing changes in the PR, the StorageListener still needed
to iterate through all the blocks within a single RDD. Although this
is already much better than before, it is still slow if a single RDD
has many partitions.

This commit further reduces the run time of StorageUtils.updateRddInfo
to near constant. It achieves this by incrementally updating the storage
information of each RDD (memory, disk, tachyon sizes) incrementally,
rather than all at once when the caller demands it. A preliminary
benchmark shows that the event queue length never exceeds 600 even
for caching 10000 partitions within a single RDD.

An important TODO is to add tests for the new code, as well as for
the StorageListener, the source of the storage information on the UI.
  • Loading branch information
andrewor14 committed Aug 1, 2014
1 parent 2c3ef6a commit e080b9e
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@ case class BlockStatus(
def isCached: Boolean = memSize + diskSize + tachyonSize > 0
}

@DeveloperApi
object BlockStatus {
def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
}

private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
Expand Down
153 changes: 104 additions & 49 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,45 @@ package org.apache.spark.storage
import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Storage information for each BlockManager. This class assumes BlockId and BlockStatus are
* immutable, such that the consumers of this class will not mutate the source of the information.
* Storage information for each BlockManager.
*
* This class assumes BlockId and BlockStatus are immutable, such that the consumers of this
* class cannot mutate the source of the information. Accesses are not thread-safe.
*/
@DeveloperApi
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {

/**
* Internal representation of the blocks stored in this block manager.
*
* Common consumption patterns of these blocks include
* (1) selecting all blocks,
* (2) selecting only RDD blocks or,
* (3) selecting only the blocks that belong to a specific RDD
*
* If we are only interested in a fraction of the blocks, as in (2) and (3), we should avoid
* linearly scanning through all the blocks, which could be expensive if there are thousands
* of blocks on each block manager. We achieve this by storing RDD blocks and non-RDD blocks
* separately. In particular, RDD blocks are stored in a map indexed by RDD IDs, so we can
* filter out the blocks of interest quickly.
*
* A common consumption pattern is to access only the blocks that belong to a specific RDD.
* For this use case, we should avoid linearly scanning through all the blocks, which could
* be expensive if there are thousands of blocks on each block manager. Thus, we need to store
* RDD blocks and non-RDD blocks separately. In particular, we store RDD blocks in a map
* indexed by RDD IDs, so we can filter out the blocks of interest quickly.
* These collections should only be mutated through the add/update/removeBlock methods.
*/
private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]

/**
* A map of storage information associated with each RDD.
*
* The key is the ID of the RDD, and the value is a 4-tuple of the following:
* (size in memory, size on disk, size in tachyon, storage level)
*
* This is updated incrementally on each block added, updated or removed, so as to avoid
* linearly scanning through all the blocks within an RDD if we're only interested in a
* given RDD's storage information.
*/
private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, Long, StorageLevel)]

/**
* Instantiate a StorageStatus with the given initial blocks. This essentially makes a copy of
* the original blocks map such that the fate of this storage status is not tied to the source.
Expand Down Expand Up @@ -79,6 +89,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
blockId match {
case RDDBlockId(rddId, _) =>
// Update the storage info of the RDD, keeping track of any existing status for this block
val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
val changeInMem = blockStatus.memSize - oldBlockStatus.memSize
val changeInDisk = blockStatus.diskSize - oldBlockStatus.diskSize
val changeInTachyon = blockStatus.tachyonSize - oldBlockStatus.tachyonSize
val level = blockStatus.storageLevel
updateRddStorageInfo(rddId, changeInMem, changeInDisk, changeInTachyon, level)
// Actually add the block itself
_rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) = blockStatus
case _ =>
_nonRddBlocks(blockId) = blockStatus
Expand All @@ -94,6 +112,11 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def removeBlock(blockId: BlockId): Option[BlockStatus] = {
blockId match {
case RDDBlockId(rddId, _) =>
// Update the storage info of the RDD if the block to remove exists
getBlock(blockId).foreach { s =>
updateRddStorageInfo(rddId, -s.memSize, -s.diskSize, -s.tachyonSize, StorageLevel.NONE)
}
// Actually remove the block, if it exists
if (_rddBlocks.contains(rddId)) {
val removed = _rddBlocks(rddId).remove(blockId)
// If the given RDD has no more blocks left, remove the RDD
Expand Down Expand Up @@ -136,33 +159,79 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
}

/**
* Return the number of blocks stored in this block manager in O(rdds) time.
* Return the number of blocks stored in this block manager in O(RDDs) time.
* Note that this is much faster than `this.blocks.size`, which is O(blocks) time.
*/
def numBlocks: Int = {
_nonRddBlocks.size + _rddBlocks.values.map(_.size).reduceOption(_ + _).getOrElse(0)
}

/**
* Return the number of RDD blocks stored in this block manager in O(RDDs) time.
* Note that this is much faster than `this.rddBlocks.size`, which is O(RDD blocks) time.
*/
def numRddBlocks: Int = _rddBlocks.keys.map(numRddBlocksById).reduceOption(_ + _).getOrElse(0)

/**
* Return the number of blocks that belong to the given RDD in O(1) time.
* Note that this is much faster than `this.rddBlocksById(rddId).size`, which is
* O(blocks in this RDD) time.
*/
def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)

/** Return the memory used by this block manager. */
def memUsed: Long = memUsed(blocks)
def memUsed: Long = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)

/** Return the memory used by the given RDD in this block manager. */
def memUsedByRDD(rddId: Int): Long = memUsed(rddBlocksById(rddId))
def memUsedByRDD(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)

/** Return the memory remaining in this block manager. */
def memRemaining: Long = maxMem - memUsed

/** Return the disk space used by this block manager. */
def diskUsed: Long = diskUsed(blocks)
def diskUsed: Long = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)

/** Return the disk space used by the given RDD in this block manager. */
def diskUsedByRDD(rddId: Int): Long = diskUsed(rddBlocksById(rddId))
def diskUsedByRDD(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)

/** Return the off-heap space used by this block manager. */
def offHeapUsed: Long = blocks.values.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)

/** Return the off-heap space used by the given RDD in this block manager. */
def offHeapUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L)

/** Return the storage level, if any, used by the given RDD in this block manager. */
def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._4)

// Helper methods for computing memory and disk usages
private def memUsed(_blocks: Map[BlockId, BlockStatus]): Long =
_blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
private def diskUsed(_blocks: Map[BlockId, BlockStatus]): Long =
_blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
/**
* Helper function to update the given RDD's storage information based on the
* (possibly negative) changes in memory, disk, and off-heap memory usages.
*/
private def updateRddStorageInfo(
rddId: Int,
changeInMem: Long,
changeInDisk: Long,
changeInTachyon: Long,
storageLevel: StorageLevel): Unit = {
val emptyRddInfo = (0L, 0L, 0L, StorageLevel.NONE)
val oldRddInfo = _rddStorageInfo.getOrElse(rddId, emptyRddInfo)
val newRddInfo = oldRddInfo match {
case (oldRddMem, oldRddDisk, oldRddTachyon, _) =>
val newRddMem = math.max(oldRddMem + changeInMem, 0L)
val newRddDisk = math.max(oldRddDisk + changeInDisk, 0L)
val newRddTachyon = math.max(oldRddTachyon + changeInTachyon, 0L)
(newRddMem, newRddDisk, newRddTachyon, storageLevel)
case _ =>
// Should never happen
throw new SparkException(s"Existing information for $rddId is not of expected type")
}
// If this RDD is no longer persisted, remove it
if (newRddInfo._1 + newRddInfo._2 + newRddInfo._3 == 0) {
_rddStorageInfo.remove(rddId)
} else {
_rddStorageInfo(rddId) = newRddInfo
}
}
}

/** Helper methods for storage-related objects. */
Expand All @@ -172,32 +241,20 @@ private[spark] object StorageUtils {
* Update the given list of RDDInfo with the given list of storage statuses.
* This method overwrites the old values stored in the RDDInfo's.
*/
def updateRddInfo(
rddInfos: Seq[RDDInfo],
storageStatuses: Seq[StorageStatus],
updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Unit = {

def updateRddInfo(rddInfos: Seq[RDDInfo], statuses: Seq[StorageStatus]): Unit = {
rddInfos.foreach { rddInfo =>
val rddId = rddInfo.id

// Collect all block statuses that belong to the given RDD
val newBlocks = updatedBlocks.filter { case (bid, _) =>
bid.asRDDId.filter(_.rddId == rddId).isDefined
}
val newBlockIds = newBlocks.map { case (bid, _) => bid }.toSet
val oldBlocks = storageStatuses
.flatMap(_.rddBlocksById(rddId))
.filter { case (bid, _) => !newBlockIds.contains(bid) } // avoid double counting
val blocks = (oldBlocks ++ newBlocks).map { case (_, bstatus) => bstatus }
val persistedBlocks = blocks.filter(_.isCached)

// Assume all blocks belonging to the same RDD have the same storage level
val storageLevel = blocks.headOption.map(_.storageLevel).getOrElse(StorageLevel.NONE)
val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
val storageLevel = statuses
.map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE)
val numCachedPartitions = statuses
.map(_.numRddBlocksById(rddId)).reduceOption(_ + _).getOrElse(0)
val memSize = statuses.map(_.memUsedByRDD(rddId)).reduceOption(_ + _).getOrElse(0L)
val diskSize = statuses.map(_.diskUsedByRDD(rddId)).reduceOption(_ + _).getOrElse(0L)
val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).reduceOption(_ + _).getOrElse(0L)

rddInfo.storageLevel = storageLevel
rddInfo.numCachedPartitions = persistedBlocks.length
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
rddInfo.diskSize = diskSize
rddInfo.tachyonSize = tachyonSize
Expand All @@ -207,11 +264,9 @@ private[spark] object StorageUtils {
/**
* Return mapping from block ID to its locations for each block that belongs to the given RDD.
*/
def getRddBlockLocations(
storageStatuses: Seq[StorageStatus],
rddId: Int): Map[BlockId, Seq[String]] = {
def getRddBlockLocations(statuses: Seq[StorageStatus], rddId: Int): Map[BlockId, Seq[String]] = {
val blockLocations = new mutable.HashMap[BlockId, mutable.ListBuffer[String]]
storageStatuses.foreach { status =>
statuses.foreach { status =>
status.rddBlocksById(rddId).foreach { case (bid, _) =>
val location = status.blockManagerId.hostPort
blockLocations.getOrElseUpdate(bid, mutable.ListBuffer.empty) += location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet
val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) }
StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList, updatedBlocks)
StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList)
}

/**
Expand Down
48 changes: 4 additions & 44 deletions core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,56 +228,16 @@ class StorageSuite extends FunSuite {
val storageStatuses = stockStorageStatuses
val rddInfos = stockRDDInfos
StorageUtils.updateRddInfo(rddInfos, storageStatuses)
assert(rddInfos(0).storageLevel === memAndDisk)
assert(rddInfos(0).numCachedPartitions === 5)
assert(rddInfos(0).memSize === 5L)
assert(rddInfos(0).diskSize === 10L)
assert(rddInfos(0).tachyonSize === 0L)
assert(rddInfos(1).storageLevel === memAndDisk)
assert(rddInfos(1).numCachedPartitions === 3)
assert(rddInfos(1).memSize === 3L)
assert(rddInfos(1).diskSize === 6L)
}

test("StorageUtils.updateRddInfo with updated blocks") {
val storageStatuses = stockStorageStatuses
val rddInfos = stockRDDInfos

// Drop 3 blocks from RDD 0, and cache more of RDD 1
val updatedBlocks1 = Seq(
(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(0, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(0, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(1, 0), BlockStatus(memAndDisk, 100L, 100L, 0L)),
(RDDBlockId(1, 100), BlockStatus(memAndDisk, 100L, 100L, 0L))
)
StorageUtils.updateRddInfo(rddInfos, storageStatuses, updatedBlocks1)
assert(rddInfos(0).numCachedPartitions === 2)
assert(rddInfos(0).memSize === 2L)
assert(rddInfos(0).diskSize === 4L)
assert(rddInfos(1).numCachedPartitions === 4)
assert(rddInfos(1).memSize === 202L)
assert(rddInfos(1).diskSize === 204L)

// Actually update storage statuses so we can chain the calls to StorageUtils.updateRddInfo
updatedBlocks1.foreach { case (bid, bstatus) =>
storageStatuses.find(_.containsBlock(bid)) match {
case Some(s) => s.updateBlock(bid, bstatus)
case None => storageStatuses(0).addBlock(bid, bstatus) // arbitrarily pick the first
}
}

// Drop all of RDD 1, following previous updates
val updatedBlocks2 = Seq(
(RDDBlockId(1, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(1, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(1, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
(RDDBlockId(1, 100), BlockStatus(memAndDisk, 0L, 0L, 0L))
)
StorageUtils.updateRddInfo(rddInfos, storageStatuses, updatedBlocks2)
assert(rddInfos(0).numCachedPartitions === 2)
assert(rddInfos(0).memSize === 2L)
assert(rddInfos(0).diskSize === 4L)
assert(rddInfos(1).numCachedPartitions === 0)
assert(rddInfos(1).memSize === 0L)
assert(rddInfos(1).diskSize === 0L)
assert(rddInfos(1).tachyonSize === 0L)
}

test("StorageUtils.getRddBlockLocations") {
Expand Down

0 comments on commit e080b9e

Please sign in to comment.