Skip to content

Commit

Permalink
[SPARK-34269][SQL] Simplify SQL view resolution
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

The currently SQL (temp or permanent) view resolution is done in 2 steps:
1. In `SessionCatalog`, we get the view metadata, parse the view SQL string, and wrap it with `View`.
2. At the beginning of the optimizer, we run `EliminateView`, which drops the wrapper `View`, and apply some special logic to match the view schema.

Step 2 is tricky, as we need to retain the output attr expr id, while we need to add an extra `Project` to add cast and alias. This PR simplifies the view solution by building a completed plan (with cast and alias added) in `SessionCatalog`, so that we only have 1 step.

### Why are the changes needed?

Code simplification. It also fixes issues like apache#31352

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing tests

Closes apache#31368 from cloud-fan/try.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan authored and skestle committed Feb 3, 2021
1 parent 96ef65b commit 2071709
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 196 deletions.
Expand Up @@ -1074,7 +1074,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
case view @ View(desc, isTempView, _, child) if !child.resolved =>
case view @ View(desc, isTempView, child) if !child.resolved =>
// Resolve all the UnresolvedRelations and Views in the child.
val newChild = AnalysisContext.withAnalysisContext(desc) {
val nestedViewDepth = AnalysisContext.get.nestedViewDepth
Expand Down
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -422,41 +422,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case write: V2WriteCommand if write.resolved =>
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))

// If the view output doesn't have the same number of columns neither with the child
// output, nor with the query column names, throw an AnalysisException.
// If the view's child output can't up cast to the view output,
// throw an AnalysisException, too.
case v @ View(desc, _, output, child) if child.resolved && !v.sameOutput(child) =>
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
if (output.length != queryColumnNames.length) {
// If the view output doesn't have the same number of columns with the query column
// names, throw an AnalysisException.
throw QueryCompilationErrors.viewOutputNumberMismatchQueryColumnNamesError(
output, queryColumnNames)
}
val resolver = SQLConf.get.resolver
queryColumnNames.map { colName =>
child.output.find { attr =>
resolver(attr.name, colName)
}.getOrElse(throw QueryCompilationErrors.attributeNotFoundError(colName, child))
}
} else {
child.output
}

output.zip(queryOutput).foreach {
case (attr, originAttr) if !attr.dataType.sameType(originAttr.dataType) =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output
// attribute. Will throw an AnalysisException if the cast is not a up-cast.
if (!Cast.canUpCast(originAttr.dataType, attr.dataType)) {
throw QueryCompilationErrors.cannotUpCastAsAttributeError(
originAttr, attr)
}
case _ =>
}

case alter: AlterTable if alter.table.resolved =>
val table = alter.table
def findField(operation: String, fieldName: Array[String]): StructField = {
Expand Down
Expand Up @@ -17,74 +17,21 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
import org.apache.spark.sql.catalyst.rules.Rule

/**
* This file defines view types and analysis rules related to views.
*/

/**
* This rule has two goals:
*
* 1. Removes [[View]] operators from the plan. The operator is respected till the end of analysis
* stage because we want to see which part of an analyzed logical plan is generated from a view.
*
* 2. Make sure that a view's child plan produces the view's output attributes. We try to wrap the
* child by:
* 1. Generate the `queryOutput` by:
* 1.1. If the query column names are defined, map the column names to attributes in the child
* output by name(This is mostly for handling view queries like SELECT * FROM ..., the
* schema of the referenced table/view may change after the view has been created, so we
* have to save the output of the query to `viewQueryColumnNames`, and restore them during
* view resolution, in this way, we are able to get the correct view column ordering and
* omit the extra columns that we don't require);
* 1.2. Else set the child output attributes to `queryOutput`.
* 2. Map the `queryOutput` to view output by index, if the corresponding attributes don't match,
* try to up cast and alias the attribute in `queryOutput` to the attribute in the view output.
* 3. Add a Project over the child, with the new output generated by the previous steps.
*
* Once reaches this rule, it means `CheckAnalysis` did necessary checks on number of columns
* between the view output and the child output or the query column names. `CheckAnalysis` also
* checked the cast from the view's child to the Project is up-cast.
*
* This should be only done after the batch of Resolution, because the view attributes are not
* completely resolved during the batch of Resolution.
* This rule removes [[View]] operators from the plan. The operator is respected till the end of
* analysis stage because we want to see which part of an analyzed logical plan is generated from a
* view.
*/
object EliminateView extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// The child has the different output attributes with the View operator. Adds a Project over
// the child of the view.
case v @ View(desc, _, output, child) if child.resolved && !v.sameOutput(child) =>
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
// Find the attribute that has the expected attribute name from an attribute list, the names
// are compared using conf.resolver.
// `CheckAnalysis` already guarantees the expected attribute can be found for sure.
desc.viewQueryColumnNames.map { colName =>
child.output.find(attr => resolver(attr.name, colName)).get
}
} else {
// For view created before Spark 2.2.0, the view text is already fully qualified, the plan
// output is the same with the view output.
child.output
}
// Map the attributes in the query output to the attributes in the view output by index.
val newOutput = output.zip(queryOutput).map {
case (attr, originAttr) if !attr.semanticEquals(originAttr) =>
// `CheckAnalysis` already guarantees that the cast is a up-cast for sure.
Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
case (_, originAttr) => originAttr
}
Project(newOutput, child)

