From 9a4b82b4f27271d5ea569dda601cf4ec6c17440e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 30 Mar 2015 23:02:35 +0800 Subject: [PATCH] Adds javadoc and addresses @aarondav's comments --- .../spark/mapred/SparkHadoopMapRedUtil.scala | 39 ++++++++++++------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 40b4f68dd4d49..ba52d841d01ca 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -73,18 +73,18 @@ trait SparkHadoopMapRedUtil { } object SparkHadoopMapRedUtil extends Logging { - def commitTask( - committer: MapReduceOutputCommitter, - mrTaskContext: MapReduceTaskAttemptContext, - sparkTaskContext: TaskContext): Unit = { - commitTask( - committer, - mrTaskContext, - sparkTaskContext.stageId(), - sparkTaskContext.partitionId(), - sparkTaskContext.attemptNumber()) - } - + /** + * Commits a task output. Before committing the task output, we need to know whether some other + * task attempt might be racing to commit the same output partition. Therefore, coordinate with + * the driver in order to determine whether this attempt can commit (please see SPARK-4879 for + * details). + * + * Commit output coordinator is only contacted when the following two configurations are both set + * to `true`: + * + * - `spark.speculation` + * - `spark.hadoop.outputCommitCoordination.enabled` + */ def commitTask( committer: MapReduceOutputCommitter, mrTaskContext: MapReduceTaskAttemptContext, @@ -109,9 +109,6 @@ object SparkHadoopMapRedUtil extends Logging { // First, check whether the task's output has already been committed by some other attempt if (committer.needsTaskCommit(mrTaskContext)) { - // The task output needs to be committed, but we don't know whether some other task attempt - // might be racing to commit the same output partition. Therefore, coordinate with the driver - // in order to determine whether this attempt can commit (see SPARK-4879). val shouldCoordinateWithDriver: Boolean = { val sparkConf = SparkEnv.get.conf // We only need to coordinate with the driver if there are multiple concurrent task @@ -144,4 +141,16 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID") } } + + def commitTask( + committer: MapReduceOutputCommitter, + mrTaskContext: MapReduceTaskAttemptContext, + sparkTaskContext: TaskContext): Unit = { + commitTask( + committer, + mrTaskContext, + sparkTaskContext.stageId(), + sparkTaskContext.partitionId(), + sparkTaskContext.attemptNumber()) + } }