Skip to content

Commit

Permalink
Merge pull request #10 from yhuai/exchangeOperator
Browse files Browse the repository at this point in the history
Exchange operator
  • Loading branch information
marmbrus committed Feb 2, 2014
2 parents 6015f93 + 45b334b commit 5b7afd8
Show file tree
Hide file tree
Showing 19 changed files with 722 additions and 134 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/catalyst/analysis/unresolved.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import trees.TreeNode
* resolved.
*/
class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String) extends
errors.OptimizationException(tree, s"Invalid call to $function on unresolved object")
errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object")

/**
* Holds the name of a relation that has yet to be looked up in a [[Catalog]].
Expand Down
11 changes: 6 additions & 5 deletions src/main/scala/catalyst/errors/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import trees._
*/
package object errors {

class OptimizationException[TreeType <: TreeNode[_]]
class TreeNodeException[TreeType <: TreeNode[_]]
(tree: TreeType, msg: String, cause: Throwable = null) extends Exception(msg, cause) {

override def getMessage: String = {
Expand All @@ -17,17 +17,18 @@ package object errors {
}

/**
* Wraps any exceptions that are thrown while executing `f` in an [[OptimizationException]],
* attaching the provided `tree`.
* Wraps any exceptions that are thrown while executing `f` in a
* [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`.
*/
def attachTree[TreeType <: TreeNode[_], A](tree: TreeType, msg: String = "")(f: => A): A = {
try f catch {
case e: Exception => throw new OptimizationException(tree, msg, e)
case e: Exception => throw new TreeNodeException(tree, msg, e)
}
}

/**
* Executes `f` which is expected to throw an OptimizationException. The first tree encountered in
* Executes `f` which is expected to throw a
* [[catalyst.errors.TreeNodeException TreeNodeException]]. The first tree encountered in
* the stack of exceptions of type `TreeType` is returned.
*/
def getTree[TreeType <: TreeNode[_]](f: => Unit): TreeType = ??? // TODO: Implement
Expand Down
115 changes: 115 additions & 0 deletions src/main/scala/catalyst/execution/Exchange.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package catalyst
package execution

import catalyst.rules.Rule
import catalyst.errors._
import catalyst.expressions._
import catalyst.plans.physical._

import org.apache.spark.{RangePartitioner, HashPartitioner}
import org.apache.spark.rdd.ShuffledRDD

case class Exchange(newPartitioning: Partitioning, child: SharkPlan)
extends UnaryNode {

override def outputPartitioning = newPartitioning
def output = child.output

def execute() = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) => {
// TODO: Eliminate redundant expressions in grouping key and value.
val rdd = child.execute().map { row =>
(buildRow(expressions.toSeq.map(Evaluate(_, Vector(row)))), row)
}
val part = new HashPartitioner(numPartitions)
val shuffled = new ShuffledRDD[Row, Row, (Row, Row)](rdd, part)

shuffled.map(_._2)
}
case RangePartitioning(sortingExpressions, numPartitions) => {
// TODO: ShuffledRDD should take an Ordering.
implicit val ordering = new RowOrdering(sortingExpressions)

val rdd = child.execute().map(row => (row, null))
val part = new RangePartitioner(numPartitions, rdd, ascending = true)
val shuffled = new ShuffledRDD[Row, Null, (Row, Null)](rdd, part)

shuffled.map(_._1)
}
case SinglePartition => {
val rdd = child.execute().coalesce(1, true)

rdd
}
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
// TODO: Handle BroadcastPartitioning.
}
}
}

