diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index dcb9226511c62..7d19d199e960b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -616,7 +616,7 @@ class SQLContext(@transient val sparkContext: SparkContext) @Experimental def jsonRDD(json: RDD[String], schema: StructType): DataFrame = { if (conf.useJacksonStreamingAPI) { - baseRelationToDataFrame(new JSONRelation(json, None, 1.0, Some(schema))(this)) + baseRelationToDataFrame(new JSONRelation(() => json, None, 1.0, Some(schema))(this)) } else { val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord val appliedSchema = @@ -650,7 +650,7 @@ class SQLContext(@transient val sparkContext: SparkContext) @Experimental def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = { if (conf.useJacksonStreamingAPI) { - baseRelationToDataFrame(new JSONRelation(json, None, samplingRatio, None)(this)) + baseRelationToDataFrame(new JSONRelation(() => json, None, samplingRatio, None)(this)) } else { val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord val appliedSchema = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 0beab0277c201..c772cd1f53e53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -104,7 +104,12 @@ private[sql] class DefaultSource } private[sql] class JSONRelation( - baseRDD: RDD[String], + // baseRDD is not immutable with respect to INSERT OVERWRITE + // and so it must be recreated at least as often as the + // underlying inputs are modified. To be safe, a function is + // used instead of a regular RDD value to ensure a fresh RDD is + // recreated for each and every operation. + baseRDD: () => RDD[String], val path: Option[String], val samplingRatio: Double, userSpecifiedSchema: Option[StructType])( @@ -120,7 +125,7 @@ private[sql] class JSONRelation( userSpecifiedSchema: Option[StructType], sqlContext: SQLContext) = this( - sqlContext.sparkContext.textFile(path), + () => sqlContext.sparkContext.textFile(path), Some(path), samplingRatio, userSpecifiedSchema)(sqlContext) @@ -132,13 +137,13 @@ private[sql] class JSONRelation( override lazy val schema = userSpecifiedSchema.getOrElse { if (useJacksonStreamingAPI) { InferSchema( - baseRDD, + baseRDD(), samplingRatio, sqlContext.conf.columnNameOfCorruptRecord) } else { JsonRDD.nullTypeToStringType( JsonRDD.inferSchema( - baseRDD, + baseRDD(), samplingRatio, sqlContext.conf.columnNameOfCorruptRecord)) } @@ -147,12 +152,12 @@ private[sql] class JSONRelation( override def buildScan(): RDD[Row] = { if (useJacksonStreamingAPI) { JacksonParser( - baseRDD, + baseRDD(), schema, sqlContext.conf.columnNameOfCorruptRecord) } else { JsonRDD.jsonStringToRow( - baseRDD, + baseRDD(), schema, sqlContext.conf.columnNameOfCorruptRecord) } @@ -161,12 +166,12 @@ private[sql] class JSONRelation( override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = { if (useJacksonStreamingAPI) { JacksonParser( - baseRDD, + baseRDD(), StructType.fromAttributes(requiredColumns), sqlContext.conf.columnNameOfCorruptRecord) } else { JsonRDD.jsonStringToRow( - baseRDD, + baseRDD(), StructType.fromAttributes(requiredColumns), sqlContext.conf.columnNameOfCorruptRecord) }