Skip to content

Commit

Permalink
[SPARK-43969][SQL] Refactor & Assign names to the error class _LEGACY…
Browse files Browse the repository at this point in the history
…_ERROR_TEMP_1170

### What changes were proposed in this pull request?
The pr aims to:
- Refactor `PreWriteCheck` to use error framework.
- Make `INSERT_COLUMN_ARITY_MISMATCH` more generic & avoiding to embed error's text in source code.
- Assign name to _LEGACY_ERROR_TEMP_1170.
- In `INSERT_PARTITION_COLUMN_ARITY_MISMATCH` error message, replace '' with `toSQLId` for table column name.

### Why are the changes needed?
The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Manually test.
- Pass GA.

Closes apache#41458 from panbingkun/refactor_PreWriteCheck.

Lead-authored-by: panbingkun <pbk1982@gmail.com>
Co-authored-by: panbingkun <84731559@qq.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
2 people authored and MaxGekk committed Jun 19, 2023
1 parent 74185cf commit f3db20c
Show file tree
Hide file tree
Showing 18 changed files with 324 additions and 193 deletions.
62 changes: 49 additions & 13 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -888,10 +888,24 @@
},
"INSERT_COLUMN_ARITY_MISMATCH" : {
"message" : [
"Cannot write to '<tableName>', <reason>:",
"Table columns: <tableColumns>.",
"Data columns: <dataColumns>."
"Cannot write to <tableName>, the reason is"
],
"subClass" : {
"NOT_ENOUGH_DATA_COLUMNS" : {
"message" : [
"not enough data columns:",
"Table columns: <tableColumns>.",
"Data columns: <dataColumns>."
]
},
"TOO_MANY_DATA_COLUMNS" : {
"message" : [
"too many data columns:",
"Table columns: <tableColumns>.",
"Data columns: <dataColumns>."
]
}
},
"sqlState" : "21S01"
},
"INSERT_PARTITION_COLUMN_ARITY_MISMATCH" : {
Expand Down Expand Up @@ -1715,6 +1729,11 @@
],
"sqlState" : "46110"
},
"NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT" : {
"message" : [
"<cmd> is not supported, if you want to enable it, please set \"spark.sql.catalogImplementation\" to \"hive\"."
]
},
"NOT_SUPPORTED_IN_JDBC_CATALOG" : {
"message" : [
"Not supported command in JDBC catalog:"
Expand Down Expand Up @@ -2464,6 +2483,33 @@
"grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup."
]
},
"UNSUPPORTED_INSERT" : {
"message" : [
"Can't insert into the target."
],
"subClass" : {
"NOT_ALLOWED" : {
"message" : [
"The target relation <relationId> does not allow insertion."
]
},
"NOT_PARTITIONED" : {
"message" : [
"The target relation <relationId> is not partitioned."
]
},
"RDD_BASED" : {
"message" : [
"An RDD-based table is not allowed."
]
},
"READ_FROM" : {
"message" : [
"The target relation <relationId> is also being read from."
]
}
}
},
"UNSUPPORTED_OVERWRITE" : {
"message" : [
"Can't overwrite the target that is also being read from."
Expand Down Expand Up @@ -3005,11 +3051,6 @@
"Window function <wf> requires window to be ordered, please add ORDER BY clause. For example SELECT <wf>(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table."
]
},
"_LEGACY_ERROR_TEMP_1038" : {
"message" : [
"Cannot write to table due to mismatched user specified column size(<columnSize>) and data column size(<outputSize>)."
]
},
"_LEGACY_ERROR_TEMP_1039" : {
"message" : [
"Multiple time/session window expressions would result in a cartesian product of rows, therefore they are currently not supported."
Expand Down Expand Up @@ -3506,11 +3547,6 @@
"Table partitions: <partColNames>."
]
},
"_LEGACY_ERROR_TEMP_1170" : {
"message" : [
"Hive support is required to <detail>."
]
},
"_LEGACY_ERROR_TEMP_1171" : {
"message" : [
"createTableColumnTypes option column <col> not found in schema <schema>."
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/sql/tests/test_readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ def test_create(self):

def test_create_without_provider(self):
df = self.df
with self.assertRaisesRegex(AnalysisException, "Hive support is required"):
with self.assertRaisesRegex(
AnalysisException, "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT"
):
df.writeTo("test_table").create()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor

// Create a project if this is an INSERT INTO BY NAME query.
val projectByName = if (i.userSpecifiedCols.nonEmpty) {
Some(createProjectForByNameQuery(i))
Some(createProjectForByNameQuery(r.table.name, i))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ abstract class ResolveInsertionBase extends Rule[LogicalPlan] {
def resolver: Resolver = conf.resolver

/** Add a project to use the table column names for INSERT INTO BY NAME */
protected def createProjectForByNameQuery(i: InsertIntoStatement): LogicalPlan = {
protected def createProjectForByNameQuery(
tblName: String,
i: InsertIntoStatement): LogicalPlan = {
SchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols, resolver)

if (i.userSpecifiedCols.size != i.query.output.size) {
throw QueryCompilationErrors.writeTableWithMismatchedColumnsError(
i.userSpecifiedCols.size, i.query.output.size, i.query)
if (i.userSpecifiedCols.size > i.query.output.size) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
tblName, i.userSpecifiedCols, i.query)
} else {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
tblName, i.userSpecifiedCols, i.query)
}
}
val projectByName = i.userSpecifiedCols.zip(i.query.output)
.map { case (userSpecifiedCol, queryOutputCol) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object TableOutputResolver {

if (actualExpectedCols.size < query.output.size) {
throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
tableName, actualExpectedCols, query)
tableName, actualExpectedCols.map(_.name), query)
}

val errors = new mutable.ArrayBuffer[String]()
Expand All @@ -74,7 +74,7 @@ object TableOutputResolver {
}
if (actualExpectedCols.size > queryOutputCols.size) {
throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
tableName, actualExpectedCols, query)
tableName, actualExpectedCols.map(_.name), query)
}

