Skip to content

Commit

Permalink
Merge pull request #1557 from twitter/rubanm/cascading3/hashjoin_merge
Browse files Browse the repository at this point in the history
[cascading3] Adds a checkpoint when hashjoin is merged with one of its sides
  • Loading branch information
rubanm committed May 9, 2016
2 parents a764128 + fb0e4de commit cb04163
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 47 deletions.
88 changes: 79 additions & 9 deletions scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object RichPipe extends java.io.Serializable {

def getNextName: String = "_pipe_" + nextPipe.incrementAndGet.toString

def assignName(p: Pipe) = new Pipe(getNextName, p)
def assignName(p: Pipe): Pipe = new Pipe(getNextName, p)

private val REDUCER_KEY = "mapred.reduce.tasks"
/**
Expand Down Expand Up @@ -95,6 +95,68 @@ object RichPipe extends java.io.Serializable {
p
}

/**
* Return true if a pipe is a source Pipe (has no parents / previous) and isn't a
* Splice.
*/
def isSourcePipe(pipe: Pipe): Boolean = {
pipe.getParent == null &&
(pipe.getPrevious == null || pipe.getPrevious.isEmpty) &&
(!pipe.isInstanceOf[Splice])
}

def getPreviousPipe(p: Pipe): Option[Pipe] = {
if (p.getPrevious != null && p.getPrevious.length == 1) p.getPrevious.headOption
else None
}

/*
* If hashJoinPipe represents a hashjoin, this method checks if
* hashJoinOperandPipe is one of the two sides in that hashjoin.
*/
def isHashJoinedWithPipe(hashJoinPipe: Pipe, hashJoinOperandPipe: Pipe): Boolean = {
// collects all Eachs ending with a non-Each
@annotation.tailrec
def getChainOfEachs(p: Pipe, collect: List[Pipe] = Nil): List[Pipe] =
p match {
case p if isSourcePipe(p) =>
collect :+ p
case each: Each =>
getChainOfEachs(each.getPrevious.head, collect :+ each)
// we don't use a special Pipe subtype for the assignName method
// and we can't. all Pipe types need to be defined in cascading
// because cascading assumes it knows all the Pipe subtypes
// and fails to match any others (think of it as a sealed trait)
// So we handle all special types before checking for the assignName case
case other @ (_: Checkpoint | _: Operator | _: Splice | _: SubAssembly) =>
collect :+ other
case renamedPipe: Pipe =>
// this is the assignName case
getChainOfEachs(renamedPipe.getPrevious.head, collect :+ renamedPipe)
}

def getJoinedPipeSet(p: HashJoin): Set[Pipe] =
p.getPrevious match {
case a @ Array(_, _) =>
// collect nodes up the left and right sides
a.flatMap { p => getChainOfEachs(p) }.toSet
case other =>
throw new IllegalStateException(s"More than two sides found in cascading's HashJoin pipe: $other")
}

hashJoinPipe match {
case hj: HashJoin =>
getJoinedPipeSet(hj).intersect(getChainOfEachs(hashJoinOperandPipe).toSet).nonEmpty
case m: Merge =>
m.getPrevious // gets all merged pipes
.exists { p => isHashJoinedWithPipe(p, hashJoinOperandPipe) }
case e: Each =>
getPreviousPipe(e)
.exists { p => isHashJoinedWithPipe(p, hashJoinOperandPipe) }
case other =>
false
}
}
}

