Skip to content

Commit

Permalink
[SPARK-13719][SQL] Parse JSON rows having an array type and a struct …
Browse files Browse the repository at this point in the history
…type in the same fieild

## What changes were proposed in this pull request?

This apache#2400 added the support to parse JSON rows wrapped with an array. However, this throws an exception when the given data contains array data and struct data in the same field as below:

```json
{"a": {"b": 1}}
{"a": []}
```

and the schema is given as below:

```scala
val schema =
  StructType(
    StructField("a", StructType(
      StructField("b", StringType) :: Nil
    )) :: Nil)
```

- **Before**

```scala
sqlContext.read.schema(schema).json(path).show()
```

```scala
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 10, 192.168.1.170): java.lang.ClassCastException: org.apache.spark.sql.types.GenericArrayData cannot be cast to org.apache.spark.sql.catalyst.InternalRow
	at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:50)
	at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:247)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
...
```

- **After**

```scala
sqlContext.read.schema(schema).json(path).show()
```

```bash
+----+
|   a|
+----+
| [1]|
|null|
+----+
```

For other data types, in this case it converts the given values are `null` but only this case emits an exception.

This PR makes the support for wrapped rows applied only at the top level.

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for code style tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#11752 from HyukjinKwon/SPARK-3308-follow-up.
  • Loading branch information
HyukjinKwon authored and roygao94 committed Mar 22, 2016
1 parent 9833e8f commit 927c234
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.CharArrayWriter

import com.fasterxml.jackson.core.JsonFactory
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,31 @@ object JacksonParser {

/**
* Parse the current token (and related children) according to a desired schema
* This is an wrapper for the method `convertField()` to handle a row wrapped
* with an array.
*/
def convertField(
def convertRootField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
import com.fasterxml.jackson.core.JsonToken._
(parser.getCurrentToken, schema) match {
case (START_ARRAY, st: StructType) =>
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
convertArray(factory, parser, st)

case (START_OBJECT, ArrayType(st, _)) =>
// the business end of SPARK-3308:
// when an object is found but an array is requested just wrap it in a list
convertField(factory, parser, st) :: Nil

case _ =>
convertField(factory, parser, schema)
}
}

private def convertField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
Expand Down Expand Up @@ -157,19 +180,9 @@ object JacksonParser {
case (START_OBJECT, st: StructType) =>
convertObject(factory, parser, st)

case (START_ARRAY, st: StructType) =>
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
convertArray(factory, parser, st)

case (START_ARRAY, ArrayType(st, _)) =>
convertArray(factory, parser, st)

case (START_OBJECT, ArrayType(st, _)) =>
// the business end of SPARK-3308:
// when an object is found but an array is requested just wrap it in a list
convertField(factory, parser, st) :: Nil

case (START_OBJECT, MapType(StringType, kt, _)) =>
convertMap(factory, parser, kt)

Expand Down Expand Up @@ -264,7 +277,7 @@ object JacksonParser {
Utils.tryWithResource(factory.createParser(record)) { parser =>
parser.nextToken()

convertField(factory, parser, schema) match {
convertRootField(factory, parser, schema) match {
case null => failedRecord(record)
case row: InternalRow => row :: Nil
case array: ArrayData =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
parser.nextToken()
JacksonParser.convertField(factory, parser, dataType)
JacksonParser.convertRootField(factory, parser, dataType)
}
}

Expand Down Expand Up @@ -1426,6 +1426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}

test("Parse JSON rows having an array type and a struct type in the same field.") {
withTempDir { dir =>
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)

val schema =
StructType(
StructField("a", StructType(
StructField("b", StringType) :: Nil
)) :: Nil)
val jsonDF = sqlContext.read.schema(schema).json(path)
assert(jsonDF.count() == 2)
}
}

test("SPARK-12872 Support to specify the option for compression codec") {
withTempDir { dir =>
val dir = Utils.createTempDir()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ private[json] trait TestJsonData {
sqlContext.sparkContext.parallelize(
"""{"ts":1451732645}""" :: Nil)

def arrayAndStructRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
"""{"a": {"b": 1}}""" ::
"""{"a": []}""" :: Nil)

lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)

def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())
Expand Down

0 comments on commit 927c234

Please sign in to comment.