Skip to content

Commit

Permalink
Add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jul 17, 2015
1 parent bd7dcf1 commit c24bc5b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 3 deletions.
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import org.apache.spark.util.Utils
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
* thread safe so that they can be reported correctly.
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
Expand Down Expand Up @@ -70,8 +73,9 @@ class Accumulable[R, T] private[spark] (
Accumulators.register(this)

/**
* Internal accumulators will be reported via heartbeats. For internal accumulators, `R` must be
* thread safe so that they can be reported correctly.
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
* via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be
* reported correctly.
*/
private[spark] def isInternal: Boolean = internal

Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,21 @@ abstract class TaskContext extends Serializable {
*/
private[spark] def taskMemoryManager(): TaskMemoryManager

/**
* Register an accumulator that belongs to this task. Accumulators must call this method when
* deserializing in executors.
*/
private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit

/**
* Return the local values of internal accumulators that belong to this task. The key of the Map
* is the accumulator id and the value of the Map is the latest accumulator local value.
*/
private[spark] def collectInternalAccumulators(): Map[Long, Any]

/**
* Return the local values of accumulators that belong to this task. The key of the Map is the
* accumulator id and the value of the Map is the latest accumulator local value.
*/
private[spark] def collectAccumulators(): Map[Long, Any]
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ class TaskMetrics extends Serializable {
_accumulatorUpdates = _accumulatorsUpdater()
}

/**
* Return the latest updates of accumulators in this task.
*/
def accumulatorUpdates(): Map[Long, Any] = _accumulatorUpdates

private[spark] def setAccumulatorsUpdater(accumulatorsUpdater: () => Map[Long, Any]): Unit = {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ import org.apache.spark.util.Utils
*/
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {

/**
* The key of the Map is the accumulator id and the value of the Map is the latest accumulator
* local value.
*/
type AccumulatorUpdates = Map[Long, Any]

/**
* Called by [[Executor]] to run this task.
*
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @return the result of the task
* @return the result of the task along with updates of Accumulators.
*/
final def run(taskAttemptId: Long, attemptNumber: Int): (T, AccumulatorUpdates) = {
context = new TaskContextImpl(
Expand Down

0 comments on commit c24bc5b

Please sign in to comment.