Skip to content

Commit

Permalink
Do not use a custom output commiter when appendiing data.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Jun 24, 2015
1 parent 0f92be5 commit 43544c4
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 36 deletions.
89 changes: 54 additions & 35 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)

val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
val pathExists = fs.exists(qualifiedOutputPath)
val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
Expand All @@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
case (SaveMode.Ignore, exists) =>
!exists
}
// If we are appending data to an existing dir.
val isAppend = (pathExists) && (mode == SaveMode.Append)

if (doInsertion) {
val job = new Job(hadoopConf)
Expand All @@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation(

val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
insert(new DefaultWriterContainer(relation, job), df)
insert(new DefaultWriterContainer(relation, job, isAppend), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
Expand Down Expand Up @@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation(

private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
@transient job: Job)
@transient job: Job,
isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
with Serializable {
Expand Down Expand Up @@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer(
}

private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
val committerClass = context.getConfiguration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])

Option(committerClass).map { clazz =>
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(outputPath), context)
} else {
// The specified output committer is just a OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)

if (isAppend) {
// If we are appending data to an existing dir, we will only use the output committer
// associated with the file output format since it is not safe to use a custom
// committer for appending. For example, in S3, direct parquet output committer may
// leave partial data in the destination dir when the the appending job fails.
logInfo(
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
} else {
val committerClass = context.getConfiguration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])

Option(committerClass).map { clazz =>
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
// has an associated output committer. To override this output committer,
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
// If a data source needs to override the output committer, it needs to set the
// output committer in prepareForWrite method.
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
// The specified output committer is a FileOutputCommitter.
// So, we will use the FileOutputCommitter-specified constructor.
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
ctor.newInstance(new Path(outputPath), context)
} else {
// The specified output committer is just a OutputCommitter.
// So, we will use the no-argument constructor.
val ctor = clazz.getDeclaredConstructor()
ctor.newInstance()
}
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
logInfo(
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
defaultOutputCommitter
}
}.getOrElse {
// If output committer class is not set, we will use the one associated with the
// file output format.
val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
outputCommitter
}
}

Expand Down Expand Up @@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer(

private[sql] class DefaultWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job)
extends BaseWriterContainer(relation, job) {
@transient job: Job,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {

@transient private var writer: OutputWriter = _

Expand Down Expand Up @@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
defaultPartitionName: String)
extends BaseWriterContainer(relation, job) {
defaultPartitionName: String,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {

// All output writers are created on executor side.
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

package org.apache.spark.sql.sources

import scala.collection.JavaConversions._

import java.io.File

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.parquet.hadoop.ParquetOutputCommitter

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -476,7 +482,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
// later.
test("SPARK-8406: Avoids name collision while writing Parquet files") {
test("SPARK-8406: Avoids name collision while writing files") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext
Expand All @@ -497,6 +503,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-8578 specified custom output committer will not be used to append data") {
val clonedConf = new Configuration(configuration)
try {
val df = sqlContext.range(1, 10).toDF("i")
withTempPath { dir =>
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
configuration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)
// Since Parquet has its own output committer setting, also set it
// to AlwaysFailParquetOutputCommitter at here.
configuration.set("spark.sql.parquet.output.committer.class",
classOf[AlwaysFailParquetOutputCommitter].getName)
// Because there data already exists,
// this append should succeed because we will use the output committer associated
// with file format and AlwaysFailOutputCommitter will not be used.
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
checkAnswer(
sqlContext.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.load(dir.getCanonicalPath),
df.unionAll(df))

// This will fail because AlwaysFailOutputCommitter is used when we do append.
intercept[Exception] {
df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath)
}
}
withTempPath { dir =>
configuration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)
// Since Parquet has its own output committer setting, also set it
// to AlwaysFailParquetOutputCommitter at here.
configuration.set("spark.sql.parquet.output.committer.class",
classOf[AlwaysFailParquetOutputCommitter].getName)
// Because there is no existing data,
// this append will fail because AlwaysFailOutputCommitter is used when we do append
// and there is no existing data.
intercept[Exception] {
df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
}
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
}
}

// This class is used to test SPARK-8578. We should not use any custom output committer when
// we actually append data to an existing dir.
class AlwaysFailOutputCommitter(
outputPath: Path,
context: TaskAttemptContext)
extends FileOutputCommitter(outputPath, context) {

override def commitJob(context: JobContext): Unit = {
sys.error("Intentional job commitment failure for testing purpose.")
}
}

// This class is used to test SPARK-8578. We should not use any custom output committer when
// we actually append data to an existing dir.
class AlwaysFailParquetOutputCommitter(
outputPath: Path,
context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

override def commitJob(context: JobContext): Unit = {
sys.error("Intentional job commitment failure for testing purpose.")
}
}

class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
Expand Down

0 comments on commit 43544c4

Please sign in to comment.