Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

内存管理 MemoryManager 解析 #17

Open
teeyog opened this issue Feb 2, 2018 · 0 comments
Open

内存管理 MemoryManager 解析 #17

teeyog opened this issue Feb 2, 2018 · 0 comments
Labels

Comments

@teeyog
Copy link
Owner

teeyog commented Feb 2, 2018

概述

spark的内存管理有两套方案,新旧方案分别对应的类是UnifiedMemoryManager和StaticMemoryManager。

旧方案是静态的,storageMemory(存储内存)和executionMemory(执行内存)拥有的内存是独享的不可相互借用,故在其中一方内存充足,另一方内存不足但又不能借用的情况下会造成资源的浪费。新方案是统一管理的,初始状态是内存各占一半,但其中一方内存不足时可以向对方借用,对内存资源进行合理有效的利用,提高了整体资源的利用率。

总的来说内存分为三大块,包括storageMemory、executionMemory、系统预留,其中storageMemory用来缓存rdd,unroll partition,存放direct task result、广播变量,在 Spark Streaming receiver 模式中存放每个 batch 的 blocks。executionMemory用于shuffle、join、sort、aggregation 中的缓存。除了这两者以外的内存都是预留给系统的。

旧方案 StaticMemoryManager

在SparkEnv中会创建memoryManager:

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)
      }

默认使用的是统一管理方案UnifiedMemoryManager,这里我们简要的看看旧方案StaticMemoryManager。

storageMemory能分到的内存是:

systemMaxMemory * memoryFraction * safetyFraction

其中:

  • systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能获得的最大内存空间。
  • memoryFraction:由参数spark.storage.memoryFraction控制,默认0.6。
  • safetyFraction:由参数spark.storage.safetyFraction控制,默认是0.9,因为cache block都是估算的,所以需要一个安全系数来保证安全。

executionMemory能分到的内存是:

systemMaxMemory * memoryFraction * safetyFraction

其中:

  • systemMaxMemory :Runtime.getRuntime.maxMemory,即JVM能获得的最大内存空间。
  • memoryFraction:由参数spark.shuffle.memoryFraction控制,默认0.2。
  • safetyFraction:由参数spark.shuffle.safetyFraction控制,默认是0.8。

memoryFraction系数之外和安全系数之外的内存就是给系统预留的了。

executionMemory能分到的内存直接影响了shuffle中spill的频率,增加executionMemory可减少spill的次数,但storageMemory能cache的容量也相应减少。

execution 和 storage 被分配到内存后大小就一直不变了,每次申请内存都只能申请自己独有的不能相互借用,会造成资源的浪费。另外,只有 execution 内存支持 off heap,storage 内存不支持 off heap。

新方案 UnifiedMemoryManager

由于新方案中storageMemory和executionMemory是统一管理的,我们看看两者一共能拿到多少内存。

private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    // SPARK-12759 Check executor memory to fail fast if memory is insufficient
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < minSystemMemory) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$minSystemMemory. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val usableMemory = systemMemory - reservedMemory
    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
    (usableMemory * memoryFraction).toLong
  }

首先给系统内存reservedMemory预留了300M,若jvm能拿到的最大内存和配置的executor内存分别不足以reservedMemory的1.5倍即450M都会抛出异常,最后storage和execution能拿到的内存为:

 (heap space - 300) * spark.memory.fraction (默认为0.6)

storage和execution各占所获内存的50%。

申请storage内存

为某个blockId申请numBytes大小的内存:

