Skip to content

Commit

Permalink
[SPARK-42236][SQL] Refine NULLABLE_ARRAY_OR_MAP_ELEMENT
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to refine `NULLABLE_ARRAY_OR_MAP_ELEMENT` into main-sub classes structure.

`NOT_NULL_CONSTRAINT_VIOLATION`
- `ARRAY_ELEMENT`
- `MAP_VALUE`

### Why are the changes needed?

The name of error class is misleading, and we can make this more generic so that we reuse for various situation.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Updated & added UTs.

Closes apache#39804 from itholic/NULLABLE_ARRAY_OR_MAP_ELEMENT.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 1cba3b9)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
itholic authored and dongjoon-hyun committed Feb 1, 2023
1 parent 53ac90f commit a72a2ad
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 13 deletions.
Expand Up @@ -91,7 +91,8 @@ private[sql] class ProtobufDeserializer(
val element = iterator.next()
if (element == null) {
if (!containsNull) {
throw QueryCompilationErrors.nullableArrayOrMapElementError(protoElementPath)
throw QueryCompilationErrors.notNullConstraintViolationArrayElementError(
protoElementPath)
} else {
elementUpdater.setNullAt(i)
}
Expand Down Expand Up @@ -129,7 +130,7 @@ private[sql] class ProtobufDeserializer(
keyWriter(keyUpdater, i, field.getField(keyField))
if (field.getField(valueField) == null) {
if (!valueContainsNull) {
throw QueryCompilationErrors.nullableArrayOrMapElementError(protoPath)
throw QueryCompilationErrors.notNullConstraintViolationMapValueError(protoPath)
} else {
valueUpdater.setNullAt(i)
}
Expand Down
24 changes: 18 additions & 6 deletions core/src/main/resources/error/error-classes.json
Expand Up @@ -1004,6 +1004,24 @@
"Operation <operation> is not allowed for <tableIdentWithDB> because it is not a partitioned table."
]
},
"NOT_NULL_CONSTRAINT_VIOLATION" : {
"message" : [
"Assigning a NULL is not allowed here."
],
"subClass" : {
"ARRAY_ELEMENT" : {
"message" : [
"The array <columnPath> is defined to contain only elements that are NOT NULL."
]
},
"MAP_VALUE" : {
"message" : [
"The map <columnPath> is defined to contain only values that are NOT NULL."
]
}
},
"sqlState" : "42000"
},
"NO_HANDLER_FOR_UDAF" : {
"message" : [
"No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead."
Expand All @@ -1019,12 +1037,6 @@
"UDF class <className> doesn't implement any UDF interface."
]
},
"NULLABLE_ARRAY_OR_MAP_ELEMENT" : {
"message" : [
"Array or map at <columnPath> contains nullable element while it's required to be non-nullable."
],
"sqlState" : "42000"
},
"NULLABLE_COLUMN_OR_FIELD" : {
"message" : [
"Column or field <name> is nullable while it's required to be non-nullable."
Expand Down
Expand Up @@ -131,7 +131,7 @@ object Project {

case (ArrayType(et, containsNull), expected: ArrayType) =>
if (containsNull & !expected.containsNull) {
throw QueryCompilationErrors.nullableArrayOrMapElementError(columnPath)
throw QueryCompilationErrors.notNullConstraintViolationArrayElementError(columnPath)
}
val param = NamedLambdaVariable("x", et, containsNull)
val reconciledElement = reconcileColumnType(
Expand All @@ -141,7 +141,7 @@ object Project {

case (MapType(kt, vt, valueContainsNull), expected: MapType) =>
if (valueContainsNull & !expected.valueContainsNull) {
throw QueryCompilationErrors.nullableArrayOrMapElementError(columnPath)
throw QueryCompilationErrors.notNullConstraintViolationMapValueError(columnPath)
}
val keyParam = NamedLambdaVariable("key", kt, nullable = false)
val valueParam = NamedLambdaVariable("value", vt, valueContainsNull)
Expand Down
Expand Up @@ -3180,9 +3180,21 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
messageParameters = Map("name" -> toSQLId(name)))
}

def nullableArrayOrMapElementError(path: Seq[String]): Throwable = {
def notNullConstraintViolationArrayElementError(path: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "NULLABLE_ARRAY_OR_MAP_ELEMENT",
errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.ARRAY_ELEMENT",
messageParameters = Map("columnPath" -> toSQLId(path)))
}

def notNullConstraintViolationMapValueError(path: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.MAP_VALUE",
messageParameters = Map("columnPath" -> toSQLId(path)))
}

def notNullConstraintViolationStructFieldError(path: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.STRUCT_FIELD",
messageParameters = Map("columnPath" -> toSQLId(path)))
}

Expand Down
Expand Up @@ -262,7 +262,7 @@ class DataFrameToSchemaSuite extends QueryTest with SharedSparkSession {
val e = intercept[SparkThrowable](data.to(schema))
checkError(
exception = e,
errorClass = "NULLABLE_ARRAY_OR_MAP_ELEMENT",
errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.ARRAY_ELEMENT",
parameters = Map("columnPath" -> "`arr`"))
}

Expand Down Expand Up @@ -320,4 +320,16 @@ class DataFrameToSchemaSuite extends QueryTest with SharedSparkSession {
assert(df.schema == schema)
checkAnswer(df, Row(Map("a" -> Row("b", "a"))))
}

test("map value: incompatible map nullability") {
val m = MapType(StringType, StringType, valueContainsNull = false)
val schema = new StructType().add("map", m, nullable = false)
val data = Seq("a" -> null).toDF("i", "j").select(map($"i", $"j").as("map"))
assert(data.schema.fields(0).dataType.asInstanceOf[MapType].valueContainsNull)
val e = intercept[SparkThrowable](data.to(schema))
checkError(
exception = e,
errorClass = "NOT_NULL_CONSTRAINT_VIOLATION.MAP_VALUE",
parameters = Map("columnPath" -> "`map`"))
}
}

0 comments on commit a72a2ad

Please sign in to comment.