/**
Expand All @@ -106,6 +168,7 @@ class RichPipe(val pipe: Pipe) extends java.io.Serializable with JoinAlgorithms
// We need this for the implicits
import Dsl._
import RichPipe.assignName
import RichPipe.isHashJoinedWithPipe

/**
* Rename the current pipe
Expand Down Expand Up @@ -210,15 +273,22 @@ class RichPipe(val pipe: Pipe) extends java.io.Serializable with JoinAlgorithms
/**
* Merge or Concatenate several pipes together with this one:
*/
def ++(that: Pipe): Pipe = {
if (this.pipe == that) {
// Cascading fails on self merge:
// solution by Jack Guo
new Merge(assignName(this.pipe), assignName(new Each(that, new Identity)))
} else {
new Merge(assignName(this.pipe), assignName(that))
def ++(that: Pipe): Pipe =
(this.pipe, that) match {
case (a, b) if a == b =>
// Cascading fails on self merge:
// solution by Jack Guo
new Merge(assignName(a), assignName(new Each(b, new Identity)))
// special handling for cases where one side of the hashjoin is merged
// with the hashjoin result. Cascading no longer allows it,
// so we add a checkpoint to the join result as a workaround
case (a, b) if isHashJoinedWithPipe(a, b) =>
new Merge(assignName(new Checkpoint(a)), assignName(b))
case (a, b) if isHashJoinedWithPipe(b, a) =>
new Merge(assignName(a), assignName(new Checkpoint(b)))
case (a, b) =>
new Merge(assignName(a), assignName(b))
}
}

/**
* Group all tuples down to one reducer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ trait HashJoinable[K, +V] extends CoGroupable[K, V] with KeyedPipe[K] {
case eachPipe: Each =>
if (canSkipEachOperation(eachPipe.getOperation, mode)) {
//need to recurse down to see if parent pipe is ok
getPreviousPipe(eachPipe).exists(prevPipe => isSafeToSkipForceToDisk(prevPipe, mode))
RichPipe.getPreviousPipe(eachPipe).exists(prevPipe => isSafeToSkipForceToDisk(prevPipe, mode))
} else false
case _: Checkpoint => true
case _: GroupBy => true
case _: CoGroup => true
case _: Every => true
case p if isSourcePipe(p) => true
case p if RichPipe.isSourcePipe(p) => true
case _ => false
}
}
Expand Down Expand Up @@ -129,19 +129,4 @@ trait HashJoinable[K, +V] extends CoGroupable[K, V] with KeyedPipe[K] {
case _ => false //default to false
}
}

private def getPreviousPipe(p: Pipe): Option[Pipe] = {
if (p.getPrevious != null && p.getPrevious.length == 1) p.getPrevious.headOption
else None
}

/**
* Return true if a pipe is a source Pipe (has no parents / previous) and isn't a
* Splice.
*/
private def isSourcePipe(pipe: Pipe): Boolean = {
pipe.getParent == null &&
(pipe.getPrevious == null || pipe.getPrevious.isEmpty) &&
(!pipe.isInstanceOf[Splice])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.io.{ OutputStream, InputStream, Serializable }
import java.util.Random

import cascading.flow.FlowDef
import cascading.pipe.{ Each, Pipe }
import cascading.pipe.{ Checkpoint, Each, Pipe, Merge }
import cascading.tap.Tap
import cascading.tuple.{ Fields, TupleEntry }
import com.twitter.algebird.{ Aggregator, Monoid, Semigroup }
Expand Down Expand Up @@ -1088,7 +1088,18 @@ final case class MergedTypedPipe[T](left: TypedPipe[T], right: TypedPipe[T]) ext
// there is no actual merging here, no need to rename:
merged.head
} else {
new cascading.pipe.Merge(merged.map(RichPipe.assignName): _*)
merged.reduce[Pipe] {
case (left, right) =>
// special handling for cases where one side of the hashjoin is merged
// with the hashjoin result. Cascading no longer allows it,
// so we add a checkpoint to the join result as a workaround
if (RichPipe.isHashJoinedWithPipe(left, right))
new Merge(RichPipe.assignName(new Checkpoint(left)), RichPipe.assignName(right))
else if (RichPipe.isHashJoinedWithPipe(right, left))
new Merge(RichPipe.assignName(left), RichPipe.assignName(new Checkpoint(right)))
else
new Merge(RichPipe.assignName(left), RichPipe.assignName(right))
}
}
}

Expand Down
Loading

0 comments on commit cb04163

Please sign in to comment.