Skip to content

Commit

Permalink
Improve rules, rule testing, and bump dagon (#1803)
Browse files Browse the repository at this point in the history
* Improve rules, rule testsing, and bump dagon

* remove unused value
  • Loading branch information
johnynek committed Feb 16, 2018
1 parent 5a50c16 commit 2846e9a
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 66 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ val avroVersion = "1.7.4"
val bijectionVersion = "0.9.5"
val cascadingAvroVersion = "2.1.2"
val chillVersion = "0.8.4"
val dagonVersion = "0.2.6"
val dagonVersion = "0.3.1"
val elephantbirdVersion = "4.15"
val hadoopLzoVersion = "0.4.19"
val hadoopVersion = "2.5.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ object CoGrouped {
}

final case class FilterKeys[K, V](on: CoGrouped[K, V], fn: K => Boolean) extends CoGrouped[K, V] {
val inputs = on.inputs.map(_.filterKeys(fn))
val inputs = on.inputs.map(TypedPipe.FilterKeys(_, fn))
def reducers = on.reducers
def keyOrdering = on.keyOrdering
def joinFunction = on.joinFunction
Expand Down Expand Up @@ -288,6 +288,25 @@ sealed trait HashJoinable[K, +V] extends CoGroupable[K, V] with KeyedPipe[K] {
/** A HashJoinable has a single input into to the cogroup */
override def inputs = List(mapped)
}

object HashJoinable {
def toReduceStep[A, B](hj: HashJoinable[A, B]): ReduceStep[A, _, _ <: B] =
hj match {
case step@IdentityReduce(_, _, _, _, _) => step
case step@UnsortedIdentityReduce(_, _, _, _, _) => step
case step@IteratorMappedReduce(_, _, _, _, _) => step
}

def filterKeys[A, B](hj: HashJoinable[A, B], fn: A => Boolean): HashJoinable[A, B] =
hj match {
case step@IdentityReduce(_, _, _, _, _) =>
step.copy(mapped = TypedPipe.FilterKeys(step.mapped, fn))
case step@UnsortedIdentityReduce(_, _, _, _, _) =>
step.copy(mapped = TypedPipe.FilterKeys(step.mapped, fn))
case step@IteratorMappedReduce(_, _, _, _, _) =>
step.copy(mapped = TypedPipe.FilterKeys(step.mapped, fn))
}
}
/**
* This encodes the rules that
* 1) sorting is only possible before doing any reduce,
Expand Down Expand Up @@ -445,6 +464,20 @@ object ReduceStep {
step.mapGroup(fn)
}

def toHashJoinable[A, B, C](rs: ReduceStep[A, B, C]): Option[HashJoinable[A, C]] =
rs match {
case step @ IdentityReduce(_, _, _, _, _) =>
Some(step)
case step @ UnsortedIdentityReduce(_, _, _, _, _) =>
Some(step)
case step @ IteratorMappedReduce(_, _, _, _, _) =>
Some(step)
case step @ IdentityValueSortedReduce(_, _, _, _, _, _) =>
None
case step @ ValueSortedReduce(_, _, _, _, _, _) =>
None
}

def withReducers[A, B, C](rs: ReduceStep[A, B, C], reds: Int): ReduceStep[A, B, C] =
rs match {
case step @ IdentityReduce(_, _, _, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ object OptimizationRules {
case CoGroupedPipe(cg) =>
CoGroupedPipe(FilterKeys(cg, fn))
case kvPipe =>
kvPipe.filterKeys(fn)
TypedPipe.FilterKeys(kvPipe, fn)
}
})
}
Expand Down Expand Up @@ -206,32 +206,14 @@ object OptimizationRules {
}

private def handleHashCoGroup[K, V, V2, R](hj: HashCoGroup[K, V, V2, R], recurse: FunctionK[TypedPipe, LiteralPipe]): LiteralPipe[(K, R)] = {
val rightLit: LiteralPipe[(K, V2)] = hj.right match {
case step@IdentityReduce(_, _, _, _, _) =>
type TK[V] = TypedPipe[(K, V)]
val mappedV2 = step.evidence.subst[TK](step.mapped)
Unary(widen[(K, V2)](recurse(mappedV2)), { (tp: TypedPipe[(K, V2)]) =>
ReduceStepPipe(IdentityReduce[K, V2, V2](step.keyOrdering, tp, step.reducers, step.descriptions, implicitly))
})
case step@UnsortedIdentityReduce(_, _, _, _, _) =>
type TK[V] = TypedPipe[(K, V)]
val mappedV2 = step.evidence.subst[TK](step.mapped)
Unary(widen[(K, V2)](recurse(mappedV2)), { (tp: TypedPipe[(K, V2)]) =>
ReduceStepPipe(UnsortedIdentityReduce[K, V2, V2](step.keyOrdering, tp, step.reducers, step.descriptions, implicitly))
})
case step@IteratorMappedReduce(_, _, _, _, _) =>
def go[A, B, C](imr: IteratorMappedReduce[A, B, C]): LiteralPipe[(A, C)] =
Unary(recurse(imr.mapped), { (tp: TypedPipe[(A, B)]) => ReduceStepPipe[A, B, C](imr.copy(mapped = tp)) })

widen(go(step))
}

val ordK: Ordering[K] = hj.right match {
case step@IdentityReduce(_, _, _, _, _) => step.keyOrdering
case step@UnsortedIdentityReduce(_, _, _, _, _) => step.keyOrdering
case step@IteratorMappedReduce(_, _, _, _, _) => step.keyOrdering
val rightLit: LiteralPipe[(K, V2)] = {
val rs = HashJoinable.toReduceStep(hj.right)
def go[A, B, C](rs: ReduceStep[A, B, C]): LiteralPipe[(A, C)] =
Unary(recurse(rs.mapped), { tp: TypedPipe[(A, B)] => ReduceStepPipe(ReduceStep.setInput(rs, tp)) })
widen(go(rs))
}

val ordK: Ordering[K] = hj.right.keyOrdering
val joiner = hj.joiner

Binary(recurse(hj.left), rightLit,
Expand Down Expand Up @@ -545,12 +527,27 @@ object OptimizationRules {
}
}

/**
* This rule is important in that it allows us to reduce
* the number of nodes in the graph, which is helpful to speed up rule application
*/
object ComposeDescriptions extends PartialRule[TypedPipe] {
def combine(descs1: List[(String, Boolean)], descs2: List[(String, Boolean)]): List[(String, Boolean)] = {
val combined = descs1 ::: descs2

combined.foldLeft((Set.empty[String], List.empty[(String, Boolean)])) {
case (state@(s, acc), item@(m, true)) =>
if (s(m)) state
else (s + m, item :: acc)
case ((s, acc), item) =>
(s, item :: acc)
}._2.reverse
}


def applyWhere[T](on: Dag[TypedPipe]) = {
case WithDescriptionTypedPipe(WithDescriptionTypedPipe(input, descs1), descs2) =>
// This rule is important in that it allows us to reduce
// the number of nodes in the graph, which is helpful to speed up rule application
WithDescriptionTypedPipe(input, descs1 ::: descs2)
WithDescriptionTypedPipe(input, combine(descs1, descs2))
}
}

Expand Down Expand Up @@ -585,10 +582,7 @@ object OptimizationRules {
case WithDescriptionTypedPipe(WithDescriptionTypedPipe(input, descs1), descs2) =>
// This rule is important in that it allows us to reduce
// the number of nodes in the graph, which is helpful to speed up rule application
WithDescriptionTypedPipe(input, descs1 ::: descs2)
case Fork(WithDescriptionTypedPipe(input, descs)) =>
// descriptions are cheap, we can apply them on both sides of a fork
WithDescriptionTypedPipe(Fork(input), descs)
WithDescriptionTypedPipe(input, ComposeDescriptions.combine(descs1, descs2))
}
}

