Skip to content

Commit

Permalink
[SPARK-13827][SQL] Can't add subquery to an operator with same-name o…
Browse files Browse the repository at this point in the history
…utputs while generate SQL string

## What changes were proposed in this pull request?

This PR tries to solve a fundamental issue in the `SQLBuilder`. When we want to turn a logical plan into SQL string and put it after FROM clause, we need to wrap it with a sub-query. However, a logical plan is allowed to have same-name outputs with different qualifiers(e.g. the `Join` operator), and this kind of plan can't be put under a subquery as we will erase and assign a new qualifier to all outputs and make it impossible to distinguish same-name outputs.

To solve this problem, this PR renames all attributes with globally unique names(using exprId), so that we don't need qualifiers to resolve ambiguity anymore.

For example, `SELECT x.key, MAX(y.key) OVER () FROM t x JOIN t y`, we will parse this SQL to a Window operator and a Project operator, and add a sub-query between them. The generated SQL looks like:
```
SELECT sq_1.key, sq_1.max
FROM (
    SELECT sq_0.key, sq_0.key, MAX(sq_0.key) OVER () AS max
    FROM (
        SELECT x.key, y.key FROM t1 AS x JOIN t2 AS y
    ) AS sq_0
) AS sq_1
```
You can see, the `key` columns become ambiguous after `sq_0`.

After this PR, it will generate something like:
```
SELECT attr_30 AS key, attr_37 AS max
FROM (
    SELECT attr_30, attr_37
    FROM (
        SELECT attr_30, attr_35, MAX(attr_35) AS attr_37
        FROM (
            SELECT attr_30, attr_35 FROM
                (SELECT key AS attr_30 FROM t1) AS sq_0
            INNER JOIN
                (SELECT key AS attr_35 FROM t1) AS sq_1
        ) AS sq_2
    ) AS sq_3
) AS sq_4
```
The outermost SELECT is used to turn the generated named to real names back, and the innermost SELECT is used to alias real columns to our generated names. Between them, there is no name ambiguity anymore.

## How was this patch tested?

existing tests and new tests in LogicalPlanToSQLSuite.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#11658 from cloud-fan/gensql.
  • Loading branch information
cloud-fan authored and roygao94 committed Mar 22, 2016
1 parent 4c44922 commit e761d9f
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ case class Alias(child: Expression, name: String)(
override def sql: String = {
val qualifiersString =
if (qualifiers.isEmpty) "" else qualifiers.map(quoteIdentifier).mkString("", ".", ".")
val aliasName = if (isGenerated) s"$name#${exprId.id}" else s"$name"
s"${child.sql} AS $qualifiersString${quoteIdentifier(aliasName)}"
s"${child.sql} AS $qualifiersString${quoteIdentifier(name)}"
}
}

Expand Down Expand Up @@ -302,8 +301,7 @@ case class AttributeReference(
override def sql: String = {
val qualifiersString =
if (qualifiers.isEmpty) "" else qualifiers.map(quoteIdentifier).mkString("", ".", ".")
val attrRefName = if (isGenerated) s"$name#${exprId.id}" else s"$name"
s"$qualifiersString${quoteIdentifier(attrRefName)}"
s"$qualifiersString${quoteIdentifier(name)}"
}
}

Expand Down
226 changes: 127 additions & 99 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -54,8 +55,26 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi

def toSQL: String = {
val canonicalizedPlan = Canonicalizer.execute(logicalPlan)
val outputNames = logicalPlan.output.map(_.name)
val qualifiers = logicalPlan.output.flatMap(_.qualifiers).distinct

// Keep the qualifier information by using it as sub-query name, if there is only one qualifier
// present.
val finalName = if (qualifiers.length == 1) {
qualifiers.head
} else {
SQLBuilder.newSubqueryName
}

// Canonicalizer will remove all naming information, we should add it back by adding an extra
// Project and alias the outputs.
val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map {
case (attr, name) => Alias(attr.withQualifiers(Nil), name)()
}
val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan))

