From e01919e834d301e13adc8919932796ebae900576 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 19 Jan 2018 07:36:06 +0900 Subject: [PATCH] [SPARK-23094] Fix invalid character handling in JsonDataSource ## What changes were proposed in this pull request? There were two related fixes regarding `from_json`, `get_json_object` and `json_tuple` ([Fix #1](https://github.com/apache/spark/commit/c8803c06854683c8761fdb3c0e4c55d5a9e22a95), [Fix #2](https://github.com/apache/spark/commit/86174ea89b39a300caaba6baffac70f3dc702788)), but they weren't comprehensive it seems. I wanted to extend those fixes to all the parsers, and add tests for each case. ## How was this patch tested? Regression tests Author: Burak Yavuz Closes #20302 from brkyvz/json-invfix. --- .../catalyst/json/CreateJacksonParser.scala | 5 +-- .../sources/JsonHadoopFsRelationSuite.scala | 34 +++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala index 025a388aacaa5..b1672e7e2fca2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala @@ -40,10 +40,11 @@ private[sql] object CreateJacksonParser extends Serializable { } def text(jsonFactory: JsonFactory, record: Text): JsonParser = { - jsonFactory.createParser(record.getBytes, 0, record.getLength) + val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength) + jsonFactory.createParser(new InputStreamReader(bain, "UTF-8")) } def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { - jsonFactory.createParser(record) + jsonFactory.createParser(new InputStreamReader(record, "UTF-8")) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala index 49be30435ad2f..27f398ebf301a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala @@ -28,6 +28,8 @@ import org.apache.spark.sql.types._ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { override val dataSourceName: String = "json" + private val badJson = "\u0000\u0000\u0000A\u0001AAA" + // JSON does not write data of NullType and does not play well with BinaryType. override protected def supportsDataType(dataType: DataType): Boolean = dataType match { case _: NullType => false @@ -105,4 +107,36 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { ) } } + + test("invalid json with leading nulls - from file (multiLine=true)") { + import testImplicits._ + withTempDir { tempDir => + val path = tempDir.getAbsolutePath + Seq(badJson, """{"a":1}""").toDS().write.mode("overwrite").text(path) + val expected = s"""$badJson\n{"a":1}\n""" + val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType) + val df = + spark.read.format(dataSourceName).option("multiLine", true).schema(schema).load(path) + checkAnswer(df, Row(null, expected)) + } + } + + test("invalid json with leading nulls - from file (multiLine=false)") { + import testImplicits._ + withTempDir { tempDir => + val path = tempDir.getAbsolutePath + Seq(badJson, """{"a":1}""").toDS().write.mode("overwrite").text(path) + val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType) + val df = + spark.read.format(dataSourceName).option("multiLine", false).schema(schema).load(path) + checkAnswer(df, Seq(Row(1, null), Row(null, badJson))) + } + } + + test("invalid json with leading nulls - from dataset") { + import testImplicits._ + checkAnswer( + spark.read.json(Seq(badJson).toDS()), + Row(badJson)) + } }