Skip to content

Commit

Permalink
[SPARK-26716][SQL] FileFormat: the supported types of read/write shou…
Browse files Browse the repository at this point in the history
…ld be consistent

## What changes were proposed in this pull request?

1. Remove parameter `isReadPath`. The supported types of read/write should be the same.

2. Disallow reading `NullType` for ORC data source. In apache#21667 and apache#21389, it was supposed that ORC supports reading `NullType`, but can't write it. This doesn't make sense. I read docs and did some tests. ORC doesn't support `NullType`.

## How was this patch tested?

Unit tset

Closes apache#23639 from gengliangwang/supportDataType.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
gengliangwang authored and jackylee-ch committed Feb 18, 2019
1 parent b2e814c commit 908c76b
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object DataSourceUtils {
*/
private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = {
schema.foreach { field =>
if (!format.supportDataType(field.dataType, isReadPath)) {
if (!format.supportDataType(field.dataType)) {
throw new AnalysisException(
s"$format data source does not support ${field.dataType.catalogString} data type.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ trait FileFormat {
* Returns whether this format supports the given [[DataType]] in read/write path.
* By default all data types are supported.
*/
def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true
def supportDataType(dataType: DataType): Boolean = true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {

override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,17 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {

override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }
case st: StructType => st.forall { f => supportDataType(f.dataType) }

case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)
case ArrayType(elementType, _) => supportDataType(elementType)

case MapType(keyType, valueType, _) =>
supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)
supportDataType(keyType) && supportDataType(valueType)

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

case _: NullType => true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,19 +235,17 @@ class OrcFileFormat
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }
case st: StructType => st.forall { f => supportDataType(f.dataType) }

case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)
case ArrayType(elementType, _) => supportDataType(elementType)

case MapType(keyType, valueType, _) =>
supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)
supportDataType(keyType) && supportDataType(valueType)

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)

case _: NullType => isReadPath
case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,17 +453,17 @@ class ParquetFileFormat
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }
case st: StructType => st.forall { f => supportDataType(f.dataType) }

case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)
case ArrayType(elementType, _) => supportDataType(elementType)

case MapType(keyType, valueType, _) =>
supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)
supportDataType(keyType) && supportDataType(valueType)

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean =
override def supportDataType(dataType: DataType): Boolean =
dataType == StringType
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,69 +367,43 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}

test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") {
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath

Seq("orc").foreach { format =>
// write path
var msg = intercept[AnalysisException] {
sql("select null").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new NullData())
sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

// read path
// We expect the types below should be passed for backward-compatibility

// Null type
var schema = StructType(StructField("a", NullType, true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()

// UDT having null data
schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
}

Seq("parquet", "csv").foreach { format =>
// write path
var msg = intercept[AnalysisException] {
sql("select null").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new NullData())
sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

// read path
msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", NullType, true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))
// TODO(SPARK-26744): support data type validating in V2 data source, and test V2 as well.
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "orc") {
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath

Seq("parquet", "csv", "orc").foreach { format =>
// write path
var msg = intercept[AnalysisException] {
sql("select null").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new NullData())
sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

// read path
msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", NullType, true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))

msg = intercept[AnalysisException] {
val schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
spark.range(1).write.format(format).mode("overwrite").save(tempDir)
spark.read.schema(schema).format(format).load(tempDir).collect()
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"$format data source does not support null data type."))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,17 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
}
}

override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }
case st: StructType => st.forall { f => supportDataType(f.dataType) }

case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)
case ArrayType(elementType, _) => supportDataType(elementType)

case MapType(keyType, valueType, _) =>
supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)
supportDataType(keyType) && supportDataType(valueType)

case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)

case _: NullType => isReadPath
case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

case _ => false
}
Expand Down

0 comments on commit 908c76b

Please sign in to comment.