diff --git a/src/main/scala/catalyst/analysis/unresolved.scala b/src/main/scala/catalyst/analysis/unresolved.scala index 54ed4d9802392..d198c6dd8ce1c 100644 --- a/src/main/scala/catalyst/analysis/unresolved.scala +++ b/src/main/scala/catalyst/analysis/unresolved.scala @@ -10,7 +10,7 @@ import trees.TreeNode * resolved. */ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String) extends - errors.OptimizationException(tree, s"Invalid call to $function on unresolved object") + errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object") /** * Holds the name of a relation that has yet to be looked up in a [[Catalog]]. diff --git a/src/main/scala/catalyst/errors/package.scala b/src/main/scala/catalyst/errors/package.scala index df829ac26fa93..953e37d7f295e 100644 --- a/src/main/scala/catalyst/errors/package.scala +++ b/src/main/scala/catalyst/errors/package.scala @@ -7,7 +7,7 @@ import trees._ */ package object errors { - class OptimizationException[TreeType <: TreeNode[_]] + class TreeNodeException[TreeType <: TreeNode[_]] (tree: TreeType, msg: String, cause: Throwable = null) extends Exception(msg, cause) { override def getMessage: String = { @@ -17,17 +17,18 @@ package object errors { } /** - * Wraps any exceptions that are thrown while executing `f` in an [[OptimizationException]], - * attaching the provided `tree`. + * Wraps any exceptions that are thrown while executing `f` in a + * [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`. */ def attachTree[TreeType <: TreeNode[_], A](tree: TreeType, msg: String = "")(f: => A): A = { try f catch { - case e: Exception => throw new OptimizationException(tree, msg, e) + case e: Exception => throw new TreeNodeException(tree, msg, e) } } /** - * Executes `f` which is expected to throw an OptimizationException. The first tree encountered in + * Executes `f` which is expected to throw a + * [[catalyst.errors.TreeNodeException TreeNodeException]]. The first tree encountered in * the stack of exceptions of type `TreeType` is returned. */ def getTree[TreeType <: TreeNode[_]](f: => Unit): TreeType = ??? // TODO: Implement diff --git a/src/main/scala/catalyst/execution/Exchange.scala b/src/main/scala/catalyst/execution/Exchange.scala new file mode 100644 index 0000000000000..076d4a8e69a9f --- /dev/null +++ b/src/main/scala/catalyst/execution/Exchange.scala @@ -0,0 +1,115 @@ +package catalyst +package execution + +import catalyst.rules.Rule +import catalyst.errors._ +import catalyst.expressions._ +import catalyst.plans.physical._ + +import org.apache.spark.{RangePartitioner, HashPartitioner} +import org.apache.spark.rdd.ShuffledRDD + +case class Exchange(newPartitioning: Partitioning, child: SharkPlan) + extends UnaryNode { + + override def outputPartitioning = newPartitioning + def output = child.output + + def execute() = attachTree(this , "execute") { + newPartitioning match { + case HashPartitioning(expressions, numPartitions) => { + // TODO: Eliminate redundant expressions in grouping key and value. + val rdd = child.execute().map { row => + (buildRow(expressions.toSeq.map(Evaluate(_, Vector(row)))), row) + } + val part = new HashPartitioner(numPartitions) + val shuffled = new ShuffledRDD[Row, Row, (Row, Row)](rdd, part) + + shuffled.map(_._2) + } + case RangePartitioning(sortingExpressions, numPartitions) => { + // TODO: ShuffledRDD should take an Ordering. + implicit val ordering = new RowOrdering(sortingExpressions) + + val rdd = child.execute().map(row => (row, null)) + val part = new RangePartitioner(numPartitions, rdd, ascending = true) + val shuffled = new ShuffledRDD[Row, Null, (Row, Null)](rdd, part) + + shuffled.map(_._1) + } + case SinglePartition => { + val rdd = child.execute().coalesce(1, true) + + rdd + } + case _ => sys.error(s"Exchange not implemented for $newPartitioning") + // TODO: Handle BroadcastPartitioning. + } + } +} + +/** + * Ensures that the [[catalyst.plans.physical.Partitioning Partitioning]] of input data meets the + * [[catalyst.plans.physical.Distribution Distribution]] requirements for each operator by inserting + * [[Exchange]] Operators where required. + */ +object AddExchange extends Rule[SharkPlan] { + // TODO: Determine the number of partitions. + val numPartitions = 8 + + def apply(plan: SharkPlan): SharkPlan = plan.transformUp { + case operator: SharkPlan => + // Check if every child's outputPartitioning satisfies the corresponding + // required data distribution. + def meetsRequirements = + !operator.requiredChildDistribution.zip(operator.children).map { + case (required, child) => + val valid = child.outputPartitioning.satisfies(required) + logger.debug( + s"${if (valid) "Valid" else "Invalid"} distribution," + + s"required: $required current: ${child.outputPartitioning}") + valid + }.exists(_ == false) + + // Check if outputPartitionings of children are compatible with each other. + // It is possible that every child satisfies its required data distribution + // but two children have incompatible outputPartitionings. For example, + // A dataset is range partitioned by "a.asc" (RangePartitioning) and another + // dataset is hash partitioned by "a" (HashPartitioning). Tuples in these two + // datasets are both clustered by "a", but these two outputPartitionings are not + // compatible. + // TODO: ASSUMES TRANSITIVITY? + def compatible = + !operator.children + .map(_.outputPartitioning) + .sliding(2) + .map { + case Seq(a) => true + case Seq(a,b) => a compatibleWith b + }.exists(_ == false) + + // Check if the partitioning we want to ensure is the same as the child's output + // partitioning. If so, we do not need to add the Exchange operator. + def addExchangeIfNecessary(partitioning: Partitioning, child: SharkPlan) = + if (child.outputPartitioning != partitioning) Exchange(partitioning, child) else child + + if (meetsRequirements && compatible) { + operator + } else { + // At least one child does not satisfies its required data distribution or + // at least one child's outputPartitioning is not compatible with another child's + // outputPartitioning. In this case, we need to add Exchange operators. + val repartitionedChildren = operator.requiredChildDistribution.zip(operator.children).map { + case (AllTuples, child) => + addExchangeIfNecessary(SinglePartition, child) + case (ClusteredDistribution(clustering), child) => + addExchangeIfNecessary(HashPartitioning(clustering, numPartitions), child) + case (OrderedDistribution(ordering), child) => + addExchangeIfNecessary(RangePartitioning(ordering, numPartitions), child) + case (UnspecifiedDistribution, child) => child + case (dist, _) => sys.error(s"Don't know how to ensure $dist") + } + operator.withNewChildren(repartitionedChildren) + } + } +} \ No newline at end of file diff --git a/src/main/scala/catalyst/execution/PlanningStrategies.scala b/src/main/scala/catalyst/execution/PlanningStrategies.scala index d39d383e24629..10f0f051bddc8 100644 --- a/src/main/scala/catalyst/execution/PlanningStrategies.scala +++ b/src/main/scala/catalyst/execution/PlanningStrategies.scala @@ -5,6 +5,7 @@ import expressions._ import planning._ import plans._ import plans.logical.LogicalPlan +import plans.physical._ trait PlanningStrategies { self: QueryPlanner[SharkPlan] => @@ -142,10 +143,12 @@ trait PlanningStrategies { object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SharkPlan] = plan match { case logical.Sort(sortExprs, child) => - execution.Sort(sortExprs, planLater(child)) :: Nil - // TODO: It is correct, but overkill to do a global sorting here. + // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution. + execution.Sort(sortExprs, true, planLater(child)):: Nil case logical.SortPartitions(sortExprs, child) => - execution.Sort(sortExprs, planLater(child)) :: Nil + // This sort only sort tuples within a partition. Its requiredDistribution will be + // an UnspecifiedDistribution. + execution.Sort(sortExprs, false, planLater(child)) :: Nil case logical.Project(projectList, child) => execution.Project(projectList, planLater(child)) :: Nil case logical.Filter(condition, child) => diff --git a/src/main/scala/catalyst/execution/SharkInstance.scala b/src/main/scala/catalyst/execution/SharkInstance.scala index 6de1d1ded8168..044bb8eb2eb3c 100644 --- a/src/main/scala/catalyst/execution/SharkInstance.scala +++ b/src/main/scala/catalyst/execution/SharkInstance.scala @@ -83,6 +83,7 @@ abstract class SharkInstance extends Logging { object PrepareForExecution extends RuleExecutor[SharkPlan] { val batches = + Batch("Add exchange", Once, AddExchange) :: Batch("Prepare Expressions", Once, new expressions.BindReferences[SharkPlan]) :: Nil } diff --git a/src/main/scala/catalyst/execution/SharkPlan.scala b/src/main/scala/catalyst/execution/SharkPlan.scala index 6116d8397b090..c22ec2831fe1b 100644 --- a/src/main/scala/catalyst/execution/SharkPlan.scala +++ b/src/main/scala/catalyst/execution/SharkPlan.scala @@ -4,10 +4,17 @@ package execution import org.apache.spark.rdd.RDD import catalyst.plans.QueryPlan +import catalyst.plans.physical._ abstract class SharkPlan extends QueryPlan[SharkPlan] with Logging { self: Product => + // TODO: Move to `DistributedPlan` + /** Specifies how data is partitioned across different nodes in the cluster. */ + def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! + /** Specifies any partition requirements on the input data for this operator. */ + def requiredChildDistribution: Seq[Distribution] = Seq.fill(children.size)(UnspecifiedDistribution) + /** * Runs this query returning the result as an RDD. */ @@ -27,6 +34,7 @@ trait LeafNode extends SharkPlan with trees.LeafNode[SharkPlan] { trait UnaryNode extends SharkPlan with trees.UnaryNode[SharkPlan] { self: Product => + override def outputPartitioning: Partitioning = child.outputPartitioning } trait BinaryNode extends SharkPlan with trees.BinaryNode[SharkPlan] { diff --git a/src/main/scala/catalyst/execution/aggregates.scala b/src/main/scala/catalyst/execution/aggregates.scala index c15414cf002f9..4328517d7ec7a 100644 --- a/src/main/scala/catalyst/execution/aggregates.scala +++ b/src/main/scala/catalyst/execution/aggregates.scala @@ -3,9 +3,11 @@ package execution import catalyst.errors._ import catalyst.expressions._ +import catalyst.plans.physical.{ClusteredDistribution, AllTuples} +import org.apache.spark.rdd.SharkPairRDDFunctions /* Implicits */ -import org.apache.spark.SparkContext._ +import SharkPairRDDFunctions._ case class Aggregate( groupingExpressions: Seq[Expression], @@ -13,82 +15,17 @@ case class Aggregate( child: SharkPlan)(@transient sc: SharkContext) extends UnaryNode { - // TODO: Move these default functions back to expressions. Build framework for instantiating them. - case class AverageFunction(expr: Expression, base: AggregateExpression) - extends AggregateFunction { - - def this() = this(null, null) // Required for serialization. - - var count: Long = _ - var sum: Long = _ - - def result: Any = sum.toDouble / count.toDouble - - def apply(input: Seq[Row]): Unit = { - count += 1 - // TODO: Support all types here... - sum += Evaluate(expr, input).asInstanceOf[Int] - } - } - - case class CountFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { - def this() = this(null, null) // Required for serialization. - - var count: Int = _ - - def apply(input: Seq[Row]): Unit = { - val evaluatedExpr = expr.map(Evaluate(_, input)) - if (evaluatedExpr.map(_ != null).reduceLeft(_ || _)) { - count += 1 - } - } - - def result: Any = count - } - - case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { - def this() = this(null, null) // Required for serialization. - - var sum = Evaluate(Cast(Literal(0), expr.dataType), Nil) - - def apply(input: Seq[Row]): Unit = - sum = Evaluate(Add(Literal(sum), expr), input) - - def result: Any = sum - } - - case class CountDistinctFunction(expr: Seq[Expression], base: AggregateExpression) - extends AggregateFunction { - - def this() = this(null, null) // Required for serialization. - - val seen = new scala.collection.mutable.HashSet[Any]() - - def apply(input: Seq[Row]): Unit = { - val evaluatedExpr = expr.map(Evaluate(_, input)) - if (evaluatedExpr.map(_ != null).reduceLeft(_ && _)) { - seen += evaluatedExpr - } - } - - def result: Any = seen.size - } - - case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { - def this() = this(null, null) // Required for serialization. - - var result: Any = null - - def apply(input: Seq[Row]): Unit = { - if (result == null) { - result = Evaluate(expr, input) - } + override def requiredChildDistribution = + if (groupingExpressions == Nil) { + AllTuples :: Nil + } else { + ClusteredDistribution(groupingExpressions) :: Nil } - } override def otherCopyArgs = sc :: Nil def output = aggregateExpressions.map(_.toAttribute) + /* Replace all aggregate expressions with spark functions that will compute the result. */ def createAggregateImplementations() = aggregateExpressions.map { agg => val impl = agg transform { case base @ Average(expr) => new AverageFunction(expr, base) @@ -119,12 +56,14 @@ case class Aggregate( } def execute() = attachTree(this, "execute") { + // TODO: If the child of it is an [[catalyst.execution.Exchange]], + // do not evaluate the groupingExpressions again since we have evaluated it + // in the [[catalyst.execution.Exchange]]. val grouped = child.execute().map { row => (buildRow(groupingExpressions.map(Evaluate(_, Vector(row)))), row) - }.groupByKey() + }.groupByKeyLocally() val result = grouped.map { case (group, rows) => - // Replace all aggregate expressions with spark functions that will compute the result. val aggImplementations = createAggregateImplementations() // Pull out all the functions so we can feed each row into them. @@ -148,3 +87,76 @@ case class Aggregate( } } } + +// TODO: Move these default functions back to expressions. Build framework for instantiating them. +case class AverageFunction(expr: Expression, base: AggregateExpression) + extends AggregateFunction { + + def this() = this(null, null) // Required for serialization. + + var count: Long = _ + var sum: Long = _ + + def result: Any = sum.toDouble / count.toDouble + + def apply(input: Seq[Row]): Unit = { + count += 1 + // TODO: Support all types here... + sum += Evaluate(expr, input).asInstanceOf[Int] + } +} + +case class CountFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { + def this() = this(null, null) // Required for serialization. + + var count: Int = _ + + def apply(input: Seq[Row]): Unit = { + val evaluatedExpr = expr.map(Evaluate(_, input)) + if (evaluatedExpr.map(_ != null).reduceLeft(_ || _)) { + count += 1 + } + } + + def result: Any = count +} + +case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { + def this() = this(null, null) // Required for serialization. + + var sum = Evaluate(Cast(Literal(0), expr.dataType), Nil) + + def apply(input: Seq[Row]): Unit = + sum = Evaluate(Add(Literal(sum), expr), input) + + def result: Any = sum +} + +case class CountDistinctFunction(expr: Seq[Expression], base: AggregateExpression) + extends AggregateFunction { + + def this() = this(null, null) // Required for serialization. + + val seen = new scala.collection.mutable.HashSet[Any]() + + def apply(input: Seq[Row]): Unit = { + val evaluatedExpr = expr.map(Evaluate(_, input)) + if (evaluatedExpr.map(_ != null).reduceLeft(_ && _)) { + seen += evaluatedExpr + } + } + + def result: Any = seen.size +} + +case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { + def this() = this(null, null) // Required for serialization. + + var result: Any = null + + def apply(input: Seq[Row]): Unit = { + if (result == null) { + result = Evaluate(expr, input) + } + } +} \ No newline at end of file diff --git a/src/main/scala/catalyst/execution/basicOperators.scala b/src/main/scala/catalyst/execution/basicOperators.scala index d1187c1771e8d..b44c1964fb261 100644 --- a/src/main/scala/catalyst/execution/basicOperators.scala +++ b/src/main/scala/catalyst/execution/basicOperators.scala @@ -1,11 +1,9 @@ package catalyst package execution -import errors._ -import expressions._ -import types._ - -import org.apache.spark.SparkContext._ +import catalyst.errors._ +import catalyst.expressions._ +import catalyst.plans.physical.{UnspecifiedDistribution, OrderedDistribution} case class Project(projectList: Seq[NamedExpression], child: SharkPlan) extends UnaryNode { def output = projectList.map(_.toAttribute) @@ -51,22 +49,23 @@ case class StopAfter(limit: Int, child: SharkPlan)(@transient sc: SharkContext) def execute() = sc.makeRDD(executeCollect(), 1) } -case class Sort(sortExprs: Seq[SortOrder], child: SharkPlan) extends UnaryNode { - val numPartitions = 8 // TODO: Set with input cardinality +case class Sort( + sortOrder: Seq[SortOrder], + global: Boolean, + child: SharkPlan) + extends UnaryNode { + override def requiredChildDistribution = + if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - private final val directions = sortExprs.map(_.direction).toIndexedSeq - private final val dataTypes = sortExprs.map(_.dataType).toIndexedSeq + @transient + lazy val ordering = new RowOrdering(sortOrder) - // TODO: Don't include redundant expressions in both sortKey and row. def execute() = attachTree(this, "sort") { - import scala.math.Ordering.Implicits._ - implicit val ordering = new RowOrdering(sortExprs) - - // TODO: Allow spark to take the ordering as an argument, also avoid needless pair creation. + // TODO: Optimize sorting operation? child.execute() - .mapPartitions(iter => iter.map(row => (row, null))) - .sortByKey(ascending = true, numPartitions) - .map(_._1) + .mapPartitions( + iterator => iterator.toArray.sorted(ordering).iterator, + preservesPartitioning = true) } def output = child.output @@ -75,6 +74,9 @@ case class Sort(sortExprs: Seq[SortOrder], child: SharkPlan) extends UnaryNode { // TODO: Rename: SchemaRDD case class LocalRelation(output: Seq[Attribute], data: Seq[IndexedSeq[Any]]) (@transient sc: SharkContext) extends LeafNode { - def execute() = sc.makeRDD(data.map(buildRow), 1) + + // Since LocalRelation is used for unit tests, set the defaultParallelism to 2 + // to make sure we can cover bugs appearing in a distributed environment. + def execute() = sc.makeRDD(data.map(buildRow), 2) } diff --git a/src/main/scala/catalyst/execution/joins.scala b/src/main/scala/catalyst/execution/joins.scala index adfb06d90bbbc..72d24835c7ae2 100644 --- a/src/main/scala/catalyst/execution/joins.scala +++ b/src/main/scala/catalyst/execution/joins.scala @@ -2,40 +2,46 @@ package catalyst package execution import scala.collection.mutable - -import org.apache.spark.rdd.RDD +import scala.Some import errors._ import expressions._ import plans._ +import org.apache.spark.rdd.SharkPairRDDFunctions._ -/* Implicits */ -import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import catalyst.plans.physical.{ClusteredDistribution, Partitioning} case class SparkEquiInnerJoin( leftKeys: Seq[Expression], rightKeys: Seq[Expression], left: SharkPlan, - right: SharkPlan) - extends BinaryNode { + right: SharkPlan) extends BinaryNode { + + override def outputPartitioning: Partitioning = left.outputPartitioning + + override def requiredChildDistribution = + ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil def output = left.output ++ right.output def execute() = attachTree(this, "execute") { - val leftWithKeys = left.execute .map { row => + val leftWithKeys = left.execute().map { row => val joinKeys = leftKeys.map(Evaluate(_, Vector(row))) - logger.debug(s"Generated left join keys ($leftKeys) => ($joinKeys) given row $row") + logger.debug(s"Generated left join keys [${leftKeys.mkString(",")}] =>" + + s"[${joinKeys.mkString(",")}] given row $row") (joinKeys, row) } val rightWithKeys = right.execute().map { row => val joinKeys = rightKeys.map(Evaluate(_, Vector(EmptyRow, row))) - logger.debug(s"Generated right join keys ($rightKeys) => ($joinKeys) given row $row") + logger.debug(s"Generated right join keys [${rightKeys.mkString(",")}] =>" + + s"[${joinKeys.mkString(",")}] given row $row") (joinKeys, row) } // Do the join. - val joined = filterNulls(leftWithKeys).join(filterNulls(rightWithKeys)) + val joined = filterNulls(leftWithKeys).joinLocally(filterNulls(rightWithKeys)) // Drop join keys and merge input tuples. joined.map { case (_, (leftTuple, rightTuple)) => buildRow(leftTuple ++ rightTuple) } } @@ -46,7 +52,7 @@ case class SparkEquiInnerJoin( */ protected def filterNulls(rdd: RDD[(Seq[Any], Row)]) = rdd.filter { - case (key: Seq[_], _) => !key.map(_ == null).reduceLeft(_ || _) + case (key: Seq[_], _) => !key.exists(_ == null) } } @@ -62,8 +68,11 @@ case class BroadcastNestedLoopJoin( streamed: SharkPlan, broadcast: SharkPlan, joinType: JoinType, condition: Option[Expression]) (@transient sc: SharkContext) extends BinaryNode { + // TODO: Override requiredChildDistribution. + + override def outputPartitioning: Partitioning = streamed.outputPartitioning - override def otherCopyArgs = sc :: Nil + override def otherCopyArgs = sc :: Nil def output = left.output ++ right.output diff --git a/src/main/scala/catalyst/expressions/Evaluate.scala b/src/main/scala/catalyst/expressions/Evaluate.scala index 1d90ba2a77680..12ff84a3a28d7 100644 --- a/src/main/scala/catalyst/expressions/Evaluate.scala +++ b/src/main/scala/catalyst/expressions/Evaluate.scala @@ -36,7 +36,7 @@ object Evaluate extends Logging { @inline def n2(e1: Expression, e2: Expression, f: ((Numeric[Any], Any, Any) => Any)): Any = { if (e1.dataType != e2.dataType) { - throw new OptimizationException(e, s"Types do not match ${e1.dataType} != ${e2.dataType}") + throw new TreeNodeException(e, s"Types do not match ${e1.dataType} != ${e2.dataType}") } val evalE1 = eval(e1) @@ -56,7 +56,7 @@ object Evaluate extends Logging { @inline def f2(e1: Expression, e2: Expression, f: ((Fractional[Any], Any, Any) => Any)): Any = { if (e1.dataType != e2.dataType) { - throw new OptimizationException(e, s"Types do not match ${e1.dataType} != ${e2.dataType}") + throw new TreeNodeException(e, s"Types do not match ${e1.dataType} != ${e2.dataType}") } val evalE1 = eval(e1) @@ -76,7 +76,7 @@ object Evaluate extends Logging { @inline def i2(e1: Expression, e2: Expression, f: ((Integral[Any], Any, Any) => Any)): Any = { if (e1.dataType != e2.dataType) { - throw new OptimizationException(e, s"Types do not match ${e1.dataType} != ${e2.dataType}") + throw new TreeNodeException(e, s"Types do not match ${e1.dataType} != ${e2.dataType}") } val evalE1 = eval(e1) val evalE2 = eval(e2) @@ -230,7 +230,7 @@ object Evaluate extends Logging { /* References to input tuples */ case br @ BoundReference(inputTuple, ordinal, _) => try input(inputTuple)(ordinal) catch { case iob: IndexOutOfBoundsException => - throw new OptimizationException(br, s"Reference not in tuple: $input") + throw new TreeNodeException(br, s"Reference not in tuple: $input") } /* Functions */ @@ -241,9 +241,9 @@ object Evaluate extends Logging { implementedFunction.evaluate(implementedFunction.children.map(eval)) case a: Attribute => - throw new OptimizationException(a, + throw new TreeNodeException(a, "Unable to evaluate unbound reference without access to the input schema.") - case other => throw new OptimizationException(other, "evaluation not implemented") + case other => throw new TreeNodeException(other, "evaluation not implemented") } val resultType = if (result == null) "null" else result.getClass.getName diff --git a/src/main/scala/catalyst/expressions/predicates.scala b/src/main/scala/catalyst/expressions/predicates.scala index 0dfa1551a04f2..b8d578dc1c546 100644 --- a/src/main/scala/catalyst/expressions/predicates.scala +++ b/src/main/scala/catalyst/expressions/predicates.scala @@ -93,7 +93,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi def dataType = { if (!resolved) { throw new UnresolvedException( - this, s"Invalid types: ${trueValue.dataType}, ${falseValue.dataType}") + this, s"Can not resolve due to differing types ${trueValue.dataType}, ${falseValue.dataType}") } trueValue.dataType } diff --git a/src/main/scala/catalyst/plans/QueryPlan.scala b/src/main/scala/catalyst/plans/QueryPlan.scala index e06ecab6c2a21..9ada0be72b8ff 100644 --- a/src/main/scala/catalyst/plans/QueryPlan.scala +++ b/src/main/scala/catalyst/plans/QueryPlan.scala @@ -1,8 +1,8 @@ package catalyst package plans -import expressions.{Attribute, Expression} -import trees._ +import catalyst.expressions.{SortOrder, Attribute, Expression} +import catalyst.trees._ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] { self: PlanType with Product => @@ -44,7 +44,8 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy val newArgs = productIterator.map { case e: Expression => transformExpressionDown(e) case Some(e: Expression) => Some(transformExpressionDown(e)) - case seq: Seq[_] => seq.map { + case m: Map[_,_] => m + case seq: Traversable[_] => seq.map { case e: Expression => transformExpressionDown(e) case other => other } @@ -75,7 +76,8 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy val newArgs = productIterator.map { case e: Expression => transformExpressionUp(e) case Some(e: Expression) => Some(transformExpressionUp(e)) - case seq: Seq[_] => seq.map { + case m: Map[_,_] => m + case seq: Traversable[_] => seq.map { case e: Expression => transformExpressionUp(e) case other => other } @@ -98,7 +100,7 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy productIterator.flatMap { case e: Expression => e :: Nil case Some(e: Expression) => e :: Nil - case seq: Seq[_] => seq.flatMap { + case seq: Traversable[_] => seq.flatMap { case e: Expression => e :: Nil case other => Nil } diff --git a/src/main/scala/catalyst/plans/logical/LogicalPlan.scala b/src/main/scala/catalyst/plans/logical/LogicalPlan.scala index 7b493374fc779..b6e6d637e95e9 100644 --- a/src/main/scala/catalyst/plans/logical/LogicalPlan.scala +++ b/src/main/scala/catalyst/plans/logical/LogicalPlan.scala @@ -45,7 +45,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { case a :: Nil => Some(a) // One match, use it. case Nil => None // No matches. case ambiguousReferences => - throw new OptimizationException( + throw new TreeNodeException( this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}") } } diff --git a/src/main/scala/catalyst/plans/physical/partitioning.scala b/src/main/scala/catalyst/plans/physical/partitioning.scala new file mode 100644 index 0000000000000..dd91b5beb4f95 --- /dev/null +++ b/src/main/scala/catalyst/plans/physical/partitioning.scala @@ -0,0 +1,183 @@ +package catalyst +package plans +package physical + +import expressions._ +import types._ + +/** + * Specifies how tuples that share common expressions will be distributed when a query is executed + * in parallel on many machines. Distribution can be used to refer to two distinct physical + * properties: + * - Inter-node partitioning of data: In this case the distribution describes how tuples are + * partitioned across physical machines in a cluster. Knowing this property allows some + * operators (e.g., Aggregate) to perform partition local operations instead of global ones. + * - Intra-partition ordering of data: In this case the distribution describes guarantees made + * about how tuples are distributed within a single partition. + */ +sealed trait Distribution + +/** + * Represents a distribution where no promises are made about co-location of data. + */ +case object UnspecifiedDistribution extends Distribution + +/** + * Represents a distribution that only has a single partition and all tuples of the dataset + * are co-located. + */ +case object AllTuples extends Distribution + +/** + * Represents data where tuples that share the same values for the `clustering` + * [[catalyst.expressions.Expression Expressions]] will be co-located. Based on the context, this + * can mean such tuples are either co-located in the same partition or they will be contiguous + * within a single partition. + */ +case class ClusteredDistribution(clustering: Seq[Expression]) extends Distribution { + require( + clustering != Nil, + "The clustering expressions of a ClusteredDistribution should not be Nil. " + + "An AllTuples should be used to represent a distribution that only has " + + "a single partition.") +} + +/** + * Represents data where tuples have been ordered according to the `ordering` + * [[catalyst.expressions.Expression Expressions]]. This is a strictly stronger guarantee than + * [[ClusteredDistribution]] as an ordering will ensure that tuples that share the same value for + * the ordering expressions are contiguous and will never be split across partitions. + */ +case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { + require( + ordering != Nil, + "The ordering expressions of a OrderedDistribution should not be Nil. " + + "An AllTuples should be used to represent a distribution that only has " + + "a single partition.") + + def clustering = ordering.map(_.child).toSet +} + +sealed trait Partitioning { + /** Returns the number of partitions that the data is split across */ + val numPartitions: Int + + /** + * Returns true iff the guarantees made by this + * [[catalyst.plans.physical.Partitioning Partitioning]] are sufficient to satisfy + * the partitioning scheme mandated by the `required` + * [[catalyst.plans.physical.Distribution Distribution]], i.e. the current dataset does not + * need to be re-partitioned for the `required` Distribution (it is possible that tuples within + * a partition need to be reorganized). + */ + def satisfies(required: Distribution): Boolean + + /** + * Returns true iff all distribution guarantees made by this partitioning can also be made + * for the `other` specified partitioning. + * For example, two [[catalyst.plans.physical.HashPartitioning HashPartitioning]]s are + * only compatible if the `numPartitions` of them is the same. + */ + def compatibleWith(other: Partitioning): Boolean +} + +case class UnknownPartitioning(numPartitions: Int) extends Partitioning { + override def satisfies(required: Distribution): Boolean = required match { + case UnspecifiedDistribution => true + case _ => false + } + + override def compatibleWith(other: Partitioning): Boolean = other match { + case UnknownPartitioning(_) => true + case _ => false + } +} + +case object SinglePartition extends Partitioning { + val numPartitions = 1 + + override def satisfies(required: Distribution): Boolean = true + + override def compatibleWith(other: Partitioning) = other match { + case SinglePartition => true + case _ => false + } +} + +case object BroadcastPartitioning extends Partitioning { + val numPartitions = 1 + + override def satisfies(required: Distribution): Boolean = true + + override def compatibleWith(other: Partitioning) = other match { + case SinglePartition => true + case _ => false + } +} + +/** + * Represents a partitioning where rows are split up across partitions based on the hash + * of `expressions`. All rows where `expressions` evaluate to the same values are guaranteed to be + * in the same partition. + */ +case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) + extends Expression + with Partitioning { + + def children = expressions + def references = expressions.flatMap(_.references).toSet + def nullable = false + def dataType = IntegerType + + lazy val clusteringSet = expressions.toSet + + override def satisfies(required: Distribution): Boolean = required match { + case UnspecifiedDistribution => true + case ClusteredDistribution(requiredClustering) => + clusteringSet.subsetOf(requiredClustering.toSet) + case _ => false + } + + override def compatibleWith(other: Partitioning) = other match { + case BroadcastPartitioning => true + case h: HashPartitioning if h == this => true + case _ => false + } +} + +/** + * Represents a partitioning where rows are split across partitions based on some total ordering of + * the expressions specified in `ordering`. When data is partitioned in this manner the following + * two conditions are guaranteed to hold: + * - All row where the expressions in `ordering` evaluate to the same values will be in the same + * partition. + * - Each partition will have a `min` and `max` row, relative to the given ordering. All rows + * that are in between `min` and `max` in this `ordering` will reside in this partition. + */ +case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) + extends Expression + with Partitioning { + + def children = ordering + def references = ordering.flatMap(_.references).toSet + def nullable = false + def dataType = IntegerType + + lazy val clusteringSet = ordering.map(_.child).toSet + + override def satisfies(required: Distribution): Boolean = required match { + case UnspecifiedDistribution => true + case OrderedDistribution(requiredOrdering) => + val minSize = Seq(requiredOrdering.size, ordering.size).min + requiredOrdering.take(minSize) == ordering.take(minSize) + case ClusteredDistribution(requiredClustering) => + clusteringSet.subsetOf(requiredClustering.toSet) + case _ => false + } + + override def compatibleWith(other: Partitioning) = other match { + case BroadcastPartitioning => true + case r: RangePartitioning if r == this => true + case _ => false + } +} \ No newline at end of file diff --git a/src/main/scala/catalyst/trees/TreeNode.scala b/src/main/scala/catalyst/trees/TreeNode.scala index bb6dfdd3110d5..814686d8936eb 100644 --- a/src/main/scala/catalyst/trees/TreeNode.scala +++ b/src/main/scala/catalyst/trees/TreeNode.scala @@ -171,7 +171,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { } else { arg } - case args: Seq[_] => args.map { + case m: Map[_,_] => m + case args: Traversable[_] => args.map { case arg: TreeNode[_] if children contains arg => val newChild = arg.asInstanceOf[BaseType].transformDown(rule) if (!(newChild fastEquals arg)) { @@ -214,7 +215,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { } else { arg } - case args: Seq[_] => args.map { + case m: Map[_,_] => m + case args: Traversable[_] => args.map { case arg: TreeNode[_] if children contains arg => val newChild = arg.asInstanceOf[BaseType].transformUp(rule) if (!(newChild fastEquals arg)) { @@ -254,7 +256,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { } } catch { case e: java.lang.IllegalArgumentException => - throw new OptimizationException( + throw new TreeNodeException( this, s"Failed to copy node. Is otherCopyArgs specified correctly for $nodeName?") } } @@ -271,7 +273,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { def argString: String = productIterator.flatMap { case tn: TreeNode[_] if children contains tn => Nil case tn: TreeNode[_] if tn.toString contains "\n" => s"(${tn.simpleString})" :: Nil - case seq: Seq[_] => seq.mkString("{", ",", "}") :: Nil + case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil + case set: Set[_] => set.mkString("{", ",", "}") :: Nil case other => other :: Nil }.mkString(", ") diff --git a/src/main/scala/org/apache/spark/rdd/SharkPairRDDFunctions.scala b/src/main/scala/org/apache/spark/rdd/SharkPairRDDFunctions.scala new file mode 100644 index 0000000000000..1599bb51cda4e --- /dev/null +++ b/src/main/scala/org/apache/spark/rdd/SharkPairRDDFunctions.scala @@ -0,0 +1,84 @@ +package org.apache.spark.rdd + +import scala.language.implicitConversions + +import scala.reflect._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark._ +import org.apache.spark.Aggregator +import org.apache.spark.SparkContext._ +import org.apache.spark.util.collection.AppendOnlyMap + +/** + * Extra functions for Shark available on RDDs of (key, value) pairs through + * an implicit conversion. + * Import `org.apache.spark.SharkPairRDDFunctions._` at the top of your program to + * use these functions. + */ +class SharkPairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) + extends Logging + with Serializable { + + /** + * Cogroup corresponding partitions of `this` and `other`. These two RDDs should have + * the same number of partitions. Partitions of these two RDDs are cogrouped + * according to the indexes of partitions. If we have two RDDs and + * each of them has n partitions, we will cogroup the partition i from `this` + * with the partition i from `other`. + * This function will not introduce a shuffling operation. + */ + def cogroupLocally[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + val cg = self.zipPartitions(other)((iter1:Iterator[(K, V)], iter2:Iterator[(K, W)]) => { + val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] + + val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => { + if (hadVal) oldVal else Array.fill(2)(new ArrayBuffer[Any]) + } + + val getSeq = (k: K) => { + map.changeValue(k, update) + } + + iter1.foreach { kv => getSeq(kv._1)(0) += kv._2 } + iter2.foreach { kv => getSeq(kv._1)(1) += kv._2 } + + map.iterator + }).mapValues { case Seq(vs, ws) => (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])} + + cg + } + + /** + * Group the values for each key within a partition of the RDD into a single sequence. + * This function will not introduce a shuffling operation. + */ + def groupByKeyLocally(): RDD[(K, Seq[V])] = { + def createCombiner(v: V) = ArrayBuffer(v) + def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + val aggregator = new Aggregator[K, V, ArrayBuffer[V]](createCombiner _, mergeValue _, null) + val bufs = self.mapPartitionsWithContext((context, iter) => { + new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) + }, preservesPartitioning = true) + bufs.asInstanceOf[RDD[(K, Seq[V])]] + } + + /** + * Join corresponding partitions of `this` and `other`. + * If we have two RDDs and each of them has n partitions, + * we will join the partition i from `this` with the partition i from `other`. + * This function will not introduce a shuffling operation. + */ + def joinLocally[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + cogroupLocally(other).flatMapValues { + case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (v, w) + } + } +} + +object SharkPairRDDFunctions { + implicit def rddToSharkPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = + new SharkPairRDDFunctions(rdd) +} + + diff --git a/src/test/scala/catalyst/execution/DistributionTests.scala b/src/test/scala/catalyst/execution/DistributionTests.scala new file mode 100644 index 0000000000000..2a3d85b5db40b --- /dev/null +++ b/src/test/scala/catalyst/execution/DistributionTests.scala @@ -0,0 +1,156 @@ +package catalyst +package plans +package physical + +import org.scalatest.FunSuite + +import catalyst.dsl._ + +class DistributionTests extends FunSuite { + + protected def checkSatisfied( + inputPartitioning: Partitioning, + requiredDistribution: Distribution, + satisfied: Boolean) { + if(inputPartitioning.satisfies(requiredDistribution) != satisfied) + fail( + s""" + |== Input Partitioning == + |$inputPartitioning + |== Required Distribution == + |$requiredDistribution + |== Does input partitioning satisfy required distribution? == + |Expected $satisfied got ${inputPartitioning.satisfies(requiredDistribution)} + """.stripMargin) + } + + test("HashPartitioning is the output partitioning") { + // Cases which do not need an exchange between two data properties. + checkSatisfied( + HashPartitioning(Seq('a, 'b, 'c), 10), + UnspecifiedDistribution, + true) + + checkSatisfied( + HashPartitioning(Seq('a, 'b, 'c), 10), + ClusteredDistribution(Seq('a, 'b, 'c)), + true) + + checkSatisfied( + HashPartitioning(Seq('b, 'c), 10), + ClusteredDistribution(Seq('a, 'b, 'c)), + true) + + checkSatisfied( + SinglePartition, + ClusteredDistribution(Seq('a, 'b, 'c)), + true) + + checkSatisfied( + SinglePartition, + OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc)), + true) + + // Cases which need an exchange between two data properties. + checkSatisfied( + HashPartitioning(Seq('a, 'b, 'c), 10), + ClusteredDistribution(Seq('b, 'c)), + false) + + checkSatisfied( + HashPartitioning(Seq('a, 'b, 'c), 10), + ClusteredDistribution(Seq('d, 'e)), + false) + + checkSatisfied( + HashPartitioning(Seq('a, 'b, 'c), 10), + AllTuples, + false) + + checkSatisfied( + HashPartitioning(Seq('a, 'b, 'c), 10), + OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc)), + false) + + checkSatisfied( + HashPartitioning(Seq('b, 'c), 10), + OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc)), + false) + + // TODO: We should check functional dependencies + /* + checkSatisfied( + ClusteredDistribution(Seq('b)), + ClusteredDistribution(Seq('b + 1)), + true) + */ + } + + test("RangePartitioning is the output partitioning") { + // Cases which do not need an exchange between two data properties. + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + UnspecifiedDistribution, + true) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc)), + true) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + OrderedDistribution(Seq('a.asc, 'b.asc)), + true) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + OrderedDistribution(Seq('a.asc, 'b.asc, 'c.asc, 'd.desc)), + true) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + ClusteredDistribution(Seq('a, 'b, 'c)), + true) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + ClusteredDistribution(Seq('c, 'b, 'a)), + true) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + ClusteredDistribution(Seq('b, 'c, 'a, 'd)), + true) + + // Cases which need an exchange between two data properties. + // TODO: We can have an optimization to first sort the dataset + // by a.asc and then sort b, and c in a partition. This optimization + // should tradeoff the benefit of a less number of Exchange operators + // and the parallelism. + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + OrderedDistribution(Seq('a.asc, 'b.desc, 'c.asc)), + false) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + OrderedDistribution(Seq('b.asc, 'a.asc)), + false) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + ClusteredDistribution(Seq('a, 'b)), + false) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + ClusteredDistribution(Seq('c, 'd)), + false) + + checkSatisfied( + RangePartitioning(Seq('a.asc, 'b.asc, 'c.asc), 10), + AllTuples, + false) + } +} \ No newline at end of file diff --git a/src/test/scala/catalyst/execution/HiveComparisonTest.scala b/src/test/scala/catalyst/execution/HiveComparisonTest.scala index e829de47ff31c..4a557c06c6daf 100644 --- a/src/test/scala/catalyst/execution/HiveComparisonTest.scala +++ b/src/test/scala/catalyst/execution/HiveComparisonTest.scala @@ -5,6 +5,7 @@ import java.io._ import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import frontend.hive.{ExplainCommand, Command} +import plans.physical._ import util._ /** @@ -86,7 +87,11 @@ abstract class HiveComparisonTest extends FunSuite with BeforeAndAfterAll with G hiveFailedDirectory.mkdir() // Not atomic! /** All directories that contain per-query output files */ - val outputDirectories = Seq(passedDirectory, failedDirectory, wrongDirectory, hiveFailedDirectory) + val outputDirectories = Seq( + passedDirectory, + failedDirectory, + wrongDirectory, + hiveFailedDirectory) protected val cacheDigest = java.security.MessageDigest.getInstance("MD5") protected def getMd5(str: String): String = { @@ -95,12 +100,17 @@ abstract class HiveComparisonTest extends FunSuite with BeforeAndAfterAll with G new java.math.BigInteger(1, digest.digest).toString(16) } - protected def prepareAnswer(sharkQuery: TestShark.type#SharkSqlQuery, answer: Seq[String]): Seq[String] = { + protected def prepareAnswer( + sharkQuery: TestShark.type#SharkSqlQuery, + answer: Seq[String]): Seq[String] = { val orderedAnswer = sharkQuery.parsed match { // Clean out non-deterministic time schema info. case _: Command => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _ => - val isOrdered = sharkQuery.executedPlan.collect { case s: Sort => s}.nonEmpty + // TODO: Really we only care about the final total ordering here... + val isOrdered = sharkQuery.executedPlan.collect { + case s @ Sort(_, global, _) if global => s + }.nonEmpty // If the query results aren't sorted, then sort them to ensure deterministic answers. if (!isOrdered) answer.sorted else answer } diff --git a/src/test/scala/catalyst/execution/HiveCompatibility.scala b/src/test/scala/catalyst/execution/HiveCompatibility.scala index 4d4a77269813b..ce92e788db825 100644 --- a/src/test/scala/catalyst/execution/HiveCompatibility.scala +++ b/src/test/scala/catalyst/execution/HiveCompatibility.scala @@ -293,7 +293,6 @@ class HiveCompatibility extends HiveQueryFileTest { "input40", "input41", "input4_cb_delim", - "input4_limit", "input6", "input7", "input8",