override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    assertInvariants()
    assert(numBytes >= 0)
    val (executionPool, storagePool, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        maxOnHeapStorageMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        maxOffHeapMemory)
    }
    // 申请的内存大于storage和execution内存之和
    if (numBytes > maxMemory) {
      // Fail fast if the block simply won't fit
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
        s"memory limit ($maxMemory bytes)")
      return false
    }
    // 大于storage空闲内存
    if (numBytes > storagePool.memoryFree) {
      // There is not enough free memory in the storage pool, so try to borrow free memory from
      // the execution pool.
      val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
      executionPool.decrementPoolSize(memoryBorrowedFromExecution)
      storagePool.incrementPoolSize(memoryBorrowedFromExecution)
    }
    storagePool.acquireMemory(blockId, numBytes)
  }
  • 若申请的numBytes比两者总共的内存还大,直接返回false,说明申请失败。
  • 若numBytes比storage空闲的内存大,则需要向executionPool借用
    • 借用的大小为此时execution的空闲内存和numBytes的较小值(个人观点应该是和<numBytes-storage空闲内存>的较小值)
    • 减小execution的poolSize
    • 增加storage的poolSize

即使向executionPool借用了内存,但不一定就够numBytes,因为不可能把execution正在使用的内存都接过来,接着调用了storagePool的acquireMemory方法在不够numBytes的情况下去释放storage中共cache的rdd,以增加storagePool.memoryFree的值:

def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
    val numBytesToFree = math.max(0, numBytes - memoryFree)
    acquireMemory(blockId, numBytes, numBytesToFree)
  }

计算出向execution借了内存后还差多少内存才能满足numBytes,即需要释放的内存numBytesToFree 。接着调用了acquireMemory方法:

def acquireMemory(
      blockId: BlockId,
      numBytesToAcquire: Long,
      numBytesToFree: Long): Boolean = lock.synchronized {
    assert(numBytesToAcquire >= 0)
    assert(numBytesToFree >= 0)
    assert(memoryUsed <= poolSize)
    if (numBytesToFree > 0) {
      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
    }
    // NOTE: If the memory store evicts blocks, then those evictions will synchronously call
    // back into this StorageMemoryPool in order to free memory. Therefore, these variables
    // should have been updated.
    val enoughMemory = numBytesToAcquire <= memoryFree
    if (enoughMemory) {
      _memoryUsed += numBytesToAcquire
    }
    enoughMemory
  }

当numBytesToFree 大于0的情况下,就真的要去释放缓存在memory中的block,释放完后再看空闲内存是否能满足numBytes,若满足则将numBytes加到已使用的变量里。

看看当需要从storay中释放block的时候是怎么释放的:

private[spark] def evictBlocksToFreeSpace(
      blockId: Option[BlockId],
      space: Long,
      memoryMode: MemoryMode): Long = {
    assert(space > 0)
    memoryManager.synchronized {
      var freedMemory = 0L
      val rddToAdd = blockId.flatMap(getRddId)
      val selectedBlocks = new ArrayBuffer[BlockId]
      def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
        entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
      }
      // This is synchronized to ensure that the set of entries is not changed
      // (because of getValue or getBytes) while traversing the iterator, as that
      // can lead to exceptions.
      entries.synchronized {
        val iterator = entries.entrySet().iterator()
        while (freedMemory < space && iterator.hasNext) {
          val pair = iterator.next()
          val blockId = pair.getKey
          val entry = pair.getValue
          if (blockIsEvictable(blockId, entry)) {
            // We don't want to evict blocks which are currently being read, so we need to obtain
            // an exclusive write lock on blocks which are candidates for eviction. We perform a
            // non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
            if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
              selectedBlocks += blockId
              freedMemory += pair.getValue.size
            }
          }
        }
      }

      def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
        val data = entry match {
          case DeserializedMemoryEntry(values, _, _) => Left(values)
          case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
        }
        val newEffectiveStorageLevel =
          blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
        if (newEffectiveStorageLevel.isValid) {
          // The block is still present in at least one store, so release the lock
          // but don't delete the block info
          blockInfoManager.unlock(blockId)
        } else {
          // The block isn't present in any store, so delete the block info so that the
          // block can be stored again
          blockInfoManager.removeBlock(blockId)
        }
      }

      if (freedMemory >= space) {
        logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
          s"(${Utils.bytesToString(freedMemory)} bytes)")
        for (blockId <- selectedBlocks) {
          val entry = entries.synchronized { entries.get(blockId) }
          // This should never be null as only one task should be dropping
          // blocks and removing entries. However the check is still here for
          // future safety.
          if (entry != null) {
            dropBlock(blockId, entry)
          }
        }
        logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
          s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
        freedMemory
      } else {
        blockId.foreach { id =>
          logInfo(s"Will not store $id")
        }
        selectedBlocks.foreach { id =>
          blockInfoManager.unlock(id)
        }
        0L
      }
    }
  }

