From a72a2adb6f23a92fb686e720ef3d26bd060ab9be Mon Sep 17 00:00:00 2001 From: itholic Date: Tue, 31 Jan 2023 19:35:57 +0300 Subject: [PATCH] [SPARK-42236][SQL] Refine `NULLABLE_ARRAY_OR_MAP_ELEMENT` ### What changes were proposed in this pull request? This PR proposes to refine `NULLABLE_ARRAY_OR_MAP_ELEMENT` into main-sub classes structure. `NOT_NULL_CONSTRAINT_VIOLATION` - `ARRAY_ELEMENT` - `MAP_VALUE` ### Why are the changes needed? The name of error class is misleading, and we can make this more generic so that we reuse for various situation. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Updated & added UTs. Closes #39804 from itholic/NULLABLE_ARRAY_OR_MAP_ELEMENT. Authored-by: itholic Signed-off-by: Max Gekk (cherry picked from commit 1cba3b98160ad9d7cdf29e84ff0191598177835c) Signed-off-by: Max Gekk --- .../sql/protobuf/ProtobufDeserializer.scala | 5 ++-- .../main/resources/error/error-classes.json | 24 ++++++++++++++----- .../plans/logical/basicLogicalOperators.scala | 4 ++-- .../sql/errors/QueryCompilationErrors.scala | 16 +++++++++++-- .../spark/sql/DataFrameToSchemaSuite.scala | 14 ++++++++++- 5 files changed, 50 insertions(+), 13 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala index 224e22c0f52e8..37278fab8a347 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala @@ -91,7 +91,8 @@ private[sql] class ProtobufDeserializer( val element = iterator.next() if (element == null) { if (!containsNull) { - throw QueryCompilationErrors.nullableArrayOrMapElementError(protoElementPath) + throw QueryCompilationErrors.notNullConstraintViolationArrayElementError( + protoElementPath) } else { elementUpdater.setNullAt(i) } @@ -129,7 +130,7 @@ private[sql] class ProtobufDeserializer( keyWriter(keyUpdater, i, field.getField(keyField)) if (field.getField(valueField) == null) { if (!valueContainsNull) { - throw QueryCompilationErrors.nullableArrayOrMapElementError(protoPath) + throw QueryCompilationErrors.notNullConstraintViolationMapValueError(protoPath) } else { valueUpdater.setNullAt(i) } diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index d881e48d604d3..b70f03b06a623 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -1004,6 +1004,24 @@ "Operation is not allowed for because it is not a partitioned table." ] }, + "NOT_NULL_CONSTRAINT_VIOLATION" : { + "message" : [ + "Assigning a NULL is not allowed here." + ], + "subClass" : { + "ARRAY_ELEMENT" : { + "message" : [ + "The array is defined to contain only elements that are NOT NULL." + ] + }, + "MAP_VALUE" : { + "message" : [ + "The map is defined to contain only values that are NOT NULL." + ] + } + }, + "sqlState" : "42000" + }, "NO_HANDLER_FOR_UDAF" : { "message" : [ "No handler for UDAF ''. Use sparkSession.udf.register(...) instead." @@ -1019,12 +1037,6 @@ "UDF class doesn't implement any UDF interface." ] }, - "NULLABLE_ARRAY_OR_MAP_ELEMENT" : { - "message" : [ - "Array or map at contains nullable element while it's required to be non-nullable." - ], - "sqlState" : "42000" - }, "NULLABLE_COLUMN_OR_FIELD" : { "message" : [ "Column or field is nullable while it's required to be non-nullable." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index a8dfb8fbd8474..74929bf5d7924 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -131,7 +131,7 @@ object Project { case (ArrayType(et, containsNull), expected: ArrayType) => if (containsNull & !expected.containsNull) { - throw QueryCompilationErrors.nullableArrayOrMapElementError(columnPath) + throw QueryCompilationErrors.notNullConstraintViolationArrayElementError(columnPath) } val param = NamedLambdaVariable("x", et, containsNull) val reconciledElement = reconcileColumnType( @@ -141,7 +141,7 @@ object Project { case (MapType(kt, vt, valueContainsNull), expected: MapType) => if (valueContainsNull & !expected.valueContainsNull) { - throw QueryCompilationErrors.nullableArrayOrMapElementError(columnPath) + throw QueryCompilationErrors.notNullConstraintViolationMapValueError(columnPath) } val keyParam = NamedLambdaVariable("key", kt, nullable = false) val valueParam = NamedLambdaVariable("value", vt, valueContainsNull) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index c43806449a885..ff53588a21571 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3180,9 +3180,21 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { messageParameters = Map("name" -> toSQLId(name))) } - def nullableArrayOrMapElementError(path: Seq[String]): Throwable = { + def notNullConstraintViolationArrayElementError(path: Seq[String]): Throwable = { new AnalysisException( - errorClass = "NULLABLE_ARRAY_OR_MAP_ELEMENT", + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.ARRAY_ELEMENT", + messageParameters = Map("columnPath" -> toSQLId(path))) + } + + def notNullConstraintViolationMapValueError(path: Seq[String]): Throwable = { + new AnalysisException( + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.MAP_VALUE", + messageParameters = Map("columnPath" -> toSQLId(path))) + } + + def notNullConstraintViolationStructFieldError(path: Seq[String]): Throwable = { + new AnalysisException( + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.STRUCT_FIELD", messageParameters = Map("columnPath" -> toSQLId(path))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala index 6a3401073a017..5bbaebbd9ce2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala @@ -262,7 +262,7 @@ class DataFrameToSchemaSuite extends QueryTest with SharedSparkSession { val e = intercept[SparkThrowable](data.to(schema)) checkError( exception = e, - errorClass = "NULLABLE_ARRAY_OR_MAP_ELEMENT", + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.ARRAY_ELEMENT", parameters = Map("columnPath" -> "`arr`")) } @@ -320,4 +320,16 @@ class DataFrameToSchemaSuite extends QueryTest with SharedSparkSession { assert(df.schema == schema) checkAnswer(df, Row(Map("a" -> Row("b", "a")))) } + + test("map value: incompatible map nullability") { + val m = MapType(StringType, StringType, valueContainsNull = false) + val schema = new StructType().add("map", m, nullable = false) + val data = Seq("a" -> null).toDF("i", "j").select(map($"i", $"j").as("map")) + assert(data.schema.fields(0).dataType.asInstanceOf[MapType].valueContainsNull) + val e = intercept[SparkThrowable](data.to(schema)) + checkError( + exception = e, + errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.MAP_VALUE", + parameters = Map("columnPath" -> "`map`")) + } }