Skip to content

Commit

Permalink
[SPARK-6941] [SQL] Provide a better error message to when inserting i…
Browse files Browse the repository at this point in the history
…nto RDD based table

JIRA: https://issues.apache.org/jira/browse/SPARK-6941

Author: Yijie Shen <henry.yijieshen@gmail.com>

Closes apache#7342 from yijieshen/SPARK-6941 and squashes the following commits:

f82cbe7 [Yijie Shen] reorder import
dd67e40 [Yijie Shen] resolve comments
09518af [Yijie Shen] fix import order in DataframeSuite
0c635d4 [Yijie Shen] make match more specific
9df388d [Yijie Shen] move check into PreWriteCheck
847ab20 [Yijie Shen] Detect insertion error in DataSourceStrategy
  • Loading branch information
yjshen authored and yhuai committed Jul 16, 2015
1 parent b536d5d commit 43dac2c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.DataType

Expand Down Expand Up @@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")

case logical.InsertIntoTable(t, _, _, _, _) =>
if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
failAnalysis(s"Inserting into an RDD-based table is not allowed.")
} else {
// OK
}

case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
Expand Down
55 changes: 52 additions & 3 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,23 @@

package org.apache.spark.sql

import java.io.File

import scala.language.postfixOps

import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint}

import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}

class DataFrameSuite extends QueryTest {
class DataFrameSuite extends QueryTest with SQLTestUtils {
import org.apache.spark.sql.TestData._

lazy val ctx = org.apache.spark.sql.test.TestSQLContext
import ctx.implicits._

def sqlContext: SQLContext = ctx

test("analysis error should be eagerly reported") {
val oldSetting = ctx.conf.dataFrameEagerAnalysis
// Eager analysis.
Expand Down Expand Up @@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest {
assert(f.getMessage.contains("column3"))
assert(!f.getMessage.contains("column2"))
}

test("SPARK-6941: Better error message for inserting into RDD-based Table") {
withTempDir { dir =>

val tempParquetFile = new File(dir, "tmp_parquet")
val tempJsonFile = new File(dir, "tmp_json")

val df = Seq(Tuple1(1)).toDF()
val insertion = Seq(Tuple1(2)).toDF("col")

// pass case: parquet table (HadoopFsRelation)
df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath)
val pdf = ctx.read.parquet(tempParquetFile.getCanonicalPath)
pdf.registerTempTable("parquet_base")
insertion.write.insertInto("parquet_base")

// pass case: json table (InsertableRelation)
df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath)
val jdf = ctx.read.json(tempJsonFile.getCanonicalPath)
jdf.registerTempTable("json_base")
insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")

// error cases: insert into an RDD
df.registerTempTable("rdd_base")
val e1 = intercept[AnalysisException] {
insertion.write.insertInto("rdd_base")
}
assert(e1.getMessage.contains("Inserting into an RDD-based table is not allowed."))

// error case: insert into a logical plan that is not a LeafNode
val indirectDS = pdf.select("_1").filter($"_1" > 5)
indirectDS.registerTempTable("indirect_ds")
val e2 = intercept[AnalysisException] {
insertion.write.insertInto("indirect_ds")
}
assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))

// error case: insert into an OneRowRelation
new DataFrame(ctx, OneRowRelation).registerTempTable("one_row")
val e3 = intercept[AnalysisException] {
insertion.write.insertInto("one_row")
}
assert(e3.getMessage.contains("Inserting into an RDD-based table is not allowed."))
}
}
}

0 comments on commit 43dac2c

Please sign in to comment.