Skip to content

Commit

Permalink
Recreate the baseRDD each for each scan operation
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Howell committed May 6, 2015
1 parent a7ebeb2 commit 26fea31
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@Experimental
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
if (conf.useJacksonStreamingAPI) {
baseRelationToDataFrame(new JSONRelation(json, None, 1.0, Some(schema))(this))
baseRelationToDataFrame(new JSONRelation(() => json, None, 1.0, Some(schema))(this))
} else {
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
val appliedSchema =
Expand Down Expand Up @@ -650,7 +650,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
if (conf.useJacksonStreamingAPI) {
baseRelationToDataFrame(new JSONRelation(json, None, samplingRatio, None)(this))
baseRelationToDataFrame(new JSONRelation(() => json, None, samplingRatio, None)(this))
} else {
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
val appliedSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ private[sql] class DefaultSource
}

private[sql] class JSONRelation(
baseRDD: RDD[String],
// baseRDD is not immutable with respect to INSERT OVERWRITE
// and so it must be recreated at least as often as the
// underlying inputs are modified. To be safe, a function is
// used instead of a regular RDD value to ensure a fresh RDD is
// recreated for each and every operation.
baseRDD: () => RDD[String],
val path: Option[String],
val samplingRatio: Double,
userSpecifiedSchema: Option[StructType])(
Expand All @@ -120,7 +125,7 @@ private[sql] class JSONRelation(
userSpecifiedSchema: Option[StructType],
sqlContext: SQLContext) =
this(
sqlContext.sparkContext.textFile(path),
() => sqlContext.sparkContext.textFile(path),
Some(path),
samplingRatio,
userSpecifiedSchema)(sqlContext)
Expand All @@ -132,13 +137,13 @@ private[sql] class JSONRelation(
override lazy val schema = userSpecifiedSchema.getOrElse {
if (useJacksonStreamingAPI) {
InferSchema(
baseRDD,
baseRDD(),
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord)
} else {
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(
baseRDD,
baseRDD(),
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord))
}
Expand All @@ -147,12 +152,12 @@ private[sql] class JSONRelation(
override def buildScan(): RDD[Row] = {
if (useJacksonStreamingAPI) {
JacksonParser(
baseRDD,
baseRDD(),
schema,
sqlContext.conf.columnNameOfCorruptRecord)
} else {
JsonRDD.jsonStringToRow(
baseRDD,
baseRDD(),
schema,
sqlContext.conf.columnNameOfCorruptRecord)
}
Expand All @@ -161,12 +166,12 @@ private[sql] class JSONRelation(
override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = {
if (useJacksonStreamingAPI) {
JacksonParser(
baseRDD,
baseRDD(),
StructType.fromAttributes(requiredColumns),
sqlContext.conf.columnNameOfCorruptRecord)
} else {
JsonRDD.jsonStringToRow(
baseRDD,
baseRDD(),
StructType.fromAttributes(requiredColumns),
sqlContext.conf.columnNameOfCorruptRecord)
}
Expand Down

0 comments on commit 26fea31

Please sign in to comment.