From 927c234e46657e8f685efcc9582909e254a37c20 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 16 Mar 2016 18:20:30 -0700 Subject: [PATCH] [SPARK-13719][SQL] Parse JSON rows having an array type and a struct type in the same fieild ## What changes were proposed in this pull request? This https://github.com/apache/spark/pull/2400 added the support to parse JSON rows wrapped with an array. However, this throws an exception when the given data contains array data and struct data in the same field as below: ```json {"a": {"b": 1}} {"a": []} ``` and the schema is given as below: ```scala val schema = StructType( StructField("a", StructType( StructField("b", StringType) :: Nil )) :: Nil) ``` - **Before** ```scala sqlContext.read.schema(schema).json(path).show() ``` ```scala Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 10, 192.168.1.170): java.lang.ClassCastException: org.apache.spark.sql.types.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:50) at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:247) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) ... ``` - **After** ```scala sqlContext.read.schema(schema).json(path).show() ``` ```bash +----+ | a| +----+ | [1]| |null| +----+ ``` For other data types, in this case it converts the given values are `null` but only this case emits an exception. This PR makes the support for wrapped rows applied only at the top level. ## How was this patch tested? Unit tests were used and `./dev/run_tests` for code style tests. Author: hyukjinkwon Closes #11752 from HyukjinKwon/SPARK-3308-follow-up. --- .../datasources/json/JSONRelation.scala | 1 - .../datasources/json/JacksonParser.scala | 37 +++++++++++++------ .../datasources/json/JsonSuite.scala | 19 +++++++++- .../datasources/json/TestJsonData.scala | 5 +++ 4 files changed, 48 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 3fa5ebf1bb81e..751d78def4238 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json import java.io.CharArrayWriter import com.fasterxml.jackson.core.JsonFactory -import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.{LongWritable, NullWritable, Text} import org.apache.hadoop.mapred.{JobConf, TextInputFormat} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index b2f5c1e96421d..3252b6c77f888 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -49,8 +49,31 @@ object JacksonParser { /** * Parse the current token (and related children) according to a desired schema + * This is an wrapper for the method `convertField()` to handle a row wrapped + * with an array. */ - def convertField( + def convertRootField( + factory: JsonFactory, + parser: JsonParser, + schema: DataType): Any = { + import com.fasterxml.jackson.core.JsonToken._ + (parser.getCurrentToken, schema) match { + case (START_ARRAY, st: StructType) => + // SPARK-3308: support reading top level JSON arrays and take every element + // in such an array as a row + convertArray(factory, parser, st) + + case (START_OBJECT, ArrayType(st, _)) => + // the business end of SPARK-3308: + // when an object is found but an array is requested just wrap it in a list + convertField(factory, parser, st) :: Nil + + case _ => + convertField(factory, parser, schema) + } + } + + private def convertField( factory: JsonFactory, parser: JsonParser, schema: DataType): Any = { @@ -157,19 +180,9 @@ object JacksonParser { case (START_OBJECT, st: StructType) => convertObject(factory, parser, st) - case (START_ARRAY, st: StructType) => - // SPARK-3308: support reading top level JSON arrays and take every element - // in such an array as a row - convertArray(factory, parser, st) - case (START_ARRAY, ArrayType(st, _)) => convertArray(factory, parser, st) - case (START_OBJECT, ArrayType(st, _)) => - // the business end of SPARK-3308: - // when an object is found but an array is requested just wrap it in a list - convertField(factory, parser, st) :: Nil - case (START_OBJECT, MapType(StringType, kt, _)) => convertMap(factory, parser, kt) @@ -264,7 +277,7 @@ object JacksonParser { Utils.tryWithResource(factory.createParser(record)) { parser => parser.nextToken() - convertField(factory, parser, schema) match { + convertRootField(factory, parser, schema) match { case null => failedRecord(record) case row: InternalRow => row :: Nil case array: ArrayData => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 4a8c128fa9605..6d942c4c90289 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -65,7 +65,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Utils.tryWithResource(factory.createParser(writer.toString)) { parser => parser.nextToken() - JacksonParser.convertField(factory, parser, dataType) + JacksonParser.convertRootField(factory, parser, dataType) } } @@ -1426,6 +1426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("Parse JSON rows having an array type and a struct type in the same field.") { + withTempDir { dir => + val dir = Utils.createTempDir() + dir.delete() + val path = dir.getCanonicalPath + arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) + + val schema = + StructType( + StructField("a", StructType( + StructField("b", StringType) :: Nil + )) :: Nil) + val jsonDF = sqlContext.read.schema(schema).json(path) + assert(jsonDF.count() == 2) + } + } + test("SPARK-12872 Support to specify the option for compression codec") { withTempDir { dir => val dir = Utils.createTempDir() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index a0836058d3c74..b2eff816ee65c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -209,6 +209,11 @@ private[json] trait TestJsonData { sqlContext.sparkContext.parallelize( """{"ts":1451732645}""" :: Nil) + def arrayAndStructRecords: RDD[String] = + sqlContext.sparkContext.parallelize( + """{"a": {"b": 1}}""" :: + """{"a": []}""" :: Nil) + lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())