From 1a02a5550fd579c704eb1ea282344f96479c3613 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 23 Jun 2015 16:29:53 +0900 Subject: [PATCH] Rolled-back test-conf cleanup & fixed possible CCE & added more tests --- .../sql/catalyst/expressions/Projection.scala | 10 +++--- .../scala/org/apache/spark/sql/SQLConf.scala | 4 --- .../sql/execution/GeneratedAggregate.scala | 2 +- .../spark/sql/execution/SparkPlan.scala | 5 +-- .../sql/parquet/ParquetTableOperations.scala | 2 +- .../spark/sql/test/TestSQLContext.scala | 15 -------- .../spark/sql/execution/AggregateSuite.scala | 34 ++++++++++++------- .../spark/sql/execution/SparkPlanTest.scala | 10 ------ 8 files changed, 33 insertions(+), 49 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index fcfe83ceb863a..6514694087ec1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -22,9 +22,11 @@ package org.apache.spark.sql.catalyst.expressions * @param expressions a sequence of expressions that determine the value of each column of the * output row. */ -class InterpretedProjection(expressions: Seq[Expression]) extends Projection { - def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = - this(expressions.map(BindReferences.bindReference(_, inputSchema))) +class InterpretedProjection(expressions: Seq[Expression], mutableRow: Boolean = false) + extends Projection { + def this(expressions: Seq[Expression], + inputSchema: Seq[Attribute], mutableRow: Boolean = false) = + this(expressions.map(BindReferences.bindReference(_, inputSchema)), mutableRow) // null check is required for when Kryo invokes the no-arg constructor. protected val exprArray = if (expressions != null) expressions.toArray else null @@ -36,7 +38,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection { outputArray(i) = exprArray(i).eval(input) i += 1 } - new GenericInternalRow(outputArray) + if (mutableRow) new GenericMutableRow(outputArray) else new GenericInternalRow(outputArray) } override def toString: String = s"Row => [${exprArray.mkString(",")}]" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index b3fb097fb02cf..9a10a23937fbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -550,10 +550,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf { props.foreach { case (k, v) => setConfString(k, v) } } - def setConf(props: Map[String, String]): Unit = settings.synchronized { - props.foreach { case (k, v) => setConfString(k, v) } - } - /** Set the given Spark SQL configuration property using a `string` value. */ def setConfString(key: String, value: String): Unit = { require(key != null, "key cannot be null") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 9b4ddc3ac2d94..10cd29f6f7bc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -246,7 +246,7 @@ case class GeneratedAggregate( child.execute().mapPartitions { iter => // Builds a new custom class for holding the results of aggregation for a group. val initialValues = computeFunctions.flatMap(_.initialValues) - val newAggregationBuffer = newProjection(initialValues, child.output) + val newAggregationBuffer = newProjection(initialValues, child.output, mutableRow = true) log.info(s"Initial values: ${initialValues.mkString(",")}") // A projection that computes the group given an input tuple. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 47f56b2b7ebe6..99f8e9433c919 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -153,13 +153,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } protected def newProjection( - expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = { + expressions: Seq[Expression], + inputSchema: Seq[Attribute], mutableRow: Boolean = false): Projection = { log.debug( s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") if (codegenEnabled && expressions.forall(_.isThreadSafe)) { GenerateProjection.generate(expressions, inputSchema) } else { - new InterpretedProjection(expressions, inputSchema) + new InterpretedProjection(expressions, inputSchema, mutableRow) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index b30fc171c0af1..3031886700225 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InternalRow, _} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} import org.apache.spark.sql.types.StructType -import org.apache.spark.{Logging, TaskContext} +import org.apache.spark.{SparkContext, Logging, TaskContext} import org.apache.spark.util.SerializableConfiguration /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala index 1e8ec9ab81b60..9fa394525d65c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.test -import scala.collection.immutable import scala.language.implicitConversions import org.apache.spark.{SparkConf, SparkContext} @@ -37,18 +36,9 @@ class LocalSQLContext } protected[sql] class SQLSession extends super.SQLSession { - var backup: immutable.Map[String, String] = null protected[sql] override lazy val conf: SQLConf = new SQLConf { /** Fewer partitions to speed up testing. */ override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, 5) - backup = getAllConfs - } - - protected[sql] def reset() = { - if (backup != null) { - conf.clear() - conf.setConf(backup) - } } } @@ -60,11 +50,6 @@ class LocalSQLContext DataFrame(this, plan) } - /** - * Reset session conf to initial state - */ - protected[sql] def resetConf(): Unit = currentSession().asInstanceOf[SQLSession].reset - } object TestSQLContext extends LocalSQLContext diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala index fb088028f9a3b..4b74b636982e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/AggregateSuite.scala @@ -25,18 +25,28 @@ class AggregateSuite extends SparkPlanTest { test("SPARK-8357 Memory leakage on unsafe aggregation path with empty input") { - val df = Seq.empty[(String, Int, Double)].toDF("a", "b", "c") - - val groupExpr = df.col("b").expr - val aggrExpr = Alias(Count(Cast(groupExpr, LongType)), "Count")() - - for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true)); - partial <- Seq(false, true)) { - TestSQLContext.conf.setConfString("spark.sql.codegen", String.valueOf(codegen)) - checkAnswer( - df, - GeneratedAggregate(partial, groupExpr :: Nil, aggrExpr :: Nil, unsafe, _: SparkPlan), - Seq.empty[(String, Int, Double)]) + val input = Seq.empty[(String, Int, Double)] + val df = input.toDF("a", "b", "c") + + val colB = df.col("b").expr + val colC = df.col("c").expr + val aggrExpr = Alias(Count(Cast(colC, LongType)), "Count")() + + // hack : current default parallelism of test local backend is two + val two = Seq(Tuple1(0L), Tuple1(0L)) + val empty = Seq.empty[Tuple1[Long]] + + val codegenDefault = TestSQLContext.conf.getConfString("spark.sql.codegen") + try { + for ((codegen, unsafe) <- Seq((false, false), (true, false), (true, true)); + partial <- Seq(false, true); groupExpr <- Seq(colB :: Nil, Seq.empty)) { + TestSQLContext.conf.setConfString("spark.sql.codegen", String.valueOf(codegen)) + checkAnswer(df, + GeneratedAggregate(partial, groupExpr, aggrExpr :: Nil, unsafe, _: SparkPlan), + if (groupExpr.isEmpty && !partial) two else empty) + } + } finally { + TestSQLContext.conf.setConfString("spark.sql.codegen", codegenDefault) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index d454ca24dcd0a..13f3be8ca28d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution -import org.scalatest.Tag - import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -45,14 +43,6 @@ class SparkPlanTest extends SparkFunSuite { TestSQLContext.implicits.localSeqToDataFrameHolder(data) } - protected override def test(testName: String, testTags: Tag*)(testFun: => Unit): Unit = { - try { - super.test(testName, testTags: _*)(testFun) - } finally { - TestSQLContext.resetConf() - } - } - /** * Runs the plan and makes sure the answer matches the expected result. * @param input the input data to be used.