try {
val replaced = canonicalizedPlan.transformAllExpressions {
val replaced = finalPlan.transformAllExpressions {
case e: SubqueryExpression =>
SubqueryHolder(new SQLBuilder(e.query, sqlContext).toSQL)
case e: NonSQLExpression =>
Expand Down Expand Up @@ -109,23 +128,6 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case Limit(limitExpr, child) =>
s"${toSQL(child)} LIMIT ${limitExpr.sql}"

case p: Sample if p.isTableSample =>
val fraction = math.min(100, math.max(0, (p.upperBound - p.lowerBound) * 100))
p.child match {
case m: MetastoreRelation =>
val aliasName = m.alias.getOrElse("")
build(
s"`${m.databaseName}`.`${m.tableName}`",
"TABLESAMPLE(" + fraction + " PERCENT)",
aliasName)
case s: SubqueryAlias =>
val aliasName = if (s.child.isInstanceOf[SubqueryAlias]) s.alias else ""
val plan = if (s.child.isInstanceOf[SubqueryAlias]) s.child else s
build(toSQL(plan), "TABLESAMPLE(" + fraction + " PERCENT)", aliasName)
case _ =>
build(toSQL(p.child), "TABLESAMPLE(" + fraction + " PERCENT)")
}

case Filter(condition, child) =>
val whereOrHaving = child match {
case _: Aggregate => "HAVING"
Expand All @@ -147,18 +149,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case p: Except =>
build("(" + toSQL(p.left), ") EXCEPT (", toSQL(p.right) + ")")

case p: SubqueryAlias =>
p.child match {
// Persisted data source relation
case LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
s"${quoteIdentifier(database)}.${quoteIdentifier(table)}"
// Parentheses is not used for persisted data source relations
// e.g., select x.c1 from (t1) as x inner join (t1) as y on x.c1 = y.c1
case SubqueryAlias(_, _: LogicalRelation | _: MetastoreRelation) =>
build(toSQL(p.child), "AS", p.alias)
case _ =>
build("(" + toSQL(p.child) + ")", "AS", p.alias)
}
case p: SubqueryAlias => build("(" + toSQL(p.child) + ")", "AS", p.alias)

case p: Join =>
build(
Expand All @@ -168,11 +159,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
toSQL(p.right),
p.condition.map(" ON " + _.sql).getOrElse(""))

case p: MetastoreRelation =>
build(
s"${quoteIdentifier(p.databaseName)}.${quoteIdentifier(p.tableName)}",
p.alias.map(a => s" AS ${quoteIdentifier(a)}").getOrElse("")
)
case SQLTable(database, table, _, sample) =>
val qualifiedName = s"${quoteIdentifier(database)}.${quoteIdentifier(table)}"
sample.map { case (lowerBound, upperBound) =>
val fraction = math.min(100, math.max(0, (upperBound - lowerBound) * 100))
qualifiedName + " TABLESAMPLE(" + fraction + " PERCENT)"
}.getOrElse(qualifiedName)

case Sort(orders, _, RepartitionByExpression(partitionExprs, child, _))
if orders.map(_.child) == partitionExprs =>
Expand Down Expand Up @@ -274,8 +266,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
val groupingSetSQL = "GROUPING SETS(" +
groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")"

val aggExprs = agg.aggregateExpressions.map { case expr =>
expr.transformDown {
val aggExprs = agg.aggregateExpressions.map { case aggExpr =>
val originalAggExpr = aggExpr.transformDown {
// grouping_id() is converted to VirtualColumn.groupingIdName by Analyzer. Revert it back.
case ar: AttributeReference if ar == gid => GroupingID(Nil)
case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
Expand All @@ -286,6 +278,15 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
groupByExprs.lift(idx).map(Grouping).getOrElse(a)
}

originalAggExpr match {
// Ancestor operators may reference the output of this grouping set, and we use exprId to
// generate a unique name for each attribute, so we should make sure the transformed
// aggregate expression won't change the output, i.e. exprId and alias name should remain
// the same.
case ne: NamedExpression if ne.exprId == aggExpr.exprId => ne
case e => Alias(e, normalizedName(aggExpr))(exprId = aggExpr.exprId)
}
}

build(
Expand All @@ -308,6 +309,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}

private def normalizedName(n: NamedExpression): String = "gen_attr_" + n.exprId.id

object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
Batch("Collapse Project", FixedPoint(100),
Expand All @@ -316,31 +319,55 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
// `Aggregate`s.
CollapseProject),
Batch("Recover Scoping Info", Once,
// Used to handle other auxiliary `Project`s added by analyzer (e.g.
// `ResolveAggregateFunctions` rule)
AddSubquery,
// Previous rule will add extra sub-queries, this rule is used to re-propagate and update
// the qualifiers bottom up, e.g.:
//
// Sort
// ordering = t1.a
// Project
// projectList = [t1.a, t1.b]
// Subquery gen_subquery
// child ...
//
// will be transformed to:
//
// Sort
// ordering = gen_subquery.a
// Project
// projectList = [gen_subquery.a, gen_subquery.b]
// Subquery gen_subquery
// child ...
UpdateQualifiers
// Remove all sub queries, as we will insert new ones when it's necessary.
EliminateSubqueryAliases,
// A logical plan is allowed to have same-name outputs with different qualifiers(e.g. the
// `Join` operator). However, this kind of plan can't be put under a sub query as we will
// erase and assign a new qualifier to all outputs and make it impossible to distinguish
// same-name outputs. This rule renames all attributes, to guarantee different
// attributes(with different exprId) always have different names. It also removes all
// qualifiers, as attributes have unique names now and we don't need qualifiers to resolve
// ambiguity.
NormalizedAttribute,
// Finds the table relations and wrap them with `SQLTable`s. If there are any `Sample`
// operators on top of a table relation, merge the sample information into `SQLTable` of
// that table relation, as we can only convert table sample to standard SQL string.
ResolveSQLTable,
// Insert sub queries on top of operators that need to appear after FROM clause.
AddSubquery
)
)

object NormalizedAttribute extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions {
case a: AttributeReference =>
AttributeReference(normalizedName(a), a.dataType)(exprId = a.exprId, qualifiers = Nil)
case a: Alias =>
Alias(a.child, normalizedName(a))(exprId = a.exprId, qualifiers = Nil)
}
}

object ResolveSQLTable extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
case Sample(lowerBound, upperBound, _, _, ExtractSQLTable(table)) =>
aliasColumns(table.withSample(lowerBound, upperBound))
case ExtractSQLTable(table) =>
aliasColumns(table)
}

/**
* Aliases the table columns to the generated attribute names, as we use exprId to generate
* unique name for each attribute when normalize attributes, and we can't reference table
* columns with their real names.
*/
private def aliasColumns(table: SQLTable): LogicalPlan = {
val aliasedOutput = table.output.map { attr =>
Alias(attr, normalizedName(attr))(exprId = attr.exprId)
}
addSubquery(Project(aliasedOutput, table))
}
}

object AddSubquery extends Rule[LogicalPlan] {
override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
// This branch handles aggregate functions within HAVING clauses. For example:
Expand All @@ -354,55 +381,56 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
// +- Filter ...
// +- Aggregate ...
// +- MetastoreRelation default, src, None
case plan @ Project(_, Filter(_, _: Aggregate)) => wrapChildWithSubquery(plan)
case p @ Project(_, f @ Filter(_, _: Aggregate)) => p.copy(child = addSubquery(f))

case w @ Window(_, _, _, Filter(_, _: Aggregate)) => wrapChildWithSubquery(w)
case w @ Window(_, _, _, f @ Filter(_, _: Aggregate)) => w.copy(child = addSubquery(f))

case plan @ Project(_,
_: SubqueryAlias
| _: Filter
| _: Join
| _: MetastoreRelation
| OneRowRelation
| _: LocalLimit
| _: GlobalLimit
| _: Sample
) => plan

case plan: Project => wrapChildWithSubquery(plan)
case p: Project => p.copy(child = addSubqueryIfNeeded(p.child))

// We will generate "SELECT ... FROM ..." for Window operator, so its child operator should
// be able to put in the FROM clause, or we wrap it with a subquery.
case w @ Window(_, _, _,
_: SubqueryAlias
| _: Filter
| _: Join
| _: MetastoreRelation
| OneRowRelation
| _: LocalLimit
| _: GlobalLimit
| _: Sample
) => w

case w: Window => wrapChildWithSubquery(w)
}
case w: Window => w.copy(child = addSubqueryIfNeeded(w.child))

private def wrapChildWithSubquery(plan: UnaryNode): LogicalPlan = {
val newChild = SubqueryAlias(SQLBuilder.newSubqueryName, plan.child)
plan.withNewChildren(Seq(newChild))
case j: Join => j.copy(
left = addSubqueryIfNeeded(j.left),
right = addSubqueryIfNeeded(j.right))
}
}

object UpdateQualifiers extends Rule[LogicalPlan] {
override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
case plan =>
val inputAttributes = plan.children.flatMap(_.output)
plan transformExpressions {
case a: AttributeReference if !plan.producedAttributes.contains(a) =>
val qualifier = inputAttributes.find(_ semanticEquals a).map(_.qualifiers)
a.withQualifiers(qualifier.getOrElse(Nil))
}
}
private def addSubquery(plan: LogicalPlan): SubqueryAlias = {
SubqueryAlias(SQLBuilder.newSubqueryName, plan)
}

private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match {
case _: SubqueryAlias => plan
case _: Filter => plan
case _: Join => plan
case _: LocalLimit => plan
case _: GlobalLimit => plan
case _: SQLTable => plan
case OneRowRelation => plan
case _ => addSubquery(plan)
}
}

