From f68b51133c01a7c2f668be5f77e73da12a56a7a1 Mon Sep 17 00:00:00 2001 From: stefankandic <154237371+stefankandic@users.noreply.github.com> Date: Tue, 30 Jan 2024 17:25:45 +0100 Subject: [PATCH] Disable partitioning on collated columns (#3) * disable partitioning on collated columns * fix linter warning * final formatting * add docstring --- .../datasources/PartitioningUtils.scala | 17 ++++++++++--- .../sql/execution/datasources/rules.scala | 13 ++++++---- .../org/apache/spark/sql/CollationSuite.scala | 25 +++++++++++++++++++ 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 6c48aa8bc1f2a..4b9ac2eb998ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -562,10 +562,9 @@ object PartitioningUtils extends SQLConfHelper { SchemaUtils.checkColumnNameDuplication(partitionColumns, caseSensitive) - partitionColumnsSchema(schema, partitionColumns).foreach { - field => field.dataType match { - case _: AtomicType => // OK - case _ => throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field) + partitionColumnsSchema(schema, partitionColumns).foreach { field => + if (!canPartitionOn(field.dataType)) { + throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field) } } @@ -574,6 +573,16 @@ object PartitioningUtils extends SQLConfHelper { } } + /** + * Checks whether a given data type can be used as a partition column. + */ + def canPartitionOn(dataType: DataType): Boolean = dataType match { + // VariantType values are not comparable, so can't be used as partition columns. + case VariantType => false + case st: StringType => st.isDefaultCollation + case _ => true + } + def partitionColumnsSchema( schema: StructType, partitionColumns: Seq[String]): StructType = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index c58815b6978e6..562fcf948f9f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.sources.InsertableRelation -import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.ArrayImplicits._ @@ -329,10 +329,13 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical messageParameters = Map.empty) } - schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach { - case _: AtomicType => // OK - case other => failAnalysis(s"Cannot use ${other.catalogString} for partition column") - } + schema + .filter(f => normalizedPartitionCols.contains(f.name)) + .foreach { field => + if (!PartitioningUtils.canPartitionOn(field.dataType)) { + failAnalysis(s"Cannot use ${field.dataType.catalogString} for partition column") + } + } normalizedPartitionCols } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala index edd36bd0b9df1..31c1dd8c1c7f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala @@ -308,4 +308,29 @@ class CollationSuite extends QueryTest checkAnswer(sql(s"SELECT COUNT(DISTINCT c1) FROM $tableName"), Seq(Row(1))) } } + + test("disable partition on collated string column") { + def createTable(partitionColumns: String*): Unit = { + val tableName = "test_partition" + withTable(tableName) { + sql( + s""" + |CREATE TABLE IF NOT EXISTS $tableName + |(id INT, c1 STRING COLLATE 'SR_CI_AI', c2 STRING) + |USING PARQUET PARTITIONED BY (${partitionColumns.mkString(", ")}) + |""".stripMargin) + } + } + + // works fine on non collated columns + createTable("id") + createTable("c2") + createTable("id", "c2") + + Seq(Seq("c1"), Seq("id", "c1"), Seq("c2", "c1")).foreach { partitionColumns => + intercept[AnalysisException] { + createTable(partitionColumns: _*) + } + } + } } \ No newline at end of file