diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala index 68cc0af1913dd..b924af4cc84d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/functions.scala @@ -250,18 +250,19 @@ case class Sum(child: Expression) extends AlgebraicAggregate { override def dataType: DataType = resultType // Expected input data type. - override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(NumericType, NullType)) + override def inputTypes: Seq[AbstractDataType] = + Seq(TypeCollection(LongType, DoubleType, DecimalType, NullType)) private val resultType = child.dataType match { case DecimalType.Fixed(precision, scale) => DecimalType(precision + 4, scale + 4) case DecimalType.Unlimited => DecimalType.Unlimited - case _ => DoubleType + case _ => child.dataType } private val sumDataType = child.dataType match { case _ @ DecimalType() => DecimalType.Unlimited - case _ => DoubleType + case _ => child.dataType } private val currentSum = AttributeReference("currentSum", sumDataType)() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index d08ef7e268894..577ede73cb01f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -71,8 +71,6 @@ private[sql] case object NoOp extends Expression with Unevaluable { override def children: Seq[Expression] = Nil } - - /** * A container for an [[AggregateFunction2]] with its [[AggregateMode]] and a field * (`isDistinct`) indicating if DISTINCT keyword is specified for this function. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index ddf790d865448..c3582be685bcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -865,7 +865,7 @@ class SQLContext(@transient val sparkContext: SparkContext) DDLStrategy :: TakeOrderedAndProject :: HashAggregation :: - AggregateOperator2 :: + Aggregation :: LeftSemiJoin :: HashJoin :: InMemoryScans :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 509c546b5dcf6..89ba883a75aef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -184,8 +184,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => Nil } - def canBeConvertedToNewAggregation(plan: LogicalPlan): Boolean = - aggregate.Utils.tryConvert(plan, sqlContext.conf.useSqlAggregate2).isDefined + def canBeConvertedToNewAggregation(plan: LogicalPlan): Boolean = { + aggregate.Utils.tryConvert( + plan, + sqlContext.conf.useSqlAggregate2, + sqlContext.conf.codegenEnabled).isDefined + } def canBeCodeGened(aggs: Seq[AggregateExpression1]): Boolean = !aggs.exists { case _: CombineSum | _: Sum | _: Count | _: Max | _: Min | _: CombineSetsAndCount => false @@ -202,50 +206,62 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { /** * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface. */ - object AggregateOperator2 extends Strategy { + object Aggregation extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case aggregate.NewAggregation(groupingExpressions, resultExpressions, child) - if sqlContext.conf.useSqlAggregate2 => - // Extracts all distinct aggregate expressions from the resultExpressions. - val aggregateExpressions = resultExpressions.flatMap { expr => - expr.collect { - case agg: AggregateExpression2 => agg - } - }.toSet.toSeq - // For those distinct aggregate expressions, we create a map from the aggregate function - // to the corresponding attribute of the function. - val aggregateFunctionMap = aggregateExpressions.map { agg => - val aggregateFunction = agg.aggregateFunction - (aggregateFunction, agg.isDistinct) -> - Alias(aggregateFunction, aggregateFunction.toString)().toAttribute - }.toMap - - val (functionsWithDistinct, functionsWithoutDistinct) = - aggregateExpressions.partition(_.isDistinct) - if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) { - // This is a sanity check. We should not reach here since we check the same thing in - // CheckAggregateFunction. - sys.error("Having more than one distinct column sets is not allowed.") + case p: logical.Aggregate => + val converted = + aggregate.Utils.tryConvert( + p, + sqlContext.conf.useSqlAggregate2, + sqlContext.conf.codegenEnabled) + converted match { + case None => Nil // Cannot convert to new aggregation code path. + case Some(logical.Aggregate(groupingExpressions, resultExpressions, child)) => + // Extracts all distinct aggregate expressions from the resultExpressions. + val aggregateExpressions = resultExpressions.flatMap { expr => + expr.collect { + case agg: AggregateExpression2 => agg + } + }.toSet.toSeq + // For those distinct aggregate expressions, we create a map from the + // aggregate function to the corresponding attribute of the function. + val aggregateFunctionMap = aggregateExpressions.map { agg => + val aggregateFunction = agg.aggregateFunction + (aggregateFunction, agg.isDistinct) -> + Alias(aggregateFunction, aggregateFunction.toString)().toAttribute + }.toMap + + val (functionsWithDistinct, functionsWithoutDistinct) = + aggregateExpressions.partition(_.isDistinct) + if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) { + // This is a sanity check. We should not reach here when we have multiple distinct + // column sets (aggregate.NewAggregation will not match). + sys.error( + "Multiple distinct column sets are not supported by the new aggregation" + + "code path.") + } + + val aggregateOperator = + if (functionsWithDistinct.isEmpty) { + aggregate.Utils.planAggregateWithoutDistinct( + groupingExpressions, + aggregateExpressions, + aggregateFunctionMap, + resultExpressions, + planLater(child)) + } else { + aggregate.Utils.planAggregateWithOneDistinct( + groupingExpressions, + functionsWithDistinct, + functionsWithoutDistinct, + aggregateFunctionMap, + resultExpressions, + planLater(child)) + } + + aggregateOperator } - val aggregateOperator = - if (functionsWithDistinct.isEmpty) { - aggregate.Utils.planAggregateWithoutDistinct( - groupingExpressions, - aggregateExpressions, - aggregateFunctionMap, - resultExpressions, - planLater(child)) - } else { - aggregate.Utils.planAggregateWithOneDistinct( - groupingExpressions, - functionsWithDistinct, - functionsWithoutDistinct, - aggregateFunctionMap, - resultExpressions, - planLater(child)) - } - aggregateOperator case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala index 4b3f25df11eea..96d67465b0d48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala @@ -149,7 +149,7 @@ private[sql] abstract class SortAggregationIterator( } } - private def initialize(): Unit = { + protected def initialize(): Unit = { if (inputIter.hasNext) { initializeBuffer() val currentRow = inputIter.next().copy() @@ -474,6 +474,31 @@ class FinalSortAggregationIterator( override protected def initialBufferOffset: Int = groupingExpressions.length + override def initialize(): Unit = { + if (inputIter.hasNext) { + initializeBuffer() + val currentRow = inputIter.next().copy() + // partitionGenerator is a mutable projection. Since we need to track nextGroupingKey, + // we are making a copy at here. + nextGroupingKey = groupGenerator(currentRow).copy() + firstRowInNextGroup = currentRow + } else { + if (groupingExpressions.isEmpty) { + // If there is no grouping expression, we need to generate a single row as the output. + initializeBuffer() + // Right now, the buffer only contains initial buffer values. Because + // merging two buffers with initial values will generate a row that + // still store initial values. We set the currentRow as the copy of the current buffer. + val currentRow = buffer.copy() + nextGroupingKey = groupGenerator(currentRow).copy() + firstRowInNextGroup = currentRow + } else { + // This iter is an empty one. + hasNewGroup = false + } + } + } + override protected def processRow(row: InternalRow): Unit = { // Process all algebraic aggregate functions. algebraicMergeProjection.target(buffer)(joinedRow(buffer, row)) @@ -659,6 +684,31 @@ class FinalAndCompleteSortAggregationIterator( newMutableProjection(evalExpressions, bufferSchemata)() } + override def initialize(): Unit = { + if (inputIter.hasNext) { + initializeBuffer() + val currentRow = inputIter.next().copy() + // partitionGenerator is a mutable projection. Since we need to track nextGroupingKey, + // we are making a copy at here. + nextGroupingKey = groupGenerator(currentRow).copy() + firstRowInNextGroup = currentRow + } else { + if (groupingExpressions.isEmpty) { + // If there is no grouping expression, we need to generate a single row as the output. + initializeBuffer() + // Right now, the buffer only contains initial buffer values. Because + // merging two buffers with initial values will generate a row that + // still store initial values. We set the currentRow as the copy of the current buffer. + val currentRow = buffer.copy() + nextGroupingKey = groupGenerator(currentRow).copy() + firstRowInNextGroup = currentRow + } else { + // This iter is an empty one. + hasNewGroup = false + } + } + } + override protected def processRow(row: InternalRow): Unit = { val input = joinedRow(buffer, row) // For all aggregate functions with mode Complete, update buffers. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala index 50c63fe04110a..6491d169960e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.AnalysisException @@ -6,10 +23,23 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.{StructType, MapType, ArrayType} object Utils { + // Right now, we do not support complex types in the grouping key schema. + private def groupingKeySchemaIsSupported(aggregate: Aggregate): Boolean = { + val hasComplexTypes = aggregate.groupingExpressions.map(_.dataType).exists { + case array: ArrayType => true + case map: MapType => true + case struct: StructType => true + case _ => false + } + + !hasComplexTypes + } + private def tryConvert(plan: LogicalPlan): Option[Aggregate] = plan match { - case p: Aggregate => + case p: Aggregate if groupingKeySchemaIsSupported(p) => val converted = p.transformExpressionsDown { case expressions.Average(child) => aggregate.AggregateExpression2( @@ -76,17 +106,32 @@ object Utils { }.isDefined } - if (!hasAggregateExpression1) Some(converted) else None + // Check if there are multiple distinct columns. + val aggregateExpressions = converted.aggregateExpressions.flatMap { expr => + expr.collect { + case agg: AggregateExpression2 => agg + } + }.toSet.toSeq + val functionsWithDistinct = aggregateExpressions.filter(_.isDistinct) + val hasMultipleDistinctColumnSets = + if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) { + true + } else { + false + } + + if (!hasAggregateExpression1 && !hasMultipleDistinctColumnSets) Some(converted) else None case other => None } def tryConvert( plan: LogicalPlan, - useNewAggregation: Boolean): Option[Aggregate] = plan match { - case p: Aggregate => + useNewAggregation: Boolean, + codeGenEnabled: Boolean): Option[Aggregate] = plan match { + case p: Aggregate if useNewAggregation && codeGenEnabled => val converted = tryConvert(p) - if (useNewAggregation && converted.isDefined) { + if (converted.isDefined) { converted } else { // If the plan cannot be converted, we will do a final round check to if the original diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/aggregate/udaf.scala index 5022c65601286..398b6cda11df8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/aggregate/udaf.scala @@ -67,7 +67,11 @@ abstract class UserDefinedAggregateFunction extends Serializable { /** Indicates if this function is deterministic. */ def deterministic: Boolean - /** Initializes the given aggregation buffer. */ + /** + * Initializes the given aggregation buffer. Initial values set by this method should satisfy + * the condition that when merging two buffers with initial values, the new buffer should + * still store initial values. + */ def initialize(buffer: MutableAggregationBuffer): Unit /** Updates the given aggregation buffer `buffer` with new input data from `input`. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 61d5f2061ae18..3b054c9532c9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -23,6 +23,7 @@ import java.sql.Timestamp import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.errors.DialectException +import org.apache.spark.sql.execution.aggregate.Aggregate2Sort import org.apache.spark.sql.execution.GeneratedAggregate import org.apache.spark.sql.functions._ import org.apache.spark.sql.TestData._ @@ -204,6 +205,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { var hasGeneratedAgg = false df.queryExecution.executedPlan.foreach { case generatedAgg: GeneratedAggregate => hasGeneratedAgg = true + case newAggregate: Aggregate2Sort => hasGeneratedAgg = true case _ => } if (!hasGeneratedAgg) { @@ -285,7 +287,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { // Aggregate with Code generation handling all null values testCodeGen( "SELECT sum('a'), avg('a'), count(null) FROM testData", - Row(0, null, 0) :: Nil) + Row(null, null, 0) :: Nil) } finally { sqlContext.dropTempTable("testData3x") sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 3dd24130af81a..3d71deb13e884 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.SparkFunSuite import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.TestSQLContext._ @@ -30,6 +31,20 @@ import org.apache.spark.sql.{Row, SQLConf, execution} class PlannerSuite extends SparkFunSuite { + private def testPartialAggregationPlan(query: LogicalPlan): Unit = { + val plannedOption = HashAggregation(query).headOption.orElse(Aggregation(query).headOption) + val planned = + plannedOption.getOrElse( + fail(s"Could query play aggregation query $query. Is it an aggregation query?")) + val aggregations = planned.collect { case n if n.nodeName contains "Aggregate" => n } + + // For the new aggregation code path, there will be three aggregate operator for + // distinct aggregations. + assert( + aggregations.size == 2 || aggregations.size == 3, + s"The plan of query $query does not have partial aggregations.") + } + test("unions are collapsed") { val query = testData.unionAll(testData).unionAll(testData).logicalPlan val planned = BasicOperators(query).head @@ -42,23 +57,18 @@ class PlannerSuite extends SparkFunSuite { test("count is partially aggregated") { val query = testData.groupBy('value).agg(count('key)).queryExecution.analyzed - val planned = HashAggregation(query).head - val aggregations = planned.collect { case n if n.nodeName contains "Aggregate" => n } - - assert(aggregations.size === 2) + testPartialAggregationPlan(query) } test("count distinct is partially aggregated") { val query = testData.groupBy('value).agg(countDistinct('key)).queryExecution.analyzed - val planned = HashAggregation(query) - assert(planned.nonEmpty) + testPartialAggregationPlan(query) } test("mixed aggregates are partially aggregated") { val query = testData.groupBy('value).agg(count('value), countDistinct('key)).queryExecution.analyzed - val planned = HashAggregation(query) - assert(planned.nonEmpty) + testPartialAggregationPlan(query) } test("sizeInBytes estimation of limit operator for broadcast hash join optimization") { diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala index 31a49a3683338..24a758f53170a 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala @@ -833,6 +833,7 @@ abstract class HiveWindowFunctionQueryFileBaseSuite "windowing_adjust_rowcontainer_sz" ) + // Only run those query tests in the realWhileList (do not try other ignored query files). override def testCases: Seq[(String, File)] = super.testCases.filter { case (name, _) => realWhiteList.contains(name) } diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala index f458567e5d7ea..1fe4fe9629c02 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.execution +import java.io.File + import org.apache.spark.sql.SQLConf import org.apache.spark.sql.hive.test.TestHive @@ -159,4 +161,9 @@ class SortMergeCompatibilitySuite extends HiveCompatibilitySuite { "join_reorder4", "join_star" ) + + // Only run those query tests in the realWhileList (do not try other ignored query files). + override def testCases: Seq[(String, File)] = super.testCases.filter { + case (name, _) => realWhiteList.contains(name) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4146038a44383..932c15b9b4359 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -451,7 +451,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { DataSinks, Scripts, HashAggregation, - AggregateOperator2, + Aggregation, LeftSemiJoin, HashJoin, BasicOperators, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 158fd71807a6b..93dabd3cb1251 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.execution.aggregate.Aggregate2Sort import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.scalatest.BeforeAndAfterAll import test.org.apache.spark.sql.hive.aggregate.{MyDoubleAvg, MyDoubleSum} @@ -66,6 +67,11 @@ class AggregationQuerySuite extends QueryTest with SQLTestUtils with BeforeAndAf (3, null, null)).toDF("key", "value1", "value2") data2.write.saveAsTable("agg2") + val emptyDF = sqlContext.createDataFrame( + sqlContext.sparkContext.emptyRDD[Row], + StructType(StructField("key", StringType) :: StructField("value", IntegerType) :: Nil)) + emptyDF.registerTempTable("emptyTable") + // Register UDAFs sqlContext.udaf.register("mydoublesum", new MyDoubleSum) sqlContext.udaf.register("mydoubleavg", new MyDoubleAvg) @@ -74,9 +80,67 @@ class AggregationQuerySuite extends QueryTest with SQLTestUtils with BeforeAndAf override def afterAll(): Unit = { sqlContext.sql("DROP TABLE IF EXISTS agg1") sqlContext.sql("DROP TABLE IF EXISTS agg2") + sqlContext.dropTempTable("emptyTable") sqlContext.sql(s"set spark.sql.useAggregate2=$originalUseAggregate2") } + test("empty table") { + // If there is no GROUP BY clause and the table is empty, we will generate a single row. + checkAnswer( + sqlContext.sql( + """ + |SELECT + | AVG(value), + | COUNT(*), + | COUNT(key), + | COUNT(value), + | FIRST(key), + | LAST(value), + | MAX(key), + | MIN(value), + | SUM(key) + |FROM emptyTable + """.stripMargin), + Row(null, 0, 0, 0, null, null, null, null, null) :: Nil) + + checkAnswer( + sqlContext.sql( + """ + |SELECT + | AVG(value), + | COUNT(*), + | COUNT(key), + | COUNT(value), + | FIRST(key), + | LAST(value), + | MAX(key), + | MIN(value), + | SUM(key), + | COUNT(DISTINCT value) + |FROM emptyTable + """.stripMargin), + Row(null, 0, 0, 0, null, null, null, null, null, 0) :: Nil) + + // If there is a GROUP BY clause and the table is empty, there is no output. + checkAnswer( + sqlContext.sql( + """ + |SELECT + | AVG(value), + | COUNT(*), + | COUNT(value), + | FIRST(value), + | LAST(value), + | MAX(value), + | MIN(value), + | SUM(value), + | COUNT(DISTINCT value) + |FROM emptyTable + |GROUP BY key + """.stripMargin), + Nil) + } + test("only do grouping") { checkAnswer( sqlContext.sql(