Skip to content

Commit

Permalink
[SQL]Extract the joinkeys from join condition
Browse files Browse the repository at this point in the history
Extract the join keys from equality conditions, that can be evaluated using equi-join.

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#1190 from chenghao-intel/extract_join_keys and squashes the following commits:

4a1060a [Cheng Hao] Fix some of the small issues
ceb4924 [Cheng Hao] Remove the redundant pattern of join keys extraction
cec34e8 [Cheng Hao] Update the code style issues
dcc4584 [Cheng Hao] Extract the joinkeys from join condition
  • Loading branch information
chenghao-intel authored and conviva-zz committed Sep 4, 2014
1 parent 664a4f7 commit ddf5098
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,11 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] {
* Check https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior for more details
*/
object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
// split the condition expression into 3 parts,
// (canEvaluateInLeftSide, canEvaluateInRightSide, haveToEvaluateWithBothSide)
/**
* Splits join condition expressions into three categories based on the attributes required
* to evaluate them.
* @returns (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
val (leftEvaluateCondition, rest) =
condition.partition(_.references subsetOf left.outputSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,57 +105,39 @@ object PhysicalOperation extends PredicateHelper {
}

/**
* A pattern that finds joins with equality conditions that can be evaluated using hashing
* techniques. For inner joins, any filters on top of the join operator are also matched.
* A pattern that finds joins with equality conditions that can be evaluated using equi-join.
*/
object HashFilteredJoin extends Logging with PredicateHelper {
object ExtractEquiJoinKeys extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
// All predicates can be evaluated for inner join (i.e., those that are in the ON
// clause and WHERE clause.)
case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
// All predicates can be evaluated for left semi join (those that are in the WHERE
// clause can only from left table, so they can all be pushed down.)
case FilteredOperation(predicates, join @ Join(left, right, LeftSemi, condition)) =>
logger.debug(s"Considering hash left semi join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
case join @ Join(left, right, joinType, condition) =>
logger.debug(s"Considering hash join on: $condition")
splitPredicates(condition.toSeq, join)
case _ => None
}

// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
val Join(left, right, joinType, _) = join
val (joinPredicates, otherPredicates) =
allPredicates.flatMap(splitConjunctivePredicates).partition {
case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
logger.debug(s"Considering join on: $condition")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
val (joinPredicates, otherPredicates) =
condition.map(splitConjunctivePredicates).getOrElse(Nil).partition {
case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}

val joinKeys = joinPredicates.map {
case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}

val joinKeys = joinPredicates.map {
case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}

// Do not consider this strategy if there are no join keys.
if (joinKeys.nonEmpty) {
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)

Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
} else {
logger.debug(s"Avoiding hash join with no join keys.")
None
}
if (joinKeys.nonEmpty) {
logger.debug(s"leftKeys:${leftKeys} | rightKeys:${rightKeys}")
Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
} else {
None
}
case _ => None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Find left semi joins where at least some predicates can be evaluated by matching hash
// keys using the HashFilteredJoin pattern.
case HashFilteredJoin(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
// Find left semi joins where at least some predicates can be evaluated by matching join keys
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>
val semiJoin = execution.LeftSemiJoinHash(
leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
Expand All @@ -46,7 +45,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

/**
* Uses the HashFilteredJoin pattern to find joins where at least some of the predicates can be
* Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be
* evaluated by matching hash keys.
*/
object HashJoin extends Strategy with PredicateHelper {
Expand All @@ -65,7 +64,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case HashFilteredJoin(
case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
Expand All @@ -75,7 +74,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

case HashFilteredJoin(
case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
Expand All @@ -85,7 +84,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if broadcastTables.contains(b.tableName) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)

case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.ShuffledHashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
Expand Down

0 comments on commit ddf5098

Please sign in to comment.