Expand Down Expand Up @@ -682,21 +676,17 @@ object OptimizationRules {
* us
*/
object FilterKeysEarly extends Rule[TypedPipe] {
private def filterReduceStep[K, V1, V2](rs: ReduceStep[K, V1, V2], fn: K => Boolean): ReduceStep[K, _, _ <: V2] =
rs match {
case step@IdentityReduce(_, _, _, _, _) => step.filterKeys(fn)
case step@UnsortedIdentityReduce(_, _, _, _, _) => step.filterKeys(fn)
case step@IdentityValueSortedReduce(_, _, _, _, _, _) => step.filterKeys(fn)
case step@ValueSortedReduce(_, _, _, _, _, _) => step.filterKeys(fn)
case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn)
}
private def filterReduceStep[K, V1, V2](rs: ReduceStep[K, V1, V2], fn: K => Boolean): ReduceStep[K, V1, V2] =
ReduceStep.setInput(rs, FilterKeys(rs.mapped, fn))

private def filterCoGroupable[K, V](rs: CoGroupable[K, V], fn: K => Boolean): CoGroupable[K, V] =
rs match {
case step@IdentityReduce(_, _, _, _, _) => step.filterKeys(fn)
case step@UnsortedIdentityReduce(_, _, _, _, _) => step.filterKeys(fn)
case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn)
case cg: CoGrouped[K, V] => filterCoGroup(cg, fn)
case rs: ReduceStep[K @unchecked, v1, V @unchecked] =>
ReduceStep.toHashJoinable(filterReduceStep(rs, fn))
.getOrElse {
sys.error("unreachable: filterReduceStep returns the same type, and this input type was CoGroupable")
}
case cg: CoGrouped[K @unchecked, V @unchecked] => filterCoGroup(cg, fn)
}

