Skip to content

Commit

Permalink
Disable partitioning on collated columns (apache#3)
Browse files Browse the repository at this point in the history
* disable partitioning on collated columns

* fix linter warning

* final formatting

* add docstring
  • Loading branch information
stefankandic committed Jan 30, 2024
1 parent 61aecfc commit f68b511
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,9 @@ object PartitioningUtils extends SQLConfHelper {

SchemaUtils.checkColumnNameDuplication(partitionColumns, caseSensitive)

partitionColumnsSchema(schema, partitionColumns).foreach {
field => field.dataType match {
case _: AtomicType => // OK
case _ => throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field)
partitionColumnsSchema(schema, partitionColumns).foreach { field =>
if (!canPartitionOn(field.dataType)) {
throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field)
}
}

Expand All @@ -574,6 +573,16 @@ object PartitioningUtils extends SQLConfHelper {
}
}

/**
* Checks whether a given data type can be used as a partition column.
*/
def canPartitionOn(dataType: DataType): Boolean = dataType match {
// VariantType values are not comparable, so can't be used as partition columns.
case VariantType => false
case st: StringType => st.isDefaultCollation
case _ => true
}

def partitionColumnsSchema(
schema: StructType,
partitionColumns: Seq[String]): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.{AtomicType, StructType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -329,10 +329,13 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical
messageParameters = Map.empty)
}

schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
case _: AtomicType => // OK
case other => failAnalysis(s"Cannot use ${other.catalogString} for partition column")
}
schema
.filter(f => normalizedPartitionCols.contains(f.name))
.foreach { field =>
if (!PartitioningUtils.canPartitionOn(field.dataType)) {
failAnalysis(s"Cannot use ${field.dataType.catalogString} for partition column")
}
}

normalizedPartitionCols
}
Expand Down
25 changes: 25 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,29 @@ class CollationSuite extends QueryTest
checkAnswer(sql(s"SELECT COUNT(DISTINCT c1) FROM $tableName"), Seq(Row(1)))
}
}

test("disable partition on collated string column") {
def createTable(partitionColumns: String*): Unit = {
val tableName = "test_partition"
withTable(tableName) {
sql(
s"""
|CREATE TABLE IF NOT EXISTS $tableName
|(id INT, c1 STRING COLLATE 'SR_CI_AI', c2 STRING)
|USING PARQUET PARTITIONED BY (${partitionColumns.mkString(", ")})
|""".stripMargin)
}
}

// works fine on non collated columns
createTable("id")
createTable("c2")
createTable("id", "c2")

Seq(Seq("c1"), Seq("id", "c1"), Seq("c2", "c1")).foreach { partitionColumns =>
intercept[AnalysisException] {
createTable(partitionColumns: _*)
}
}
}
}

0 comments on commit f68b511

Please sign in to comment.