Skip to content

Commit

Permalink
[SPARK-30782][SQL] Column resolution doesn't respect current catalog/…
Browse files Browse the repository at this point in the history
…namespace for v2 tables

### What changes were proposed in this pull request?

This PR proposes to fix an issue where qualified columns are not matched for v2 tables if current catalog/namespace are used.

For v1 tables, you can currently perform the following:
```SQL
SELECT default.t.id FROM t;
```

For v2 tables, the following fails:
```SQL
USE testcat.ns1.ns2;
SELECT testcat.ns1.ns2.t.id FROM t;

org.apache.spark.sql.AnalysisException: cannot resolve '`testcat.ns1.ns2.t.id`' given input columns: [t.id, t.point]; line 1 pos 7;
```

### Why are the changes needed?

It is a bug since qualified column names cannot match if current catalog/namespace are used.

### Does this PR introduce any user-facing change?

Yes, now the following works:
```SQL
USE testcat.ns1.ns2;
SELECT testcat.ns1.ns2.t.id FROM t;
```

### How was this patch tested?

Added new tests

Closes apache#27532 from imback82/qualifed_col_respect_current.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
imback82 authored and Seongjin Cho committed Apr 14, 2020
1 parent e5ebd49 commit 300c867
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,10 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
.map(SubqueryAlias(u.multipartIdentifier, _))
.getOrElse(u)
.map { rel =>
val ident = rel.identifier.get
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
}.getOrElse(u)

case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>
CatalogV2Util.loadTable(catalog, ident)
Expand Down Expand Up @@ -933,7 +935,7 @@ class Analyzer(
v1SessionCatalog.getRelation(v1Table.v1Table)
case table =>
SubqueryAlias(
identifier,
ident.asMultipartIdentifier,
DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
}
val key = catalog.name +: ident.namespace :+ ident.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,12 +685,21 @@ class DataSourceV2SQLSuite
sql(s"CREATE TABLE $t (id bigint, point struct<x: bigint, y: bigint>) USING foo")
sql(s"INSERT INTO $t VALUES (1, (10, 20))")

checkAnswer(
sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $t"),
Row(1, 10))
checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $t"), Row(1, 10))
checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $t"), Row(1, 10))
checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $t"), Row(1, 10))
def check(tbl: String): Unit = {
checkAnswer(
sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $tbl"),
Row(1, 10))
checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $tbl"), Row(1, 10))
checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $tbl"), Row(1, 10))
checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $tbl"), Row(1, 10))
}

// Test with qualified table name "testcat.ns1.ns2.tbl".
check(t)

// Test if current catalog and namespace is respected in column resolution.
sql("USE testcat.ns1.ns2")
check("tbl")

val ex = intercept[AnalysisException] {
sql(s"SELECT ns1.ns2.ns3.tbl.id from $t")
Expand All @@ -700,19 +709,30 @@ class DataSourceV2SQLSuite
}

test("qualified column names for v1 tables") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)

withTable("t") {
sql("CREATE TABLE t USING json AS SELECT 1 AS i")
checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))
Seq(true, false).foreach { useV1Table =>
val format = if (useV1Table) "json" else v2Format
if (useV1Table) {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
} else {
spark.conf.set(
V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableSessionCatalog].getName)
}

// catalog name cannot be used for v1 tables.
val ex = intercept[AnalysisException] {
sql(s"select spark_catalog.default.t.i from spark_catalog.default.t")
withTable("t") {
sql(s"CREATE TABLE t USING $format AS SELECT 1 AS i")
checkAnswer(sql("select i from t"), Row(1))
checkAnswer(sql("select t.i from t"), Row(1))
checkAnswer(sql("select default.t.i from t"), Row(1))
checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1))
checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1))

// catalog name cannot be used for tables in the session catalog.
val ex = intercept[AnalysisException] {
sql(s"select spark_catalog.default.t.i from spark_catalog.default.t")
}
assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`"))
}
assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`"))
}
}

Expand Down

0 comments on commit 300c867

Please sign in to comment.