diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala index d799987a78..5f467d71e2 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/OptimizationRules.scala @@ -615,6 +615,9 @@ object OptimizationRules { object RemoveDuplicateForceFork extends PartialRule[TypedPipe] { def applyWhere[T](on: Dag[TypedPipe]) = { case ForceToDisk(ForceToDisk(t)) => ForceToDisk(t) + case ForceToDisk(WithDescriptionTypedPipe(ForceToDisk(t), desc)) => + // we might as well only do one force to disk in this case + WithDescriptionTypedPipe(ForceToDisk(t), desc) case ForceToDisk(Fork(t)) => ForceToDisk(t) case Fork(Fork(t)) => Fork(t) case Fork(ForceToDisk(t)) => ForceToDisk(t) @@ -622,6 +625,21 @@ object OptimizationRules { } } + /** + * If a fork has no fan-out when planned, it serves no purpose + * and is safe to remove. Likewise, there is no reason + * to put a forceToDisk immediatle after a source + */ + object RemoveUselessFork extends PartialRule[TypedPipe] { + def applyWhere[T](on: Dag[TypedPipe]) = { + case fork@Fork(t) if on.hasSingleDependent(fork) => t + case Fork(src@SourcePipe(_)) => src + case Fork(iter@IterablePipe(_)) => iter + case ForceToDisk(src@SourcePipe(_)) => src + case ForceToDisk(iter@IterablePipe(_)) => iter + } + } + /** * We ignore .group if there are is no setting of reducers * @@ -915,6 +933,7 @@ object OptimizationRules { List( // phase 0, add explicit forks to not duplicate pipes on fanout below AddExplicitForks, + RemoveUselessFork, // phase 1, compose flatMap/map, move descriptions down, defer merge, filter pushup etc... IgnoreNoOpGroup.orElse(composeSame).orElse(DescribeLater).orElse(FilterKeysEarly).orElse(DeferMerge), // phase 2, combine different kinds of mapping operations into flatMaps, including redundant merges diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/WritePartitioner.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/WritePartitioner.scala index 2c451ed9e4..14b202b1fd 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/WritePartitioner.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/WritePartitioner.scala @@ -74,7 +74,7 @@ object WritePartitioner { } // Now apply the rules: logger.info(s"applying rules to graph of size: ${finalDag.allNodes.size}") - val optDag = finalDag.applySeq(phases) + val optDag = finalDag.applySeq(phases :+ OptimizationRules.RemoveUselessFork) logger.info(s"optimized graph hash size: ${optDag.allNodes.size}") import TypedPipe.{ReduceStepPipe, HashCoGroup} @@ -215,80 +215,171 @@ object WritePartitioner { } } - // Now we convert - val fn = Memoize.functionK[TypedPipe, mat.TP]( - new Memoize.RecursiveK[TypedPipe, mat.TP] { + /** + * If cascading would consider the current pipe as a Logical reduce + * we can avoid some forces below. This method returns true + * if the pipe is ending on a reduce (not potentially a map-only job) + */ + @annotation.tailrec + def isLogicalReduce(tp: TypedPipe[Any]): Boolean = { + import TypedPipe._ + tp match { + case EmptyTypedPipe | IterablePipe(_) | SourcePipe(_) => false + case CounterPipe(a) => isLogicalReduce(a) + case cp@CrossPipe(_, _) => isLogicalReduce(cp.viaHashJoin) + case cp@CrossValue(_, _) => isLogicalReduce(cp.viaHashJoin) + case DebugPipe(p) => isLogicalReduce(p) + case FilterKeys(p, _) => isLogicalReduce(p) + case Filter(p, _) => isLogicalReduce(p) + case FlatMapValues(p, _) => isLogicalReduce(p) + case FlatMapped(p, _) => isLogicalReduce(p) + case ForceToDisk(_) => false // not reducers for sure, could be a map-only job + case Fork(_) => false // TODO, not super clear + case HashCoGroup(left, _, _) => isLogicalReduce(left) + case MapValues(p, _) => isLogicalReduce(p) + case Mapped(p, _) => isLogicalReduce(p) + case MergedTypedPipe(_, _) => false + case ReduceStepPipe(_) => true + case SumByLocalKeys(p, _) => isLogicalReduce(p) + case TrappedPipe(p, _, _) => isLogicalReduce(p) + case CoGroupedPipe(_) => true + case WithOnComplete(p, _) => isLogicalReduce(p) + case WithDescriptionTypedPipe(p, _) => isLogicalReduce(p) + } + } + + /** + * We use this state to track where we are as we recurse up the graph. + * Since we know at the very end we will write, we can avoid, for instance + * forcing a reduce operation that is followed only by a map and a write. + * + * Coupled with the isLogicalReduce above, we can emulate the behavior + * of the cascading planner as we recurse up. + */ + sealed abstract class BelowState { + def |(that: BelowState): BelowState = + (this, that) match { + case (BelowState.Write, later) => later + case (BelowState.OnlyMapping, BelowState.Write) => BelowState.OnlyMapping + case (BelowState.OnlyMapping, mapOrMater) => mapOrMater + case (BelowState.Materialized, _) => BelowState.Materialized + } + } + object BelowState { + case object Write extends BelowState + case object OnlyMapping extends BelowState + case object Materialized extends BelowState + } + type P[a] = (TypedPipe[a], BelowState) + /** + * Given a pipe, and the state below it, return the materialized + * version of that pipe. This should cause no more materializations + * than cascading would do, and indeed we test for this property + */ + val fn = Memoize.functionK[P, mat.TP]( + new Memoize.RecursiveK[P, mat.TP] { import TypedPipe._ + import BelowState._ def toFunction[A] = { - case (cp: CounterPipe[a], rec) => - mat.map(rec(cp.pipe))(CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])])) - case (c: CrossPipe[a, b], rec) => - rec(c.viaHashJoin) - case (cv@CrossValue(_, _), rec) => - rec(cv.viaHashJoin) - case (p: DebugPipe[a], rec) => - mat.map(rec(p.input))(DebugPipe(_: TypedPipe[a])) - case (p: FilterKeys[a, b], rec) => - mat.map(rec(p.input))(FilterKeys(_: TypedPipe[(a, b)], p.fn)) - case (p: Filter[a], rec) => - mat.map(rec(p.input))(Filter(_: TypedPipe[a], p.fn)) - case (Fork(src@IterablePipe(_)), rec) => - // no need to put a checkpoint here: - rec(src) - case (Fork(src@SourcePipe(_)), rec) => + case ((cp: CounterPipe[a], bs), rec) => + mat.map(rec((cp.pipe, bs)))(CounterPipe(_: TypedPipe[(a, Iterable[((String, String), Long)])])) + case ((c: CrossPipe[a, b], bs), rec) => + rec((c.viaHashJoin, bs)) + case ((cv@CrossValue(_, _), bs), rec) => + rec((cv.viaHashJoin, bs)) + case ((p: DebugPipe[a], bs), rec) => + mat.map(rec((p.input, bs)))(DebugPipe(_: TypedPipe[a])) + case ((p: FilterKeys[a, b], bs), rec) => + mat.map(rec((p.input, bs | OnlyMapping)))(FilterKeys(_: TypedPipe[(a, b)], p.fn)) + case ((p: Filter[a], bs), rec) => + mat.map(rec((p.input, bs | OnlyMapping)))(Filter(_: TypedPipe[a], p.fn)) + case ((Fork(of), bs), rec) => + // Treat forks as forceToDisk after + // optimizations (which should have removed unneeded forks + rec((ForceToDisk(of), bs)) + case ((p: FlatMapValues[a, b, c], bs), rec) => + mat.map(rec((p.input, bs | OnlyMapping)))(FlatMapValues(_: TypedPipe[(a, b)], p.fn)) + case ((p: FlatMapped[a, b], bs), rec) => + mat.map(rec((p.input, bs | OnlyMapping)))(FlatMapped(_: TypedPipe[a], p.fn)) + case ((ForceToDisk(src@IterablePipe(_)), bs), rec) => // no need to put a checkpoint here: - rec(src) - case (p: Fork[a], rec) => - mat.materialize(rec(p.input)) - case (p: FlatMapValues[a, b, c], rec) => - mat.map(rec(p.input))(FlatMapValues(_: TypedPipe[(a, b)], p.fn)) - case (p: FlatMapped[a, b], rec) => - mat.map(rec(p.input))(FlatMapped(_: TypedPipe[a], p.fn)) - case (ForceToDisk(src@IterablePipe(_)), rec) => + rec((src, bs)) + case ((ForceToDisk(src@SourcePipe(_)), bs), rec) => // no need to put a checkpoint here: - rec(src) - case (ForceToDisk(src@SourcePipe(_)), rec) => - // no need to put a checkpoint here: - rec(src) - case (p: ForceToDisk[a], rec) => - mat.materialize(rec(p.input)) - case (it@IterablePipe(_), _) => + rec((src, bs)) + case ((p: ForceToDisk[a], bs), rec) => + val newBs = + if (isLogicalReduce(p.input)) OnlyMapping + else Materialized + val matP = rec((p.input, newBs)) + bs match { + case Write => + // there is no need force to disk immediately before a write + matP + case _ => mat.materialize(matP) + } + case ((it@IterablePipe(_), _), _) => mat.pure(it) - case (p: MapValues[a, b, c], rec) => - mat.map(rec(p.input))(MapValues(_: TypedPipe[(a, b)], p.fn)) - case (p: Mapped[a, b], rec) => - mat.map(rec(p.input))(Mapped(_: TypedPipe[a], p.fn)) - case (p: MergedTypedPipe[a], rec) => - val mleft = rec(p.left) - val mright = rec(p.right) + case ((p: MapValues[a, b, c], bs), rec) => + mat.map(rec((p.input, bs | OnlyMapping)))(MapValues(_: TypedPipe[(a, b)], p.fn)) + case ((p: Mapped[a, b], bs), rec) => + mat.map(rec((p.input, bs | OnlyMapping)))(Mapped(_: TypedPipe[a], p.fn)) + case ((p: MergedTypedPipe[a], bs), rec) => + val mleft = rec((p.left, bs)) + val mright = rec((p.right, bs)) val both = mat.zip(mleft, mright) mat.map(both) { case (l, r) => MergedTypedPipe(l, r) } - case (src@SourcePipe(_), _) => + case ((src@SourcePipe(_), _), _) => mat.pure(src) - case (p: SumByLocalKeys[a, b], rec) => - mat.map(rec(p.input))(SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup)) - case (p: TrappedPipe[a], rec) => - mat.map(rec(p.input))(TrappedPipe[a](_: TypedPipe[a], p.sink, p.conv)) - case (p: WithDescriptionTypedPipe[a], rec) => - mat.map(rec(p.input))(WithDescriptionTypedPipe(_: TypedPipe[a], p.descriptions)) - case (p: WithOnComplete[a], rec) => - mat.map(rec(p.input))(WithOnComplete(_: TypedPipe[a], p.fn)) - case (EmptyTypedPipe, _) => + case ((p: SumByLocalKeys[a, b], bs), rec) => + mat.map(rec((p.input, bs | OnlyMapping)))(SumByLocalKeys(_: TypedPipe[(a, b)], p.semigroup)) + case ((p: TrappedPipe[a], bs), rec) => + // TODO: it is a bit unclear if a trap is allowed on the back of a reduce? + mat.map(rec((p.input, bs)))(TrappedPipe[a](_: TypedPipe[a], p.sink, p.conv)) + case ((p: WithDescriptionTypedPipe[a], bs), rec) => + mat.map(rec((p.input, bs)))(WithDescriptionTypedPipe(_: TypedPipe[a], p.descriptions)) + case ((p: WithOnComplete[a], bs), rec) => + mat.map(rec((p.input, bs)))(WithOnComplete(_: TypedPipe[a], p.fn)) + case ((EmptyTypedPipe, _), _) => mat.pure(EmptyTypedPipe) - case (hg: HashCoGroup[a, b, c, d], rec) => - handleHashCoGroup(hg, rec) - case (CoGroupedPipe(cg), f) => - // simple version puts a checkpoint here - mat.materialize(handleCoGrouped(cg, f)) - case (ReduceStepPipe(rs), f) => - // simple version puts a checkpoint here - mat.materialize(handleReduceStep(rs, f)) + case ((hg: HashCoGroup[a, b, c, d], bs), rec) => + val withBs = new FunctionK[TypedPipe, P] { + def toFunction[A] = { tp => (tp, bs | OnlyMapping) } + } + // TODO: hashJoins may not be allowed in a reduce step in cascading, + // not clear + val recHG = FunctionK.andThen[TypedPipe, P, mat.TP](withBs, rec) + handleHashCoGroup(hg, recHG) + case ((CoGroupedPipe(cg), bs), rec) => + val withBs = new FunctionK[TypedPipe, P] { + def toFunction[A] = { tp => (tp, bs | Materialized) } + } + // TODO: hashJoins may not be allowed in a reduce step in cascading, + // not clear + val recHG = FunctionK.andThen[TypedPipe, P, mat.TP](withBs, rec) + val hcg = handleCoGrouped(cg, recHG) + bs match { + case BelowState.Materialized => mat.materialize(hcg) + case _ => hcg + } + case ((ReduceStepPipe(rs), bs), rec) => + val withBs = new FunctionK[TypedPipe, P] { + def toFunction[A] = { tp => (tp, bs | BelowState.Materialized) } + } + // TODO: hashJoins may not be allowed in a reduce step in cascading, + // not clear + val recHG = FunctionK.andThen[TypedPipe, P, mat.TP](withBs, rec) + val hrs = handleReduceStep(rs, recHG) + bs match { + case BelowState.Materialized => mat.materialize(hrs) + case _ => hrs + } } }) def write[A](p: PairK[Id, S, A]): (M[TypedPipe[A]], S[A]) = { - val materialized: M[TypedPipe[A]] = fn(optDag.evaluate(p._1)) + val materialized: M[TypedPipe[A]] = fn((optDag.evaluate(p._1), BelowState.Write)) (materialized, p._2) } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/WritePartitionerTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/WritePartitionerTest.scala index bc2b4b2274..f07300024b 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/typed/WritePartitionerTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/WritePartitionerTest.scala @@ -1,6 +1,7 @@ package com.twitter.scalding.typed -import com.twitter.scalding.{ Config, Execution, Local } +import com.twitter.algebird.Monoid +import com.twitter.scalding.{ Config, Execution, Local, TupleConverter, TupleGetter } import com.twitter.scalding.source.{ TypedText, NullSink } import com.twitter.scalding.typed.cascading_backend.CascadingBackend import com.twitter.scalding.typed.functions.EqTypes @@ -56,7 +57,7 @@ class WritePartitionerTest extends FunSuite with PropertyChecks { } } - test("When we break at forks we have at most 1 + hashJoin steps") { + test("When we break at forks we have at most 2 + hashJoin steps") { implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 100) def afterPartitioningEachStepIsSize1[T](init: TypedPipe[T]) = { @@ -68,15 +69,15 @@ class WritePartitionerTest extends FunSuite with PropertyChecks { case (tp, _) => val (dag, _) = Dag(tp, OptimizationRules.toLiteral) val hcg = dag.allNodes.collect { case h: TypedPipe.HashCoGroup[_, _, _, _] => 1 }.sum - // we can have at most 1 + hcg jobs - assert(TypedPipeGen.steps(tp) <= 1 + hcg, s"optimized: ${tp.toString}") + // we can have at most 2 + hcg jobs + assert(TypedPipeGen.steps(tp) <= 2 + hcg, s"optimized: ${tp.toString}") } writes.materializations.foreach { case (tp, _) => val (dag, _) = Dag(tp, OptimizationRules.toLiteral) val hcg = dag.allNodes.collect { case h: TypedPipe.HashCoGroup[_, _, _, _] => 1 }.sum // we can have at most 1 + hcg jobs - assert(TypedPipeGen.steps(tp) <= 1 + hcg, s"optimized: ${tp.toString}") + assert(TypedPipeGen.steps(tp) <= 2 + hcg, s"optimized: ${tp.toString}") } } @@ -98,9 +99,88 @@ class WritePartitionerTest extends FunSuite with PropertyChecks { val (dag, id) = Dag(t, OptimizationRules.toLiteral) val optDag = dag.applySeq(phases) val optT = optDag.evaluate(id) - assert(writeSteps + matSteps <= TypedPipeGen.steps(optT) + 1) + assert(writeSteps + matSteps <= TypedPipeGen.steps(optT)) + } + + { + import TypedPipe._ + + val pipe = WithDescriptionTypedPipe(Mapped(ReduceStepPipe(ValueSortedReduce[Int, Int, Int](implicitly[Ordering[Int]], + WithDescriptionTypedPipe(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(MergedTypedPipe( + WithDescriptionTypedPipe(Fork(WithDescriptionTypedPipe(TrappedPipe(SourcePipe(TypedText.tsv[Int]("oyg")), + TypedText.tsv[Int]("a3QasphTfqhd1namjb"), + TupleConverter.Single(implicitly[TupleGetter[Int]])), List(("org.scalacheck.Gen$R $class.map(Gen.scala:237)", true)))), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)", true))), + IterablePipe(List(-930762680, -1495455462, -1, -903011942, -2147483648, 1539778843, -2147483648))), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)", true))), null /**/ ), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)", true))), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)", true))), + implicitly[Ordering[Int]], null /**/ , Some(2), List())), + null /**/ ), List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)", true))) + + notMoreSteps(pipe) } + { + import TypedPipe._ + + val pipe = WithDescriptionTypedPipe(ForceToDisk(WithDescriptionTypedPipe(Mapped( + ReduceStepPipe(ValueSortedReduce[Int, Int, Int](implicitly[Ordering[Int]], + WithDescriptionTypedPipe(WithDescriptionTypedPipe( + Mapped(WithDescriptionTypedPipe(MergedTypedPipe(WithDescriptionTypedPipe( + Mapped(WithDescriptionTypedPipe(CrossValue( + SourcePipe(TypedText.tsv[Int]("yumwd")),LiteralValue(2)), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))),null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), + WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(FilterKeys( + WithDescriptionTypedPipe(SumByLocalKeys( + WithDescriptionTypedPipe(FlatMapped( + IterablePipe(List(943704575)),null/**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), + implicitly[Monoid[Int]]), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))),null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))),null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true)))), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))),null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), + implicitly[Ordering[Int]], null /**/,None,List())), + null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true)))), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))) + + notMoreSteps(pipe) + } + + { + import TypedPipe._ + + val pipe = WithDescriptionTypedPipe( + Fork(WithDescriptionTypedPipe(Mapped(WithDescriptionTypedPipe(CrossValue( + WithDescriptionTypedPipe(TrappedPipe(WithDescriptionTypedPipe(ForceToDisk(WithDescriptionTypedPipe( + Mapped(ReduceStepPipe(ValueSortedReduce[Int, Int, Int](implicitly[Ordering[Int]], + WithDescriptionTypedPipe(WithDescriptionTypedPipe(FilterKeys(WithDescriptionTypedPipe(FlatMapValues( + WithDescriptionTypedPipe(Mapped(IterablePipe(List(1533743286, 0, -1, 0, 1637692751)), + null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), + implicitly[Ordering[Int]], null /**/,Some(2), List())), + null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true)))), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), + TypedText.tsv[Int]("mndlSTwuEmwqhJk7ac"), + TupleConverter.Single(implicitly[TupleGetter[Int]])), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), + LiteralValue(2)), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))), + null /**/), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true)))), + List(("org.scalacheck.Gen$R$class.map(Gen.scala:237)",true))) + + notMoreSteps(pipe) + } implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 100) forAll(TypedPipeGen.genWithFakeSources)(notMoreSteps(_)) }