Skip to content

Commit

Permalink
Detect insertion error in DataSourceStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen committed Jul 14, 2015
1 parent 408b384 commit 847ab20
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil

case logical.InsertIntoTable(t, _, _, _, _) =>
throw new QueryPlanningException(s"""
| Attempt to insert into a RDD-based table: ${t.simpleString}, which is immutable.
| Save the RDD to a data source and register the data source as a table before insertion.
""".stripMargin)

case _ => Nil
}

Expand Down Expand Up @@ -393,3 +399,5 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
filters.flatMap(translate)
}
}

class QueryPlanningException(message: String) extends Exception(message)
23 changes: 23 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.sources.QueryPlanningException

import scala.language.postfixOps

import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -761,4 +763,25 @@ class DataFrameSuite extends QueryTest {
assert(f.getMessage.contains("column3"))
assert(!f.getMessage.contains("column2"))
}

test("SPARK-6941: Better Error Message for inserting into RDD-based Table") {
val df = Seq(Tuple1(1)).toDF("col")
df.registerTempTable("rdd_base")

df.write.parquet("tmp_parquet")
val pdf = ctx.read.parquet("tmp_parquet")
pdf.registerTempTable("parquet_base")

df.write.json("tmp_json")
val jdf = ctx.read.json("tmp_json")
jdf.registerTempTable("json_base")

val insertion = Seq(Tuple1(2)).toDF("col")
val e = intercept[QueryPlanningException] {
insertion.write.insertInto("rdd_base")
}
assert(e.getMessage.contains("Attempt to insert into a RDD-based table"))
insertion.write.insertInto("parquet_base")
insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
}
}

0 comments on commit 847ab20

Please sign in to comment.