resolveColumnsByPosition(queryOutputCols, actualExpectedCols, conf, errors += _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,16 +613,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
messageParameters = Map("wf" -> wf.toString))
}

def writeTableWithMismatchedColumnsError(
columnSize: Int, outputSize: Int, t: TreeNode[_]): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1038",
messageParameters = Map(
"columnSize" -> columnSize.toString,
"outputSize" -> outputSize.toString),
origin = t.origin)
}

def multiTimeWindowExpressionsNotSupportedError(t: TreeNode[_]): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1039",
Expand Down Expand Up @@ -1743,10 +1733,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
"partColNames" -> partColNames.map(_.name).mkString(",")))
}

def ddlWithoutHiveSupportEnabledError(detail: String): Throwable = {
def ddlWithoutHiveSupportEnabledError(cmd: String): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1170",
messageParameters = Map("detail" -> detail))
errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT",
messageParameters = Map("cmd" -> cmd))
}

def createTableColumnTypesOptionColumnNotFoundInSchemaError(
Expand Down Expand Up @@ -2056,26 +2046,26 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {

def cannotWriteTooManyColumnsToTableError(
tableName: String,
expected: Seq[Attribute],
expected: Seq[String],
query: LogicalPlan): Throwable = {
new AnalysisException(
errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
messageParameters = Map(
"tableName" -> tableName,
"reason" -> "too many data columns",
"tableColumns" -> expected.map(c => s"'${c.name}'").mkString(", "),
"dataColumns" -> query.output.map(c => s"'${c.name}'").mkString(", ")))
"tableName" -> toSQLId(tableName),
"tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "),
"dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", ")))
}

def cannotWriteNotEnoughColumnsToTableError(
tableName: String, expected: Seq[Attribute], query: LogicalPlan): Throwable = {
tableName: String,
expected: Seq[String],
query: LogicalPlan): Throwable = {
new AnalysisException(
errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
messageParameters = Map(
"tableName" -> tableName,
"reason" -> "not enough data columns",
"tableColumns" -> expected.map(c => s"'${c.name}'").mkString(", "),
"dataColumns" -> query.output.map(c => s"'${c.name}'").mkString(", ")))
"tableName" -> toSQLId(tableName),
"tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "),
"dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", ")))
}

def cannotWriteIncompatibleDataToTableError(tableName: String, errors: Seq[String]): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
val parsedPlan = byName(table, query)

assertNotResolved(parsedPlan)
assertAnalysisError(parsedPlan, Seq(
"Cannot write", "'table-name'", "too many data columns",
"Table columns: 'x', 'y'",
"Data columns: 'x', 'y', 'z'"))
assertAnalysisErrorClass(
inputPlan = parsedPlan,
expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
expectedMessageParameters = Map(
"tableName" -> "`table-name`",
"tableColumns" -> "`x`, `y`",
"dataColumns" -> "`x`, `y`, `z`")
)
}

test("byName: fail extra data fields in struct") {
Expand Down Expand Up @@ -523,10 +527,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
val parsedPlan = byPosition(requiredTable, query)

assertNotResolved(parsedPlan)
assertAnalysisError(parsedPlan, Seq(
"Cannot write", "'table-name'", "not enough data columns",
"Table columns: 'x', 'y'",
"Data columns: 'y'"))
assertAnalysisErrorClass(
inputPlan = parsedPlan,
expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
expectedMessageParameters = Map(
"tableName" -> "`table-name`",
"tableColumns" -> "`x`, `y`",
"dataColumns" -> "`y`")
)
}

