diff --git a/build.gradle b/build.gradle index 09b0654334..6508305ebe 100644 --- a/build.gradle +++ b/build.gradle @@ -197,10 +197,8 @@ configure(allProjs) { ignoreFailures = true include '**/*.java', '**/*.scala' exclude '**/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala', - '**/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala', '**/com/salesforce/op/test/TestSparkContext.scala', '**/com/salesforce/op/test/TempDirectoryTest.scala', - '**/com/salesforce/op/utils/io/DirectOutputCommitter.scala', '**/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala', '**/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala', '**/com/salesforce/op/test/*.java', diff --git a/gradle/spark.gradle b/gradle/spark.gradle index 69f96cdb33..60599d53e9 100644 --- a/gradle/spark.gradle +++ b/gradle/spark.gradle @@ -70,8 +70,7 @@ task sparkSubmit(type: Exec, dependsOn: copyLog4jToSpark) { "spark.hadoop.avro.output.codec=deflate", "spark.hadoop.avro.mapred.deflate.level=6", "spark.hadoop.validateOutputSpecs=false", - "spark.hadoop.mapred.output.committer.class=com.salesforce.op.utils.io.DirectOutputCommitter", - "spark.hadoop.spark.sql.sources.outputCommitterClass=com.salesforce.op.utils.io.DirectMapreduceOutputCommitter" + "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" ].collect { ["--conf", it] }.flatten() environment SPARK_HOME: sparkHome diff --git a/helloworld/gradle/spark.gradle b/helloworld/gradle/spark.gradle index 69f96cdb33..60599d53e9 100644 --- a/helloworld/gradle/spark.gradle +++ b/helloworld/gradle/spark.gradle @@ -70,8 +70,7 @@ task sparkSubmit(type: Exec, dependsOn: copyLog4jToSpark) { "spark.hadoop.avro.output.codec=deflate", "spark.hadoop.avro.mapred.deflate.level=6", "spark.hadoop.validateOutputSpecs=false", - "spark.hadoop.mapred.output.committer.class=com.salesforce.op.utils.io.DirectOutputCommitter", - "spark.hadoop.spark.sql.sources.outputCommitterClass=com.salesforce.op.utils.io.DirectMapreduceOutputCommitter" + "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" ].collect { ["--conf", it] }.flatten() environment SPARK_HOME: sparkHome diff --git a/templates/simple/spark.gradle b/templates/simple/spark.gradle index 9784c283a9..3cfb281230 100644 --- a/templates/simple/spark.gradle +++ b/templates/simple/spark.gradle @@ -197,8 +197,7 @@ task sparkSubmit(dependsOn: copyLog4jToSparkNoInstall) { "spark.hadoop.avro.output.codec=deflate", "spark.hadoop.avro.mapred.deflate.level=6", "spark.hadoop.validateOutputSpecs=false", - "spark.hadoop.mapred.output.committer.class=com.salesforce.op.utils.io.DirectOutputCommitter", - "spark.hadoop.spark.sql.sources.outputCommitterClass=com.salesforce.op.utils.io.DirectMapreduceOutputCommitter" + "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" ].collect { ["--conf", it] }.flatten() def hadoopConfDir = System.env.HOME + "/.fake_hadoop_conf" diff --git a/utils/src/main/scala/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala b/utils/src/main/scala/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala deleted file mode 100644 index e9c16c5e09..0000000000 --- a/utils/src/main/scala/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala +++ /dev/null @@ -1,63 +0,0 @@ -// scalastyle:off header.matches -/* - * Modifications: (c) 2017, Salesforce.com, Inc. - * Copyright 2015 Databricks, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.salesforce.op.utils.io - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat} - -class DirectMapreduceOutputCommitter extends OutputCommitter { - - override def setupJob(jobContext: JobContext): Unit = {} - - override def setupTask(taskContext: TaskAttemptContext): Unit = {} - - override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = { - // We return true here to guard against implementations that do not handle false correctly. - // The meaning of returning false is not entirely clear, so it's possible to be interpreted - // as an error. Returning true just means that commitTask() will be called, which is a no-op. - true - } - - override def commitTask(taskContext: TaskAttemptContext): Unit = {} - - override def abortTask(taskContext: TaskAttemptContext): Unit = {} - - /** - * Creates a _SUCCESS file to indicate the entire job was successful. - * This mimics the behavior of FileOutputCommitter, reusing the same file name and conf option. - */ - override def commitJob(context: JobContext): Unit = { - val conf = context.getConfiguration - if (shouldCreateSuccessFile(conf)) { - val outputPath = FileOutputFormat.getOutputPath(context) - if (outputPath != null) { - val fileSys = outputPath.getFileSystem(conf) - val filePath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) - fileSys.create(filePath).close() - } - } - } - - /** By default, we do create the _SUCCESS file, but we allow it to be turned off. */ - private def shouldCreateSuccessFile(conf: Configuration): Boolean = { - conf.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true) - } -} diff --git a/utils/src/main/scala/com/salesforce/op/utils/io/DirectOutputCommitter.scala b/utils/src/main/scala/com/salesforce/op/utils/io/DirectOutputCommitter.scala deleted file mode 100644 index 83f89c14ae..0000000000 --- a/utils/src/main/scala/com/salesforce/op/utils/io/DirectOutputCommitter.scala +++ /dev/null @@ -1,76 +0,0 @@ -// scalastyle:off header.matches -/* - * Modifications: (c) 2017, Salesforce.com, Inc. - * Copyright 2015 Databricks, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package com.salesforce.op.utils.io - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred._ - -/** - * OutputCommitter suitable for S3 workloads. Unlike the usual FileOutputCommitter, which - * writes files to a _temporary/ directory before renaming them to their final location, this - * simply writes directly to the final location. - * - * The FileOutputCommitter is required for HDFS + speculation, which allows only one writer at - * a time for a file (so two people racing to write the same file would not work). However, S3 - * supports multiple writers outputting to the same file, where visibility is guaranteed to be - * atomic. This is a monotonic operation: all writers should be writing the same data, so which - * one wins is immaterial. - * - * Code adapted from Ian Hummel's code from this PR: - * https://github.com/themodernlife/spark/commit/4359664b1d557d55b0579023df809542386d5b8c - */ -class DirectOutputCommitter extends OutputCommitter { - - override def setupJob(jobContext: JobContext): Unit = {} - - override def setupTask(taskContext: TaskAttemptContext): Unit = {} - - override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = { - // We return true here to guard against implementations that do not handle false correctly. - // The meaning of returning false is not entirely clear, so it's possible to be interpreted - // as an error. Returning true just means that commitTask() will be called, which is a no-op. - true - } - - override def commitTask(taskContext: TaskAttemptContext): Unit = {} - - override def abortTask(taskContext: TaskAttemptContext): Unit = {} - - /** - * Creates a _SUCCESS file to indicate the entire job was successful. - * This mimics the behavior of FileOutputCommitter, reusing the same file name and conf option. - */ - override def commitJob(context: JobContext): Unit = { - val conf = context.getJobConf - if (shouldCreateSuccessFile(conf)) { - val outputPath = FileOutputFormat.getOutputPath(conf) - if (outputPath != null) { - val fileSys = outputPath.getFileSystem(conf) - val filePath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) - fileSys.create(filePath).close() - } - } - } - - /** By default, we do create the _SUCCESS file, but we allow it to be turned off. */ - private def shouldCreateSuccessFile(conf: JobConf): Boolean = { - conf.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true) - } -} diff --git a/utils/src/main/scala/com/salesforce/op/utils/io/avro/AvroInOut.scala b/utils/src/main/scala/com/salesforce/op/utils/io/avro/AvroInOut.scala index 155e0e1d35..4623840875 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/io/avro/AvroInOut.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/io/avro/AvroInOut.scala @@ -32,7 +32,6 @@ package com.salesforce.op.utils.io.avro import java.net.URI -import com.salesforce.op.utils.io.DirectOutputCommitter import com.salesforce.op.utils.spark.RichRDD._ import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord @@ -156,7 +155,6 @@ object AvroInOut { private def createJobConfFromContext(schema: String)(implicit sc: SparkSession) = { val jobConf = new JobConf(sc.sparkContext.hadoopConfiguration) - jobConf.setOutputCommitter(classOf[DirectOutputCommitter]) AvroJob.setOutputSchema(jobConf, new Schema.Parser().parse(schema)) jobConf }