diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index d7a90d0e350d3..2f4fcac890eef 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -40,6 +40,9 @@ import org.apache.spark.util.Utils * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `R` and `T` * @param name human-readable name for use in Spark's web UI + * @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported + * to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be + * thread safe so that they can be reported correctly. * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ @@ -70,8 +73,9 @@ class Accumulable[R, T] private[spark] ( Accumulators.register(this) /** - * Internal accumulators will be reported via heartbeats. For internal accumulators, `R` must be - * thread safe so that they can be reported correctly. + * If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver + * via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be + * reported correctly. */ private[spark] def isInternal: Boolean = internal diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 11f19489fa725..345bb500a7dec 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -153,9 +153,21 @@ abstract class TaskContext extends Serializable { */ private[spark] def taskMemoryManager(): TaskMemoryManager + /** + * Register an accumulator that belongs to this task. Accumulators must call this method when + * deserializing in executors. + */ private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit + /** + * Return the local values of internal accumulators that belong to this task. The key of the Map + * is the accumulator id and the value of the Map is the latest accumulator local value. + */ private[spark] def collectInternalAccumulators(): Map[Long, Any] + /** + * Return the local values of accumulators that belong to this task. The key of the Map is the + * accumulator id and the value of the Map is the latest accumulator local value. + */ private[spark] def collectAccumulators(): Map[Long, Any] } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 93d44492c0e3f..42207a9553592 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -231,6 +231,9 @@ class TaskMetrics extends Serializable { _accumulatorUpdates = _accumulatorsUpdater() } + /** + * Return the latest updates of accumulators in this task. + */ def accumulatorUpdates(): Map[Long, Any] = _accumulatorUpdates private[spark] def setAccumulatorsUpdater(accumulatorsUpdater: () => Map[Long, Any]): Unit = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 12bc4ee1e17bc..6a86f9d4b8530 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -45,6 +45,10 @@ import org.apache.spark.util.Utils */ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable { + /** + * The key of the Map is the accumulator id and the value of the Map is the latest accumulator + * local value. + */ type AccumulatorUpdates = Map[Long, Any] /** @@ -52,7 +56,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex * * @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext. * @param attemptNumber how many times this task has been attempted (0 for the first attempt) - * @return the result of the task + * @return the result of the task along with updates of Accumulators. */ final def run(taskAttemptId: Long, attemptNumber: Int): (T, AccumulatorUpdates) = { context = new TaskContextImpl(