// The child should have the same output attributes with the View operator, so we simply
// remove the View operator.
case View(_, _, _, child) =>
child
case View(_, _, child) => child
}
}

Expand Down
Expand Up @@ -35,9 +35,9 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, ImplicitCastInputTypes}
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionInfo, ImplicitCastInputTypes, UpCast}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -844,18 +844,41 @@ class SessionCatalog(
}
}

def getTempViewSchema(plan: LogicalPlan): StructType = {
plan match {
case viewInfo: TemporaryViewRelation => viewInfo.tableMeta.schema
case v => v.schema
}
}

private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = {
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
val viewText = metadata.viewText.getOrElse {
throw new IllegalStateException("Invalid view without text.")
}
val viewConfigs = metadata.viewSQLConfigs
val viewPlan =
SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView = isTempView)) {
parser.parsePlan(viewText)
}
View(
desc = metadata,
isTempView = isTempView,
output = metadata.schema.toAttributes,
child = viewPlan)
val parsedPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView)) {
parser.parsePlan(viewText)
}
val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
// For view created before Spark 2.2.0, the view text is already fully qualified, the plan
// output is the same with the view output.
metadata.schema.fieldNames.toSeq
} else {
assert(metadata.viewQueryColumnNames.length == metadata.schema.length)
metadata.viewQueryColumnNames
}

// For view queries like `SELECT * FROM t`, the schema of the referenced table/view may
// change after the view has been created. We need to add an extra SELECT to pick the columns
// according to the recorded column names (to get the correct view column ordering and omit
// the extra columns that we don't require), with UpCast (to make sure the type change is
// safe) and Alias (to respect user-specified view column names) according to the view schema
// in the catalog.
val projectList = viewColumnNames.zip(metadata.schema).map { case (name, field) =>
Alias(UpCast(UnresolvedAttribute.quoted(name), field.dataType), field.name)(
explicitMetadata = Some(field.metadata))
}
View(desc = metadata, isTempView = isTempView, child = Project(projectList, parsedPlan))
}

