diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 215e53c020849..fb6173f58ece6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation( val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match { + val pathExists = fs.exists(qualifiedOutputPath) + val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => sys.error(s"path $qualifiedOutputPath already exists.") case (SaveMode.Overwrite, true) => @@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation( case (SaveMode.Ignore, exists) => !exists } + // If we are appending data to an existing dir. + val isAppend = (pathExists) && (mode == SaveMode.Append) if (doInsertion) { val job = new Job(hadoopConf) @@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation( val partitionColumns = relation.partitionColumns.fieldNames if (partitionColumns.isEmpty) { - insert(new DefaultWriterContainer(relation, job), df) + insert(new DefaultWriterContainer(relation, job, isAppend), df) } else { val writerContainer = new DynamicPartitionWriterContainer( - relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME) + relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend) insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns) } } @@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation( private[sql] abstract class BaseWriterContainer( @transient val relation: HadoopFsRelation, - @transient job: Job) + @transient job: Job, + isAppend: Boolean) extends SparkHadoopMapReduceUtil with Logging with Serializable { @@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer( } private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { - val committerClass = context.getConfiguration.getClass( - SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) - - Option(committerClass).map { clazz => - logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") - - // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat - // has an associated output committer. To override this output committer, - // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. - // If a data source needs to override the output committer, it needs to set the - // output committer in prepareForWrite method. - if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) { - // The specified output committer is a FileOutputCommitter. - // So, we will use the FileOutputCommitter-specified constructor. - val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - ctor.newInstance(new Path(outputPath), context) - } else { - // The specified output committer is just a OutputCommitter. - // So, we will use the no-argument constructor. - val ctor = clazz.getDeclaredConstructor() - ctor.newInstance() + val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) + + if (isAppend) { + // If we are appending data to an existing dir, we will only use the output committer + // associated with the file output format since it is not safe to use a custom + // committer for appending. For example, in S3, direct parquet output committer may + // leave partial data in the destination dir when the the appending job fails. + logInfo( + s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " + + "for appending.") + defaultOutputCommitter + } else { + val committerClass = context.getConfiguration.getClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) + + Option(committerClass).map { clazz => + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat + // has an associated output committer. To override this output committer, + // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. + // If a data source needs to override the output committer, it needs to set the + // output committer in prepareForWrite method. + if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) { + // The specified output committer is a FileOutputCommitter. + // So, we will use the FileOutputCommitter-specified constructor. + val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + ctor.newInstance(new Path(outputPath), context) + } else { + // The specified output committer is just a OutputCommitter. + // So, we will use the no-argument constructor. + val ctor = clazz.getDeclaredConstructor() + ctor.newInstance() + } + }.getOrElse { + // If output committer class is not set, we will use the one associated with the + // file output format. + logInfo( + s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}") + defaultOutputCommitter } - }.getOrElse { - // If output committer class is not set, we will use the one associated with the - // file output format. - val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) - logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}") - outputCommitter } } @@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer( private[sql] class DefaultWriterContainer( @transient relation: HadoopFsRelation, - @transient job: Job) - extends BaseWriterContainer(relation, job) { + @transient job: Job, + isAppend: Boolean) + extends BaseWriterContainer(relation, job, isAppend) { @transient private var writer: OutputWriter = _ @@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer( @transient relation: HadoopFsRelation, @transient job: Job, partitionColumns: Array[String], - defaultPartitionName: String) - extends BaseWriterContainer(relation, job) { + defaultPartitionName: String, + isAppend: Boolean) + extends BaseWriterContainer(relation, job, isAppend) { // All output writers are created on executor side. @transient protected var outputWriters: mutable.Map[String, OutputWriter] = _ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index e0d8277a8ed3f..a16ab3a00ddb8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -17,10 +17,16 @@ package org.apache.spark.sql.sources +import scala.collection.JavaConversions._ + import java.io.File import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter +import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil @@ -476,7 +482,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { // more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this // requirement. We probably want to move this test case to spark-integration-tests or spark-perf // later. - test("SPARK-8406: Avoids name collision while writing Parquet files") { + test("SPARK-8406: Avoids name collision while writing files") { withTempPath { dir => val path = dir.getCanonicalPath sqlContext @@ -497,6 +503,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { } } } + + test("SPARK-8578 specified custom output committer will not be used to append data") { + val clonedConf = new Configuration(configuration) + try { + val df = sqlContext.range(1, 10).toDF("i") + withTempPath { dir => + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + configuration.set( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[AlwaysFailOutputCommitter].getName) + // Since Parquet has its own output committer setting, also set it + // to AlwaysFailParquetOutputCommitter at here. + configuration.set("spark.sql.parquet.output.committer.class", + classOf[AlwaysFailParquetOutputCommitter].getName) + // Because there data already exists, + // this append should succeed because we will use the output committer associated + // with file format and AlwaysFailOutputCommitter will not be used. + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + checkAnswer( + sqlContext.read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .load(dir.getCanonicalPath), + df.unionAll(df)) + + // This will fail because AlwaysFailOutputCommitter is used when we do append. + intercept[Exception] { + df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath) + } + } + withTempPath { dir => + configuration.set( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[AlwaysFailOutputCommitter].getName) + // Since Parquet has its own output committer setting, also set it + // to AlwaysFailParquetOutputCommitter at here. + configuration.set("spark.sql.parquet.output.committer.class", + classOf[AlwaysFailParquetOutputCommitter].getName) + // Because there is no existing data, + // this append will fail because AlwaysFailOutputCommitter is used when we do append + // and there is no existing data. + intercept[Exception] { + df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath) + } + } + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + } + } +} + +// This class is used to test SPARK-8578. We should not use any custom output committer when +// we actually append data to an existing dir. +class AlwaysFailOutputCommitter( + outputPath: Path, + context: TaskAttemptContext) + extends FileOutputCommitter(outputPath, context) { + + override def commitJob(context: JobContext): Unit = { + sys.error("Intentional job commitment failure for testing purpose.") + } +} + +// This class is used to test SPARK-8578. We should not use any custom output committer when +// we actually append data to an existing dir. +class AlwaysFailParquetOutputCommitter( + outputPath: Path, + context: TaskAttemptContext) + extends ParquetOutputCommitter(outputPath, context) { + + override def commitJob(context: JobContext): Unit = { + sys.error("Intentional job commitment failure for testing purpose.") + } } class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {