Skip to content

Commit

Permalink
[SPARK-11301][SQL] Fix case sensitivity for filter on partitioned col…
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

`DataSourceStrategy` does not consider `SQLConf` in `Context` and always match column names. For instance, `HiveContext` uses case insensitive configuration, but it's ignored in `DataSourceStrategy`. This issue was originally registered at SPARK-11301 against 1.6.0 and seemed to be fixed at that time, but Apache Spark 1.6.2 still handles **partitioned column name** in a case-sensitive way always. This is incorrect like the following.

```scala
scala> sql("CREATE TABLE t(a int) PARTITIONED BY (b string) STORED AS PARQUET")
scala> sql("INSERT INTO TABLE t PARTITION(b='P') SELECT * FROM (SELECT 1) t")
scala> sql("INSERT INTO TABLE t PARTITION(b='Q') SELECT * FROM (SELECT 2) t")
scala> sql("SELECT * FROM T WHERE B='P'").show
+---+---+
|  a|  b|
+---+---+
|  1|  P|
|  2|  Q|
+---+---+
```

The result is the same with `set spark.sql.caseSensitive=false`. Here is the result in [Databricks CE](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6660119172909095/3421754458488607/5162191866050912/latest.html) .

This PR reads the configuration and handle the column name comparison accordingly.

## How was this patch tested?

Pass the Jenkins test with a modified test.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#14970 from dongjoon-hyun/SPARK-11301.

(cherry picked from commit 958039a)
  • Loading branch information
dongjoon-hyun authored and zzcclp committed Sep 7, 2016
1 parent ef2c1da commit a728ba9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
Expand Up @@ -255,9 +255,18 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
predicates: Seq[Expression],
partitionSpec: PartitionSpec): Seq[Partition] = {
val PartitionSpec(partitionColumns, partitions) = partitionSpec
val partitionColumnNames = partitionColumns.map(_.name).toSet
val isCaseSensitive = SQLContext.getActive().get.conf.caseSensitiveAnalysis
val partitionColumnNames = if (isCaseSensitive) {
partitionColumns.map(_.name).toSet
} else {
partitionColumns.map(_.name.toLowerCase).toSet
}
val partitionPruningPredicates = predicates.filter {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
if (isCaseSensitive) {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
} else {
_.references.map(_.name.toLowerCase).toSet.subsetOf(partitionColumnNames)
}
}

if (partitionPruningPredicates.nonEmpty) {
Expand All @@ -268,7 +277,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

val boundPredicate = InterpretedPredicate.create(predicate.transform {
case a: AttributeReference =>
val index = partitionColumns.indexWhere(a.name == _.name)
val index = if (isCaseSensitive) {
partitionColumns.indexWhere(a.name == _.name)
} else {
partitionColumns.indexWhere(c => a.name.equalsIgnoreCase(c.name))
}
BoundReference(index, partitionColumns(index).dataType, nullable = true)
})

Expand Down
Expand Up @@ -1013,7 +1013,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("SPARK-11301: fix case sensitivity for filter on partitioned columns") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
withTempPath { path =>
Seq(2012 -> "a").toDF("year", "val").write.partitionBy("year").parquet(path.getAbsolutePath)
Seq(2012 -> "a", 1999 -> "b").toDF("year", "val").write.partitionBy("year")
.parquet(path.getAbsolutePath)
val df = sqlContext.read.parquet(path.getAbsolutePath)
checkAnswer(df.filter($"yEAr" > 2000).select($"val"), Row("a"))
}
Expand Down

0 comments on commit a728ba9

Please sign in to comment.