Skip to content

Commit

Permalink
Support partitioned parquet tables that have the key in both the dire…
Browse files Browse the repository at this point in the history
…ctory and the file
  • Loading branch information
marmbrus committed Nov 14, 2014
1 parent e421072 commit 447f08c
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,18 @@ private[hive] trait HiveStrategies {
def lowerCase =
new SchemaRDD(s.sqlContext, s.logicalPlan)

def addPartitioningAttributes(attrs: Seq[Attribute]) =
new SchemaRDD(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
})
def addPartitioningAttributes(attrs: Seq[Attribute]) = {
// Don't add the partitioning key if its already present in the data.
if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
s
} else {
new SchemaRDD(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
})
}
}
}

implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._

// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)


/**
* Tests for our SerDe -> Native parquet scan conversion.
Expand All @@ -45,6 +49,17 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
.saveAsParquetFile(partDir.getCanonicalPath)
}

val partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
partitionedTableDirWithKey.delete()
partitionedTableDirWithKey.mkdir()

(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKey, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetDataWithKey(p, i, s"part-$p"))
.saveAsParquetFile(partDir.getCanonicalPath)
}

sql(s"""
create external table partitioned_parquet
(
Expand All @@ -59,6 +74,20 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
location '${partitionedTableDir.getCanonicalPath}'
""")

sql(s"""
create external table partitioned_parquet_with_key
(
intField INT,
stringField STRING
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${partitionedTableDirWithKey.getCanonicalPath}'
""")

sql(s"""
create external table normal_parquet
(
Expand All @@ -76,82 +105,87 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
}

(1 to 10).foreach { p =>
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
}

setConf("spark.sql.hive.convertMetastoreParquet", "true")
}

override def afterAll(): Unit = {
setConf("spark.sql.hive.convertMetastoreParquet", "false")
}

test("project the partitioning column") {
checkAnswer(
sql("SELECT p, count(*) FROM partitioned_parquet group by p"),
(1, 10) ::
(2, 10) ::
(3, 10) ::
(4, 10) ::
(5, 10) ::
(6, 10) ::
(7, 10) ::
(8, 10) ::
(9, 10) ::
(10, 10) :: Nil
)
}
Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
test(s"project the partitioning column $table") {
checkAnswer(
sql(s"SELECT p, count(*) FROM $table group by p"),
(1, 10) ::
(2, 10) ::
(3, 10) ::
(4, 10) ::
(5, 10) ::
(6, 10) ::
(7, 10) ::
(8, 10) ::
(9, 10) ::
(10, 10) :: Nil
)
}

test("project partitioning and non-partitioning columns") {
checkAnswer(
sql("SELECT stringField, p, count(intField) " +
"FROM partitioned_parquet GROUP BY p, stringField"),
("part-1", 1, 10) ::
("part-2", 2, 10) ::
("part-3", 3, 10) ::
("part-4", 4, 10) ::
("part-5", 5, 10) ::
("part-6", 6, 10) ::
("part-7", 7, 10) ::
("part-8", 8, 10) ::
("part-9", 9, 10) ::
("part-10", 10, 10) :: Nil
)
}
test(s"project partitioning and non-partitioning columns $table") {
checkAnswer(
sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
("part-1", 1, 10) ::
("part-2", 2, 10) ::
("part-3", 3, 10) ::
("part-4", 4, 10) ::
("part-5", 5, 10) ::
("part-6", 6, 10) ::
("part-7", 7, 10) ::
("part-8", 8, 10) ::
("part-9", 9, 10) ::
("part-10", 10, 10) :: Nil
)
}

test("simple count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet"),
100)
}
test(s"simple count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table"),
100)
}

test("pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
10)
}
test(s"pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
10)
}

test("multi-partition pruned count") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
30)
}
test(s"multi-partition pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
30)
}

test("non-partition predicates") {
checkAnswer(
sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
30)
}
test(s"non-partition predicates $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
30)
}

test("sum") {
checkAnswer(
sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
1 + 2 + 3)
}
test(s"sum $table") {
checkAnswer(
sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
1 + 2 + 3)
}

test("hive udfs") {
checkAnswer(
sql("SELECT concat(stringField, stringField) FROM partitioned_parquet"),
sql("SELECT stringField FROM partitioned_parquet").map {
case Row(s: String) => Row(s + s)
}.collect().toSeq)
test(s"hive udfs $table") {
checkAnswer(
sql(s"SELECT concat(stringField, stringField) FROM $table"),
sql(s"SELECT stringField FROM $table").map {
case Row(s: String) => Row(s + s)
}.collect().toSeq)
}
}

test("non-part select(*)") {
Expand Down

0 comments on commit 447f08c

Please sign in to comment.