Skip to content

Commit

Permalink
[SPARK-7088] Fix analysis for 3rd party logical plan.
Browse files Browse the repository at this point in the history
ResolveReferences analysis rule now does not throw when
it cannot resolve references in a self-join.
  • Loading branch information
smola committed Jun 24, 2015
1 parent bba6699 commit af71ac7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class Analyzer(
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j")

val (oldRelation, newRelation) = right.collect {
right.collect {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Expand All @@ -308,25 +308,27 @@ class Analyzer(
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
}.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
sys.error(
s"""
|Failure when resolving conflicting references in Join:
|$plan
|
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)
}

val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
}
// Only handle first case, others will be fixed on the next pass.
.headOption match {
case None =>
/*
* No result implies that there is a logical plan node that produces new references
* that this rule cannot handle. When that is the case, there must be another rule
* that resolves these conflicts. Otherwise, the analysis will fail.
*/
j
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
val newRight = right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute => attributeRewrites.get(a).getOrElse(a)
}
}
j.copy(right = newRight)
}
j.copy(right = newRight)

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on grandchild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ trait CheckAnalysis {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {

case operator: LogicalPlan =>
operator transformExpressionsUp {
case a: Attribute if !a.resolved =>
Expand Down Expand Up @@ -121,6 +122,17 @@ trait CheckAnalysis {

case _ => // Analysis successful!
}

// Special handling for cases when self-join introduce duplicate expression ids.
case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty =>
val conflictingAttributes = left.outputSet.intersect(right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references in Join:
|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)

}
extendedCheckRules.foreach(_(plan))
}
Expand Down

0 comments on commit af71ac7

Please sign in to comment.