From 4435f20e3f5b95cc3023e73b86c10f1d5bc878aa Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 14 Jul 2015 12:24:46 -0700 Subject: [PATCH] Add ConvertAggregateFunction to HiveContext's analyzer. --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 6 ++++++ .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 3 +++ 2 files changed, 9 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 226eead5c67e8..f22507b8c21e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -205,6 +205,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.Aggregate(groupingExpressions, resultExpressions, child) if sqlContext.conf.useSqlAggregate2 => + // 0. Make sure we can convert. + resultExpressions.foreach { + case agg1: AggregateExpression => + sys.error(s"$agg1 is not supported. Please set spark.sql.useAggregate2 to false.") + case _ => // ok + } // 1. Extracts all distinct aggregate expressions from the resultExpressions. val aggregateExpressions = resultExpressions.flatMap { expr => expr.collect { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4146038a44383..0b0fbeb7340b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -21,6 +21,8 @@ import java.io.File import java.net.{URL, URLClassLoader} import java.sql.Timestamp +import org.apache.spark.sql.execution.aggregate2.ConvertAggregateFunction + import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.language.implicitConversions @@ -385,6 +387,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { ExtractPythonUDFs :: ResolveHiveWindowFunction :: sources.PreInsertCastAndRename :: + ConvertAggregateFunction(self) :: Nil override val extendedCheckRules = Seq(