def lookupTempView(table: String): Option[SubqueryAlias] = {
Expand Down
Expand Up @@ -230,7 +230,7 @@ object LogicalPlanIntegrity {
// NOTE: we still need to filter resolved expressions here because the output of
// some resolved logical plans can have unresolved references,
// e.g., outer references in `ExistenceJoin`.
p.output.filter(_.resolved).map { a => (a.exprId, a.dataType) }
p.output.filter(_.resolved).map { a => (a.exprId, a.dataType.asNullable) }
}.flatten

val ignoredExprIds = plan.collect {
Expand Down
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.AliasIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateView, MultiInstanceRelation}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
Expand Down Expand Up @@ -450,45 +450,40 @@ case class InsertIntoDir(
*
* @param desc A view description(CatalogTable) that provides necessary information to resolve the
* view.
* @param output The output of a view operator, this is generated during planning the view, so that
* we are able to decouple the output from the underlying structure.
* @param child The logical plan of a view operator, it should be a logical plan parsed from the
* `CatalogTable.viewText`, should throw an error if the `viewText` is not defined.
*/
case class View(
desc: CatalogTable,
isTempView: Boolean,
output: Seq[Attribute],
child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation {

override def producedAttributes: AttributeSet = outputSet

override lazy val resolved: Boolean = child.resolved

override def children: Seq[LogicalPlan] = child :: Nil
child: LogicalPlan) extends UnaryNode {

override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
override def output: Seq[Attribute] = child.output

override def simpleString(maxFields: Int): String = {
s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
}

override def doCanonicalize(): LogicalPlan = {
def sameOutput(
outerProject: Seq[NamedExpression], innerProject: Seq[NamedExpression]): Boolean = {
outerProject.length == innerProject.length &&
outerProject.zip(innerProject).forall {
case(outer, inner) => outer.name == inner.name && outer.dataType == inner.dataType
}
}
override def doCanonicalize(): LogicalPlan = child match {
case p: Project if p.resolved && canRemoveProject(p) => p.child.canonicalized
case _ => child.canonicalized
}

val eliminated = EliminateView(this) match {
case Project(viewProjectList, child @ Project(queryProjectList, _))
if sameOutput(viewProjectList, queryProjectList) =>
child
case other => other
// When resolving a SQL view, we use an extra Project to add cast and alias to make sure the view
// output schema doesn't change even if the table referenced by the view is changed after view
// creation. We should remove this extra Project during canonicalize if it does nothing.
// See more details in `SessionCatalog.fromCatalogTable`.
private def canRemoveProject(p: Project): Boolean = {
p.output.length == p.child.output.length && p.projectList.zip(p.child.output).forall {
case (Alias(cast: CastBase, name), childAttr) =>
cast.child match {
case a: AttributeReference =>
a.dataType == cast.dataType && a.name == name && childAttr.semanticEquals(a)
case _ => false
}
case _ => false
}
eliminated.canonicalized
}
}

Expand Down
Expand Up @@ -29,7 +29,7 @@ import org.scalatest.matchers.must.Matchers
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand All @@ -40,7 +40,6 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.InMemoryTable
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
Expand Down Expand Up @@ -655,28 +654,6 @@ class AnalysisSuite extends AnalysisTest with Matchers {
}
}

test("SPARK-25691: AliasViewChild with different nullabilities") {
object ViewAnalyzer extends RuleExecutor[LogicalPlan] {
val batches = Batch("View", Once, EliminateView) :: Nil
}
val relation = LocalRelation(Symbol("a").int.notNull, Symbol("b").string)
val view = View(CatalogTable(
identifier = TableIdentifier("v1"),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = StructType(Seq(StructField("a", IntegerType), StructField("b", StringType)))),
isTempView = false,
output = Seq(Symbol("a").int, Symbol("b").string),
child = relation)
val tz = Option(conf.sessionLocalTimeZone)
val expected = Project(Seq(
Alias(Cast(Symbol("a").int.notNull, IntegerType, tz), "a")(),
Alias(Cast(Symbol("b").string, StringType, tz), "b")()),
relation)
val res = ViewAnalyzer.execute(view)
comparePlans(res, expected)
}

test("CTE with non-existing column alias") {
assertAnalysisError(parsePlan("WITH t(x) AS (SELECT 1) SELECT * FROM t WHERE y = 1"),
Seq("cannot resolve '`y`' given input columns: [x]"))
Expand Down
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, Ta
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{Command, Range, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Project, Range, SubqueryAlias, View}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
Expand Down Expand Up @@ -634,6 +634,14 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
}
}

private def getViewPlan(metadata: CatalogTable): LogicalPlan = {
import org.apache.spark.sql.catalyst.dsl.expressions._
val projectList = metadata.schema.map { field =>
UpCast(field.name.attr, field.dataType).as(field.name)
}
Project(projectList, CatalystSqlParser.parsePlan(metadata.viewText.get))
}

test("look up view relation") {
withBasicCatalog { catalog =>
val props = CatalogTable.catalogAndNamespaceToProps("cat1", Seq("ns1"))
Expand All @@ -646,8 +654,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

// Look up a view.
catalog.setCurrentDatabase("default")
val view = View(desc = metadata, isTempView = false, output = metadata.schema.toAttributes,
child = CatalystSqlParser.parsePlan(metadata.viewText.get))
val view = View(desc = metadata, isTempView = false, child = getViewPlan(metadata))
comparePlans(catalog.lookupRelation(TableIdentifier("view1", Some("db3"))),
SubqueryAlias(Seq(CatalogManager.SESSION_CATALOG_NAME, "db3", "view1"), view))
// Look up a view using current database of the session catalog.
Expand All @@ -666,8 +673,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
assert(metadata.viewText.isDefined)
assert(metadata.viewCatalogAndNamespace == Seq(CatalogManager.SESSION_CATALOG_NAME, "db2"))

val view = View(desc = metadata, isTempView = false, output = metadata.schema.toAttributes,
child = CatalystSqlParser.parsePlan(metadata.viewText.get))
val view = View(desc = metadata, isTempView = false, child = getViewPlan(metadata))
comparePlans(catalog.lookupRelation(TableIdentifier("view2", Some("db3"))),
SubqueryAlias(Seq(CatalogManager.SESSION_CATALOG_NAME, "db3", "view2"), view))
}
Expand Down

0 comments on commit 2071709

Please sign in to comment.