spark中内存中的block都是通过memoryStore来存储的,用

private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

来维护了blockId和MemoryEntry(对应value的包装)的关联,另外方法中还定义了两个方法,blockIsEvictable方法是判断遍历到的blockId和当前blockId是否属于同一个rdd,因为不能提出同一个rdd的另外一个block。dropBlock方法就是真正执行从内存中移除block的,若StorageLevel包括了使用disk,则会写到磁盘文件。

整段代码的逻辑简单概述就是:遍历当前memoryStore中存的每个block(不是和当前请求的block属于同于同一rdd),直到block对应的内存之和大于所需释放的内存才停止遍历,也有可能遍历完了都还不能满足所需的内存。若能释放的内存满足所需的内存,则真正执行移除,否则不移除,因为不可能一个block在内存中一部分,在磁盘一部分,最后返回真正剔除block释放的内存。

总结一下向StorageMemory申请内存的过程(在MemoryMode.ON_HEAP模式下):

  • 若numBytes大于storage和execution内存之和,抛异常。
  • 若numBytes大于storage空闲内存,向execution借用min(executionFree,numBytes)大的内存,并更新各自的poolSize。
  • 若申请完后还不够,则释放storage中的block来补足。
    • memoryStore缓存的block大小满足需要补足的大小,则真正执行剔除(遍历block直到内存满足需求对应的block),否则不剔除。
  • 最终若空闲内存满足numBytes则返回true,否则返回false。

申请execution内存

在execution内存不足向storage借用时,还是不满足所需内存的情况下能借多少借多少。看看在需要向execution申请内存时是怎么处理的(MemoryMode.ON_HEAP模式下):

override private[memory] def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    assertInvariants()
    assert(numBytes >= 0)
    val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
      case MemoryMode.ON_HEAP => (
        onHeapExecutionMemoryPool,
        onHeapStorageMemoryPool,
        onHeapStorageRegionSize,
        maxHeapMemory)
      case MemoryMode.OFF_HEAP => (
        offHeapExecutionMemoryPool,
        offHeapStorageMemoryPool,
        offHeapStorageMemory,
        maxOffHeapMemory)
    }

    /**
     * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
     *
     * When acquiring memory for a task, the execution pool may need to make multiple
     * attempts. Each attempt must be able to evict storage in case another task jumps in
     * and caches a large block between the attempts. This is called once per attempt.
     */
    def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
      if (extraMemoryNeeded > 0) {
        // There is not enough free memory in the execution pool, so try to reclaim memory from
        // storage. We can reclaim any free memory from the storage pool. If the storage pool
        // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
        // the memory that storage has borrowed from execution.
        val memoryReclaimableFromStorage = math.max(
          storagePool.memoryFree,
          storagePool.poolSize - storageRegionSize)
        if (memoryReclaimableFromStorage > 0) {
          // Only reclaim as much space as is necessary and available:
          val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
            math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
          storagePool.decrementPoolSize(spaceToReclaim)
          executionPool.incrementPoolSize(spaceToReclaim)
        }
      }
    }

    /**
     * The size the execution pool would have after evicting storage memory.
     *
     * The execution memory pool divides this quantity among the active tasks evenly to cap
     * the execution memory allocation for each task. It is important to keep this greater
     * than the execution pool size, which doesn't take into account potential memory that
     * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
     *
     * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
     * in execution memory allocation across tasks, Otherwise, a task may occupy more than
     * its fair share of execution memory, mistakenly thinking that other tasks can acquire
     * the portion of storage memory that cannot be evicted.
     */
    def computeMaxExecutionPoolSize(): Long = {
      maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
    }

    executionPool.acquireMemory(
      numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
  }