/**
* Ensures that the [[catalyst.plans.physical.Partitioning Partitioning]] of input data meets the
* [[catalyst.plans.physical.Distribution Distribution]] requirements for each operator by inserting
* [[Exchange]] Operators where required.
*/
object AddExchange extends Rule[SharkPlan] {
// TODO: Determine the number of partitions.
val numPartitions = 8

def apply(plan: SharkPlan): SharkPlan = plan.transformUp {
case operator: SharkPlan =>
// Check if every child's outputPartitioning satisfies the corresponding
// required data distribution.
def meetsRequirements =
!operator.requiredChildDistribution.zip(operator.children).map {
case (required, child) =>
val valid = child.outputPartitioning.satisfies(required)
logger.debug(
s"${if (valid) "Valid" else "Invalid"} distribution," +
s"required: $required current: ${child.outputPartitioning}")
valid
}.exists(_ == false)

// Check if outputPartitionings of children are compatible with each other.
// It is possible that every child satisfies its required data distribution
// but two children have incompatible outputPartitionings. For example,
// A dataset is range partitioned by "a.asc" (RangePartitioning) and another
// dataset is hash partitioned by "a" (HashPartitioning). Tuples in these two
// datasets are both clustered by "a", but these two outputPartitionings are not
// compatible.
// TODO: ASSUMES TRANSITIVITY?
def compatible =
!operator.children
.map(_.outputPartitioning)
.sliding(2)
.map {
case Seq(a) => true
case Seq(a,b) => a compatibleWith b
}.exists(_ == false)

// Check if the partitioning we want to ensure is the same as the child's output
// partitioning. If so, we do not need to add the Exchange operator.
def addExchangeIfNecessary(partitioning: Partitioning, child: SharkPlan) =
if (child.outputPartitioning != partitioning) Exchange(partitioning, child) else child

if (meetsRequirements && compatible) {
operator
} else {
// At least one child does not satisfies its required data distribution or
// at least one child's outputPartitioning is not compatible with another child's
// outputPartitioning. In this case, we need to add Exchange operators.
val repartitionedChildren = operator.requiredChildDistribution.zip(operator.children).map {
case (AllTuples, child) =>
addExchangeIfNecessary(SinglePartition, child)
case (ClusteredDistribution(clustering), child) =>
addExchangeIfNecessary(HashPartitioning(clustering, numPartitions), child)
case (OrderedDistribution(ordering), child) =>
addExchangeIfNecessary(RangePartitioning(ordering, numPartitions), child)
case (UnspecifiedDistribution, child) => child
case (dist, _) => sys.error(s"Don't know how to ensure $dist")
}
operator.withNewChildren(repartitionedChildren)
}
}
}
9 changes: 6 additions & 3 deletions src/main/scala/catalyst/execution/PlanningStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import expressions._
import planning._
import plans._
import plans.logical.LogicalPlan
import plans.physical._

trait PlanningStrategies {
self: QueryPlanner[SharkPlan] =>
Expand Down Expand Up @@ -142,10 +143,12 @@ trait PlanningStrategies {
object BasicOperators extends Strategy {
def apply(plan: LogicalPlan): Seq[SharkPlan] = plan match {
case logical.Sort(sortExprs, child) =>
execution.Sort(sortExprs, planLater(child)) :: Nil
// TODO: It is correct, but overkill to do a global sorting here.
// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, true, planLater(child)):: Nil
case logical.SortPartitions(sortExprs, child) =>
execution.Sort(sortExprs, planLater(child)) :: Nil
// This sort only sort tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
execution.Sort(sortExprs, false, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/catalyst/execution/SharkInstance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ abstract class SharkInstance extends Logging {

object PrepareForExecution extends RuleExecutor[SharkPlan] {
val batches =
Batch("Add exchange", Once, AddExchange) ::
Batch("Prepare Expressions", Once, new expressions.BindReferences[SharkPlan]) :: Nil
}

Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/catalyst/execution/SharkPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ package execution
import org.apache.spark.rdd.RDD

import catalyst.plans.QueryPlan
import catalyst.plans.physical._

abstract class SharkPlan extends QueryPlan[SharkPlan] with Logging {
self: Product =>

// TODO: Move to `DistributedPlan`
/** Specifies how data is partitioned across different nodes in the cluster. */
def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH!
/** Specifies any partition requirements on the input data for this operator. */
def requiredChildDistribution: Seq[Distribution] = Seq.fill(children.size)(UnspecifiedDistribution)

/**
* Runs this query returning the result as an RDD.
*/
Expand All @@ -27,6 +34,7 @@ trait LeafNode extends SharkPlan with trees.LeafNode[SharkPlan] {

trait UnaryNode extends SharkPlan with trees.UnaryNode[SharkPlan] {
self: Product =>
override def outputPartitioning: Partitioning = child.outputPartitioning
}

trait BinaryNode extends SharkPlan with trees.BinaryNode[SharkPlan] {
Expand Down
Loading

0 comments on commit 5b7afd8

Please sign in to comment.