From 2d8be83cf2aeb948b4f0fd15f2d978ec9b02c997 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 12 Jun 2015 14:06:13 +0800 Subject: [PATCH] add LimitPushDown --- .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../optimizer/FilterPushdownSuite.scala | 16 ------ .../optimizer/LimitPushDownSuit.scala | 52 ++++++++++++++++++- 3 files changed, 52 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 92f3402b9258c..1b9aad80d4e74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -44,6 +44,7 @@ object DefaultOptimizer extends Optimizer { PushPredicateThroughProject, PushPredicateThroughGenerate, ColumnPruning, + LimitPushDown, ProjectCollapsing, CombineLimits, NullPropagation, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ffdc673cdc455..fc78366e0c33f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -95,22 +95,6 @@ class FilterPushdownSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("column pruning for Project(ne, Limit)") { - val originalQuery = - testRelation - .select('a, 'b) - .limit(2) - .select('a) - - val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = - testRelation - .select('a) - .limit(2).analyze - - comparePlans(optimized, correctAnswer) - } - // After this line is unimplemented. test("simple push down") { val originalQuery = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownSuit.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownSuit.scala index 299feed1ba4a2..5d774efc63c62 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownSuit.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushDownSuit.scala @@ -17,6 +17,56 @@ package org.apache.spark.sql.catalyst.optimizer -class LimitPushDownSuit { +import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +class LimitPushDownSuit extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Limit PushDown", FixedPoint(10), + LimitPushDown, + CombineLimits, + ConstantFolding, + BooleanSimplification) :: Nil + } + + val testRelation = LocalRelation('a.int) + + test("push down limit when the child is project on limit") { + val originalQuery = + testRelation + .limit(10) + .select('a) + .limit(2) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .limit(2) + .select('a).analyze + + comparePlans(optimized, correctAnswer) + } + + test("push down limit when the child is project on sort") { + val originalQuery = + testRelation + .sortBy(SortOrder('a, Ascending)) + .select('a) + .limit(2) + + val optimized = Optimize.execute(originalQuery.analyze) + val correctAnswer = + testRelation + .sortBy(SortOrder('a, Ascending)) + .limit(2) + .select('a).analyze + + comparePlans(optimized, correctAnswer) + } }