Skip to content

Commit

Permalink
Fix the WritePartitioner to exactly match cascading (#1805)
Browse files Browse the repository at this point in the history
* Fix the WritePartitioner to exactly match cascading

We were failing the laws previously, sometimes taking
more than n + 1 steps where cascading took n. By
improving the logic, we fix those bugs and reach
actually exactly matching cascading.

This should allow use batching to bypass any
case of cascading taking too long to plan.

* re-enable tests

* improve comments to reply to reviews
  • Loading branch information
johnynek committed Feb 17, 2018
1 parent 2846e9a commit 4bbeada
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -615,13 +615,31 @@ object OptimizationRules {
object RemoveDuplicateForceFork extends PartialRule[TypedPipe] {
def applyWhere[T](on: Dag[TypedPipe]) = {
case ForceToDisk(ForceToDisk(t)) => ForceToDisk(t)
case ForceToDisk(WithDescriptionTypedPipe(ForceToDisk(t), desc)) =>
// we might as well only do one force to disk in this case
WithDescriptionTypedPipe(ForceToDisk(t), desc)
case ForceToDisk(Fork(t)) => ForceToDisk(t)
case Fork(Fork(t)) => Fork(t)
case Fork(ForceToDisk(t)) => ForceToDisk(t)
case Fork(t) if on.contains(ForceToDisk(t)) => ForceToDisk(t)
}
}

/**
* If a fork has no fan-out when planned, it serves no purpose
* and is safe to remove. Likewise, there is no reason
* to put a forceToDisk immediatle after a source
*/
object RemoveUselessFork extends PartialRule[TypedPipe] {
def applyWhere[T](on: Dag[TypedPipe]) = {
case fork@Fork(t) if on.hasSingleDependent(fork) => t
case Fork(src@SourcePipe(_)) => src
case Fork(iter@IterablePipe(_)) => iter
case ForceToDisk(src@SourcePipe(_)) => src
case ForceToDisk(iter@IterablePipe(_)) => iter
}
}

/**
* We ignore .group if there are is no setting of reducers
*
Expand Down Expand Up @@ -915,6 +933,7 @@ object OptimizationRules {
List(
// phase 0, add explicit forks to not duplicate pipes on fanout below
AddExplicitForks,
RemoveUselessFork,
// phase 1, compose flatMap/map, move descriptions down, defer merge, filter pushup etc...
IgnoreNoOpGroup.orElse(composeSame).orElse(DescribeLater).orElse(FilterKeysEarly).orElse(DeferMerge),
// phase 2, combine different kinds of mapping operations into flatMaps, including redundant merges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object WritePartitioner {
}
// Now apply the rules:
logger.info(s"applying rules to graph of size: ${finalDag.allNodes.size}")
val optDag = finalDag.applySeq(phases)
val optDag = finalDag.applySeq(phases :+ OptimizationRules.RemoveUselessFork)
logger.info(s"optimized graph hash size: ${optDag.allNodes.size}")

import TypedPipe.{ReduceStepPipe, HashCoGroup}
Expand Down Expand Up @@ -215,80 +215,171 @@ object WritePartitioner {
}
}

// Now we convert
val fn = Memoize.functionK[TypedPipe, mat.TP](
new Memoize.RecursiveK[TypedPipe, mat.TP] {
/**
* If cascading would consider the current pipe as a Logical reduce
* we can avoid some forces below. This method returns true
* if the pipe is ending on a reduce (not potentially a map-only job)
*/
@annotation.tailrec
def isLogicalReduce(tp: TypedPipe[Any]): Boolean = {
import TypedPipe._
tp match {
case EmptyTypedPipe | IterablePipe(_) | SourcePipe(_) => false
case CounterPipe(a) => isLogicalReduce(a)
case cp@CrossPipe(_, _) => isLogicalReduce(cp.viaHashJoin)
case cp@CrossValue(_, _) => isLogicalReduce(cp.viaHashJoin)
case DebugPipe(p) => isLogicalReduce(p)
case FilterKeys(p, _) => isLogicalReduce(p)
case Filter(p, _) => isLogicalReduce(p)
case FlatMapValues(p, _) => isLogicalReduce(p)
case FlatMapped(p, _) => isLogicalReduce(p)
case ForceToDisk(_) => false // not reducers for sure, could be a map-only job
case Fork(_) => false // TODO, not super clear
case HashCoGroup(left, _, _) => isLogicalReduce(left)
case MapValues(p, _) => isLogicalReduce(p)
case Mapped(p, _) => isLogicalReduce(p)
case MergedTypedPipe(_, _) => false
case ReduceStepPipe(_) => true
case SumByLocalKeys(p, _) => isLogicalReduce(p)
case TrappedPipe(p, _, _) => isLogicalReduce(p)
case CoGroupedPipe(_) => true
case WithOnComplete(p, _) => isLogicalReduce(p)
case WithDescriptionTypedPipe(p, _) => isLogicalReduce(p)
}
}

/**
* We use this state to track where we are as we recurse up the graph.
* Since we know at the very end we will write, we can avoid, for instance
* forcing a reduce operation that is followed only by a map and a write.
*
* Coupled with the isLogicalReduce above, we can emulate the behavior
* of the cascading planner as we recurse up.
*/
sealed abstract class BelowState {
def |(that: BelowState): BelowState =
(this, that) match {
case (BelowState.Write, later) => later
case (BelowState.OnlyMapping, BelowState.Write) => BelowState.OnlyMapping
case (BelowState.OnlyMapping, mapOrMater) => mapOrMater
case (BelowState.Materialized, _) => BelowState.Materialized
}
}
object BelowState {
case object Write extends BelowState
case object OnlyMapping extends BelowState
case object Materialized extends BelowState
}
type P[a] = (TypedPipe[a], BelowState)
/**
* Given a pipe, and the state below it, return the materialized
* version of that pipe. This should cause no more materializations
* than cascading would do, and indeed we test for this property
*/
val fn = Memoize.functionK[P, mat.TP](
new Memoize.RecursiveK[P, mat.TP] {
import TypedPipe._
import BelowState._

def toFunction[A] = {
case (cp: CounterPipe[a], rec) =>
mat.map(rec(cp.pipe))(CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])]))
case (c: CrossPipe[a, b], rec) =>
rec(c.viaHashJoin)
case (cv@CrossValue(_, _), rec) =>
rec(cv.viaHashJoin)
case (p: DebugPipe[a], rec) =>
mat.map(rec(p.input))(DebugPipe(_: TypedPipe[a]))
case (p: FilterKeys[a, b], rec) =>
mat.map(rec(p.input))(FilterKeys(_: TypedPipe[(a, b)], p.fn))
case (p: Filter[a], rec) =>
mat.map(rec(p.input))(Filter(_: TypedPipe[a], p.fn))
case (Fork(src@IterablePipe(_)), rec) =>
// no need to put a checkpoint here:
rec(src)
case (Fork(src@SourcePipe(_)), rec) =>
case ((cp: CounterPipe[a], bs), rec) =>
mat.map(rec((cp.pipe, bs)))(CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])]))
case ((c: CrossPipe[a, b], bs), rec) =>
rec((c.viaHashJoin, bs))
case ((cv@CrossValue(_, _), bs), rec) =>
rec((cv.viaHashJoin, bs))
case ((p: DebugPipe[a], bs), rec) =>
mat.map(rec((p.input, bs)))(DebugPipe(_: TypedPipe[a]))
case ((p: FilterKeys[a, b], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(FilterKeys(_: TypedPipe[(a, b)], p.fn))
case ((p: Filter[a], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(Filter(_: TypedPipe[a], p.fn))
case ((Fork(of), bs), rec) =>
// Treat forks as forceToDisk after
// optimizations (which should have removed unneeded forks
rec((ForceToDisk(of), bs))
case ((p: FlatMapValues[a, b, c], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(FlatMapValues(_: TypedPipe[(a, b)], p.fn))
case ((p: FlatMapped[a, b], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(FlatMapped(_: TypedPipe[a], p.fn))
case ((ForceToDisk(src@IterablePipe(_)), bs), rec) =>
// no need to put a checkpoint here:
rec(src)
case (p: Fork[a], rec) =>
mat.materialize(rec(p.input))
case (p: FlatMapValues[a, b, c], rec) =>
mat.map(rec(p.input))(FlatMapValues(_: TypedPipe[(a, b)], p.fn))
case (p: FlatMapped[a, b], rec) =>
mat.map(rec(p.input))(FlatMapped(_: TypedPipe[a], p.fn))
case (ForceToDisk(src@IterablePipe(_)), rec) =>
rec((src, bs))
case ((ForceToDisk(src@SourcePipe(_)), bs), rec) =>
// no need to put a checkpoint here:
rec(src)
case (ForceToDisk(src@SourcePipe(_)), rec) =>
// no need to put a checkpoint here:
rec(src)
case (p: ForceToDisk[a], rec) =>
mat.materialize(rec(p.input))
case (it@IterablePipe(_), _) =>
rec((src, bs))
case ((p: ForceToDisk[a], bs), rec) =>
val newBs =
if (isLogicalReduce(p.input)) OnlyMapping
else Materialized
val matP = rec((p.input, newBs))
bs match {
case Write =>
// there is no need force to disk immediately before a write
matP
case _ => mat.materialize(matP)
}
case ((it@IterablePipe(_), _), _) =>
mat.pure(it)
case (p: MapValues[a, b, c], rec) =>
mat.map(rec(p.input))(MapValues(_: TypedPipe[(a, b)], p.fn))
case (p: Mapped[a, b], rec) =>
mat.map(rec(p.input))(Mapped(_: TypedPipe[a], p.fn))
case (p: MergedTypedPipe[a], rec) =>
val mleft = rec(p.left)
val mright = rec(p.right)
case ((p: MapValues[a, b, c], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(MapValues(_: TypedPipe[(a, b)], p.fn))
case ((p: Mapped[a, b], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(Mapped(_: TypedPipe[a], p.fn))
case ((p: MergedTypedPipe[a], bs), rec) =>
val mleft = rec((p.left, bs))
val mright = rec((p.right, bs))
val both = mat.zip(mleft, mright)
mat.map(both) { case (l, r) => MergedTypedPipe(l, r) }
case (src@SourcePipe(_), _) =>
case ((src@SourcePipe(_), _), _) =>
mat.pure(src)
case (p: SumByLocalKeys[a, b], rec) =>
mat.map(rec(p.input))(SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup))
case (p: TrappedPipe[a], rec) =>
mat.map(rec(p.input))(TrappedPipe[a](_: TypedPipe[a], p.sink, p.conv))
case (p: WithDescriptionTypedPipe[a], rec) =>
mat.map(rec(p.input))(WithDescriptionTypedPipe(_: TypedPipe[a], p.descriptions))
case (p: WithOnComplete[a], rec) =>
mat.map(rec(p.input))(WithOnComplete(_: TypedPipe[a], p.fn))
case (EmptyTypedPipe, _) =>
case ((p: SumByLocalKeys[a, b], bs), rec) =>
mat.map(rec((p.input, bs | OnlyMapping)))(SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup))
case ((p: TrappedPipe[a], bs), rec) =>
// TODO: it is a bit unclear if a trap is allowed on the back of a reduce?
mat.map(rec((p.input, bs)))(TrappedPipe[a](_: TypedPipe[a], p.sink, p.conv))
case ((p: WithDescriptionTypedPipe[a], bs), rec) =>
mat.map(rec((p.input, bs)))(WithDescriptionTypedPipe(_: TypedPipe[a], p.descriptions))
case ((p: WithOnComplete[a], bs), rec) =>
mat.map(rec((p.input, bs)))(WithOnComplete(_: TypedPipe[a], p.fn))
case ((EmptyTypedPipe, _), _) =>
mat.pure(EmptyTypedPipe)
case (hg: HashCoGroup[a, b, c, d], rec) =>
handleHashCoGroup(hg, rec)
case (CoGroupedPipe(cg), f) =>
// simple version puts a checkpoint here
mat.materialize(handleCoGrouped(cg, f))
case (ReduceStepPipe(rs), f) =>
// simple version puts a checkpoint here
mat.materialize(handleReduceStep(rs, f))
case ((hg: HashCoGroup[a, b, c, d], bs), rec) =>
val withBs = new FunctionK[TypedPipe, P] {
def toFunction[A] = { tp => (tp, bs | OnlyMapping) }
}
// TODO: hashJoins may not be allowed in a reduce step in cascading,
// not clear
val recHG = FunctionK.andThen[TypedPipe, P, mat.TP](withBs, rec)
handleHashCoGroup(hg, recHG)
case ((CoGroupedPipe(cg), bs), rec) =>
val withBs = new FunctionK[TypedPipe, P] {
def toFunction[A] = { tp => (tp, bs | Materialized) }
}
// TODO: hashJoins may not be allowed in a reduce step in cascading,
// not clear
val recHG = FunctionK.andThen[TypedPipe, P, mat.TP](withBs, rec)
val hcg = handleCoGrouped(cg, recHG)
bs match {
case BelowState.Materialized => mat.materialize(hcg)
case _ => hcg
}
case ((ReduceStepPipe(rs), bs), rec) =>
val withBs = new FunctionK[TypedPipe, P] {
def toFunction[A] = { tp => (tp, bs | BelowState.Materialized) }
}
// TODO: hashJoins may not be allowed in a reduce step in cascading,
// not clear
val recHG = FunctionK.andThen[TypedPipe, P, mat.TP](withBs, rec)
val hrs = handleReduceStep(rs, recHG)
bs match {
case BelowState.Materialized => mat.materialize(hrs)
case _ => hrs
}
}
})

def write[A](p: PairK[Id, S, A]): (M[TypedPipe[A]], S[A]) = {
val materialized: M[TypedPipe[A]] = fn(optDag.evaluate(p._1))
val materialized: M[TypedPipe[A]] = fn((optDag.evaluate(p._1), BelowState.Write))
(materialized, p._2)
}

Expand Down

0 comments on commit 4bbeada

Please sign in to comment.