Skip to content
This repository was archived by the owner on Jan 20, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.

package com.twitter.summingbird.store

import com.twitter.chill.MeatLocker
import com.twitter.chill.Externalizer
import com.twitter.storehaus.ReadableStore
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.summingbird.batch.BatchID
Expand All @@ -33,8 +33,7 @@ class CompoundStore[K, V] private (
@transient offline: Option[BatchedScaldingStore[K, V]],
online: Option[() => MergeableStore[(K, BatchID), V]])
extends Serializable {
// MeatLocker these to protect them from serialization errors.
private val offlineBox = MeatLocker(offline)
private val offlineBox = Externalizer(offline)
def offlineStore: Option[BatchedScaldingStore[K, V]] = offlineBox.get
def onlineSupplier: Option[() => MergeableStore[(K, BatchID), V]] = online
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.planner

import com.twitter.summingbird._

class OnlinePlan[P <: Platform[P], V](tail: Producer[P, V]) {
private type Prod[T] = Producer[P, T]
private type VisitedStore = Set[Prod[_]]
private type CNode = Node[P]
private type CFlatMapNode = FlatMapNode[P]

private val depData = Dependants(tail)
private val forkedNodes = Producer.transitiveDependenciesOf(tail)
.filter(depData.fanOut(_).exists(_ > 1)).toSet
private def distinctAddToList[T](l : List[T], n : T): List[T] = if(l.contains(n)) l else (n :: l)

private def mergableWithSource(dep: Producer[P, _]): Boolean = {
dep match {
case NamedProducer(producer, _) => true
case IdentityKeyedProducer(producer) => true
case OptionMappedProducer(producer, _) => true
case Source(_) => true
case AlsoProducer(_, _) => true
case _ => false
}
}

private def hasSummerAsNextProducer(p: Prod[_]): Boolean =
depData.dependantsOf(p).get.collect { case s: Summer[_, _, _] => s }.headOption.isDefined

private def allDepsMergeableWithSource(p: Prod[_]): Boolean = mergableWithSource(p) && Producer.dependenciesOf(p).forall(allDepsMergeableWithSource)

// Add the dependentProducer to a Node along with each of its dependencies in turn.
private def addWithDependencies[T](dependantProducer: Prod[T], previousBolt: CNode,
akkaRegistry: List[CNode], visited: VisitedStore) : (List[CNode], VisitedStore) = {

if (visited.contains(dependantProducer)) {
(distinctAddToList(akkaRegistry, previousBolt), visited)
} else {
val currentBolt = previousBolt.add(dependantProducer)
val visitedWithN = visited + dependantProducer

def recurse[U](
producer: Prod[U],
updatedBolt: CNode = currentBolt,
updatedRegistry: List[CNode] = akkaRegistry,
visited: VisitedStore = visitedWithN)
: (List[CNode], VisitedStore) = {
addWithDependencies(producer, updatedBolt, updatedRegistry, visited)
}

/*
* The purpose of this method is to see if we need to add a new physical node to the graph,
* or if we can continue by adding this producer to the current physical node.
*
* This function acts as a look ahead, rather than depending on the state of the current node it depends
* on the nodes further along in the dag. That is conditions for spliting into multiple Nodes are based on as yet
* unvisisted Producers.
*/
def maybeSplitThenRecurse[U, A](currentProducer: Prod[U], dep: Prod[A]): (List[CNode], VisitedStore) = {
val doSplit = dep match {
case _ if (forkedNodes.contains(dep)) => true
// If we are a flatmap, but there haven't been any other flatmaps yet(i.e. the registry is of size 1, the summer).
// Then we must split to avoid a 2 level higherarchy
case _ if (currentBolt.isInstanceOf[FlatMapNode[_]] && hasSummerAsNextProducer(currentProducer) && allDepsMergeableWithSource(dep)) => true
case _ if ((!mergableWithSource(currentProducer)) && allDepsMergeableWithSource(dep)) => true
case _ => false
}
if (doSplit) {
recurse(dep, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(akkaRegistry, currentBolt))
} else {
recurse(dep)
}
}

/*
* This is a peek ahead when we meet a MergedProducer. We pull the directly depended on MergedProducer's into the same Node,
* only if that MergedProducer is not a fan out node.
* This has the effect of pulling all of the merged streams in as siblings rather than just the two.
* From this we return a list of the MergedProducers which should be combined into the current Node, and the list of nodes
* on which these nodes depends (the producers passing data into these MergedProducer).
*/

def mergeCollapse[A](p: Prod[A]): (List[Prod[A]], List[Prod[A]]) = {
p match {
case MergedProducer(subL, subR) if !forkedNodes.contains(p) =>
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237
if(subL == subR) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.")
val (lMergeNodes, lSiblings) = mergeCollapse(subL)
val (rMergeNodes, rSiblings) = mergeCollapse(subR)
(distinctAddToList((lMergeNodes ::: rMergeNodes).distinct, p), (lSiblings ::: rSiblings).distinct)
case _ => (List(), List(p))
}
}

dependantProducer match {
case Summer(producer, _, _) => recurse(producer, updatedBolt = FlatMapNode(), updatedRegistry = distinctAddToList(akkaRegistry, currentBolt.toSummer))
case IdentityKeyedProducer(producer) => maybeSplitThenRecurse(dependantProducer, producer)
case NamedProducer(producer, newId) => maybeSplitThenRecurse(dependantProducer, producer)
case AlsoProducer(lProducer, rProducer) =>
val (updatedReg, updatedVisited) = maybeSplitThenRecurse(dependantProducer, rProducer)
recurse(lProducer, FlatMapNode(), updatedReg, updatedVisited)
case Source(spout) => (distinctAddToList(akkaRegistry, currentBolt.toSource), visitedWithN)
case OptionMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer)
case FlatMappedProducer(producer, op) => maybeSplitThenRecurse(dependantProducer, producer)
case WrittenProducer(producer, sinkSupplier) => maybeSplitThenRecurse(dependantProducer, producer)
case LeftJoinedProducer(producer, _) => maybeSplitThenRecurse(dependantProducer, producer)
case MergedProducer(l, r) =>
// TODO support de-duping self merges https://github.com/twitter/summingbird/issues/237
if(l == r) throw new Exception("Online Planner doesn't support both the left and right sides of a join being the same node.")
val (otherMergeNodes, dependencies) = mergeCollapse(dependantProducer)
val newCurrentBolt = otherMergeNodes.foldLeft(currentBolt)(_.add(_))
val visitedWithOther = otherMergeNodes.foldLeft(visitedWithN){ (visited, n) => visited + n }

// Recurse down all the newly generated dependencies
dependencies.foldLeft((distinctAddToList(akkaRegistry, newCurrentBolt), visitedWithOther)) { case ((newAkkaReg, newVisited), n) =>
recurse(n, FlatMapNode(), newAkkaReg, newVisited)
}
}
}
}

// This takes an initial pass through all of the Producers, assigning them to Nodes
private def buildNodesSet: List[CNode] = {
val (akkaRegistry, _) = addWithDependencies(tail, FlatMapNode(), List[CNode](), Set())
akkaRegistry
}
}

