Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jun 24, 2015
1 parent b558549 commit 3676a82
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ object DefaultOptimizer extends Optimizer {
Batch("Operator Optimizations", FixedPoint(100),
// Operator push down
UnionPushDown,
LimitPushDown,
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
Expand Down Expand Up @@ -112,20 +111,6 @@ object UnionPushDown extends Rule[LogicalPlan] {
}
}

object LimitPushDown extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Push down limit when the child is project on limit.
case Limit(expr, Project(projectList, l: Limit)) =>
Project(projectList, Limit(expr, l))

// Push down limit when the child is project on sort,
// and we cannot push down this project through sort.
case Limit(expr, p @ Project(projectList, s: Sort))
if !s.references.subsetOf(p.outputSet) =>
Project(projectList, Limit(expr, s))
}
}

/**
* Attempts to eliminate the reading of unneeded columns from the query plan using the following
* transformations:
Expand Down Expand Up @@ -175,7 +160,11 @@ object ColumnPruning extends Rule[LogicalPlan] {

Join(left, prunedChild(right, allReferences), LeftSemi, condition)

// push down project if possible when the child is sort
// Push down project through limit, so that we may have chance to push it further.
case Project(projectList, Limit(exp, child)) =>
Limit(exp, Project(projectList, child))

// Push down project if possible when the child is sort
case p @ Project(projectList, s @ Sort(_, _, grandChild))
if s.references.subsetOf(p.outputSet) =>
s.copy(child = Project(projectList, grandChild))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("column pruning for Project(ne, Limit)") {
val originalQuery =
testRelation
.select('a, 'b)
.limit(2)
.select('a)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.select('a)
.limit(2).analyze

comparePlans(optimized, correctAnswer)
}

// After this line is unimplemented.
test("simple push down") {
val originalQuery =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
experimental.extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrdered ::
TakeOrderedAndProject ::
HashAggregation ::
LeftSemiJoin ::
HashJoin ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
protected lazy val singleRowRdd =
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): InternalRow), 1)

object TakeOrdered extends Strategy {
object TakeOrderedAndProject extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrdered(limit, order, planLater(child)) :: Nil
execution.TakeOrderedAndProject(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit),
logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProject(limit, order, Some(projectList), planLater(child)) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,30 +147,41 @@ case class Limit(limit: Int, child: SparkPlan)

/**
* :: DeveloperApi ::
* Take the first limit elements as defined by the sortOrder. This is logically equivalent to
* having a [[Limit]] operator after a [[Sort]] operator. This could have been named TopK, but
* Spark's top operator does the opposite in ordering so we name it TakeOrdered to avoid confusion.
* Take the first limit elements as defined by the sortOrder, and do projection if needed.
* This is logically equivalent to having a [[Limit]] operator after a [[Sort]] operator,
* or having a [[Project]] operator between them.
* This could have been named TopK, but Spark's top operator does the opposite in ordering
* so we name it TakeOrdered to avoid confusion.
*/
@DeveloperApi
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode {
case class TakeOrderedAndProject(
limit: Int,
sortOrder: Seq[SortOrder],
projectList: Option[Seq[NamedExpression]],
child: SparkPlan) extends UnaryNode {

override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = SinglePartition

private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)

private def collectData(): Array[InternalRow] =
child.execute().map(_.copy()).takeOrdered(limit)(ord)
private val projection = projectList.map(newProjection(_, child.output))

private def collectData(): Iterator[InternalRow] = {
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord).toIterator
projection.map(data.map(_)).getOrElse(data)
}

override def executeCollect(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
collectData().map(converter(_).asInstanceOf[Row])
collectData().map(converter(_).asInstanceOf[Row]).toArray
}

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1)
protected override def doExecute(): RDD[InternalRow] =
sparkContext.makeRDD(collectData().toArray[InternalRow], 1)

override def outputOrdering: Seq[SortOrder] = sortOrder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,10 @@ class PlannerSuite extends SparkFunSuite {

setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
}

test("efficient limit -> project -> sort") {
val query = testData.sort('key).select('value).limit(2).logicalPlan
val planned = planner.TakeOrderedAndProject(query)
assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
HiveCommandStrategy(self),
HiveDDLStrategy,
DDLStrategy,
TakeOrdered,
TakeOrderedAndProject,
ParquetOperations,
InMemoryScans,
ParquetConversion, // Must be before HiveTableScans
Expand Down

0 comments on commit 3676a82

Please sign in to comment.