private def filterCoGroup[K, V](cg: CoGrouped[K, V], fn: K => Boolean): CoGrouped[K, V] =
Expand All @@ -719,11 +709,7 @@ object OptimizationRules {
case FilterKeys(CoGroupedPipe(cg), fn) =>
Some(CoGroupedPipe(filterCoGroup(cg, fn)))
case FilterKeys(HashCoGroup(left, right, joiner), fn) =>
val newRight = right match {
case step@IdentityReduce(_, _, _, _, _) => step.filterKeys(fn)
case step@UnsortedIdentityReduce(_, _, _, _, _) => step.filterKeys(fn)
case step@IteratorMappedReduce(_, _, _, _, _) => step.filterKeys(fn)
}
val newRight = HashJoinable.filterKeys(right, fn)
Some(HashCoGroup(FilterKeys(left, fn), newRight, joiner))
case FilterKeys(MapValues(pipe, mapFn), filterFn) =>
Some(MapValues(FilterKeys(pipe, filterFn), mapFn))
Expand Down Expand Up @@ -760,11 +746,7 @@ object OptimizationRules {
}

private def emptyHashJoinable[K, V](hj: HashJoinable[K, V]): Boolean =
hj match {
case step@IdentityReduce(_, _, _, _, _) => step.mapped == EmptyTypedPipe
case step@UnsortedIdentityReduce(_, _, _, _, _) => step.mapped == EmptyTypedPipe
case step@IteratorMappedReduce(_, _, _, _, _) => step.mapped == EmptyTypedPipe
}
HashJoinable.toReduceStep(hj).mapped == EmptyTypedPipe

def applyWhere[T](on: Dag[TypedPipe]) = {
case CrossPipe(EmptyTypedPipe, _) => EmptyTypedPipe
Expand All @@ -778,7 +760,6 @@ object OptimizationRules {
case FlatMapValues(EmptyTypedPipe, _) => EmptyTypedPipe
case FlatMapped(EmptyTypedPipe, _) => EmptyTypedPipe
case ForceToDisk(EmptyTypedPipe) => EmptyTypedPipe
case Fork(EmptyTypedPipe) => EmptyTypedPipe
case HashCoGroup(EmptyTypedPipe, _, _) => EmptyTypedPipe
case HashCoGroup(_, right, hjf) if emptyHashJoinable(right) && Joiner.isInnerHashJoinLike(hjf) == Some(true) => EmptyTypedPipe
case MapValues(EmptyTypedPipe, _) => EmptyTypedPipe
Expand All @@ -791,6 +772,10 @@ object OptimizationRules {
case CoGroupedPipe(cgp) if emptyCogroup(cgp) => EmptyTypedPipe
case WithOnComplete(EmptyTypedPipe, _) => EmptyTypedPipe // there is nothing to do, so we never have workers complete
case WithDescriptionTypedPipe(EmptyTypedPipe, _) => EmptyTypedPipe // descriptions apply to tasks, but empty has no tasks

// This rule is tempting, but dangerous since if used in combination
// with AddExplicitForks it would create an infinite loop
// case Fork(EmptyTypedPipe) => EmptyTypedPipe
}
}

Expand Down Expand Up @@ -931,7 +916,7 @@ object OptimizationRules {
// phase 0, add explicit forks to not duplicate pipes on fanout below
AddExplicitForks,
// phase 1, compose flatMap/map, move descriptions down, defer merge, filter pushup etc...
composeSame.orElse(DescribeLater).orElse(FilterKeysEarly).orElse(DeferMerge),
IgnoreNoOpGroup.orElse(composeSame).orElse(DescribeLater).orElse(FilterKeysEarly).orElse(DeferMerge),
// phase 2, combine different kinds of mapping operations into flatMaps, including redundant merges
composeIntoFlatMap.orElse(simplifyEmpty).orElse(DiamondToFlatMap).orElse(ComposeDescriptions),
// phase 3, remove duplicates forces/forks (e.g. .fork.fork or .forceToDisk.fork, ....)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ object CascadingBackend {
def defaultOptimizationRules(config: Config): Seq[Rule[TypedPipe]] = {

def std(forceHash: Rule[TypedPipe]) =
OptimizationRules.IgnoreNoOpGroup ::
(OptimizationRules.standardMapReduceRules :::
List(
OptimizationRules.FilterLocally, // after filtering, we may have filtered to nothing, lets see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ object TypedPipeGen {
* Iterable sources
*/
val genWithIterableSources: Gen[TypedPipe[Int]] =
Gen.choose(0, 20) // don't make giant lists which take too long to evaluate
Gen.choose(0, 16) // don't make giant lists which take too long to evaluate
.flatMap { sz =>
tpGen(Gen.listOfN(sz, Arbitrary.arbitrary[Int]).map(TypedPipe.from(_)))
}
Expand All @@ -147,6 +147,7 @@ object TypedPipeGen {

val allRules = List(
AddExplicitForks,
ComposeDescriptions,
ComposeFlatMap,
ComposeMap,
ComposeFilter,
Expand All @@ -161,7 +162,8 @@ object TypedPipeGen {
DeferMerge,
FilterKeysEarly,
FilterLocally,
EmptyIsOftenNoOp,
//EmptyIsOftenNoOp, this causes confluence problems when combined with other rules randomly.
//Have to be careful about the order it is applied
EmptyIterableIsEmpty,
HashToShuffleCoGroup,
ForceToDiskBeforeHashJoin)
Expand Down Expand Up @@ -219,12 +221,35 @@ class OptimizationRulesTest extends FunSuite with PropertyChecks {
}
}

test("optimization rules are reproducible") {
import TypedPipeGen.{ genWithFakeSources, genRule }

implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 5000)
forAll(genWithFakeSources, genRule) { (t, rule) =>
val optimized = Dag.applyRule(t, toLiteral, rule)
val optimized2 = Dag.applyRule(t, toLiteral, rule)
assert(optimized == optimized2)
}
}

test("standard rules are reproducible") {
import TypedPipeGen.{ genWithFakeSources, genRule }

implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 5000)
forAll(genWithFakeSources) { t =>
val (dag1, id1) = Dag(t, toLiteral)
val opt1 = dag1.applySeq(OptimizationRules.standardMapReduceRules)
val t1 = opt1.evaluate(id1)

val (dag2, id2) = Dag(t, toLiteral)
val opt2 = dag2.applySeq(OptimizationRules.standardMapReduceRules)
val t2 = opt2.evaluate(id2)
assert(t1 == t2)
}
}

def optimizationLaw[T: Ordering](t: TypedPipe[T], rule: Rule[TypedPipe]) = {
val optimized = Dag.applyRule(t, toLiteral, rule)
val optimized2 = Dag.applyRule(t, toLiteral, rule)

// Optimization pure is function (wrt to universal equality)
assert(optimized == optimized2)

// We don't want any further optimization on this job
val conf = Config.empty.setOptimizationPhases(classOf[EmptyOptimizationPhases])
Expand All @@ -241,7 +266,7 @@ class OptimizationRulesTest extends FunSuite with PropertyChecks {

test("all optimization rules don't change results") {
import TypedPipeGen.{ genWithIterableSources, genRule }
implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 200)
implicit val generatorDrivenConfig: PropertyCheckConfiguration = PropertyCheckConfiguration(minSuccessful = 50)
forAll(genWithIterableSources, genRule)(optimizationLaw[Int] _)
}

Expand Down

0 comments on commit 2846e9a

Please sign in to comment.