Skip to content

Commit

Permalink
[SPARK-38347][SQL] Fix nullability propagation in transformUpWithNewO…
Browse files Browse the repository at this point in the history
…utput

### What changes were proposed in this pull request?

In `updateAttr`, let the new Attribute have the same nullability as the Attribute to be replaced.

### Why are the changes needed?

`attrMap` can possibly be populated below an outer join and the outer join changes nullability.

### How was this patch tested?

New unit test - verified that it fails without the fix.

Closes apache#35685 from sigmod/nullability2.

Authored-by: Yingyi Bu <yingyi.bu@databricks.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit dba6edb)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
sigmod authored and dongjoon-hyun committed Mar 7, 2022
1 parent 3754344 commit 471e32b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
private def updateAttr(a: Attribute, attrMap: AttributeMap[Attribute]): Attribute = {
attrMap.get(a) match {
case Some(b) =>
AttributeReference(a.name, b.dataType, b.nullable, a.metadata)(b.exprId, a.qualifier)
// The new Attribute has to
// - use a.nullable, because nullability cannot be propagated bottom-up without considering
// enclosed operators, e.g., operators such as Filters and Outer Joins can change
// nullability;
// - use b.dataType because transformUpWithNewOutput is used in the Analyzer for resolution,
// e.g., WidenSetOperationTypes uses it to propagate types bottom-up.
AttributeReference(a.name, b.dataType, a.nullable, a.metadata)(b.exprId, a.qualifier)
case None => a
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,35 @@ class QueryPlanSuite extends SparkFunSuite {
val plan = t.select($"a", $"b").select($"a", $"b").select($"a", $"b").analyze
assert(testRule(plan).resolved)
}

test("SPARK-38347: Nullability propagation in transformUpWithNewOutput") {
// A test rule that replaces Attributes in Project's project list.
val testRule = new Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithNewOutput {
case p @ Project(projectList, _) =>
val newProjectList = projectList.map {
case a: AttributeReference => a.newInstance()
case ne => ne
}
val newProject = p.copy(projectList = newProjectList)
newProject -> p.output.zip(newProject.output)
}
}

// Test a Left Outer Join plan in which right-hand-side input attributes are not nullable.
// Those attributes should be nullable after join even with a `transformUpWithNewOutput`
// started below the Left Outer join.
val t1 = LocalRelation('a.int.withNullability(false),
'b.int.withNullability(false), 'c.int.withNullability(false))
val t2 = LocalRelation('c.int.withNullability(false),
'd.int.withNullability(false), 'e.int.withNullability(false))
val plan = t1.select($"a", $"b")
.join(t2.select($"c", $"d"), LeftOuter, Some($"a" === $"c"))
.select($"a" + $"d").analyze
// The output Attribute of `plan` is nullable even though `d` is not nullable before the join.
assert(plan.output(0).nullable)
// The test rule with `transformUpWithNewOutput` should not change the nullability.
val planAfterTestRule = testRule(plan)
assert(planAfterTestRule.output(0).nullable)
}
}

0 comments on commit 471e32b

Please sign in to comment.