Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cascading3] Adds a checkpoint when hashjoin is merged with one of its sides #1557

Merged
merged 4 commits into from
May 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 79 additions & 9 deletions scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object RichPipe extends java.io.Serializable {

def getNextName: String = "_pipe_" + nextPipe.incrementAndGet.toString

def assignName(p: Pipe) = new Pipe(getNextName, p)
def assignName(p: Pipe): Pipe = new Pipe(getNextName, p)

private val REDUCER_KEY = "mapred.reduce.tasks"
/**
Expand Down Expand Up @@ -95,6 +95,68 @@ object RichPipe extends java.io.Serializable {
p
}

/**
* Return true if a pipe is a source Pipe (has no parents / previous) and isn't a
* Splice.
*/
def isSourcePipe(pipe: Pipe): Boolean = {
pipe.getParent == null &&
(pipe.getPrevious == null || pipe.getPrevious.isEmpty) &&
(!pipe.isInstanceOf[Splice])
}

def getPreviousPipe(p: Pipe): Option[Pipe] = {
if (p.getPrevious != null && p.getPrevious.length == 1) p.getPrevious.headOption
else None
}

/*
* If hashJoinPipe represents a hashjoin, this method checks if
* hashJoinOperandPipe is one of the two sides in that hashjoin.
*/
def isHashJoinedWithPipe(hashJoinPipe: Pipe, hashJoinOperandPipe: Pipe): Boolean = {
// collects all Eachs ending with a non-Each
@annotation.tailrec
def getChainOfEachs(p: Pipe, collect: List[Pipe] = Nil): List[Pipe] =
p match {
case p if isSourcePipe(p) =>
collect :+ p
case each: Each =>
getChainOfEachs(each.getPrevious.head, collect :+ each)
// we don't use a special Pipe subtype for the assignName method
// and we can't. all Pipe types need to be defined in cascading
// because cascading assumes it knows all the Pipe subtypes
// and fails to match any others (think of it as a sealed trait)
// So we handle all special types before checking for the assignName case
case other @ (_: Checkpoint | _: Operator | _: Splice | _: SubAssembly) =>
collect :+ other
case renamedPipe: Pipe =>
// this is the assignName case
getChainOfEachs(renamedPipe.getPrevious.head, collect :+ renamedPipe)
}

def getJoinedPipeSet(p: HashJoin): Set[Pipe] =
p.getPrevious match {
case a @ Array(_, _) =>
// collect nodes up the left and right sides
a.flatMap { p => getChainOfEachs(p) }.toSet
case other =>
throw new IllegalStateException(s"More than two sides found in cascading's HashJoin pipe: $other")
}

hashJoinPipe match {
case hj: HashJoin =>
getJoinedPipeSet(hj).intersect(getChainOfEachs(hashJoinOperandPipe).toSet).nonEmpty
case m: Merge =>
m.getPrevious // gets all merged pipes
.exists { p => isHashJoinedWithPipe(p, hashJoinOperandPipe) }
case e: Each =>
getPreviousPipe(e)
.exists { p => isHashJoinedWithPipe(p, hashJoinOperandPipe) }
case other =>
false
}
}
}

/**
Expand All @@ -106,6 +168,7 @@ class RichPipe(val pipe: Pipe) extends java.io.Serializable with JoinAlgorithms
// We need this for the implicits
import Dsl._
import RichPipe.assignName
import RichPipe.isHashJoinedWithPipe

/**
* Rename the current pipe
Expand Down Expand Up @@ -210,15 +273,22 @@ class RichPipe(val pipe: Pipe) extends java.io.Serializable with JoinAlgorithms
/**
* Merge or Concatenate several pipes together with this one:
*/
def ++(that: Pipe): Pipe = {
if (this.pipe == that) {
// Cascading fails on self merge:
// solution by Jack Guo
new Merge(assignName(this.pipe), assignName(new Each(that, new Identity)))
} else {
new Merge(assignName(this.pipe), assignName(that))
def ++(that: Pipe): Pipe =
(this.pipe, that) match {
case (a, b) if a == b =>
// Cascading fails on self merge:
// solution by Jack Guo
new Merge(assignName(a), assignName(new Each(b, new Identity)))
// special handling for cases where one side of the hashjoin is merged
// with the hashjoin result. Cascading no longer allows it,
// so we add a checkpoint to the join result as a workaround
case (a, b) if isHashJoinedWithPipe(a, b) =>
new Merge(assignName(new Checkpoint(a)), assignName(b))
case (a, b) if isHashJoinedWithPipe(b, a) =>
new Merge(assignName(a), assignName(new Checkpoint(b)))
case (a, b) =>
new Merge(assignName(a), assignName(b))
}
}

/**
* Group all tuples down to one reducer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ trait HashJoinable[K, +V] extends CoGroupable[K, V] with KeyedPipe[K] {
case eachPipe: Each =>
if (canSkipEachOperation(eachPipe.getOperation, mode)) {
//need to recurse down to see if parent pipe is ok
getPreviousPipe(eachPipe).exists(prevPipe => isSafeToSkipForceToDisk(prevPipe, mode))
RichPipe.getPreviousPipe(eachPipe).exists(prevPipe => isSafeToSkipForceToDisk(prevPipe, mode))
} else false
case _: Checkpoint => true
case _: GroupBy => true
case _: CoGroup => true
case _: Every => true
case p if isSourcePipe(p) => true
case p if RichPipe.isSourcePipe(p) => true
case _ => false
}
}
Expand Down Expand Up @@ -129,19 +129,4 @@ trait HashJoinable[K, +V] extends CoGroupable[K, V] with KeyedPipe[K] {
case _ => false //default to false
}
}

private def getPreviousPipe(p: Pipe): Option[Pipe] = {
if (p.getPrevious != null && p.getPrevious.length == 1) p.getPrevious.headOption
else None
}

/**
* Return true if a pipe is a source Pipe (has no parents / previous) and isn't a
* Splice.
*/
private def isSourcePipe(pipe: Pipe): Boolean = {
pipe.getParent == null &&
(pipe.getPrevious == null || pipe.getPrevious.isEmpty) &&
(!pipe.isInstanceOf[Splice])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.io.{ OutputStream, InputStream, Serializable }
import java.util.Random

import cascading.flow.FlowDef
import cascading.pipe.{ Each, Pipe }
import cascading.pipe.{ Checkpoint, Each, Pipe, Merge }
import cascading.tap.Tap
import cascading.tuple.{ Fields, TupleEntry }
import com.twitter.algebird.{ Aggregator, Monoid, Semigroup }
Expand Down Expand Up @@ -1088,7 +1088,18 @@ final case class MergedTypedPipe[T](left: TypedPipe[T], right: TypedPipe[T]) ext
// there is no actual merging here, no need to rename:
merged.head
} else {
new cascading.pipe.Merge(merged.map(RichPipe.assignName): _*)
merged.reduce[Pipe] {
case (left, right) =>
// special handling for cases where one side of the hashjoin is merged
// with the hashjoin result. Cascading no longer allows it,
// so we add a checkpoint to the join result as a workaround
if (RichPipe.isHashJoinedWithPipe(left, right))
new Merge(RichPipe.assignName(new Checkpoint(left)), RichPipe.assignName(right))
else if (RichPipe.isHashJoinedWithPipe(right, left))
new Merge(RichPipe.assignName(left), RichPipe.assignName(new Checkpoint(right)))
else
new Merge(RichPipe.assignName(left), RichPipe.assignName(right))
}
}
}

Expand Down
Loading