Skip to content

Commit

Permalink
[SPARK-6319][SQL] Throw AnalysisException when using BinaryType on Jo…
Browse files Browse the repository at this point in the history
…in and Aggregate

JIRA: https://issues.apache.org/jira/browse/SPARK-6319

Spark SQL uses plain byte arrays to represent binary values. However, the arrays are compared by reference rather than by values. Thus, we should not use BinaryType on Join and Aggregate in current implementation.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes apache#7787 from viirya/agg_no_binary_type and squashes the following commits:

4f76cac [Liang-Chi Hsieh] Throw AnalysisException when using BinaryType on Join and Aggregate.
  • Loading branch information
viirya authored and rxin committed Jul 31, 2015
1 parent 0b1a464 commit 351eda0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ trait CheckAnalysis {
s"join condition '${condition.prettyString}' " +
s"of type ${condition.dataType.simpleString} is not a boolean.")

case j @ Join(_, _, _, Some(condition)) =>
def checkValidJoinConditionExprs(expr: Expression): Unit = expr match {
case p: Predicate =>
p.asInstanceOf[Expression].children.foreach(checkValidJoinConditionExprs)
case e if e.dataType.isInstanceOf[BinaryType] =>
failAnalysis(s"expression ${e.prettyString} in join condition " +
s"'${condition.prettyString}' can't be binary type.")
case _ => // OK
}

checkValidJoinConditionExprs(condition)

case Aggregate(groupingExprs, aggregateExprs, child) =>
def checkValidAggregateExpression(expr: Expression): Unit = expr match {
case _: AggregateExpression => // OK
Expand All @@ -100,7 +112,15 @@ trait CheckAnalysis {
case e => e.children.foreach(checkValidAggregateExpression)
}

def checkValidGroupingExprs(expr: Expression): Unit = expr.dataType match {
case BinaryType =>
failAnalysis(s"grouping expression '${expr.prettyString}' in aggregate can " +
s"not be binary type.")
case _ => // OK
}

aggregateExprs.foreach(checkValidAggregateExpression)
aggregateExprs.foreach(checkValidGroupingExprs)

case Sort(orders, _, _) =>
orders.foreach { order =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql

import org.apache.spark.sql.TestData._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.types.{BinaryType, DecimalType}


class DataFrameAggregateSuite extends QueryTest {
Expand Down Expand Up @@ -191,4 +191,13 @@ class DataFrameAggregateSuite extends QueryTest {
Row(null))
}

test("aggregation can't work on binary type") {
val df = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
intercept[AnalysisException] {
df.groupBy("c").agg(count("*"))
}
intercept[AnalysisException] {
df.distinct
}
}
}
9 changes: 9 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.types.BinaryType


class JoinSuite extends QueryTest with BeforeAndAfterEach {
Expand Down Expand Up @@ -489,4 +490,12 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
Row(3, 2) :: Nil)

}

test("Join can't work on binary type") {
val left = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
val right = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("d").select($"d" cast BinaryType)
intercept[AnalysisException] {
left.join(right, ($"left.N" === $"right.N"), "full")
}
}
}

0 comments on commit 351eda0

Please sign in to comment.