Skip to content

Commit

Permalink
[SPARK-9415][SQL] Throw AnalysisException when using MapType on Join …
Browse files Browse the repository at this point in the history
…and Aggregate

JIRA: https://issues.apache.org/jira/browse/SPARK-9415

Following up apache#7787. We shouldn't use MapType as grouping keys and join keys too.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes apache#7819 from viirya/map_join_groupby and squashes the following commits:

005ee0c [Liang-Chi Hsieh] For comments.
7463398 [Liang-Chi Hsieh] MapType can't be used as join keys, grouping keys.
  • Loading branch information
viirya authored and rxin committed Aug 1, 2015
1 parent 14f2634 commit 3320b0b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ trait CheckAnalysis {
case p: Predicate =>
p.asInstanceOf[Expression].children.foreach(checkValidJoinConditionExprs)
case e if e.dataType.isInstanceOf[BinaryType] =>
failAnalysis(s"expression ${e.prettyString} in join condition " +
s"'${condition.prettyString}' can't be binary type.")
failAnalysis(s"binary type expression ${e.prettyString} cannot be used " +
"in join conditions")
case e if e.dataType.isInstanceOf[MapType] =>
failAnalysis(s"map type expression ${e.prettyString} cannot be used " +
"in join conditions")
case _ => // OK
}

Expand All @@ -114,13 +117,16 @@ trait CheckAnalysis {

def checkValidGroupingExprs(expr: Expression): Unit = expr.dataType match {
case BinaryType =>
failAnalysis(s"grouping expression '${expr.prettyString}' in aggregate can " +
s"not be binary type.")
failAnalysis(s"binary type expression ${expr.prettyString} cannot be used " +
"in grouping expression")
case m: MapType =>
failAnalysis(s"map type expression ${expr.prettyString} cannot be used " +
"in grouping expression")
case _ => // OK
}

aggregateExprs.foreach(checkValidAggregateExpression)
aggregateExprs.foreach(checkValidGroupingExprs)
groupingExprs.foreach(checkValidGroupingExprs)

case Sort(orders, _, _) =>
orders.foreach { order =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,71 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter {
val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(join)
}
error.message.contains("Failure when resolving conflicting references in Join")
error.message.contains("Conflicting attributes")
assert(error.message.contains("Failure when resolving conflicting references in Join"))
assert(error.message.contains("Conflicting attributes"))
}

test("aggregation can't work on binary and map types") {
val plan =
Aggregate(
AttributeReference("a", BinaryType)(exprId = ExprId(2)) :: Nil,
Alias(Sum(AttributeReference("b", IntegerType)(exprId = ExprId(1))), "c")() :: Nil,
LocalRelation(
AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))))

val error = intercept[AnalysisException] {
caseSensitiveAnalyze(plan)
}
assert(error.message.contains("binary type expression a cannot be used in grouping expression"))

val plan2 =
Aggregate(
AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)) :: Nil,
Alias(Sum(AttributeReference("b", IntegerType)(exprId = ExprId(1))), "c")() :: Nil,
LocalRelation(
AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))))

val error2 = intercept[AnalysisException] {
caseSensitiveAnalyze(plan2)
}
assert(error2.message.contains("map type expression a cannot be used in grouping expression"))
}

test("Join can't work on binary and map types") {
val plan =
Join(
LocalRelation(
AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))),
LocalRelation(
AttributeReference("c", BinaryType)(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
Inner,
Some(EqualTo(AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("c", BinaryType)(exprId = ExprId(4)))))

val error = intercept[AnalysisException] {
caseSensitiveAnalyze(plan)
}
assert(error.message.contains("binary type expression a cannot be used in join conditions"))

val plan2 =
Join(
LocalRelation(
AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))),
LocalRelation(
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
Inner,
Some(EqualTo(AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)))))

val error2 = intercept[AnalysisException] {
caseSensitiveAnalyze(plan2)
}
assert(error2.message.contains("map type expression a cannot be used in join conditions"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,4 @@ class DataFrameAggregateSuite extends QueryTest {
emptyTableData.agg(sumDistinct('a)),
Row(null))
}

test("aggregation can't work on binary type") {
val df = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
intercept[AnalysisException] {
df.groupBy("c").agg(count("*"))
}
intercept[AnalysisException] {
df.distinct
}
}
}
8 changes: 0 additions & 8 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,4 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
Row(3, 2) :: Nil)

}

test("Join can't work on binary type") {
val left = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
val right = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("d").select($"d" cast BinaryType)
intercept[AnalysisException] {
left.join(right, ($"left.N" === $"right.N"), "full")
}
}
}

0 comments on commit 3320b0b

Please sign in to comment.