Permalink
Browse files

Issue #675.Configurable elimination of FlatMapNode by enhancing Sourc… (

#676)

* Issue #675.Configurable elimination of FlatMapNode by enhancing SourceNode

* Added Tests for FlatMap fanOut case. Corrected the case types in scheduleSpout method. Removed the repeating tests. Changed the names to be descriptive. Many other suggested changes have been done.

* test case refined. indentation corrected.

* Added the property Test. Added fanOut test and validated graph. Comments addressed.

* Refactored the case logic in OnlinePlan, corrected some of the stale/incorrect comments

* Added some more tests. Changes on assert style, map.get. Comments have been added to code.

* variable labelling standards
1 parent 0879ab9 commit 22db7b0a65f472c42c8f283a7b7773eb68f3d973 @NPraneeth NPraneeth committed with johnynek Aug 3, 2016
View
@@ -39,7 +39,7 @@ val extraSettings = Project.defaultSettings ++ mimaDefaultSettings ++ scalarifor
val sharedSettings = extraSettings ++ Seq(
organization := "com.twitter",
- scalaVersion := "2.10.5",
+ scalaVersion := "2.11.7",
crossScalaVersions := Seq("2.10.5", "2.11.7"),
// To support hadoop 1.x
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),
@@ -368,3 +368,4 @@ lazy val summingbirdCoreTest = module("core-test").settings(
).dependsOn(
summingbirdCore % "test->test;compile->compile"
)
+
@@ -44,6 +44,8 @@ private[summingbird] trait OnlineDefaultConstants {
val DEFAULT_VALUE_COMBINER_CACHE_SIZE = ValueCombinerCacheSize(100)
val DEFAULT_MAX_EMIT_PER_EXECUTE = MaxEmitPerExecute(Int.MaxValue)
val DEFAULT_SUMMER_BATCH_MULTIPLIER = SummerBatchMultiplier(1)
+ val DEFAULT_FM_MERGEABLE_WITH_SOURCE = FMMergeableWithSource.default
+
}
private[summingbird] object OnlineDefaultConstants extends OnlineDefaultConstants
@@ -20,11 +20,11 @@ import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import com.twitter.summingbird.online.Queue
-import com.twitter.summingbird.online.option.{MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures}
+import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures }
import com.twitter.util._
-import org.slf4j.{Logger, LoggerFactory}
+import org.slf4j.{ Logger, LoggerFactory }
-import scala.util.{Failure, Success, Try}
+import scala.util.{ Failure, Success, Try }
object AsyncBase {
/**
@@ -113,3 +113,13 @@ case class SummerParallelism(parHint: Int)
* which are set by SummerParallelism.
*/
case class SummerBatchMultiplier(get: Int)
+
+/**
+ * This option configures the flatMap to be merged into spout when set to true.
+ */
+case class FMMergeableWithSource(get: Boolean)
+
+object FMMergeableWithSource {
+ val default: FMMergeableWithSource = FMMergeableWithSource(false)
+}
+
@@ -17,8 +17,10 @@
package com.twitter.summingbird.planner
import com.twitter.summingbird._
+import scala.reflect.ClassTag
+import com.twitter.summingbird.online.OnlineDefaultConstants
-class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
+class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V], nameMap: Map[Producer[P, _], List[String]], options: Map[String, Options]) {
private type Prod[T] = Producer[P, T]
private type VisitedStore = Set[Prod[_]]
private type CNode = Node[P]
@@ -27,6 +29,13 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
private val depData = Dependants(tail)
private val forkedNodes = depData.nodes
.filter(depData.fanOut(_).exists(_ > 1)).toSet
+
+ private def get[T <: AnyRef: ClassTag](dep: Producer[P, _]): Option[(String, T)] =
+ Options.getFirst[T](options, nameMap(dep))
+
+ private def getOrElse[T <: AnyRef: ClassTag](dep: Producer[P, _], default: T): T =
+ get[T](dep).map { case (_, t) => t }.getOrElse(default)
+
private def distinctAddToList[T](l: List[T], n: T): List[T] = if (l.contains(n)) l else (n :: l)
// We don't merge flatMaps or joins with source.
@@ -38,8 +47,7 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
case OptionMappedProducer(producer, _) => true
case Source(_) => true
case AlsoProducer(_, _) => true
- // The rest are flatMaps, joins, merges or tails
- case FlatMappedProducer(_, _) => false
+ case FlatMappedProducer(_, _) => getOrElse(dep, OnlineDefaultConstants.DEFAULT_FM_MERGEABLE_WITH_SOURCE).get
case ValueFlatMappedProducer(_, _) => false
case KeyFlatMappedProducer(_, _) => false
case LeftJoinedProducer(_, _) => false
@@ -81,6 +89,12 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
private def allTransDepsMergeableWithSource(p: Prod[_]): Boolean =
mergableWithSource(p) && Producer.dependenciesOf(p).forall(allTransDepsMergeableWithSource)
+ /*
+ * This method is used when we need to check if the map-side aggregation node has to be added.
+ * If the source is followed by summer, we create a FlatMapNode() for map-side aggregation.
+ */
+ private def isSourceFollowedBySummer(curr: Prod[_], dep: Prod[_]): Boolean =
+ hasSummerAsDependantProducer(curr) && allTransDepsMergeableWithSource(dep)
/**
* This is the main planning loop that goes bottom up planning into CNodes.
* The default empty node is a FlatMapNode. When a node is fully planned, we put it
@@ -135,6 +149,7 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
* we want to push things higher up in the Dag, not further down.
*/
case SummerNode(_) if !noOpProducer(dep) => true
+
/*
* Currently, SummerNodes cannot have any other logic than sum. So, we check to see
* if this node has something that is not no-op, and if the next node will be a summer, we split
@@ -143,10 +158,12 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
case _ if (!noOpNode(activeBolt) && dependsOnSummerProducer(currentProducer)) => true
/*
* This should possibly be improved, but currently, we force a FlatMapNode just before a
- * summer (to handle map-side aggregation). This check is here to prevent us from merging
- * this current node all the way up to the source.
+ * summer (to handle map-side aggregation) unless the currentProducer is configured to be merged into Source.
+ * This check is here to prevent us from merging this current node all the way up to the source.
*/
- case FlatMapNode(_) if hasSummerAsDependantProducer(currentProducer) && allTransDepsMergeableWithSource(dep) => true
+
+ case FlatMapNode(_) if isSourceFollowedBySummer(currentProducer, dep) => !(getOrElse(currentProducer, OnlineDefaultConstants.DEFAULT_FM_MERGEABLE_WITH_SOURCE).get)
+
/*
* if the current node can't be merged with a source, but the transitive deps can
* then split now.
@@ -219,9 +236,12 @@ class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
}
object OnlinePlan {
- def apply[P <: Platform[P], T](tail: TailProducer[P, T]): Dag[P] = {
+
+ def apply[P <: Platform[P], T](tail: TailProducer[P, T]): Dag[P] = apply(tail, Map.empty)
+
+ def apply[P <: Platform[P], T](tail: TailProducer[P, T], options: Map[String, Options]): Dag[P] = {
val (nameMap, strippedTail) = StripNamedNode(tail)
- val planner = new OnlinePlan(strippedTail)
+ val planner = new OnlinePlan(strippedTail, nameMap, options)
val nodesSet = planner.nodeSet
// The nodes are added in a source -> summer way with how we do list prepends
@@ -17,11 +17,12 @@
package com.twitter.summingbird.online
import com.twitter.algebird.{ MapAlgebra, Semigroup }
-import com.twitter.storehaus.{ ReadableStore, JMapStore }
+import com.twitter.storehaus.{ JMapStore, ReadableStore }
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.summingbird._
import com.twitter.summingbird.memory._
import com.twitter.summingbird.planner._
+import com.twitter.summingbird.online.option._
import com.twitter.util.Future
import org.scalatest.WordSpec
import scala.collection.JavaConverters._
@@ -30,7 +31,7 @@ import org.scalacheck._
import Gen._
import Arbitrary._
import org.scalacheck.Prop._
-import scala.util.{ Try, Success, Failure }
+import scala.util.{ Failure, Success, Try }
/**
* Tests for Summingbird's Storm planner.
@@ -62,6 +63,95 @@ class PlannerSpec extends WordSpec {
def arbSource1 = sample[Producer[Memory, Int]]
def arbSource2 = sample[KeyedProducer[Memory, Int, Int]]
+ /*
+ * Test Case : A simple src.map.summer topology with FMMergeableWithSource opt-in functionality.
+ * Asserts : The plan considers the options to convert into a two node toplogy removing FlatMapNode.
+ */
+
+ "The Online Plan with a flat Map which has Summer as dependant and opted-in for FMMergeableWithSource" in {
+ val store1 = testStore
+ val fmname = "flatmapped"
+ val h1 = arbSource1
+ .flatMap { i: Int => List((i + 1, 1), (i + 2, 1), (i + 3, i)) }.name(fmname)
+ .sumByKey(store1)
+
+ val opts = Map(
+ fmname -> Options().set(FMMergeableWithSource(true))
+ )
+ val planned = Try(OnlinePlan(h1, opts))
+
+ assert(planned.isSuccess, "FAILED : The Online Plan with a flat Map which has no Summer as dependant - writing to: " + TopologyPlannerLaws.dumpGraph(h1))
+ assert(planned.get.nodes.size == 2)
+ }
+
+ /*
+ * Tests the fanOut case on the FlatMappedProducer when opt-in to FlatMapNode mergeable with SourceNode
+ * Asserts : SourceNode has two FlatMapNodes
+ * Each FlatMapNode has a SummerNode as dependant.
+ */
+ "FMMergeableWithSource with a fanOut case after flatMap" in {
+
+ def testStore2: Memory#Store[Int, Int] = MMap[Int, Int]()
+ val sumName = "summer"
+
+ val p1 = arbSource1.flatMap { i: Int => List((i -> i)) }
+ val p2 = p1.sumByKey(testStore).name("sum1")
+ val p3 = p1.map { x => x }.sumByKey(testStore2).name("sum2")
+ val p = p2.also(p3)
+
+ val opts = Map("sum1" -> Options().set(FMMergeableWithSource(true)).set(FlatMapParallelism(15)),
+ "sum2" -> Options().set(SourceParallelism(50)).set(FMMergeableWithSource(true)))
+
+ val storm = Try(OnlinePlan(p, opts))
+
+ // Source Node should exist and have two FlatMapNodes as dependants
+ val sourceNodes = storm.get.dependantsOfM.keys.find(_.toString contains "SourceNode")
+ sourceNodes match {
+ case Some(s) => storm.get.dependantsOfM.get(s) match {
+ case Some(listOfFlatMapNodes) => assert(listOfFlatMapNodes.size == 2)
+ case None => assert(false, "No FlatMapNodes are found in dependant list of Source where as two FlatMapNodes are expected.")
+ }
+ case None => assert(false, "Could not find a SourceNode.")
+ }
+
+ // Each FlatMapNode should have a SummerNode as dependant.
+ val flatMapnodes = storm.get.dependantsOfM.filterKeys { _.toString contains "FlatMapNode" }
+ val fmnValues = flatMapnodes.values
+ fmnValues.foreach {
+ x: List[Node[Memory]] =>
+ {
+ assert(x.size == 1)
+ assert(x(0).toString contains "SummerNode")
+ }
+ }
+ }
+
+ /*
+ * Tests the alsoProducer case for two seperate sub-topologies.
+ * Asserts : There are two sourceNodes
+ * Each SourceNode has a single dependant.
+ * Each SourceNode has a summer as dependant.
+ */
+ "Also producer with two topos" in {
+ def testStore2 : Memory#Store[Int, Int] = MMap[Int, Int]()
+
+ val p1 = arbSource1.flatMap { i: Int => List((i -> i))}.sumByKey(testStore).name("topo1")
+ val p2 = arbSource2.flatMap { tup: (Int, Int) => List((tup._1, tup._2)) }.sumByKey(testStore2).name("topo2")
+ val p = p1.also(p2)
+
+ val opts = Map("topo1" -> Options().set(FMMergeableWithSource(true)),
+ "topo2" -> Options().set(FMMergeableWithSource(true)))
+ val storm = Try(OnlinePlan(p,opts))
+ val srcNodes = storm.get.dependantsOfM.filterKeys { _.toString contains "SourceNode"}
+ assert(srcNodes.keySet.size == 2)
+ srcNodes.keySet.foreach {
+ x => {
+ assert(storm.get.dependantsOfM.asJava.get(x).size == 1)
+ assert(storm.get.dependantsOfM.asJava.get(x).head.toString contains "SummerNode")
+ }
+ }
+ }
+
"Must be able to plan user supplied Job A" in {
val store1 = testStore
val store2 = testStore
@@ -94,15 +184,14 @@ class PlannerSpec extends WordSpec {
.sumByKey(store2)
val planned = Try(OnlinePlan(tail))
- val path = TopologyPlannerLaws.dumpGraph(tail)
planned match {
- case Success(graph) => assert(true == true)
+ case Success(graph) => {
+ assert(true)
+ }
case Failure(error) =>
- val path = TopologyPlannerLaws.dumpGraph(tail)
error.printStackTrace
- println("Dumped failing graph to: " + path)
- assert(false)
+ fail("Dumped failing graph for ' Must be able to plan user supplied Job A ' to: " + TopologyPlannerLaws.dumpGraph(tail))
}
}
@@ -150,9 +239,7 @@ class PlannerSpec extends WordSpec {
case Success(graph) => assert(true == true)
case Failure(error) =>
val path = TopologyPlannerLaws.dumpGraph(tail)
- error.printStackTrace
- println("Dumped failing graph to: " + path)
- assert(false)
+ fail("Dumped failing graph to: " + path)
}
}
@@ -175,9 +262,7 @@ class PlannerSpec extends WordSpec {
case Success(graph) => assert(true == true)
case Failure(error) =>
val path = TopologyPlannerLaws.dumpGraph(tail)
- error.printStackTrace
- println("Dumped failing graph to: " + path)
- assert(false)
+ fail("Dumped failing graph to: " + path)
}
}
"Chained SumByKey with extra Also is okay" in {
@@ -194,9 +279,7 @@ class PlannerSpec extends WordSpec {
assert(TopologyPlannerLaws.summersOnlyShareNoOps(graph) == true)
case Failure(error) =>
val path = TopologyPlannerLaws.dumpGraph(part2)
- error.printStackTrace
- println("Dumped failing graph to: " + path)
- assert(false)
+ fail("Dumped failing graph to: " + path)
}
}
}
@@ -20,11 +20,11 @@ import java.util.concurrent.CyclicBarrier
import com.twitter.bijection.Injection
import com.twitter.conversions.time._
-import com.twitter.summingbird.online.option.{MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures}
+import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures }
import com.twitter.util._
import org.scalatest.WordSpec
import org.scalatest.concurrent.Eventually
-import org.scalatest.time.{Seconds, Span}
+import org.scalatest.time.{ Seconds, Span }
class AsyncBaseSpec extends WordSpec with Eventually {
Oops, something went wrong.

0 comments on commit 22db7b0

Please sign in to comment.