diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 26c036f6648da..2786e3d2cd6bf 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -22,7 +22,7 @@ The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.
All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`, `pyspark` shell, or `sparkR` shell.
-## Starting Point: `SQLContext`
+## Starting Point: SQLContext
spark.sql.planner.externalSort |
false |
@@ -1889,7 +1999,7 @@ options.
#### DataFrame data reader/writer interface
Based on user feedback, we created a new, more fluid API for reading data in (`SQLContext.read`)
-and writing data out (`DataFrame.write`),
+and writing data out (`DataFrame.write`),
and deprecated the old APIs (e.g. `SQLContext.parquetFile`, `SQLContext.jsonFile`).
See the API docs for `SQLContext.read` (
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 63e2c79669763..e4932cfa7a4fc 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -306,6 +306,13 @@ def parse_args():
"--private-ips", action="store_true", default=False,
help="Use private IPs for instances rather than public if VPC/subnet " +
"requires that.")
+ parser.add_option(
+ "--instance-initiated-shutdown-behavior", default="stop",
+ choices=["stop", "terminate"],
+ help="Whether instances should terminate when shut down or just stop")
+ parser.add_option(
+ "--instance-profile-name", default=None,
+ help="IAM profile name to launch instances under")
(opts, args) = parser.parse_args()
if len(args) != 2:
@@ -602,7 +609,8 @@ def launch_cluster(conn, opts, cluster_name):
block_device_map=block_map,
subnet_id=opts.subnet_id,
placement_group=opts.placement_group,
- user_data=user_data_content)
+ user_data=user_data_content,
+ instance_profile_name=opts.instance_profile_name)
my_req_ids += [req.id for req in slave_reqs]
i += 1
@@ -647,16 +655,19 @@ def launch_cluster(conn, opts, cluster_name):
for zone in zones:
num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
if num_slaves_this_zone > 0:
- slave_res = image.run(key_name=opts.key_pair,
- security_group_ids=[slave_group.id] + additional_group_ids,
- instance_type=opts.instance_type,
- placement=zone,
- min_count=num_slaves_this_zone,
- max_count=num_slaves_this_zone,
- block_device_map=block_map,
- subnet_id=opts.subnet_id,
- placement_group=opts.placement_group,
- user_data=user_data_content)
+ slave_res = image.run(
+ key_name=opts.key_pair,
+ security_group_ids=[slave_group.id] + additional_group_ids,
+ instance_type=opts.instance_type,
+ placement=zone,
+ min_count=num_slaves_this_zone,
+ max_count=num_slaves_this_zone,
+ block_device_map=block_map,
+ subnet_id=opts.subnet_id,
+ placement_group=opts.placement_group,
+ user_data=user_data_content,
+ instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior,
+ instance_profile_name=opts.instance_profile_name)
slave_nodes += slave_res.instances
print("Launched {s} slave{plural_s} in {z}, regid = {r}".format(
s=num_slaves_this_zone,
@@ -678,16 +689,19 @@ def launch_cluster(conn, opts, cluster_name):
master_type = opts.instance_type
if opts.zone == 'all':
opts.zone = random.choice(conn.get_all_zones()).name
- master_res = image.run(key_name=opts.key_pair,
- security_group_ids=[master_group.id] + additional_group_ids,
- instance_type=master_type,
- placement=opts.zone,
- min_count=1,
- max_count=1,
- block_device_map=block_map,
- subnet_id=opts.subnet_id,
- placement_group=opts.placement_group,
- user_data=user_data_content)
+ master_res = image.run(
+ key_name=opts.key_pair,
+ security_group_ids=[master_group.id] + additional_group_ids,
+ instance_type=master_type,
+ placement=opts.zone,
+ min_count=1,
+ max_count=1,
+ block_device_map=block_map,
+ subnet_id=opts.subnet_id,
+ placement_group=opts.placement_group,
+ user_data=user_data_content,
+ instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior,
+ instance_profile_name=opts.instance_profile_name)
master_nodes = master_res.instances
print("Launched master in %s, regid = %s" % (zone, master_res.id))
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index 5812b72f0aa78..f16bf989f200b 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -91,8 +91,7 @@ object MimaBuild {
def mimaSettings(sparkHome: File, projectRef: ProjectRef) = {
val organization = "org.apache.spark"
- // TODO: Change this once Spark 1.4.0 is released
- val previousSparkVersion = "1.4.0-rc4"
+ val previousSparkVersion = "1.4.0"
val fullId = "spark-" + projectRef.project + "_2.10"
mimaDefaultSettings ++
Seq(previousArtifact := Some(organization % fullId % previousSparkVersion),
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index e01720296fed0..f5f1c9a1a247a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -166,9 +166,8 @@ object SparkBuild extends PomBuild {
/* Enable tests settings for all projects except examples, assembly and tools */
(allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
- // TODO: remove launcher from this list after 1.4.0
allProjects.filterNot(x => Seq(spark, hive, hiveThriftServer, catalyst, repl,
- networkCommon, networkShuffle, networkYarn, launcher, unsafe).contains(x)).foreach {
+ networkCommon, networkShuffle, networkYarn, unsafe).contains(x)).foreach {
x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
}
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 1ecec5b126505..0a85da7443d3d 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -396,6 +396,11 @@ def over(self, window):
jc = self._jc.over(window._jspec)
return Column(jc)
+ def __nonzero__(self):
+ raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
+ "'~' for 'not' when building DataFrame boolean expressions.")
+ __bool__ = __nonzero__
+
def __repr__(self):
return 'Column<%s>' % self._jc.toString().encode('utf8')
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 13f4556943ac8..e6a434e4b2dff 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -164,6 +164,14 @@ def test_explode(self):
self.assertEqual(result[0][0], "a")
self.assertEqual(result[0][1], "b")
+ def test_and_in_expression(self):
+ self.assertEqual(4, self.df.filter((self.df.key <= 10) & (self.df.value <= "2")).count())
+ self.assertRaises(ValueError, lambda: (self.df.key <= 10) and (self.df.value <= "2"))
+ self.assertEqual(14, self.df.filter((self.df.key <= 3) | (self.df.value < "2")).count())
+ self.assertRaises(ValueError, lambda: self.df.key <= 3 or self.df.value < "2")
+ self.assertEqual(99, self.df.filter(~(self.df.key == 1)).count())
+ self.assertRaises(ValueError, lambda: not self.df.key == 1)
+
def test_udf_with_callable(self):
d = [Row(number=i, squared=i**2) for i in range(10)]
rdd = self.sc.parallelize(d)
@@ -408,7 +416,7 @@ def test_column_operators(self):
self.assertTrue(isinstance((- ci - 1 - 2) % 3 * 2.5 / 3.5, Column))
rcc = (1 + ci), (1 - ci), (1 * ci), (1 / ci), (1 % ci)
self.assertTrue(all(isinstance(c, Column) for c in rcc))
- cb = [ci == 5, ci != 0, ci > 3, ci < 4, ci >= 0, ci <= 7, ci and cs, ci or cs]
+ cb = [ci == 5, ci != 0, ci > 3, ci < 4, ci >= 0, ci <= 7]
self.assertTrue(all(isinstance(c, Column) for c in cb))
cbool = (ci & ci), (ci | ci), (~ci)
self.assertTrue(all(isinstance(c, Column) for c in cbool))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index bd5475d2066fc..47c5455435ec6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -175,8 +175,10 @@ class CodeGenContext {
* Generate code for compare expression in Java
*/
def genComp(dataType: DataType, c1: String, c2: String): String = dataType match {
+ // java boolean doesn't support > or < operator
+ case BooleanType => s"($c1 == $c2 ? 0 : ($c1 ? 1 : -1))"
// use c1 - c2 may overflow
- case dt: DataType if isPrimitiveType(dt) => s"(int)($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)"
+ case dt: DataType if isPrimitiveType(dt) => s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)"
case BinaryType => s"org.apache.spark.sql.catalyst.util.TypeUtils.compareBinary($c1, $c2)"
case other => s"$c1.compare($c2)"
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala
index 4bbbbe6c7f091..6c93698f8017b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ArithmeticExpressionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.types.{Decimal, DoubleType, IntegerType}
+import org.apache.spark.sql.types.Decimal
class ArithmeticExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -123,23 +123,39 @@ class ArithmeticExpressionSuite extends SparkFunSuite with ExpressionEvalHelper
}
}
- test("MaxOf") {
- checkEvaluation(MaxOf(1, 2), 2)
- checkEvaluation(MaxOf(2, 1), 2)
- checkEvaluation(MaxOf(1L, 2L), 2L)
- checkEvaluation(MaxOf(2L, 1L), 2L)
+ test("MaxOf basic") {
+ testNumericDataTypes { convert =>
+ val small = Literal(convert(1))
+ val large = Literal(convert(2))
+ checkEvaluation(MaxOf(small, large), convert(2))
+ checkEvaluation(MaxOf(large, small), convert(2))
+ checkEvaluation(MaxOf(Literal.create(null, small.dataType), large), convert(2))
+ checkEvaluation(MaxOf(large, Literal.create(null, small.dataType)), convert(2))
+ }
+ }
- checkEvaluation(MaxOf(Literal.create(null, IntegerType), 2), 2)
- checkEvaluation(MaxOf(2, Literal.create(null, IntegerType)), 2)
+ test("MaxOf for atomic type") {
+ checkEvaluation(MaxOf(true, false), true)
+ checkEvaluation(MaxOf("abc", "bcd"), "bcd")
+ checkEvaluation(MaxOf(Array(1.toByte, 2.toByte), Array(1.toByte, 3.toByte)),
+ Array(1.toByte, 3.toByte))
}
- test("MinOf") {
- checkEvaluation(MinOf(1, 2), 1)
- checkEvaluation(MinOf(2, 1), 1)
- checkEvaluation(MinOf(1L, 2L), 1L)
- checkEvaluation(MinOf(2L, 1L), 1L)
+ test("MinOf basic") {
+ testNumericDataTypes { convert =>
+ val small = Literal(convert(1))
+ val large = Literal(convert(2))
+ checkEvaluation(MinOf(small, large), convert(1))
+ checkEvaluation(MinOf(large, small), convert(1))
+ checkEvaluation(MinOf(Literal.create(null, small.dataType), large), convert(2))
+ checkEvaluation(MinOf(small, Literal.create(null, small.dataType)), convert(1))
+ }
+ }
- checkEvaluation(MinOf(Literal.create(null, IntegerType), 1), 1)
- checkEvaluation(MinOf(1, Literal.create(null, IntegerType)), 1)
+ test("MinOf for atomic type") {
+ checkEvaluation(MinOf(true, false), false)
+ checkEvaluation(MinOf("abc", "bcd"), "abc")
+ checkEvaluation(MinOf(Array(1.toByte, 2.toByte), Array(1.toByte, 3.toByte)),
+ Array(1.toByte, 2.toByte))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 16493c3d7c19c..265352647fa9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,6 +22,8 @@ import java.util.Properties
import scala.collection.immutable
import scala.collection.JavaConversions._
+import org.apache.parquet.hadoop.ParquetOutputCommitter
+
import org.apache.spark.sql.catalyst.CatalystConf
private[spark] object SQLConf {
@@ -252,9 +254,9 @@ private[spark] object SQLConf {
val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
defaultValue = Some(false),
- doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default" +
- " because of a known bug in Paruet 1.6.0rc3 " +
- "(PARQUET-136). However, " +
+ doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default " +
+ "because of a known bug in Parquet 1.6.0rc3 " +
+ "(PARQUET-136, https://issues.apache.org/jira/browse/PARQUET-136). However, " +
"if your table doesn't contain any nullable string or binary columns, it's still safe to " +
"turn this feature on.")
@@ -262,11 +264,21 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "")
+ val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
+ key = "spark.sql.parquet.output.committer.class",
+ defaultValue = Some(classOf[ParquetOutputCommitter].getName),
+ doc = "The output committer class used by Parquet. The specified class needs to be a " +
+ "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
+ "of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
+ "option must be set in Hadoop Configuration. 2. This option overrides " +
+ "\"spark.sql.sources.outputCommitterClass\"."
+ )
+
val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
defaultValue = Some(false),
doc = "")
- val HIVE_VERIFY_PARTITIONPATH = booleanConf("spark.sql.hive.verifyPartitionPath",
+ val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
defaultValue = Some(true),
doc = "")
@@ -325,9 +337,13 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "")
- // The output committer class used by FSBasedRelation. The specified class needs to be a
+ // The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
- // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
+ //
+ // NOTE:
+ //
+ // 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
+ // 2. This option can be overriden by "spark.sql.parquet.output.committer.class".
val OUTPUT_COMMITTER_CLASS =
stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
@@ -415,7 +431,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
/** When true uses verifyPartitionPath to prune the path which is not exists. */
- private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITIONPATH)
+ private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
index 62c4e92ebec68..1551afd7b7bf2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
@@ -17,19 +17,35 @@
package org.apache.spark.sql.parquet
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.Log
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
+/**
+ * An output committer for writing Parquet files. In stead of writing to the `_temporary` folder
+ * like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the
+ * destination folder. This can be useful for data stored in S3, where directory operations are
+ * relatively expensive.
+ *
+ * To enable this output committer, users may set the "spark.sql.parquet.output.committer.class"
+ * property via Hadoop [[Configuration]]. Not that this property overrides
+ * "spark.sql.sources.outputCommitterClass".
+ *
+ * *NOTE*
+ *
+ * NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
+ * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
+ * left * empty).
+ */
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
val LOG = Log.getLog(classOf[ParquetOutputCommitter])
- override def getWorkPath(): Path = outputPath
+ override def getWorkPath: Path = outputPath
override def abortTask(taskContext: TaskAttemptContext): Unit = {}
override def commitTask(taskContext: TaskAttemptContext): Unit = {}
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
@@ -46,13 +62,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
try {
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
- } catch {
- case e: Exception => {
- LOG.warn("could not write summary file for " + outputPath, e)
- val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
- if (fileSystem.exists(metadataPath)) {
- fileSystem.delete(metadataPath, true)
- }
+ } catch { case e: Exception =>
+ LOG.warn("could not write summary file for " + outputPath, e)
+ val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
+ if (fileSystem.exists(metadataPath)) {
+ fileSystem.delete(metadataPath, true)
}
}
} catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e049d54bf55dc..1d353bd8e1114 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -178,11 +178,11 @@ private[sql] class ParquetRelation2(
val committerClass =
conf.getClass(
- "spark.sql.parquet.output.committer.class",
+ SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])
- if (conf.get("spark.sql.parquet.output.committer.class") == null) {
+ if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
logInfo("Using default output committer for Parquet: " +
classOf[ParquetOutputCommitter].getCanonicalName)
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index c6f535dde7676..8b2a45d8e970a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
- val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2))
+ val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
// Creates the StructType which represents the partition columns.
val fields = {
@@ -181,19 +181,18 @@ private[sql] object PartitioningUtils {
* StringType
* }}}
*/
- private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
- // Column names of all partitions must match
- val distinctPartitionsColNames = values.map(_.columnNames).distinct
-
- if (distinctPartitionsColNames.isEmpty) {
+ private[sql] def resolvePartitions(
+ pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
+ if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
- assert(distinctPartitionsColNames.size == 1, {
- val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
- s"Conflicting partition column names detected:\n$list"
- })
+ val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct
+ assert(
+ distinctPartColNames.size == 1,
+ listConflictingPartitionColumns(pathsWithPartitionValues))
// Resolves possible type conflicts for each column
+ val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
@@ -206,6 +205,34 @@ private[sql] object PartitioningUtils {
}
}
+ private[sql] def listConflictingPartitionColumns(
+ pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
+ val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct
+
+ def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
+ seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })
+
+ val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
+ case (path, partValues) => partValues.columnNames -> path
+ })
+
+ val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
+ case (names, index) =>
+ s"Partition column name list #$index: $names"
+ }
+
+ // Lists out those non-leaf partition directories that also contain files
+ val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths)
+
+ s"Conflicting partition column names detected:\n" +
+ distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
+ "For partitioned table directories, data files should only live in leaf directories.\n" +
+ "And directories at the same level should have the same partition column name.\n" +
+ "Please check the following directories for unexpected files or " +
+ "inconsistent partition column names:\n" +
+ suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
+ }
+
/**
* Converts a string to a [[Literal]] with automatic type inference. Currently only supports
* [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 215e53c020849..fb6173f58ece6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
+ val pathExists = fs.exists(qualifiedOutputPath)
+ val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
@@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
case (SaveMode.Ignore, exists) =>
!exists
}
+ // If we are appending data to an existing dir.
+ val isAppend = (pathExists) && (mode == SaveMode.Append)
if (doInsertion) {
val job = new Job(hadoopConf)
@@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionColumns = relation.partitionColumns.fieldNames
if (partitionColumns.isEmpty) {
- insert(new DefaultWriterContainer(relation, job), df)
+ insert(new DefaultWriterContainer(relation, job, isAppend), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
- relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
+ relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
@@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
- @transient job: Job)
+ @transient job: Job,
+ isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
with Serializable {
@@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer(
}
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
- val committerClass = context.getConfiguration.getClass(
- SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
- Option(committerClass).map { clazz =>
- logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
- // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
- // has an associated output committer. To override this output committer,
- // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
- // If a data source needs to override the output committer, it needs to set the
- // output committer in prepareForWrite method.
- if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
- // The specified output committer is a FileOutputCommitter.
- // So, we will use the FileOutputCommitter-specified constructor.
- val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- ctor.newInstance(new Path(outputPath), context)
- } else {
- // The specified output committer is just a OutputCommitter.
- // So, we will use the no-argument constructor.
- val ctor = clazz.getDeclaredConstructor()
- ctor.newInstance()
+ val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+
+ if (isAppend) {
+ // If we are appending data to an existing dir, we will only use the output committer
+ // associated with the file output format since it is not safe to use a custom
+ // committer for appending. For example, in S3, direct parquet output committer may
+ // leave partial data in the destination dir when the the appending job fails.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+ "for appending.")
+ defaultOutputCommitter
+ } else {
+ val committerClass = context.getConfiguration.getClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+ Option(committerClass).map { clazz =>
+ logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+ // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+ // has an associated output committer. To override this output committer,
+ // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+ // If a data source needs to override the output committer, it needs to set the
+ // output committer in prepareForWrite method.
+ if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+ // The specified output committer is a FileOutputCommitter.
+ // So, we will use the FileOutputCommitter-specified constructor.
+ val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ ctor.newInstance(new Path(outputPath), context)
+ } else {
+ // The specified output committer is just a OutputCommitter.
+ // So, we will use the no-argument constructor.
+ val ctor = clazz.getDeclaredConstructor()
+ ctor.newInstance()
+ }
+ }.getOrElse {
+ // If output committer class is not set, we will use the one associated with the
+ // file output format.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+ defaultOutputCommitter
}
- }.getOrElse {
- // If output committer class is not set, we will use the one associated with the
- // file output format.
- val outputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
- logInfo(s"Using output committer class ${outputCommitter.getClass.getCanonicalName}")
- outputCommitter
}
}
@@ -433,8 +450,9 @@ private[sql] abstract class BaseWriterContainer(
private[sql] class DefaultWriterContainer(
@transient relation: HadoopFsRelation,
- @transient job: Job)
- extends BaseWriterContainer(relation, job) {
+ @transient job: Job,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
@transient private var writer: OutputWriter = _
@@ -473,8 +491,9 @@ private[sql] class DynamicPartitionWriterContainer(
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
- defaultPartitionName: String)
- extends BaseWriterContainer(relation, job) {
+ defaultPartitionName: String,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
// All output writers are created on executor side.
@transient protected var outputWriters: mutable.Map[String, OutputWriter] = _
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 01df189d1f3be..d0ebb11b063f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df)
}
}
+
+ test("listConflictingPartitionColumns") {
+ def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
+ val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>
+ s"\tPartition column name list #$index: $list"
+ }.mkString("\n", "\n", "\n")
+
+ // scalastyle:off
+ s"""Conflicting partition column names detected:
+ |$conflictingColNameLists
+ |For partitioned table directories, data files should only live in leaf directories.
+ |And directories at the same level should have the same partition column name.
+ |Please check the following directories for unexpected files or inconsistent partition column names:
+ |${paths.map("\t" + _).mkString("\n", "\n", "")}
+ """.stripMargin.trim
+ // scalastyle:on
+ }
+
+ assert(
+ listConflictingPartitionColumns(
+ Seq(
+ (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))),
+ (new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), Seq(Literal(1)))))).trim ===
+ makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1")))
+
+ assert(
+ listConflictingPartitionColumns(
+ Seq(
+ (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(Literal(1)))),
+ (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))))).trim ===
+ makeExpectedMessage(
+ Seq("a"),
+ Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1")))
+
+ assert(
+ listConflictingPartitionColumns(
+ Seq(
+ (new Path("file:/tmp/foo/a=1"),
+ PartitionValues(Seq("a"), Seq(Literal(1)))),
+ (new Path("file:/tmp/foo/a=1/b=foo"),
+ PartitionValues(Seq("a", "b"), Seq(Literal(1), Literal("foo")))))).trim ===
+ makeExpectedMessage(
+ Seq("a", "a, b"),
+ Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo")))
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 42c2d4c98ffb2..2f771d76793e5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
+import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
@@ -136,12 +137,62 @@ private[hive] class ClientWrapper(
// TODO: should be a def?s
// When we create this val client, the HiveConf of it (conf) is the one associated with state.
- private val client = Hive.get(conf)
+ @GuardedBy("this")
+ private var client = Hive.get(conf)
+
+ // We use hive's conf for compatibility.
+ private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
+ private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)
+
+ /**
+ * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
+ */
+ private def retryLocked[A](f: => A): A = synchronized {
+ // Hive sometimes retries internally, so set a deadline to avoid compounding delays.
+ val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
+ var numTries = 0
+ var caughtException: Exception = null
+ do {
+ numTries += 1
+ try {
+ return f
+ } catch {
+ case e: Exception if causedByThrift(e) =>
+ caughtException = e
+ logWarning(
+ "HiveClientWrapper got thrift exception, destroying client and retrying " +
+ s"(${retryLimit - numTries} tries remaining)", e)
+ Thread.sleep(retryDelayMillis)
+ try {
+ client = Hive.get(state.getConf, true)
+ } catch {
+ case e: Exception if causedByThrift(e) =>
+ logWarning("Failed to refresh hive client, will retry.", e)
+ }
+ }
+ } while (numTries <= retryLimit && System.nanoTime < deadline)
+ if (System.nanoTime > deadline) {
+ logWarning("Deadline exceeded")
+ }
+ throw caughtException
+ }
+
+ private def causedByThrift(e: Throwable): Boolean = {
+ var target = e
+ while (target != null) {
+ val msg = target.getMessage()
+ if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
+ return true
+ }
+ target = target.getCause()
+ }
+ false
+ }
/**
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
*/
- private def withHiveState[A](f: => A): A = synchronized {
+ private def withHiveState[A](f: => A): A = retryLocked {
val original = Thread.currentThread().getContextClassLoader
// Set the thread local metastore client to the client associated with this ClientWrapper.
Hive.set(client)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 5ae2dbb50d86b..e7c1779f80ce6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -21,6 +21,7 @@ import java.lang.{Boolean => JBoolean, Integer => JInteger}
import java.lang.reflect.{Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
@@ -64,6 +65,8 @@ private[client] sealed abstract class Shim {
def getDriverResults(driver: Driver): Seq[String]
+ def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
+
def loadPartition(
hive: Hive,
loadPath: Path,
@@ -192,6 +195,10 @@ private[client] class Shim_v0_12 extends Shim {
res.toSeq
}
+ override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
+ conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000
+ }
+
override def loadPartition(
hive: Hive,
loadPath: Path,
@@ -321,6 +328,12 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
+ private lazy val getTimeVarMethod =
+ findMethod(
+ classOf[HiveConf],
+ "getTimeVar",
+ classOf[HiveConf.ConfVars],
+ classOf[TimeUnit])
override def loadPartition(
hive: Hive,
@@ -359,4 +372,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
}
+ override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
+ getTimeVarMethod.invoke(
+ conf,
+ HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
+ TimeUnit.MILLISECONDS).asInstanceOf[Long]
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index ab443032be20d..d85516ab0878e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive
import java.io.File
+import scala.sys.process.{ProcessLogger, Process}
+
import org.apache.spark._
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -82,12 +84,18 @@ class HiveSparkSubmitSuite
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
- val process = Utils.executeCommand(
+ val process = Process(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
- Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+ "SPARK_TESTING" -> "1",
+ "SPARK_HOME" -> sparkHome
+ ).run(ProcessLogger(
+ (line: String) => { println(s"out> $line") },
+ (line: String) => { println(s"err> $line") }
+ ))
+
try {
- val exitCode = failAfter(120 seconds) { process.waitFor() }
+ val exitCode = failAfter(120 seconds) { process.exitValue() }
if (exitCode != 0) {
fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index e0d8277a8ed3f..a16ab3a00ddb8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,10 +17,16 @@
package org.apache.spark.sql.sources
+import scala.collection.JavaConversions._
+
import java.io.File
import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -476,7 +482,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf
// later.
- test("SPARK-8406: Avoids name collision while writing Parquet files") {
+ test("SPARK-8406: Avoids name collision while writing files") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext
@@ -497,6 +503,81 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
}
}
}
+
+ test("SPARK-8578 specified custom output committer will not be used to append data") {
+ val clonedConf = new Configuration(configuration)
+ try {
+ val df = sqlContext.range(1, 10).toDF("i")
+ withTempPath { dir =>
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ configuration.set(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
+ classOf[AlwaysFailOutputCommitter].getName)
+ // Since Parquet has its own output committer setting, also set it
+ // to AlwaysFailParquetOutputCommitter at here.
+ configuration.set("spark.sql.parquet.output.committer.class",
+ classOf[AlwaysFailParquetOutputCommitter].getName)
+ // Because there data already exists,
+ // this append should succeed because we will use the output committer associated
+ // with file format and AlwaysFailOutputCommitter will not be used.
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ checkAnswer(
+ sqlContext.read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .load(dir.getCanonicalPath),
+ df.unionAll(df))
+
+ // This will fail because AlwaysFailOutputCommitter is used when we do append.
+ intercept[Exception] {
+ df.write.mode("overwrite").format(dataSourceName).save(dir.getCanonicalPath)
+ }
+ }
+ withTempPath { dir =>
+ configuration.set(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
+ classOf[AlwaysFailOutputCommitter].getName)
+ // Since Parquet has its own output committer setting, also set it
+ // to AlwaysFailParquetOutputCommitter at here.
+ configuration.set("spark.sql.parquet.output.committer.class",
+ classOf[AlwaysFailParquetOutputCommitter].getName)
+ // Because there is no existing data,
+ // this append will fail because AlwaysFailOutputCommitter is used when we do append
+ // and there is no existing data.
+ intercept[Exception] {
+ df.write.mode("append").format(dataSourceName).save(dir.getCanonicalPath)
+ }
+ }
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+ }
+ }
+}
+
+// This class is used to test SPARK-8578. We should not use any custom output committer when
+// we actually append data to an existing dir.
+class AlwaysFailOutputCommitter(
+ outputPath: Path,
+ context: TaskAttemptContext)
+ extends FileOutputCommitter(outputPath, context) {
+
+ override def commitJob(context: JobContext): Unit = {
+ sys.error("Intentional job commitment failure for testing purpose.")
+ }
+}
+
+// This class is used to test SPARK-8578. We should not use any custom output committer when
+// we actually append data to an existing dir.
+class AlwaysFailParquetOutputCommitter(
+ outputPath: Path,
+ context: TaskAttemptContext)
+ extends ParquetOutputCommitter(outputPath, context) {
+
+ override def commitJob(context: JobContext): Unit = {
+ sys.error("Intentional job commitment failure for testing purpose.")
+ }
}
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
|