test("byPosition: missing optional columns cause failure") {
Expand All @@ -537,10 +545,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
val parsedPlan = byPosition(table, query)

assertNotResolved(parsedPlan)
assertAnalysisError(parsedPlan, Seq(
"Cannot write", "'table-name'", "not enough data columns",
"Table columns: 'x', 'y'",
"Data columns: 'y'"))
assertAnalysisErrorClass(
inputPlan = parsedPlan,
expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
expectedMessageParameters = Map(
"tableName" -> "`table-name`",
"tableColumns" -> "`x`, `y`",
"dataColumns" -> "`y`")
)
}

test("byPosition: insert safe cast") {
Expand Down Expand Up @@ -572,10 +584,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
val parsedPlan = byName(table, query)

assertNotResolved(parsedPlan)
assertAnalysisError(parsedPlan, Seq(
"Cannot write", "'table-name'", "too many data columns",
"Table columns: 'x', 'y'",
"Data columns: 'a', 'b', 'c'"))
assertAnalysisErrorClass(
inputPlan = parsedPlan,
expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
expectedMessageParameters = Map(
"tableName" -> "`table-name`",
"tableColumns" -> "`x`, `y`",
"dataColumns" -> "`a`, `b`, `c`")
)
}

test("bypass output column resolution") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
// Create a project if this INSERT has a user-specified column list.
val hasColumnList = insert.userSpecifiedCols.nonEmpty
val query = if (hasColumnList) {
createProjectForByNameQuery(insert)
createProjectForByNameQuery(tblName, insert)
} else {
insert.query
}
Expand All @@ -401,12 +401,13 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
supportColDefaultValue = true)
} catch {
case e: AnalysisException if staticPartCols.nonEmpty &&
e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH" =>
(e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" ||
e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS") =>
val newException = e.copy(
errorClass = Some("INSERT_PARTITION_COLUMN_ARITY_MISMATCH"),
messageParameters = e.messageParameters ++ Map(
"tableColumns" -> insert.table.output.map(c => s"'${c.name}'").mkString(", "),
"staticPartCols" -> staticPartCols.toSeq.sorted.map(c => s"'$c'").mkString(", ")
"tableColumns" -> insert.table.output.map(c => toSQLId(c.name)).mkString(", "),
"staticPartCols" -> staticPartCols.toSeq.sorted.map(c => toSQLId(c)).mkString(", ")
))
newException.setStackTrace(e.getStackTrace)
throw newException
Expand Down Expand Up @@ -503,8 +504,6 @@ object PreReadCheck extends (LogicalPlan => Unit) {
*/
object PreWriteCheck extends (LogicalPlan => Unit) {

def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }

def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition,
Expand All @@ -514,7 +513,9 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
case LogicalRelation(src, _, _, _) => src
}
if (srcRelations.contains(relation)) {
failAnalysis("Cannot insert into table that is also being read from.")
throw new AnalysisException(
errorClass = "UNSUPPORTED_INSERT.READ_FROM",
messageParameters = Map("relationId" -> toSQLId(relation.toString)))
} else {
// OK
}
Expand All @@ -524,18 +525,25 @@ object PreWriteCheck extends (LogicalPlan => Unit) {

// Right now, we do not support insert into a non-file-based data source table with
// partition specs.
case _: InsertableRelation if partition.nonEmpty =>
failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")

case _ => failAnalysis(s"$relation does not allow insertion.")
case i: InsertableRelation if partition.nonEmpty =>
throw new AnalysisException(
errorClass = "UNSUPPORTED_INSERT.NOT_PARTITIONED",
messageParameters = Map("relationId" -> toSQLId(i.toString)))

case _ =>
throw new AnalysisException(
errorClass = "UNSUPPORTED_INSERT.NOT_ALLOWED",
messageParameters = Map("relationId" -> toSQLId(relation.toString)))
}

case InsertIntoStatement(t, _, _, _, _, _, _)
if !t.isInstanceOf[LeafNode] ||
t.isInstanceOf[Range] ||
t.isInstanceOf[OneRowRelation] ||
t.isInstanceOf[LocalRelation] =>
failAnalysis(s"Inserting into an RDD-based table is not allowed.")
throw new AnalysisException(
errorClass = "UNSUPPORTED_INSERT.RDD_BASED",
messageParameters = Map.empty)

case _ => // OK
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3841,12 +3841,11 @@ INSERT INTO num_result SELECT t1.id, t2.id, t1.val, t2.val, t1.val * t2.val
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "INSERT_COLUMN_ARITY_MISMATCH",
"errorClass" : "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
"sqlState" : "21S01",
"messageParameters" : {
"dataColumns" : "'id', 'id', 'val', 'val', '(val * val)'",
"reason" : "too many data columns",
"tableColumns" : "'id1', 'id2', 'result'",
"dataColumns" : "`id`, `id`, `val`, `val`, `(val * val)`",
"tableColumns" : "`id1`, `id2`, `result`",
"tableName" : "`spark_catalog`.`default`.`num_result`"
}
}
Expand Down
Loading

0 comments on commit f3db20c

Please sign in to comment.