Skip to content

Commit

Permalink
[SPARK-8578] [SQL] Should ignore user defined output committer when a…
Browse files Browse the repository at this point in the history
…ppending data

https://issues.apache.org/jira/browse/SPARK-8578

It is not very safe to use a custom output committer when append data to an existing dir. This changes adds the logic to check if we are appending data, and if so, we use the output committer associated with the file output format.

Author: Yin Huai <yhuai@databricks.com>

Closes apache#6964 from yhuai/SPARK-8578 and squashes the following commits:

43544c4 [Yin Huai] Do not use a custom output commiter when appendiing data.
  • Loading branch information
yhuai committed Jun 24, 2015
1 parent 9d36ec2 commit bba6699
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 36 deletions.
89 changes: 54 additions & 35 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)

val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
val pathExists = fs.exists(qualifiedOutputPath)
val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
Expand All @@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
case (SaveMode.Ignore, exists) =>
!exists
}
// If we are appending data to an existing dir.
val isAppend = (pathExists) && (mode == SaveMode.Append)

if (doInsertion) {
val job = new Job(hadoopConf)
Expand All @@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation(

val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
insert(new DefaultWriterContainer(relation, job), df)
insert(new DefaultWriterContainer(relation, job, isAppend), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
Expand Down Expand Up @@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation(

private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
@transient job: Job)
@transient job: Job,
isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
with Serializable {
Expand Down Expand Up @@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer(
}

private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
val committerClass = context.getConfiguration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])

Option(committerClass).map { clazz =>
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(outputPath), context)
} else {
// The specified output committer is just a OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)

if (isAppend) {
// If we are appending data to an existing dir, we will only use the output committer
// associated with the file output format since it is not safe to use a custom
// committer for appending. For example, in S3, direct parquet output committer may
// leave partial data in the destination dir when the the appending job fails.
logInfo(
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
} else {
val committerClass = context.getConfiguration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])

Option(committerClass).map { clazz =>
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(outputPath), context)
} else {
// The specified output committer is just a OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
}
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
logInfo(
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
defaultOutputCommitter
}
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
outputCommitter
}
}

Expand Down Expand Up @@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer(

private[sql] class DefaultWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job)
extends BaseWriterContainer(relation, job) {
@transient job: Job,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {

@transient private var writer: OutputWriter = _

Expand Down Expand Up @@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
defaultPartitionName: String)
extends BaseWriterContainer(relation, job) {
defaultPartitionName: String,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {

// All output writers are created on executor side.
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

package org.apache.spark.sql.sources

import scala.collection.JavaConversions._

import java.io.File

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.parquet.hadoop.ParquetOutputCommitter

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -476,7 +482,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
// later.
test("SPARK-8406: Avoids name collision while writing Parquet files") {
test("SPARK-8406: Avoids name collision while writing files") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext
Expand All @@ -497,6 +503,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-8578 specified custom output committer will not be used to append data") {
val clonedConf = new Configuration(configuration)
try {
val df = sqlContext.range(1, 10).toDF("i")
withTempPath { dir =>
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
configuration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)
// Since Parquet has its own output committer setting, also set it
// to AlwaysFailParquetOutputCommitter at here.
configuration.set("spark.sql.parquet.output.committer.class",
classOf[AlwaysFailParquetOutputCommitter].getName)
// Because there data already exists,
// this append should succeed because we will use the output committer associated
// with file format and AlwaysFailOutputCommitter will not be used.
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
checkAnswer(
sqlContext.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.load(dir.getCanonicalPath),
df.unionAll(df))

// This will fail because AlwaysFailOutputCommitter is used when we do append.
intercept[Exception] {
df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath)
}
}
withTempPath { dir =>
configuration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)
// Since Parquet has its own output committer setting, also set it
// to AlwaysFailParquetOutputCommitter at here.
configuration.set("spark.sql.parquet.output.committer.class",
classOf[AlwaysFailParquetOutputCommitter].getName)
// Because there is no existing data,
// this append will fail because AlwaysFailOutputCommitter is used when we do append
// and there is no existing data.
intercept[Exception] {
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
}
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
}
}

// This class is used to test SPARK-8578. We should not use any custom output committer when
// we actually append data to an existing dir.
class AlwaysFailOutputCommitter(
outputPath: Path,
context: TaskAttemptContext)
extends FileOutputCommitter(outputPath, context) {

override def commitJob(context: JobContext): Unit = {
sys.error("Intentional job commitment failure for testing purpose.")
}
}

// This class is used to test SPARK-8578. We should not use any custom output committer when
// we actually append data to an existing dir.
class AlwaysFailParquetOutputCommitter(
outputPath: Path,
context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

override def commitJob(context: JobContext): Unit = {
sys.error("Intentional job commitment failure for testing purpose.")
}
}

class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
Expand Down

0 comments on commit bba6699

Please sign in to comment.