Skip to content

Commit

Permalink
[SPARK-48292][CORE] Revert [SPARK-39195][SQL] Spark OutputCommitCoord…
Browse files Browse the repository at this point in the history
…inator should abort stage when committed file not consistent with task status

### What changes were proposed in this pull request?
Revert apache#36564 According to discuss apache#36564 (comment)

When spark commit task will commit to committedTaskPath
`${outputpath}/_temporary//${appAttempId}/${taskId}`
So in apache#36564 's case, since before apache#38980, each task's job id's date is not the same,  when the task writes data success but fails to send back TaskSuccess RPC, the task rerun will commit to a different committedTaskPath then causing data duplicated.

After apache#38980, for the same task's different attempts, the TaskId is the same now, when re-run task commit, will commit to the same committedTaskPath, and hadoop CommitProtocol will handle such case then data won't be duplicated.

Note: The taskAttemptPath is not same since in the path contains the taskAttemptId.

### Why are the changes needed?
No need anymore

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46696 from AngersZhuuuu/SPARK-48292.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
AngersZhuuuu authored and riyaverm-db committed Jun 7, 2024
1 parent 0eebb2c commit b7efcf3
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 88 deletions.
7 changes: 1 addition & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,7 @@ class SparkContext(config: SparkConf) extends Logging {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(
conf,
isLocal,
listenerBus,
SparkContext.numDriverCores(master, conf),
this)
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}

private[spark] def env: SparkEnv = _env
Expand Down
12 changes: 1 addition & 11 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ object SparkEnv extends Logging {
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
sparkContext: SparkContext,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
Expand All @@ -281,7 +280,6 @@ object SparkEnv extends Logging {
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
Option(sparkContext),
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
Expand Down Expand Up @@ -317,7 +315,6 @@ object SparkEnv extends Logging {
/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
// scalastyle:off argcount
private def create(
conf: SparkConf,
executorId: String,
Expand All @@ -328,9 +325,7 @@ object SparkEnv extends Logging {
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
sc: Option[SparkContext] = None,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
// scalastyle:on argcount

val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

Expand Down Expand Up @@ -473,12 +468,7 @@ object SparkEnv extends Logging {
}

val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
if (isDriver) {
new OutputCommitCoordinator(conf, isDriver, sc)
} else {
new OutputCommitCoordinator(conf, isDriver)
}

new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ private case class AskPermissionToCommitOutput(
* This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
* for an extensive design discussion.
*/
private[spark] class OutputCommitCoordinator(
conf: SparkConf,
isDriver: Boolean,
sc: Option[SparkContext] = None) extends Logging {
private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {

// Initialized by SparkEnv
var coordinatorRef: Option[RpcEndpointRef] = None
Expand Down Expand Up @@ -160,10 +157,9 @@ private[spark] class OutputCommitCoordinator(
val taskId = TaskIdentifier(stageAttempt, attemptNumber)
stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
if (stageState.authorizedCommitters(partition) == taskId) {
sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " +
s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " +
s"but task commit success, data duplication may happen. " +
s"reason=$reason"))
logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
s"partition=$partition) failed; clearing lock")
stageState.authorizedCommitters(partition) = null
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.apache.spark.scheduler

import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.time.{Seconds, Span}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, TaskContext}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext}

/**
* Integration tests for the OutputCommitCoordinator.
Expand All @@ -44,15 +45,13 @@ class OutputCommitCoordinatorIntegrationSuite
sc = new SparkContext("local[2, 4]", "test", conf)
}

test("SPARK-39195: exception thrown in OutputCommitter.commitTask()") {
test("exception thrown in OutputCommitter.commitTask()") {
// Regression test for SPARK-10381
val e = intercept[SparkException] {
failAfter(Span(60, Seconds)) {
withTempDir { tempDir =>
sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
}
}.getCause.getMessage
assert(e.contains("failed; but task commit success, data duplication may happen.") &&
e.contains("Intentional exception"))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,11 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
outputCommitCoordinator =
spy[OutputCommitCoordinator](
new OutputCommitCoordinator(conf, isDriver = true, Option(this)))
spy[OutputCommitCoordinator](new OutputCommitCoordinator(conf, isDriver = true))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator))
SparkContext.numDriverCores(master), Some(outputCommitCoordinator))
}
}
// Use Mockito.spy() to maintain the default infrastructure everywhere else
Expand Down Expand Up @@ -190,9 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
// The authorized committer now fails, clearing the lock
outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
// A new task should not be allowed to become stage failed because of potential data duplication
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
// A new task should now be allowed to become the authorized committer
assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
nonAuthorizedCommitter + 2))
// There can only be one authorized committer
assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
nonAuthorizedCommitter + 3))
}

test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
Expand Down Expand Up @@ -226,8 +228,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))

// Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
// then fail the 1st attempt and since stage failed because of potential data duplication,
// make sure fail the 4th attempt.
// then fail the 1st attempt and make sure the 4th one can commit again.
stage += 1
outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
Expand All @@ -236,9 +237,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
ExecutorLostFailure("0", exitCausedByApp = true, None))
// A new task should not be allowed to become the authorized committer since stage failed
// because of potential data duplication
assert(!outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
}

test("SPARK-24589: Make sure stage state is cleaned up") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, TestUtils}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
Expand Down Expand Up @@ -1206,6 +1206,43 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
errorMessage.contains("is not a valid DFS filename"))
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)

// Before fixing SPARK-7837, the following code results in an NPE because both
// `commitTask()` and `abortTask()` try to close output writers.

withTempPath { dir =>
val m1 = intercept[SparkException] {
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
}

withTempPath { dir =>
val m2 = intercept[SparkException] {
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
.coalesce(1)
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
}
if (m2.getErrorClass != null) {
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
} else {
assert(m2.getMessage.contains("TASK_WRITE_FAILED"))
}
}
}
}

test("SPARK-11044 Parquet writer version fixed as version1 ") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
Expand Down Expand Up @@ -1550,52 +1587,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

// Parquet IO test suite with output commit coordination disabled.
// This test suite is separated ParquetIOSuite to avoid race condition of failure events
// from `OutputCommitCoordination` and `TaskSetManager`.
class ParquetIOWithoutOutputCommitCoordinationSuite
extends QueryTest with ParquetTest with SharedSparkSession {
import testImplicits._

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.hadoop.outputCommitCoordination.enabled", "false")
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)

// Before fixing SPARK-7837, the following code results in an NPE because both
// `commitTask()` and `abortTask()` try to close output writers.

withTempPath { dir =>
val m1 = intercept[SparkException] {
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
}

withTempPath { dir =>
val m2 = intercept[SparkException] {
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
.coalesce(1)
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
}
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

Expand Down

0 comments on commit b7efcf3

Please sign in to comment.