Skip to content

Commit

Permalink
MapType can't be used as join keys, grouping keys.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jul 31, 2015
1 parent a3a85d7 commit 7463398
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ trait CheckAnalysis {
case e if e.dataType.isInstanceOf[BinaryType] =>
failAnalysis(s"expression ${e.prettyString} in join condition " +
s"'${condition.prettyString}' can't be binary type.")
case e if e.dataType.isInstanceOf[MapType] =>
failAnalysis(s"expression ${e.prettyString} in join condition " +
s"'${condition.prettyString}' can't be map type.")
case _ => // OK
}

Expand All @@ -116,11 +119,14 @@ trait CheckAnalysis {
case BinaryType =>
failAnalysis(s"grouping expression '${expr.prettyString}' in aggregate can " +
s"not be binary type.")
case m: MapType =>
failAnalysis(s"grouping expression '${expr.prettyString}' in aggregate can " +
s"not be map type.")
case _ => // OK
}

aggregateExprs.foreach(checkValidAggregateExpression)
aggregateExprs.foreach(checkValidGroupingExprs)
groupingExprs.foreach(checkValidGroupingExprs)

case Sort(orders, _, _) =>
orders.foreach { order =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,71 @@ class AnalysisErrorSuite extends SparkFunSuite with BeforeAndAfter {
val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(join)
}
error.message.contains("Failure when resolving conflicting references in Join")
error.message.contains("Conflicting attributes")
assert(error.message.contains("Failure when resolving conflicting references in Join"))
assert(error.message.contains("Conflicting attributes"))
}

test("aggregation can't work on binary and map types") {
val plan =
Aggregate(
AttributeReference("a", BinaryType)(exprId = ExprId(2)) :: Nil,
Alias(Sum(AttributeReference("b", IntegerType)(exprId = ExprId(1))), "c")() :: Nil,
LocalRelation(
AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))))

val error = intercept[AnalysisException] {
caseSensitiveAnalyze(plan)
}
assert(error.message.contains("grouping expression 'a' in aggregate can not be binary type"))

val plan2 =
Aggregate(
AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)) :: Nil,
Alias(Sum(AttributeReference("b", IntegerType)(exprId = ExprId(1))), "c")() :: Nil,
LocalRelation(
AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))))

val error2 = intercept[AnalysisException] {
caseSensitiveAnalyze(plan2)
}
assert(error2.message.contains("grouping expression 'a' in aggregate can not be map type"))
}

test("Join can't work on binary and map types") {
val plan =
Join(
LocalRelation(
AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))),
LocalRelation(
AttributeReference("c", BinaryType)(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
Inner,
Some(EqualTo(AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("c", BinaryType)(exprId = ExprId(4)))))

val error = intercept[AnalysisException] {
caseSensitiveAnalyze(plan)
}
assert(error.message.contains("expression a in join condition '(a = c)' can't be binary type"))

val plan2 =
Join(
LocalRelation(
AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("b", IntegerType)(exprId = ExprId(1))),
LocalRelation(
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
Inner,
Some(EqualTo(AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)))))

val error2 = intercept[AnalysisException] {
caseSensitiveAnalyze(plan2)
}
assert(error2.message.contains("expression a in join condition '(a = c)' can't be map type"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,4 @@ class DataFrameAggregateSuite extends QueryTest {
emptyTableData.agg(sumDistinct('a)),
Row(null))
}

test("aggregation can't work on binary type") {
val df = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
intercept[AnalysisException] {
df.groupBy("c").agg(count("*"))
}
intercept[AnalysisException] {
df.distinct
}
}
}
8 changes: 0 additions & 8 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,4 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
Row(3, 2) :: Nil)

}

test("Join can't work on binary type") {
val left = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
val right = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("d").select($"d" cast BinaryType)
intercept[AnalysisException] {
left.join(right, ($"left.N" === $"right.N"), "full")
}
}
}

0 comments on commit 7463398

Please sign in to comment.