object OnlinePlan {
def apply[P <: Platform[P], T](tail: Producer[P, T]): Dag[P] = {
val planner = new OnlinePlan(tail)
val akkaNodeSet: List[Node[P]] = planner.buildNodesSet

// The nodes are added in a source -> summer way with how we do list prepends
// but its easier to look at laws in a summer -> source manner
// We also drop all Nodes with no members(may occur when we visit a node already seen and its the first in that Node)
val reversedNodeSet = akkaNodeSet.filter(_.members.size > 0).foldLeft(List[Node[P]]()){(nodes, n) => n.reverse :: nodes}
Dag(tail, reversedNodeSet)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ object TestGraphGenerators {
def genProd2[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]],
genSource2 : Arbitrary[KeyedProducer[P, Int, Int]],
testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[KeyedProducer[P, Int, Int]] =
frequency((25, genSource2.arbitrary), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2), (2, also2),
(0, also2), (3, genFlatMap22), (3, genFlatMap12))
frequency((25, genSource2.arbitrary), (3, genOptMap12), (3, genOptMap22), (4, genWrite22), (1, genMerged2), (1, also2),
(3, genFlatMap22), (3, genFlatMap12))


def genProd1[P <: Platform[P]](implicit genSource1 : Arbitrary[Producer[P, Int]],
genSource2 : Arbitrary[KeyedProducer[P, Int, Int]],
testStore: P#Store[Int, Int], sink1: P#Sink[Int], sink2: P#Sink[(Int, Int)]): Gen[Producer[P, Int]] =
frequency((25, genSource1.arbitrary), (8, genNamedProducer11), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (2, also1), (3, genFlatMap11),
(0, also1), (3, genFlatMap21))
frequency((25, genSource1.arbitrary), (8, genNamedProducer11), (3, genOptMap11), (3, genOptMap21), (1, genMerged1), (1, also1), (3, genFlatMap11),
(3, genFlatMap21))

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,46 @@
limitations under the License.
*/

package com.twitter.summingbird.storm
package com.twitter.summingbird.online

import com.twitter.storehaus.JMapStore
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.summingbird._
import com.twitter.summingbird.planner._
import com.twitter.summingbird.storm.planner._
import com.twitter.summingbird.batch.{BatchID, Batcher}
import com.twitter.summingbird.storm.spout.TraversableSpout
import com.twitter.util.Future
import com.twitter.summingbird.memory.Memory
import scala.collection.mutable.{Map => MMap}
import org.scalacheck._
import Gen._
import Arbitrary._
import org.scalacheck.Prop._



object TopologyPlannerLaws extends Properties("StormDag") {
object TopologyPlannerLaws extends Properties("Online Dag") {

implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L)
implicit val batcher = Batcher.unit
private type StormDag = Dag[Storm]
private type MemoryDag = Dag[Memory]

import TestGraphGenerators._
implicit def sink1: Storm#Sink[Int] = (() => ((_) => Future.Unit))
implicit def sink2: Storm#Sink[(Int, Int)] = (() => ((_) => Future.Unit))

implicit def testStore: Storm#Store[Int, Int] = MergeableStoreSupplier.from {MergeableStore.fromStore[(Int, BatchID), Int](new JMapStore[(Int, BatchID), Int]())}
implicit def sink1: Memory#Sink[Int] = {x: Int => x}
implicit def sink2: Memory#Sink[(Int, Int)] = {x: (Int, Int) => x}

implicit def arbSource1: Arbitrary[Producer[Storm, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[Int]).map{x: List[Int] => Storm.source(TraversableSpout(x))})
implicit def arbSource2: Arbitrary[KeyedProducer[Storm, Int, Int]] = Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[(Int, Int)]).map{x: List[(Int, Int)] => IdentityKeyedProducer(Storm.source(TraversableSpout(x)))})
implicit def testStore: Memory#Store[Int, Int] = MMap[Int, Int]()

implicit def arbSource1: Arbitrary[Producer[Memory, Int]] =
Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[Int]).map{
x: List[Int] =>
Memory.toSource(x)})
implicit def arbSource2: Arbitrary[KeyedProducer[Memory, Int, Int]] =
Arbitrary(Gen.listOfN(5000, Arbitrary.arbitrary[(Int, Int)]).map{
x: List[(Int, Int)] =>
IdentityKeyedProducer(Memory.toSource(x))})