这里先讲解这里面的两个方法:

maybeGrowExecutionPool就是需要向storage借内存的方法,能借用的最大内存memoryReclaimableFromStorage 为storage的空闲内存和storage向execution借用的内存(即已经使用也要释放来归还)的较大值,若memoryReclaimableFromStorage为0,则说明storage之前没有向execution借用内存,并且此时storage没有空闲的内存可借。
最终申请借用的是所需内存和memoryReclaimableFromStorage的较小值(缺多少借多少),跟进storagePool.freeSpaceToShrinkPool方法看看其实现:

def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
    val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
    val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
    if (remainingSpaceToFree > 0) {
      // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
      val spaceFreedByEviction =
        memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
      // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
      // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
      spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
    } else {
      spaceFreedByReleasingUnusedMemory
    }
  }

若storage空闲内存不足以所申请的内存,则需要通过释放storage中缓存的block来补充。

方法computeMaxExecutionPoolSize即计算的是execution拥有的最大可用内存。

接着通过这两个函数作为参数调用了方法executionPool.acquireMemory:

private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    // TODO: clean up this clunky method signature

    // Add this task to the taskMemory map just so we can keep an accurate count of the number
    // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      // This will later cause waiting tasks to wake up and check numTasks again
      lock.notifyAll()
    }

    // Keep looping until we're either sure that we don't want to grant this request (because this
    // task would have more than 1 / numActiveTasks of the memory) or we have enough free
    // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
    // TODO: simplify this to limit each task to its own slot
    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      // In every iteration of this loop, we should first try to reclaim any borrowed execution
      // space from storage. This is necessary because of the potential race condition where new
      // storage blocks may steal the free execution memory that this task was waiting for.
      maybeGrowPool(numBytes - memoryFree)

      // Maximum size the pool would have after potentially growing the pool.
      // This is used to compute the upper bound of how much memory each task can occupy. This
      // must take into account potential free memory as well as the amount this pool currently
      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
      // we did not take into account space that could have been freed by evicting cached blocks.
      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
      val toGrant = math.min(maxToGrant, memoryFree)

      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L  // Never reached
  }

里面定义了一个Task能使用的execution内存:

val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

其中maxPoolSize 为从 storage 借用了内存后,executionMemoryPool 的最大可用内存,保证一个Task可用的内存在 1/2*numActiveTasks ~ 1/numActiveTasks 范围内,整体保证各个Task资源占用平衡。

向execution申请内存代码流程:

  1. 先获取Task目前已经分配到的内存。
  2. 当numBytes大于execution空闲内存,则会通过maybeGrowPool方法向storage借内存。
  3. 能获取的最大内存maxToGrant为numBytes和(maxMemoryPerTask - curMem)的较小值。
  4. 本次循环能获取真正的内存toGrant为maxToGrant和(execution向memory借用后可用的内存)的较小值。
  5. 若最终能申请的内存小于numBytes且申请的内存加上原来有的内存还不足以一个Task最小的使用内存minMemoryPerTask,则会阻塞,直到有足够的内存或者有新的Task进来减小了minMemoryPerTask的值。
    否则直接返回本次分配到的内存。

对于向storage和execution申请内存以及相互借用内存的方式至此讲解完成。用到storage和execution内存的地方很多(看概述),其中缓存rdd会向storage申请内存,运行Task会向execution申请内存,接下来分别看看是在什么时候申请的。

缓存 RDD

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

每个rdd分区的数据都是通过对应的迭代器得到,其中若存储级别不为NONE,则会先尝试从储存介质中(内存、磁盘文件等)获取,第一次获取当然都没有,只有先计算完缓存起来以供后续的计算直接获取。缓存序列化和非序列化的数据的缓存方式不一样,非序列化的缓存的代码是:

