diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 531581dbd5909..c6029675eab0e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1126,7 +1126,10 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) => val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleToMapStage(shuffleId) - if (failedStage.attemptId - 1 > task.stageAttemptId) { + + // failedStage.attemptId is already on the next attempt, so we have to use + // failedStage.latestInfo.attemptId + if (failedStage.latestInfo.attemptId != task.stageAttemptId) { logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" + s" ${task.stageAttemptId}, which has already failed") } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index c59d6e4f5bc04..14ab2b86e1b77 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -77,7 +77,13 @@ private[spark] abstract class Stage( id } - def attemptId: Int = nextAttemptId + /** + * The id for the **next** stage attempt. + * + * The unusual meaning of this method means its unlikely to hold the value you are interested in + * -- you probably want to use [[latestInfo.attemptId]] + */ + private[spark] def attemptId: Int = nextAttemptId override final def hashCode(): Int = id override final def equals(other: Any): Boolean = other match {