lazy val genDag : Gen[StormDag]= for {
lazy val genDag : Gen[MemoryDag]= for {
tail <- summed
} yield DagBuilder(tail)
} yield OnlinePlan(tail)

implicit def genProducer: Arbitrary[StormDag] = Arbitrary(genDag)
implicit def genProducer: Arbitrary[MemoryDag] = Arbitrary(genDag)



Expand All @@ -60,7 +62,7 @@ object TopologyPlannerLaws extends Properties("StormDag") {
def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get

var dumpNumber = 1
def dumpGraph(dag: StormDag) = {
def dumpGraph(dag: MemoryDag) = {
import java.io._
import com.twitter.summingbird.viz.VizGraph
val writer2 = new PrintWriter(new File("/tmp/failingGraph" + dumpNumber + ".dot"))
Expand All @@ -69,17 +71,17 @@ object TopologyPlannerLaws extends Properties("StormDag") {
dumpNumber = dumpNumber + 1
}

property("Dag Nodes must be unique") = forAll { (dag: StormDag) =>
property("Dag Nodes must be unique") = forAll { (dag: MemoryDag) =>
dag.nodes.size == dag.nodes.toSet.size
}

property("Must have at least one producer in each StormNode") = forAll { (dag: StormDag) =>
property("Must have at least one producer in each MemoryNode") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
n.members.size > 0
}
}

property("If a StormNode contains a Summer, it must be the first Producer in that StormNode") = forAll { (dag: StormDag) =>
property("If a MemoryNode contains a Summer, it must be the first Producer in that MemoryNode") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val firstP = n.members.last
n.members.forall{p =>
Expand All @@ -90,15 +92,15 @@ object TopologyPlannerLaws extends Properties("StormDag") {
}
}

property("The first producer in a storm node cannot be a NamedProducer") = forAll { (dag: StormDag) =>
property("The first producer in a online node cannot be a NamedProducer") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val inError = n.members.last.isInstanceOf[NamedProducer[_, _]]
if(inError) dumpGraph(dag)
!inError
}
}

property("0 or more merge producers at the start of every storm bolts, followed by 1+ non-merge producers and no other merge producers following those.") = forAll { (dag: StormDag) =>
property("0 or more merge producers at the start of every online bolts, followed by 1+ non-merge producers and no other merge producers following those.") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val (_, inError) = n.members.foldLeft((false, false)) { case ((seenMergeProducer, inError), producer) =>
producer match {
Expand All @@ -112,7 +114,7 @@ object TopologyPlannerLaws extends Properties("StormDag") {
}
}

property("The last producer in any StormNode prior to a summer must be a KeyedProducer") = forAll { (dag: StormDag) =>
property("The last producer in any online prior to a summer must be a KeyedProducer") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val firstP = n.members.last
firstP match {
Expand All @@ -125,20 +127,20 @@ object TopologyPlannerLaws extends Properties("StormDag") {
}
}

property("No producer is repeated") = forAll { (dag: StormDag) =>
property("No producer is repeated") = forAll { (dag: MemoryDag) =>
val numAllProducers = dag.nodes.foldLeft(0){(sum, n) => sum + n.members.size}
val allProducersSet = dag.nodes.foldLeft(Set[Producer[Storm, _]]()){(runningSet, n) => runningSet | n.members.toSet}
val allProducersSet = dag.nodes.foldLeft(Set[Producer[Memory, _]]()){(runningSet, n) => runningSet | n.members.toSet}
numAllProducers == allProducersSet.size
}


property("All producers are in a StormNode") = forAll { (dag: StormDag) =>
property("All producers are in a Node") = forAll { (dag: MemoryDag) =>
val allProducers = Producer.entireGraphOf(dag.tail).toSet + dag.tail
val numAllProducersInDag = dag.nodes.foldLeft(0){(sum, n) => sum + n.members.size}
allProducers.size == numAllProducersInDag
}

property("Only spouts can have no incoming dependencies") = forAll { (dag: StormDag) =>
property("Only sources can have no incoming dependencies") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val success = n match {
case _: SourceNode[_] => true
Expand All @@ -150,7 +152,7 @@ object TopologyPlannerLaws extends Properties("StormDag") {
}


property("Spouts must have no incoming dependencies, and they must have dependants") = forAll { (dag: StormDag) =>
property("Sources must have no incoming dependencies, and they must have dependants") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val success = n match {
case _: SourceNode[_] =>
Expand All @@ -163,7 +165,7 @@ object TopologyPlannerLaws extends Properties("StormDag") {
}


property("Prior to a summer the StormNode should be a FinalFlatMapStormBolt") = forAll { (dag: StormDag) =>
property("Prior to a summer the Nonde should be a FlatMap Node") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val firstP = n.members.last
val success = firstP match {
Expand All @@ -178,7 +180,7 @@ object TopologyPlannerLaws extends Properties("StormDag") {
}
}

property("There should be no flatmap producers in the source node") = forAll { (dag: StormDag) =>
property("There should be no flatmap producers in the source node") = forAll { (dag: MemoryDag) =>
dag.nodes.forall{n =>
val success = n match {
case n: SourceNode[_] => n.members.forall{p => !p.isInstanceOf[FlatMappedProducer[_, _, _]]}
Expand All @@ -189,22 +191,9 @@ object TopologyPlannerLaws extends Properties("StormDag") {
}
}

property("Nodes in the storm DAG should have unique names") = forAll { (dag: StormDag) =>
property("Nodes in the DAG should have unique names") = forAll { (dag: MemoryDag) =>
val allNames = dag.nodes.toList.map{n => dag.getNodeName(n)}
allNames.size == allNames.distinct.size
}

property("Can plan to a Storm Topology") = forAll { (dag: StormDag) =>
try {
Storm.local().plan(dag.tail)
true
} catch {
case e: Throwable =>
dumpGraph(dag)
println(e)
e.printStackTrace
false
}

}
}
Loading