memoryStore.putIteratorAsValues(blockId, iterator(), classTag)
 private[storage] def putIteratorAsValues[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

    // Number of elements unrolled so far
    var elementsUnrolled = 0
    // Whether there is still enough memory for us to continue unrolling this block
    var keepUnrolling = true
    // Initial per-task memory to request for unrolling blocks (bytes).
    val initialMemoryThreshold = unrollMemoryThreshold
    // How often to check whether we need to request more memory
    val memoryCheckPeriod = 16
    // Memory currently reserved by this task for this particular unrolling operation
    var memoryThreshold = initialMemoryThreshold
    // Memory to request as a multiple of current vector size
    val memoryGrowthFactor = 1.5
    // Keep track of unroll memory used by this particular block / putIterator() operation
    var unrollMemoryUsedByThisBlock = 0L
    // Underlying vector for unrolling the block
    var vector = new SizeTrackingVector[T]()(classTag)

    // Request enough memory to begin unrolling
    keepUnrolling =
      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)

    if (!keepUnrolling) {
      logWarning(s"Failed to reserve initial memory threshold of " +
        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    } else {
      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    }

    // Unroll this block safely, checking whether we have exceeded our threshold periodically
    while (values.hasNext && keepUnrolling) {
      vector += values.next()
      if (elementsUnrolled % memoryCheckPeriod == 0) {
        // If our vector's size has exceeded the threshold, request more memory
        val currentSize = vector.estimateSize()
        if (currentSize >= memoryThreshold) {
          val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
          keepUnrolling =
            reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
          if (keepUnrolling) {
            unrollMemoryUsedByThisBlock += amountToRequest
          }
          // New threshold is currentSize * memoryGrowthFactor
          memoryThreshold += amountToRequest
        }
      }
      elementsUnrolled += 1
    }

    if (keepUnrolling) {
      // We successfully unrolled the entirety of this block
      val arrayValues = vector.toArray
      vector = null
      val entry =
        new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
      val size = entry.size
      def transferUnrollToStorage(amount: Long): Unit = {
        // Synchronize so that transfer is atomic
        memoryManager.synchronized {
          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
          val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
          assert(success, "transferring unroll memory to storage memory failed")
        }
      }
      // Acquire storage memory if necessary to store this block in memory.
      val enoughStorageMemory = {
        if (unrollMemoryUsedByThisBlock <= size) {
          val acquiredExtra =
            memoryManager.acquireStorageMemory(
              blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
          if (acquiredExtra) {
            transferUnrollToStorage(unrollMemoryUsedByThisBlock)
          }
          acquiredExtra
        } else { // unrollMemoryUsedByThisBlock > size
          // If this task attempt already owns more unroll memory than is necessary to store the
          // block, then release the extra memory that will not be used.
          val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
          transferUnrollToStorage(size)
          true
        }
      }
      if (enoughStorageMemory) {
        entries.synchronized {
          entries.put(blockId, entry)
        }
        logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
          blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
        Right(size)
      } else {
        assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
          "released too much unroll memory")
        Left(new PartiallyUnrolledIterator(
          this,
          MemoryMode.ON_HEAP,
          unrollMemoryUsedByThisBlock,
          unrolled = arrayValues.toIterator,
          rest = Iterator.empty))
      }
    } else {
      // We ran out of space while unrolling the values for this block
      logUnrollFailureMessage(blockId, vector.estimateSize())
      Left(new PartiallyUnrolledIterator(
        this,
        MemoryMode.ON_HEAP,
        unrollMemoryUsedByThisBlock,
        unrolled = vector.iterator,
        rest = values))
    }
  }

代码太长了,我自己看到都头大了,没事,咱一点一点的慢慢来~

