diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/store/CompoundStore.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/store/CompoundStore.scala index 106696082..6e2dad3b7 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/store/CompoundStore.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/store/CompoundStore.scala @@ -16,7 +16,7 @@ limitations under the License. package com.twitter.summingbird.store -import com.twitter.chill.MeatLocker +import com.twitter.chill.Externalizer import com.twitter.storehaus.ReadableStore import com.twitter.storehaus.algebra.MergeableStore import com.twitter.summingbird.batch.BatchID @@ -33,8 +33,7 @@ class CompoundStore[K, V] private ( @transient offline: Option[BatchedScaldingStore[K, V]], online: Option[() => MergeableStore[(K, BatchID), V]]) extends Serializable { - // MeatLocker these to protect them from serialization errors. - private val offlineBox = MeatLocker(offline) + private val offlineBox = Externalizer(offline) def offlineStore: Option[BatchedScaldingStore[K, V]] = offlineBox.get def onlineSupplier: Option[() => MergeableStore[(K, BatchID), V]] = online } diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/planner/OnlinePlan.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/planner/OnlinePlan.scala new file mode 100644 index 000000000..c74bdbab1 --- /dev/null +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/planner/OnlinePlan.scala @@ -0,0 +1,156 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird.planner + +import com.twitter.summingbird._ + +class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) { + private type Prod[T] = Producer[P, T] + private type VisitedStore = Set[Prod[_]] + private type CNode = Node[P] + private type CFlatMapNode = FlatMapNode[P] + + private val depData = Dependants(tail) + private val forkedNodes = Producer.transitiveDependenciesOf(tail) + .filter(depData.fanOut(_).exists(_ > 1)).toSet + private def distinctAddToList[T](l : List[T], n : T): List[T] = if(l.contains(n)) l else (n :: l) + + private def mergableWithSource(dep: Producer[P, _]): Boolean = { + dep match { + case NamedProducer(producer, _) => true + case IdentityKeyedProducer(producer) => true + case OptionMappedProducer(producer, _) => true + case Source(_) => true + case AlsoProducer(_, _) => true + case _ => false + } + } + + private def hasSummerAsNextProducer(p: Prod[_]): Boolean = + depData.dependantsOf(p).get.collect { case s: Summer[_, _, _] => s }.headOption.isDefined + + private def allDepsMergeableWithSource(p: Prod[_]): Boolean = mergableWithSource(p) && Producer.dependenciesOf(p).forall(allDepsMergeableWithSource) + + // Add the dependentProducer to a Node along with each of its dependencies in turn. + private def addWithDependencies[T](dependantProducer: Prod[T], previousBolt: CNode, + akkaRegistry: List[CNode], visited: VisitedStore) : (List[CNode], VisitedStore) = { + + if (visited.contains(dependantProducer)) { + (distinctAddToList(akkaRegistry, previousBolt), visited) + } else { + val currentBolt = previousBolt.add(dependantProducer) + val visitedWithN = visited + dependantProducer + + def recurse[U]( + producer: Prod[U], + updatedBolt: CNode = currentBolt, + updatedRegistry: List[CNode] = akkaRegistry, + visited: VisitedStore = visitedWithN) + : (List[CNode], VisitedStore) = { + addWithDependencies(producer, updatedBolt, updatedRegistry, visited) + } + + /* + * The purpose of this method is to see if we need to add a new physical node to the graph, + * or if we can continue by adding this producer to the current physical node. + * + * This function acts as a look ahead, rather than depending on the state of the current node it depends + * on the nodes further along in the dag. That is conditions for spliting into multiple Nodes are based on as yet + * unvisisted Producers. + */ + def maybeSplitThenRecurse[U, A](currentProducer: Prod[U], dep: Prod[A]): (List[CNode], VisitedStore) = { + val doSplit = dep match { + case _ if (forkedNodes.contains(dep)) => true + // If we are a flatmap, but there haven't been any other flatmaps yet(i.e. the registry is of size 1, the summer). + // Then we must split to avoid a 2 level higherarchy + case _ if (currentBolt.isInstanceOf[FlatMapNode[_]] && hasSummerAsNextProducer(currentProducer) && allDepsMergeableWithSource(dep)) => true + case _ if ((!mergableWithSource(currentProducer)) && allDepsMergeableWithSource(dep)) => true + case _ => false + } + if (doSplit) { + recurse(dep, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(akkaRegistry, currentBolt)) + } else { + recurse(dep) + } + } + + /* + * This is a peek ahead when we meet a MergedProducer. We pull the directly depended on MergedProducer's into the same Node, + * only if that MergedProducer is not a fan out node. + * This has the effect of pulling all of the merged streams in as siblings rather than just the two. + * From this we return a list of the MergedProducers which should be combined into the current Node, and the list of nodes + * on which these nodes depends (the producers passing data into these MergedProducer). + */ + + def mergeCollapse[A](p: Prod[A]): (List[Prod[A]], List[Prod[A]]) = { + p match { + case MergedProducer(subL, subR) if !forkedNodes.contains(p) => + // TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237 + if(subL == subR) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.") + val (lMergeNodes, lSiblings) = mergeCollapse(subL) + val (rMergeNodes, rSiblings) = mergeCollapse(subR) + (distinctAddToList((lMergeNodes ::: rMergeNodes).distinct, p), (lSiblings ::: rSiblings).distinct) + case _ => (List(), List(p)) + } + } + + dependantProducer match { + case Summer(producer, _, _) => recurse(producer, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(akkaRegistry, currentBolt.toSummer)) + case IdentityKeyedProducer(producer) => maybeSplitThenRecurse(dependantProducer, producer) + case NamedProducer(producer, newId) => maybeSplitThenRecurse(dependantProducer, producer) + case AlsoProducer(lProducer, rProducer) => + val (updatedReg, updatedVisited) = maybeSplitThenRecurse(dependantProducer, rProducer) + recurse(lProducer, FlatMapNode(), updatedReg, updatedVisited) + case Source(spout) => (distinctAddToList(akkaRegistry, currentBolt.toSource), visitedWithN) + case OptionMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer) + case FlatMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer) + case WrittenProducer(producer, sinkSupplier) => maybeSplitThenRecurse(dependantProducer, producer) + case LeftJoinedProducer(producer, _) => maybeSplitThenRecurse(dependantProducer, producer) + case MergedProducer(l, r) => + // TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237 + if(l == r) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.") + val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer) + val newCurrentBolt = otherMergeNodes.foldLeft(currentBolt)(_.add(_)) + val visitedWithOther = otherMergeNodes.foldLeft(visitedWithN){ (visited, n) => visited + n } + + // Recurse down all the newly generated dependencies + dependencies.foldLeft((distinctAddToList(akkaRegistry, newCurrentBolt), visitedWithOther)) { case ((newAkkaReg, newVisited), n) => + recurse(n, FlatMapNode(), newAkkaReg, newVisited) + } + } + } + } + + // This takes an initial pass through all of the Producers, assigning them to Nodes + private def buildNodesSet: List[CNode] = { + val (akkaRegistry, _) = addWithDependencies(tail, FlatMapNode(), List[CNode](), Set()) + akkaRegistry + } +} + +object OnlinePlan { + def apply[P <: Platform[P], T](tail: Producer[P, T]): Dag[P] = { + val planner = new OnlinePlan(tail) + val akkaNodeSet: List[Node[P]] = planner.buildNodesSet + + // The nodes are added in a source -> summer way with how we do list prepends + // but its easier to look at laws in a summer -> source manner + // We also drop all Nodes with no members(may occur when we visit a node already seen and its the first in that Node) + val reversedNodeSet = akkaNodeSet.filter(_.members.size > 0).foldLeft(List[Node[P]]()){(nodes, n) => n.reverse :: nodes} + Dag(tail, reversedNodeSet) + } +} diff --git a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala index 43354d5bb..c357687f2 100644 --- a/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala +++ b/summingbird-core/src/test/scala/com/twitter/summingbird/TestGraphGenerators.scala @@ -138,14 +138,14 @@ object TestGraphGenerators { def genProd2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[KeyedProducer[P, Int, Int]] = - frequency((25, genSource2.arbitrary), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2), (2, also2), - (0, also2), (3, genFlatMap22), (3, genFlatMap12)) + frequency((25, genSource2.arbitrary), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2), (1, also2), + (3, genFlatMap22), (3, genFlatMap12)) def genProd1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]], genSource2 : Arbitrary[KeyedProducer[P, Int, Int]], testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[Producer[P, Int]] = - frequency((25, genSource1.arbitrary), (8, genNamedProducer11), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (2, also1), (3, genFlatMap11), - (0, also1), (3, genFlatMap21)) + frequency((25, genSource1.arbitrary), (8, genNamedProducer11), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (1, also1), (3, genFlatMap11), + (3, genFlatMap21)) } diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala b/summingbird-core/src/test/scala/com/twitter/summingbird/online/TopologyPlannerLaws.scala similarity index 57% rename from summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala rename to summingbird-core/src/test/scala/com/twitter/summingbird/online/TopologyPlannerLaws.scala index 50aaea6ed..f6325bd5d 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyPlannerLaws.scala +++ b/summingbird-core/src/test/scala/com/twitter/summingbird/online/TopologyPlannerLaws.scala @@ -14,16 +14,12 @@ limitations under the License. */ -package com.twitter.summingbird.storm +package com.twitter.summingbird.online -import com.twitter.storehaus.JMapStore -import com.twitter.storehaus.algebra.MergeableStore import com.twitter.summingbird._ import com.twitter.summingbird.planner._ -import com.twitter.summingbird.storm.planner._ -import com.twitter.summingbird.batch.{BatchID, Batcher} -import com.twitter.summingbird.storm.spout.TraversableSpout -import com.twitter.util.Future +import com.twitter.summingbird.memory.Memory +import scala.collection.mutable.{Map => MMap} import org.scalacheck._ import Gen._ import Arbitrary._ @@ -31,27 +27,33 @@ import org.scalacheck.Prop._ -object TopologyPlannerLaws extends Properties("StormDag") { +object TopologyPlannerLaws extends Properties("Online Dag") { implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L) - implicit val batcher = Batcher.unit - private type StormDag = Dag[Storm] + private type MemoryDag = Dag[Memory] import TestGraphGenerators._ - implicit def sink1: Storm#Sink[Int] = (() => ((_) => Future.Unit)) - implicit def sink2: Storm#Sink[(Int, Int)] = (() => ((_) => Future.Unit)) - implicit def testStore: Storm#Store[Int, Int] = MergeableStoreSupplier.from {MergeableStore.fromStore[(Int, BatchID), Int](new JMapStore[(Int, BatchID), Int]())} + implicit def sink1: Memory#Sink[Int] = {x: Int => x} + implicit def sink2: Memory#Sink[(Int, Int)] = {x: (Int, Int) => x} - implicit def arbSource1: Arbitrary[Producer[Storm, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[Int]).map{x: List[Int] => Storm.source(TraversableSpout(x))}) - implicit def arbSource2: Arbitrary[KeyedProducer[Storm, Int, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[(Int, Int)]).map{x: List[(Int, Int)] => IdentityKeyedProducer(Storm.source(TraversableSpout(x)))}) + implicit def testStore: Memory#Store[Int, Int] = MMap[Int, Int]() + + implicit def arbSource1: Arbitrary[Producer[Memory, Int]] = + Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[Int]).map{ + x: List[Int] => + Memory.toSource(x)}) + implicit def arbSource2: Arbitrary[KeyedProducer[Memory, Int, Int]] = + Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[(Int, Int)]).map{ + x: List[(Int, Int)] => + IdentityKeyedProducer(Memory.toSource(x))}) - lazy val genDag : Gen[StormDag]= for { + lazy val genDag : Gen[MemoryDag]= for { tail <- summed - } yield DagBuilder(tail) + } yield OnlinePlan(tail) - implicit def genProducer: Arbitrary[StormDag] = Arbitrary(genDag) + implicit def genProducer: Arbitrary[MemoryDag] = Arbitrary(genDag) @@ -60,7 +62,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get var dumpNumber = 1 - def dumpGraph(dag: StormDag) = { + def dumpGraph(dag: MemoryDag) = { import java.io._ import com.twitter.summingbird.viz.VizGraph val writer2 = new PrintWriter(new File("/tmp/failingGraph" + dumpNumber + ".dot")) @@ -69,17 +71,17 @@ object TopologyPlannerLaws extends Properties("StormDag") { dumpNumber = dumpNumber + 1 } - property("Dag Nodes must be unique") = forAll { (dag: StormDag) => + property("Dag Nodes must be unique") = forAll { (dag: MemoryDag) => dag.nodes.size == dag.nodes.toSet.size } - property("Must have at least one producer in each StormNode") = forAll { (dag: StormDag) => + property("Must have at least one producer in each MemoryNode") = forAll { (dag: MemoryDag) => dag.nodes.forall{n => n.members.size > 0 } } - property("If a StormNode contains a Summer, it must be the first Producer in that StormNode") = forAll { (dag: StormDag) => + property("If a MemoryNode contains a Summer, it must be the first Producer in that MemoryNode") = forAll { (dag: MemoryDag) => dag.nodes.forall{n => val firstP = n.members.last n.members.forall{p => @@ -90,7 +92,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { } } - property("The first producer in a storm node cannot be a NamedProducer") = forAll { (dag: StormDag) => + property("The first producer in a online node cannot be a NamedProducer") = forAll { (dag: MemoryDag) => dag.nodes.forall{n => val inError = n.members.last.isInstanceOf[NamedProducer[_, _]] if(inError) dumpGraph(dag) @@ -98,7 +100,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { } } - property("0 or more merge producers at the start of every storm bolts, followed by 1+ non-merge producers and no other merge producers following those.") = forAll { (dag: StormDag) => + property("0 or more merge producers at the start of every online bolts, followed by 1+ non-merge producers and no other merge producers following those.") = forAll { (dag: MemoryDag) => dag.nodes.forall{n => val (_, inError) = n.members.foldLeft((false, false)) { case ((seenMergeProducer, inError), producer) => producer match { @@ -112,7 +114,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { } } - property("The last producer in any StormNode prior to a summer must be a KeyedProducer") = forAll { (dag: StormDag) => + property("The last producer in any online prior to a summer must be a KeyedProducer") = forAll { (dag: MemoryDag) => dag.nodes.forall{n => val firstP = n.members.last firstP match { @@ -125,20 +127,20 @@ object TopologyPlannerLaws extends Properties("StormDag") { } } - property("No producer is repeated") = forAll { (dag: StormDag) => + property("No producer is repeated") = forAll { (dag: MemoryDag) => val numAllProducers = dag.nodes.foldLeft(0){(sum, n) => sum + n.members.size} - val allProducersSet = dag.nodes.foldLeft(Set[Producer[Storm, _]]()){(runningSet, n) => runningSet | n.members.toSet} + val allProducersSet = dag.nodes.foldLeft(Set[Producer[Memory, _]]()){(runningSet, n) => runningSet | n.members.toSet} numAllProducers == allProducersSet.size } - property("All producers are in a StormNode") = forAll { (dag: StormDag) => + property("All producers are in a Node") = forAll { (dag: MemoryDag) => val allProducers = Producer.entireGraphOf(dag.tail).toSet + dag.tail val numAllProducersInDag = dag.nodes.foldLeft(0){(sum, n) => sum + n.members.size} allProducers.size == numAllProducersInDag } - property("Only spouts can have no incoming dependencies") = forAll { (dag: StormDag) => + property("Only sources can have no incoming dependencies") = forAll { (dag: MemoryDag) => dag.nodes.forall{n => val success = n match { case _: SourceNode[_] => true @@ -150,7 +152,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { } - property("Spouts must have no incoming dependencies, and they must have dependants") = forAll { (dag: StormDag) => + property("Sources must have no incoming dependencies, and they must have dependants") = forAll { (dag: MemoryDag) => dag.nodes.forall{n => val success = n match { case _: SourceNode[_] => @@ -163,7 +165,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { } - property("Prior to a summer the StormNode should be a FinalFlatMapStormBolt") = forAll { (dag: StormDag) => + property("Prior to a summer the Nonde should be a FlatMap Node") = forAll { (dag: MemoryDag) => dag.nodes.forall{n => val firstP = n.members.last val success = firstP match { @@ -178,7 +180,7 @@ object TopologyPlannerLaws extends Properties("StormDag") { } } - property("There should be no flatmap producers in the source node") = forAll { (dag: StormDag) => + property("There should be no flatmap producers in the source node") = forAll { (dag: MemoryDag) => dag.nodes.forall{n => val success = n match { case n: SourceNode[_] => n.members.forall{p => !p.isInstanceOf[FlatMappedProducer[_, _, _]]} @@ -189,22 +191,9 @@ object TopologyPlannerLaws extends Properties("StormDag") { } } - property("Nodes in the storm DAG should have unique names") = forAll { (dag: StormDag) => + property("Nodes in the DAG should have unique names") = forAll { (dag: MemoryDag) => val allNames = dag.nodes.toList.map{n => dag.getNodeName(n)} allNames.size == allNames.distinct.size } - property("Can plan to a Storm Topology") = forAll { (dag: StormDag) => - try { - Storm.local().plan(dag.tail) - true - } catch { - case e: Throwable => - dumpGraph(dag) - println(e) - e.printStackTrace - false - } - - } } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FinalFlatMapBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FinalFlatMapBolt.scala index d01068e65..529540106 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FinalFlatMapBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FinalFlatMapBolt.scala @@ -21,7 +21,7 @@ import backtype.storm.topology.OutputFieldsDeclarer import backtype.storm.tuple.{ Fields, Tuple, Values } import com.twitter.algebird.{ Monoid, SummingQueue } -import com.twitter.chill.MeatLocker +import com.twitter.chill.Externalizer import com.twitter.summingbird.batch.{ Batcher, BatchID } import com.twitter.summingbird.storm.option.{ AnchorTuples, CacheSize, FlatMapStormMetrics @@ -46,7 +46,7 @@ class FinalFlatMapBolt[Event, Key, Value]( (implicit monoid: Monoid[Value], batcher: Batcher) extends BaseBolt(metrics.metrics) { - val lockedOp = MeatLocker(flatMapOp) + val lockedOp = Externalizer(flatMapOp) var collectorMergeable: MergeableStore[(Key, Tuple, BatchID), Value] = null override val fields = { diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala index b04fd2384..d143871ce 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapOperation.scala @@ -16,7 +16,7 @@ package com.twitter.summingbird.storm -import com.twitter.chill.MeatLocker +import com.twitter.chill.Externalizer import com.twitter.storehaus.ReadableStore import com.twitter.util.Future import java.io.{ Closeable, Serializable } @@ -66,7 +66,7 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable { class FunctionFlatMapOperation[T, U](@transient fm: T => TraversableOnce[U]) extends FlatMapOperation[T, U] { - val boxed = MeatLocker(fm) + val boxed = Externalizer(fm) def apply(t: T) = Future.value(boxed.get(t)) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/IntermediateFlatMapBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/IntermediateFlatMapBolt.scala index cb6d0173a..500032391 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/IntermediateFlatMapBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/IntermediateFlatMapBolt.scala @@ -17,7 +17,7 @@ package com.twitter.summingbird.storm import backtype.storm.tuple.{Fields, Tuple, Values} -import com.twitter.chill.MeatLocker +import com.twitter.chill.Externalizer import com.twitter.storehaus.algebra.MergeableStore.enrich import com.twitter.summingbird.storm.option.{ AnchorTuples, FlatMapStormMetrics } @@ -36,7 +36,7 @@ class IntermediateFlatMapBolt[T]( anchor: AnchorTuples, shouldEmit: Boolean) extends BaseBolt(metrics.metrics) { - val lockedOp = MeatLocker(flatMapOp) + val lockedOp = Externalizer(flatMapOp) def toValues(time: Long, item: Any): Values = new Values((time, item)) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/SinkBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/SinkBolt.scala index 035154f3e..8256b0dee 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/SinkBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/SinkBolt.scala @@ -19,7 +19,7 @@ package com.twitter.summingbird.storm import backtype.storm.task.{OutputCollector, TopologyContext} import backtype.storm.tuple.Tuple import com.twitter.algebird.{Monoid, SummingQueue} -import com.twitter.chill.MeatLocker +import com.twitter.chill.Externalizer import com.twitter.storehaus.algebra.MergeableStore import com.twitter.summingbird.batch.BatchID import com.twitter.summingbird.storm.option._ @@ -58,7 +58,7 @@ class SinkBolt[Key, Value: Monoid]( includeSuccessHandler: IncludeSuccessHandler) extends BaseBolt(metrics.metrics) { import Constants._ - val storeBox = MeatLocker(storeSupplier) + val storeBox = Externalizer(storeSupplier) lazy val store = storeBox.get.apply // See MaxWaitingFutures for a todo around removing this. @@ -66,8 +66,8 @@ class SinkBolt[Key, Value: Monoid]( lazy val buffer = SummingQueue[Map[(Key, BatchID), Value]](cacheCount.getOrElse(0)) lazy val futureQueue = FutureQueue(Future.Unit, maxWaitingFutures.get) - val exceptionHandlerBox = MeatLocker(exceptionHandler) - val successHandlerBox = MeatLocker(successHandler) + val exceptionHandlerBox = Externalizer(exceptionHandler) + val successHandlerBox = Externalizer(successHandler) var successHandlerOpt: Option[OnlineSuccessHandler] = null diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index cb024b8b7..75778935d 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -215,7 +215,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config implicit val topologyBuilder = new TopologyBuilder implicit val config = baseConfig - val stormDag = DagBuilder(tail) + val stormDag = OnlinePlan(tail) stormDag.nodes.foreach { node => node match { diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala deleted file mode 100644 index 8daa5a24b..000000000 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/planner/StormPlanner.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - Copyright 2013 Twitter, Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -package com.twitter.summingbird.storm.planner - -import com.twitter.summingbird._ -import com.twitter.summingbird.planner._ -import com.twitter.summingbird.storm._ - -object DagBuilder { - private type Prod[T] = Producer[Storm, T] - private type VisitedStore = Set[Prod[_]] - private type StormFlatMapNode = FlatMapNode[Storm] - - def apply[P](tail: Producer[Storm, P]): Dag[Storm] = { - val stormNodeSet = buildNodesSet(tail) - - // The nodes are added in a source -> summer way with how we do list prepends - // but its easier to look at laws in a summer -> source manner - // We also drop all Nodes with no members(may occur when we visit a node already seen and its the first in that Node) - val reversedNodeSet = stormNodeSet.filter(_.members.size > 0).foldLeft(List[StormNode]()){(nodes, n) => n.reverse :: nodes} - Dag(tail, reversedNodeSet) - } - - // This takes an initial pass through all of the Producers, assigning them to Nodes - private def buildNodesSet[P](tail: Producer[Storm, P]): List[StormNode] = { - val depData = Dependants(tail) - val forkedNodes = Producer.transitiveDependenciesOf(tail) - .filter(depData.fanOut(_).exists(_ > 1)).toSet - def distinctAddToList[T](l : List[T], n : T): List[T] = if(l.contains(n)) l else (n :: l) - - // Add the dependentProducer to a Node along with each of its dependencies in turn. - def addWithDependencies[T](dependantProducer: Prod[T], previousBolt: StormNode, - stormRegistry: List[StormNode], visited: VisitedStore) : (List[StormNode], VisitedStore) = { - if (visited.contains(dependantProducer)) { - (distinctAddToList(stormRegistry, previousBolt), visited) - } else { - val currentBolt = previousBolt.add(dependantProducer) - val visitedWithN = visited + dependantProducer - - def recurse[U]( - producer: Prod[U], - updatedBolt: StormNode = currentBolt, - updatedRegistry: List[StormNode] = stormRegistry, - visited: VisitedStore = visitedWithN) - : (List[StormNode], VisitedStore) = { - addWithDependencies(producer, updatedBolt, updatedRegistry, visited) - } - - def mergableWithSource(dep: Prod[_]): Boolean = { - dep match { - case NamedProducer(producer, _) => true - case IdentityKeyedProducer(producer) => true - case OptionMappedProducer(producer, _) => true - case Source(_) => true - case AlsoProducer(_, _) => true - case _ => false - } - } - - def hasSummerAsNextProducer(p: Prod[_]): Boolean = - depData.dependantsOf(p).get.collect { case s: Summer[_, _, _] => s }.headOption.isDefined - - - def allDepsMergeableWithSource(p: Prod[_]): Boolean = mergableWithSource(p) && Producer.dependenciesOf(p).forall(allDepsMergeableWithSource) - - /* - * The purpose of this method is to see if we need to add a new physical node to the graph, - * or if we can continue by adding this producer to the current physical node. - * - * This function acts as a look ahead, rather than depending on the state of the current node it depends - * on the nodes further along in the dag. That is conditions for spliting into multiple Nodes are based on as yet - * unvisisted Producers. - */ - def maybeSplitThenRecurse[U, A](currentProducer: Prod[U], dep: Prod[A]): (List[StormNode], VisitedStore) = { - val doSplit = dep match { - case _ if (forkedNodes.contains(dep)) => true - // If we are a flatmap, but there haven't been any other flatmaps yet(i.e. the registry is of size 1, the summer). - // Then we must split to avoid a 2 level higherarchy - case _ if (currentBolt.isInstanceOf[FlatMapNode[_]] && hasSummerAsNextProducer(currentProducer) && allDepsMergeableWithSource(dep)) => true - case _ if ((!mergableWithSource(currentProducer)) && allDepsMergeableWithSource(dep)) => true - case _ => false - } - if (doSplit) { - recurse(dep, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(stormRegistry, currentBolt)) - } else { - recurse(dep) - } - } - - - - /* - * This is a peek ahead when we meet a MergedProducer. We pull the directly depended on MergedProducer's into the same Node, - * only if that MergedProducer is not a fan out node. - * This has the effect of pulling all of the merged streams in as siblings rather than just the two. - * From this we return a list of the MergedProducers which should be combined into the current Node, and the list of nodes - * on which these nodes depends (the producers passing data into these MergedProducer). - */ - - def mergeCollapse[A](p: Prod[A]): (List[Prod[A]], List[Prod[A]]) = { - p match { - case MergedProducer(subL, subR) if !forkedNodes.contains(p) => - // TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237 - if(subL == subR) throw new Exception("Storm doesn't support both the left and right sides of a join being the same node.") - val (lMergeNodes, lSiblings) = mergeCollapse(subL) - val (rMergeNodes, rSiblings) = mergeCollapse(subR) - (distinctAddToList((lMergeNodes ::: rMergeNodes).distinct, p), (lSiblings ::: rSiblings).distinct) - case _ => (List(), List(p)) - } - } - - dependantProducer match { - case Summer(producer, _, _) => recurse(producer, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(stormRegistry, currentBolt.toSummer)) - case IdentityKeyedProducer(producer) => maybeSplitThenRecurse(dependantProducer, producer) - case NamedProducer(producer, newId) => maybeSplitThenRecurse(dependantProducer, producer) - case AlsoProducer(lProducer, rProducer) => - val (updatedReg, updatedVisited) = maybeSplitThenRecurse(dependantProducer, rProducer) - recurse(lProducer, FlatMapNode(), updatedReg, updatedVisited) - case Source(spout) => (distinctAddToList(stormRegistry, currentBolt.toSource), visitedWithN) - case OptionMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer) - case FlatMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer) - case WrittenProducer(producer, sinkSupplier) => maybeSplitThenRecurse(dependantProducer, producer) - case LeftJoinedProducer(producer, StoreWrapper(newService)) => maybeSplitThenRecurse(dependantProducer, producer) - case MergedProducer(l, r) => - // TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237 - if(l == r) throw new Exception("Storm doesn't support both the left and right sides of a join being the same node.") - val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer) - val newCurrentBolt = otherMergeNodes.foldLeft(currentBolt)(_.add(_)) - val visitedWithOther = otherMergeNodes.foldLeft(visitedWithN){ (visited, n) => visited + n } - - // Recurse down all the newly generated dependencies - dependencies.foldLeft((distinctAddToList(stormRegistry, newCurrentBolt), visitedWithOther)) { case ((newStormReg, newVisited), n) => - recurse(n, FlatMapNode(), newStormReg, newVisited) - } - } - } - } - val (stormRegistry, _) = addWithDependencies(tail, FlatMapNode(), List[StormNode](), Set()) - stormRegistry - } -} diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormPlanTopology.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormPlanTopology.scala new file mode 100644 index 000000000..d17c0bef4 --- /dev/null +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormPlanTopology.scala @@ -0,0 +1,85 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird.storm + +import com.twitter.storehaus.JMapStore +import com.twitter.storehaus.algebra.MergeableStore +import com.twitter.summingbird._ +import com.twitter.summingbird.planner._ +import com.twitter.summingbird.storm.planner._ +import com.twitter.summingbird.batch.{BatchID, Batcher} +import com.twitter.summingbird.storm.spout.TraversableSpout +import com.twitter.util.Future +import org.scalacheck._ +import Gen._ +import Arbitrary._ +import org.scalacheck.Prop._ + + + +object StormPlanTopology extends Properties("StormDag") { + + implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L) + implicit val batcher = Batcher.unit + private type StormDag = Dag[Storm] + + import TestGraphGenerators._ + implicit def sink1: Storm#Sink[Int] = (() => ((_) => Future.Unit)) + implicit def sink2: Storm#Sink[(Int, Int)] = (() => ((_) => Future.Unit)) + + implicit def testStore: Storm#Store[Int, Int] = MergeableStoreSupplier.from {MergeableStore.fromStore[(Int, BatchID), Int](new JMapStore[(Int, BatchID), Int]())} + + implicit def arbSource1: Arbitrary[Producer[Storm, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[Int]).map{x: List[Int] => Storm.source(TraversableSpout(x))}) + implicit def arbSource2: Arbitrary[KeyedProducer[Storm, Int, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[(Int, Int)]).map{x: List[(Int, Int)] => IdentityKeyedProducer(Storm.source(TraversableSpout(x)))}) + + + lazy val genDag : Gen[StormDag]= for { + tail <- summed + } yield OnlinePlan(tail) + + implicit def genProducer: Arbitrary[StormDag] = Arbitrary(genDag) + + + + val testFn = { i: Int => List((i -> i)) } + + def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get + + var dumpNumber = 1 + def dumpGraph(dag: StormDag) = { + import java.io._ + import com.twitter.summingbird.viz.VizGraph + val writer2 = new PrintWriter(new File("/tmp/failingGraph" + dumpNumber + ".dot")) + VizGraph(dag, writer2) + writer2.close() + dumpNumber = dumpNumber + 1 + } + + property("Can plan to a Storm Topology") = forAll { (dag: StormDag) => + try { + Storm.local().plan(dag.tail) + true + } catch { + case e: Throwable => + dumpGraph(dag) + println(e) + e.printStackTrace + false + } + + } +}