Skip to content

Commit

Permalink
fix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jul 14, 2015
1 parent 109900e commit 906d626
Showing 1 changed file with 2 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ class DAGScheduler(
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.attemptId, taskBinary, part, locs)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs)
}

case stage: ResultStage =>
Expand All @@ -922,7 +922,7 @@ class DAGScheduler(
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
new ResultTask(stage.id, stage.attemptId, taskBinary, part, locs, id)
new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id)
}
}
} catch {
Expand Down Expand Up @@ -1128,8 +1128,6 @@ class DAGScheduler(
val failedStage = stageIdToStage(task.stageId)
val mapStage = shuffleToMapStage(shuffleId)

// failedStage.attemptId is already on the next attempt, so we have to use
// failedStage.latestInfo.attemptId
if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
s" ${task.stageAttemptId}, which has already failed")
Expand Down

0 comments on commit 906d626

Please sign in to comment.