参数中的blockId是一个block的唯一标示,格式是"rdd_" + rddId + "_" + splitIndex,value就是该partition对应数据的迭代器,

  1. 通过reserveUnrollMemoryForThisTask方法向Storage申请initialMemoryThreshold(初始值可通过spark.storage.unrollMemoryThreshold配置,默认1M)的内存来unroll 迭代器:
    def reserveUnrollMemoryForThisTask(
      blockId: BlockId,
      memory: Long,
      memoryMode: MemoryMode): Boolean = {
    memoryManager.synchronized {
      val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
      if (success) {
        val taskAttemptId = currentTaskAttemptId()
        val unrollMemoryMap = memoryMode match {
          case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
          case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
        }
        unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
      }
      success
    }
    }
    
    跟进acquireUnrollMemory可看见底层调用的就是前面所讲的向storage申请内存的方法acquireStorageMemory,若申请成功则将对应的onHeapUnrollMemoryMap加上申请到的内存,即unroll使用的内存。
  2. 若申请成功则跟新unrollMemoryUsedByThisBlock的值,即在该block上unroll使用的内存。
  3. 接着进行遍历,停止遍历的条件有两个,一是迭代器全部遍历完,二是没有申请到内存。
    • 每迭代一条数据都会加到SizeTrackingVector类型的vector中(底层由数组实现),每迭代16次都会估算vector的大小是否超过了memoryThreshold(申请的内存)。
    • 若超过了memoryThreshold,则会计算再次申请内存的大小,1.5倍当前vector大小-已经申请到的内存大小。
    • 再次向Storage申请内存,若申请成功,则跟新unrollMemoryUsedByThisBlock,继续遍历进入下次循环,否则停止遍历。
  4. 循环结束后,若keepUnrolling 为 true,则说明values 一定被全部展开了;若为false,则没有全部被展开,说明没有申请到足够的内存来展开这个values,意味着该partition缓存到内存失败。
  5. 在values全部成功展开的前提下,会将vector构造成一个DeserializedMemoryEntry对象,其中包括数据的大小,接着会将展开后的数据大小和申请的内存大小作比较:
    • 若申请的内存比数据小,则再次向storage申请对应的大小,申请成功则将unroll使用的内存转化到storage中去,转化对应的逻辑是:释放掉该Task占用的所有unroll内存,又向storage申请对应的内存,其实unroll内存就是storage内存,即操作的都是storage的内存,减去某值又加上某值,结果没有变,但流程还得这么走,因为为了将 MemoryStore 和 MemoryManager 的解耦。
    • 若申请的内存比数据大,则释放掉对应的unroll内存,接着将unroll使用的内存转化到storage中去。
    • 最后将blockId和对应的entry加入到memorySore所管理的entries中去。

缓存序列化rdd支持 ON_HEAP 和 OFF_HEAP,和缓存非序列化rdd的方式类似,只是以流的形式写到bytebuffer中,其中
MemoryMode 如果是 ON_HEAP,这里的 ByteBuffer 是 HeapByteBuffer(堆上内存);而如果是 OFF_HEAP,这里的 ByteBuffer 则是 DirectByteBuffer(指向的是堆外内存)。最后根据数据构建成SerializedMemoryEntry来保存在memoryStore的entries中。

shuffle中execution内存的使用

在shuffle write的时候,并不会直接将数据写到磁盘(详情请看Shuffle Write解析),而是先写到一个集合中,此集合占用的内存就是execution内存,初始给的大小是5M,可通过spark.shuffle.spill.initialMemoryThreshold进行设置,每写一次数据就判断是否需要溢写到磁盘,溢写之前还尝试会向execution申请来避免溢写,代码如下:

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }

当insert&update的次数是32的倍数且当前集合的大小已经大于等于了已经申请到的内存,此时会尝试向execution申请更多的内存来避免spill,申请的大小为2倍当前集合大小减去已经申请到的内存大小,跟进acquireMemory方法:

 public long acquireMemory(long size) {
    long granted = taskMemoryManager.acquireExecutionMemory(size, this);
    used += granted;
    return granted;
  }

这不就是我们前面讲的向execution申请内存的方法吗,这里就不再叙述。

参考

http://www.jianshu.com/p/999ef21dffe8

@teeyog teeyog added spark and removed spark labels Apr 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant