diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala index 74fdcb8a05..58f1492771 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/OptimizationRulesTest.scala @@ -21,29 +21,36 @@ object TypedPipeGen { } def mapped(srcGen: Gen[TypedPipe[Int]]): Gen[TypedPipe[Int]] = { + val commonFreq = 10 val next1: Gen[TypedPipe[Int] => TypedPipe[Int]] = - Gen.oneOf( - tpGen(srcGen).map { p: TypedPipe[Int] => + Gen.frequency( + (1, tpGen(srcGen).map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x.cross(p).keys } - }, - tpGen(srcGen).map { p: TypedPipe[Int] => + }), + (2, tpGen(srcGen).map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x.cross(ValuePipe(2)).values } - }, + }), //Gen.const({ t: TypedPipe[Int] => t.debug }), debug spews a lot to the terminal - Arbitrary.arbitrary[Int => Boolean].map { fn => + (commonFreq, Arbitrary.arbitrary[Int => Boolean].map { fn => { t: TypedPipe[Int] => t.filter(fn) } - }, - Gen.const({ t: TypedPipe[Int] => t.forceToDisk }), - Gen.const({ t: TypedPipe[Int] => t.fork }), - tpGen(srcGen).map { p: TypedPipe[Int] => + }), + (commonFreq, Arbitrary.arbitrary[Int => Int].map { fn => + { t: TypedPipe[Int] => t.map(fn) } + }), + (commonFreq, Arbitrary.arbitrary[Int => List[Int]].map { fn => + { t: TypedPipe[Int] => t.flatMap(fn.andThen(_.take(4))) } // the take is to not get too big + }), + (2, Gen.const({ t: TypedPipe[Int] => t.forceToDisk })), + (2, Gen.const({ t: TypedPipe[Int] => t.fork })), + (5, tpGen(srcGen).map { p: TypedPipe[Int] => { x: TypedPipe[Int] => x ++ p } - }, - Gen.identifier.map { id => + }), + (1, Gen.identifier.map { id => { t: TypedPipe[Int] => t.addTrap(TypedText.tsv[Int](id)) } - }, - Gen.identifier.map { id => + }), + (1, Gen.identifier.map { id => { t: TypedPipe[Int] => t.withDescription(id) } - }) + })) val one = for { n <- next1 @@ -73,7 +80,8 @@ object TypedPipeGen { for { single <- tpGen(srcGen) fn <- Arbitrary.arbitrary[Int => List[(Int, Int)]] - } yield single.flatMap(fn)) + } yield single.flatMap(fn.andThen(_.take(4))) // take to not get too big + ) val two = Gen.oneOf( for { @@ -83,7 +91,7 @@ object TypedPipeGen { for { fn <- Arbitrary.arbitrary[Int => List[Int]] pair <- keyRec - } yield pair.flatMapValues(fn), + } yield pair.flatMapValues(fn.andThen(_.take(4))), // take to not get too big for { fn <- Arbitrary.arbitrary[Int => Int] pair <- keyRec @@ -106,7 +114,7 @@ object TypedPipeGen { for { p1 <- keyRec p2 <- keyRec - } yield p1.hashJoin(p2).values, + } yield p1.hashJoin(p2).mapValues { case (a, b) => 31 * a + b }, for { p1 <- keyRec p2 <- keyRec @@ -114,7 +122,7 @@ object TypedPipeGen { for { p1 <- keyRec p2 <- keyRec - } yield p1.join(p2).mapValues { case (a, b) => a * b }.toTypedPipe) + } yield p1.join(p2).mapValues { case (a, b) => a + 31 * b }.toTypedPipe) // bias to consuming Int, since the we can stack overflow with the (Int, Int) // cases