Skip to content

Commit

Permalink
add LimitPushDown
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jun 24, 2015
1 parent 948f740 commit 2d8be83
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object DefaultOptimizer extends Optimizer {
PushPredicateThroughProject,
PushPredicateThroughGenerate,
ColumnPruning,
LimitPushDown,
ProjectCollapsing,
CombineLimits,
NullPropagation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,6 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("column pruning for Project(ne, Limit)") {
val originalQuery =
testRelation
.select('a, 'b)
.limit(2)
.select('a)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.select('a)
.limit(2).analyze

comparePlans(optimized, correctAnswer)
}

// After this line is unimplemented.
test("simple push down") {
val originalQuery =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,56 @@

package org.apache.spark.sql.catalyst.optimizer

class LimitPushDownSuit {
import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._

class LimitPushDownSuit extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Limit PushDown", FixedPoint(10),
LimitPushDown,
CombineLimits,
ConstantFolding,
BooleanSimplification) :: Nil
}

val testRelation = LocalRelation('a.int)

test("push down limit when the child is project on limit") {
val originalQuery =
testRelation
.limit(10)
.select('a)
.limit(2)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.limit(2)
.select('a).analyze

comparePlans(optimized, correctAnswer)
}

test("push down limit when the child is project on sort") {
val originalQuery =
testRelation
.sortBy(SortOrder('a, Ascending))
.select('a)
.limit(2)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.sortBy(SortOrder('a, Ascending))
.limit(2)
.select('a).analyze

comparePlans(optimized, correctAnswer)
}
}

0 comments on commit 2d8be83

Please sign in to comment.