Skip to content

Commit

Permalink
[Minor] Fixes on top of apache#1679
Browse files Browse the repository at this point in the history
Minor fixes on top of apache#1679.

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#1736 from andrewor14/amend-#1679 and squashes the following commits:

3b46f5e [Andrew Or] Minor fixes
  • Loading branch information
andrewor14 authored and pwendell committed Aug 3, 2014
1 parent 9cf429a commit 3dc55fd
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).sum
val remainingMem = storageStatusList.map(_.memRemaining).sum
(maxMem - remainingMem) / 1024 / 1024
val memUsed = storageStatusList.map(_.memUsed).sum
memUsed / 1024 / 1024
}
})

Expand Down
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,13 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
def memRemaining: Long = maxMem - memUsed

/** Return the memory used by this block manager. */
def memUsed: Long =
_nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
def memUsed: Long = _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum

/** Return the disk space used by this block manager. */
def diskUsed: Long =
_nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum

/** Return the off-heap space used by this block manager. */
def offHeapUsed: Long =
_nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
def offHeapUsed: Long = _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum

/** Return the memory used by the given RDD in this block manager in O(1) time. */
def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
Expand Down Expand Up @@ -246,7 +243,7 @@ private[spark] object StorageUtils {
val rddId = rddInfo.id
// Assume all blocks belonging to the same RDD have the same storage level
val storageLevel = statuses
.map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE)
.flatMap(_.rddStorageLevel(rddId)).headOption.getOrElse(StorageLevel.NONE)
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
Expand Down

0 comments on commit 3dc55fd

Please sign in to comment.