diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 226eead5c67e8..f22507b8c21e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -205,6 +205,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Aggregate(groupingExpressions, resultExpressions, child) if sqlContext.conf.useSqlAggregate2 => + // 0. Make sure we can convert. + resultExpressions.foreach { + case agg1: AggregateExpression => + sys.error(s"$agg1 is not supported. Please set spark.sql.useAggregate2 to false.") + case _ => // ok + } // 1. Extracts all distinct aggregate expressions from the resultExpressions. val aggregateExpressions = resultExpressions.flatMap { expr => expr.collect { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4146038a44383..0b0fbeb7340b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -21,6 +21,8 @@ import java.io.File import java.net.{URL, URLClassLoader} import java.sql.Timestamp +import org.apache.spark.sql.execution.aggregate2.ConvertAggregateFunction + import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.language.implicitConversions @@ -385,6 +387,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { ExtractPythonUDFs :: ResolveHiveWindowFunction :: sources.PreInsertCastAndRename :: + ConvertAggregateFunction(self) :: Nil override val extendedCheckRules = Seq(