case class SQLTable(
database: String,
table: String,
output: Seq[Attribute],
sample: Option[(Double, Double)] = None) extends LeafNode {
def withSample(lowerBound: Double, upperBound: Double): SQLTable =
this.copy(sample = Some(lowerBound -> upperBound))
}

object ExtractSQLTable {
def unapply(plan: LogicalPlan): Option[SQLTable] = plan match {
case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
Some(SQLTable(database, table, l.output.map(_.withQualifiers(Nil))))

case m: MetastoreRelation =>
Some(SQLTable(m.databaseName, m.tableName, m.output.map(_.withQualifiers(Nil))))

case _ => None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,22 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|WINDOW w AS (PARTITION BY key % 5 ORDER BY key)
""".stripMargin)
}

test("window with join") {
checkHiveQl(
"""
|SELECT x.key, MAX(y.key) OVER (PARTITION BY x.key % 5 ORDER BY x.key)
|FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key
""".stripMargin)
}

test("join 2 tables and aggregate function in having clause") {
checkHiveQl(
"""
|SELECT COUNT(a.value), b.KEY, a.KEY
|FROM parquet_t1 a, parquet_t1 b
|GROUP BY a.KEY, b.KEY
|HAVING MAX(a.KEY) > 0
""".stripMargin)
}
}

0 comments on commit e761d9f

Please sign in to comment.