Skip to content

Commit

Permalink
Adds javadoc and addresses @aarondav's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Mar 30, 2015
1 parent dfdf3ef commit 9a4b82b
Showing 1 changed file with 24 additions and 15 deletions.
Expand Up @@ -73,18 +73,18 @@ trait SparkHadoopMapRedUtil {
}

object SparkHadoopMapRedUtil extends Logging {
def commitTask(
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
sparkTaskContext: TaskContext): Unit = {
commitTask(
committer,
mrTaskContext,
sparkTaskContext.stageId(),
sparkTaskContext.partitionId(),
sparkTaskContext.attemptNumber())
}

/**
* Commits a task output. Before committing the task output, we need to know whether some other
* task attempt might be racing to commit the same output partition. Therefore, coordinate with
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
* details).
*
* Commit output coordinator is only contacted when the following two configurations are both set
* to `true`:
*
* - `spark.speculation`
* - `spark.hadoop.outputCommitCoordination.enabled`
*/
def commitTask(
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
Expand All @@ -109,9 +109,6 @@ object SparkHadoopMapRedUtil extends Logging {

// First, check whether the task's output has already been committed by some other attempt
if (committer.needsTaskCommit(mrTaskContext)) {
// The task output needs to be committed, but we don't know whether some other task attempt
// might be racing to commit the same output partition. Therefore, coordinate with the driver
// in order to determine whether this attempt can commit (see SPARK-4879).
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
// We only need to coordinate with the driver if there are multiple concurrent task
Expand Down Expand Up @@ -144,4 +141,16 @@ object SparkHadoopMapRedUtil extends Logging {
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
}
}

def commitTask(
committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
sparkTaskContext: TaskContext): Unit = {
commitTask(
committer,
mrTaskContext,
sparkTaskContext.stageId(),
sparkTaskContext.partitionId(),
sparkTaskContext.attemptNumber())
}
}

0 comments on commit 9a4b82b

Please sign in to comment.