diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 989740c8d43b6..67695648320e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -58,12 +58,18 @@ private[hive] trait HiveStrategies { def lowerCase = new SchemaRDD(s.sqlContext, s.logicalPlan) - def addPartitioningAttributes(attrs: Seq[Attribute]) = - new SchemaRDD( - s.sqlContext, - s.logicalPlan transform { - case p: ParquetRelation => p.copy(partitioningAttributes = attrs) - }) + def addPartitioningAttributes(attrs: Seq[Attribute]) = { + // Don't add the partitioning key if its already present in the data. + if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) { + s + } else { + new SchemaRDD( + s.sqlContext, + s.logicalPlan transform { + case p: ParquetRelation => p.copy(partitioningAttributes = attrs) + }) + } + } } implicit class PhysicalPlanHacks(originalPlan: SparkPlan) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala index 86adbbf3ad2d8..cc65242c0da9b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala @@ -27,7 +27,11 @@ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive._ +// The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) +// The data that also includes the partitioning key +case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) + /** * Tests for our SerDe -> Native parquet scan conversion. @@ -45,6 +49,17 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { .saveAsParquetFile(partDir.getCanonicalPath) } + val partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql") + partitionedTableDirWithKey.delete() + partitionedTableDirWithKey.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKey, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithKey(p, i, s"part-$p")) + .saveAsParquetFile(partDir.getCanonicalPath) + } + sql(s""" create external table partitioned_parquet ( @@ -59,6 +74,20 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { location '${partitionedTableDir.getCanonicalPath}' """) + sql(s""" + create external table partitioned_parquet_with_key + ( + intField INT, + stringField STRING + ) + PARTITIONED BY (p int) + ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + STORED AS + INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + location '${partitionedTableDirWithKey.getCanonicalPath}' + """) + sql(s""" create external table normal_parquet ( @@ -76,6 +105,10 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)") } + (1 to 10).foreach { p => + sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)") + } + setConf("spark.sql.hive.convertMetastoreParquet", "true") } @@ -83,75 +116,76 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll { setConf("spark.sql.hive.convertMetastoreParquet", "false") } - test("project the partitioning column") { - checkAnswer( - sql("SELECT p, count(*) FROM partitioned_parquet group by p"), - (1, 10) :: - (2, 10) :: - (3, 10) :: - (4, 10) :: - (5, 10) :: - (6, 10) :: - (7, 10) :: - (8, 10) :: - (9, 10) :: - (10, 10) :: Nil - ) - } + Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table => + test(s"project the partitioning column $table") { + checkAnswer( + sql(s"SELECT p, count(*) FROM $table group by p"), + (1, 10) :: + (2, 10) :: + (3, 10) :: + (4, 10) :: + (5, 10) :: + (6, 10) :: + (7, 10) :: + (8, 10) :: + (9, 10) :: + (10, 10) :: Nil + ) + } - test("project partitioning and non-partitioning columns") { - checkAnswer( - sql("SELECT stringField, p, count(intField) " + - "FROM partitioned_parquet GROUP BY p, stringField"), - ("part-1", 1, 10) :: - ("part-2", 2, 10) :: - ("part-3", 3, 10) :: - ("part-4", 4, 10) :: - ("part-5", 5, 10) :: - ("part-6", 6, 10) :: - ("part-7", 7, 10) :: - ("part-8", 8, 10) :: - ("part-9", 9, 10) :: - ("part-10", 10, 10) :: Nil - ) - } + test(s"project partitioning and non-partitioning columns $table") { + checkAnswer( + sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"), + ("part-1", 1, 10) :: + ("part-2", 2, 10) :: + ("part-3", 3, 10) :: + ("part-4", 4, 10) :: + ("part-5", 5, 10) :: + ("part-6", 6, 10) :: + ("part-7", 7, 10) :: + ("part-8", 8, 10) :: + ("part-9", 9, 10) :: + ("part-10", 10, 10) :: Nil + ) + } - test("simple count") { - checkAnswer( - sql("SELECT COUNT(*) FROM partitioned_parquet"), - 100) - } + test(s"simple count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table"), + 100) + } - test("pruned count") { - checkAnswer( - sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"), - 10) - } + test(s"pruned count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"), + 10) + } - test("multi-partition pruned count") { - checkAnswer( - sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"), - 30) - } + test(s"multi-partition pruned count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"), + 30) + } - test("non-partition predicates") { - checkAnswer( - sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"), - 30) - } + test(s"non-partition predicates $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"), + 30) + } - test("sum") { - checkAnswer( - sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"), - 1 + 2 + 3) - } + test(s"sum $table") { + checkAnswer( + sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"), + 1 + 2 + 3) + } - test("hive udfs") { - checkAnswer( - sql("SELECT concat(stringField, stringField) FROM partitioned_parquet"), - sql("SELECT stringField FROM partitioned_parquet").map { - case Row(s: String) => Row(s + s) - }.collect().toSeq) + test(s"hive udfs $table") { + checkAnswer( + sql(s"SELECT concat(stringField, stringField) FROM $table"), + sql(s"SELECT stringField FROM $table").map { + case Row(s: String) => Row(s + s) + }.collect().toSeq) + } } test("non-part select(*)") {