Permalink
Browse files

Track state in summingbird-online as an Iterator rather than a Seq. (#…

…703)

Track state in summingbird-online as a chain rather than a Seq. This avoids n^2 compute complexity when summing single element lists of Storm tuples.
  • Loading branch information...
1 parent 288af29 commit 77b65d5fc9602af74de92596b9d49fc64b236620 @pankajroark pankajroark committed on GitHub Jan 4, 2017
View
@@ -32,6 +32,7 @@ val storehausVersion = "0.15.0-RC1"
val stormDep = "storm" % "storm" % "0.9.0-wip15" //This project also compiles with the latest storm, which is in fact required to run the example
val tormentaVersion = "0.11.1"
val utilVersion = "6.34.0"
+val chainVersion = "0.1.0"
val extraSettings = mimaDefaultSettings ++ scalariformSettings
@@ -235,7 +236,11 @@ val ignoredABIProblems = {
exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.storm.Storm.get"),
exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.storm.Storm.getOrElse"),
exclude[DirectMissingMethodProblem]("com.twitter.summingbird.storm.BaseBolt.apply"),
- exclude[IncompatibleResultTypeProblem]("com.twitter.summingbird.example.Memcache.client")
+ exclude[IncompatibleResultTypeProblem]("com.twitter.summingbird.example.Memcache.client"),
+ exclude[DirectMissingMethodProblem]("com.twitter.summingbird.online.executor.OperationContainer.notifyFailure"),
+ exclude[ReversedMissingMethodProblem]("com.twitter.summingbird.online.executor.OperationContainer.notifyFailure"),
+ exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.online.executor.AsyncBase.notifyFailure"),
+ exclude[IncompatibleMethTypeProblem]("com.twitter.summingbird.online.executor.Summer.notifyFailure")
)
}
@@ -293,7 +298,8 @@ lazy val summingbirdOnline = module("online").settings(
"com.twitter" %% "chill" % chillVersion,
"com.twitter" %% "storehaus-algebra" % storehausVersion,
"com.twitter" %% "util-core" % utilVersion,
- "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test"
+ "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test",
+ "org.spire-math" %% "chain" % chainVersion
)
).dependsOn(
summingbirdCore % "test->test;compile->compile",
@@ -315,6 +321,7 @@ lazy val summingbirdStorm = module("storm").settings(
"com.twitter" %% "scalding-args" % scaldingVersion,
"com.twitter" %% "tormenta-core" % tormentaVersion,
"com.twitter" %% "util-core" % utilVersion,
+ "org.spire-math" %% "chain" % chainVersion,
stormDep % "provided"
)
).dependsOn(
@@ -16,10 +16,11 @@ limitations under the License.
package com.twitter.summingbird.online.executor
+import com.twitter.algebird.Semigroup
import com.twitter.summingbird.online.FutureQueue
import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures }
import com.twitter.util._
-
+import chain.Chain
import scala.util.Try
abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaitingTime: MaxFutureWaitTime, maxEmitPerExec: MaxEmitPerExecute) extends Serializable with OperationContainer[I, O, S] {
@@ -29,21 +30,30 @@ abstract class AsyncBase[I, O, S](maxWaitingFutures: MaxWaitingFutures, maxWaiti
* cases that need to complete operations after or before doing a FlatMapOperation or
* doing a store merge
*/
- def apply(state: S, in: I): Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]]
- def tick: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]] = Future.value(Nil)
+ def apply(state: S, in: I): Future[TraversableOnce[(Chain[S], Future[TraversableOnce[O]])]]
+ def tick: Future[TraversableOnce[(Chain[S], Future[TraversableOnce[O]])]] = Future.value(Nil)
+
+ implicit def chainSemigroup[T]: Semigroup[Chain[T]] = new Semigroup[Chain[T]] {
+ override def plus(l: Chain[T], r: Chain[T]): Chain[T] = l ++ r
+ }
- private[executor] lazy val futureQueue = new FutureQueue[Seq[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime)
+ private[executor] lazy val futureQueue = new FutureQueue[Chain[S], TraversableOnce[O]](maxWaitingFutures, maxWaitingTime)
- override def executeTick: TraversableOnce[(Seq[S], Try[TraversableOnce[O]])] =
+ override def executeTick: TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] =
finishExecute(None, tick)
- override def execute(state: S, data: I): TraversableOnce[(Seq[S], Try[TraversableOnce[O]])] =
+ override def execute(state: S, data: I): TraversableOnce[(Chain[S], Try[TraversableOnce[O]])] =
finishExecute(Some(state), apply(state, data))
- private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[O]])]]) = {
+ private def finishExecute(failStateOpt: Option[S], fIn: Future[TraversableOnce[(Chain[S], Future[TraversableOnce[O]])]]) = {
fIn.respond {
case Return(iter) => futureQueue.addAll(iter)
- case Throw(ex) => futureQueue.add(failStateOpt.toSeq, Future.exception(ex))
+ case Throw(ex) =>
+ val failState = failStateOpt match {
+ case Some(state) => Chain.single(state)
+ case None => Chain.Empty
+ }
+ futureQueue.add(failState, Future.exception(ex))
}
futureQueue.dequeue(maxEmitPerExec.get)
}
@@ -29,6 +29,7 @@ import com.twitter.summingbird.online.option.{
MaxFutureWaitTime,
MaxEmitPerExecute
}
+import chain.Chain
import scala.collection.mutable.{ Map => MMap, ListBuffer }
// These CMaps we generate in the FFM, we use it as an immutable wrapper around
// a mutable map.
@@ -67,15 +68,16 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]](
val lockedOp = Externalizer(flatMapOp)
type SummerK = Key
- type SummerV = (Seq[S], Value)
- lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Seq[S], Value)]])
+ type SummerV = (Chain[S], Value)
+
+ lazy val sCache = summerBuilder.getSummer[SummerK, SummerV](implicitly[Semigroup[(Chain[S], Value)]])
// Lazy transient as const futures are not serializable
@transient private[this] lazy val noData = List(
- (List(), Future.value(Nil))
+ (Chain.empty, Future.value(Nil))
)
- private def formatResult(outData: Map[Key, (Seq[S], Value)]): TraversableOnce[(Seq[S], Future[TraversableOnce[OutputElement]])] = {
+ private def formatResult(outData: Map[Key, (Chain[S], Value)]): TraversableOnce[(Chain[S], Future[TraversableOnce[OutputElement]])] = {
if (outData.isEmpty) {
noData
} else {
@@ -85,34 +87,34 @@ class FinalFlatMap[Event, Key, Value: Semigroup, S <: InputState[_]](
case (k, (listS, v)) =>
val newK = summerShards.summerIdFor(k)
val (buffer, mmap) = mmMap.getOrElseUpdate(newK, (ListBuffer[S](), MMap[Key, Value]()))
- buffer ++= listS
+ buffer ++= listS.iterator
mmap += k -> v
}
mmMap.toIterator.map {
case (outerKey, (listS, innerMap)) =>
- (listS, Future.value(List((outerKey, innerMap))))
+ (Chain(listS), Future.value(List((outerKey, innerMap))))
}
}
}
- override def tick: Future[TraversableOnce[(Seq[S], Future[TraversableOnce[OutputElement]])]] =
+ override def tick: Future[TraversableOnce[(Chain[S], Future[TraversableOnce[OutputElement]])]] =
sCache.tick.map(formatResult(_))
def cache(state: S,
- items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Seq[S], Future[TraversableOnce[OutputElement]])]] =
+ items: TraversableOnce[(Key, Value)]): Future[TraversableOnce[(Chain[S], Future[TraversableOnce[OutputElement]])]] =
try {
val itemL = items.toList
if (itemL.size > 0) {
state.fanOut(itemL.size)
sCache.addAll(itemL.map {
case (k, v) =>
- k -> (List(state), v)
+ k -> (Chain.single(state), v)
}).map(formatResult(_))
} else { // Here we handle mapping to nothing, option map et. al
Future.value(
List(
- (List(state), Future.value(Nil))
+ (Chain.single(state), Future.value(Nil))
)
)
}
@@ -25,6 +25,7 @@ import com.twitter.summingbird.online.option.{
MaxFutureWaitTime,
MaxEmitPerExecute
}
+import chain.Chain
class IntermediateFlatMap[T, U, S](
@transient flatMapOp: FlatMapOperation[T, U],
@@ -35,9 +36,9 @@ class IntermediateFlatMap[T, U, S](
val lockedOp = Externalizer(flatMapOp)
override def apply(state: S,
- tup: T): Future[Iterable[(List[S], Future[TraversableOnce[U]])]] =
+ tup: T): Future[Iterable[(Chain[S], Future[TraversableOnce[U]])]] =
lockedOp.get.apply(tup).map { res =>
- List((List(state), Future.value(res)))
+ List((Chain.single(state), Future.value(res)))
}
override def cleanup(): Unit = lockedOp.get.close
@@ -16,14 +16,15 @@ limitations under the License.
package com.twitter.summingbird.online.executor
+import chain.Chain
import scala.util.Try
trait OperationContainer[Input, Output, State] {
def init(): Unit = {}
def cleanup(): Unit = {}
- def executeTick: TraversableOnce[(Seq[State], Try[TraversableOnce[Output]])]
+ def executeTick: TraversableOnce[(Chain[State], Try[TraversableOnce[Output]])]
def execute(state: State,
- data: Input): TraversableOnce[(Seq[State], Try[TraversableOnce[Output]])]
- def notifyFailure(inputs: Seq[State], e: Throwable): Unit = {}
-}
+ data: Input): TraversableOnce[(Chain[State], Try[TraversableOnce[Output]])]
+ def notifyFailure(inputs: Chain[State], e: Throwable): Unit = {}
+}
@@ -24,6 +24,8 @@ import com.twitter.storehaus.algebra.Mergeable
import com.twitter.summingbird.online.{ FlatMapOperation, Externalizer }
import com.twitter.summingbird.online.option._
+import chain.Chain
+
// These CMaps we generate in the FFM, we use it as an immutable wrapper around
// a mutable map.
import scala.collection.{ Map => CMap }
@@ -72,7 +74,8 @@ class Summer[Key, Value: Semigroup, Event, S](
lazy val storePromise = Promise[Mergeable[Key, Value]]
lazy val store = Await.result(storePromise)
- lazy val sSummer: AsyncSummer[(Key, (Seq[InputState[S]], Value)), Map[Key, (Seq[InputState[S]], Value)]] = summerBuilder.getSummer[Key, (Seq[InputState[S]], Value)](implicitly[Semigroup[(Seq[InputState[S]], Value)]])
+ lazy val sSummer: AsyncSummer[(Key, (Chain[InputState[S]], Value)), Map[Key, (Chain[InputState[S]], Value)]] =
+ summerBuilder.getSummer[Key, (Chain[InputState[S]], Value)](implicitly[Semigroup[(Chain[InputState[S]], Value)]])
val exceptionHandlerBox = Externalizer(exceptionHandler.handlerFn.lift)
val successHandlerBox = Externalizer(successHandler)
@@ -86,12 +89,12 @@ class Summer[Key, Value: Semigroup, Event, S](
successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None
}
- override def notifyFailure(inputs: Seq[InputState[S]], error: Throwable): Unit = {
+ override def notifyFailure(inputs: Chain[InputState[S]], error: Throwable): Unit = {
super.notifyFailure(inputs, error)
exceptionHandlerBox.get.apply(error)
}
- private def handleResult(kvs: Map[Key, (Seq[InputState[S]], Value)]): TraversableOnce[(Seq[InputState[S]], Future[TraversableOnce[Event]])] =
+ private def handleResult(kvs: Map[Key, (Chain[InputState[S]], Value)]): TraversableOnce[(Chain[InputState[S]], Future[TraversableOnce[Event]])] =
store.multiMerge(kvs.mapValues(_._2)).iterator.map {
case (k, beforeF) =>
val (tups, delta) = kvs(k)
@@ -110,7 +113,7 @@ class Summer[Key, Value: Semigroup, Event, S](
state.fanOut(innerTuples.size)
val cacheEntries = innerTuples.map {
case (k, v) =>
- (k, (List(state), v))
+ (k, (Chain.single(state), v))
}
sSummer.addAll(cacheEntries).map(handleResult(_))
@@ -20,37 +20,41 @@ import com.twitter.conversions.time._
import com.twitter.summingbird.online.FutureQueue
import com.twitter.summingbird.online.option.{ MaxEmitPerExecute, MaxFutureWaitTime, MaxWaitingFutures }
import com.twitter.util.{ Await, Future, Promise }
+import chain.Chain
import org.scalatest.WordSpec
import scala.util.Try
class AsyncBaseSpec extends WordSpec {
- val data = Seq((Seq(100, 104, 99), Future(Seq(9, 10, 13))), (Seq(12, 19), Future(Seq(100, 200, 500))))
- val dequeueData = List((Seq(8, 9), Try(Seq(4, 5, 6))))
+ val data = Seq(
+ (Chain(Seq(100, 104, 99)), Future(Seq(9, 10, 13))),
+ (Chain(Seq(12, 19)), Future(Seq(100, 200, 500))))
- class TestFutureQueue extends FutureQueue[Seq[Int], TraversableOnce[Int]](
+ val dequeueData = List((Chain(Seq(8, 9)), Try(Seq(4, 5, 6))))
+
+ class TestFutureQueue extends FutureQueue[Chain[Int], TraversableOnce[Int]](
MaxWaitingFutures(100),
MaxFutureWaitTime(1.minute)
) {
var added = false
- var addedData: (Seq[Int], Future[TraversableOnce[Int]]) = _
- var addedAllData: TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])] = _
+ var addedData: (Chain[Int], Future[TraversableOnce[Int]]) = _
+ var addedAllData: TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])] = _
var dequeued = false
var dequeuedCount: Int = 0
- override def add(state: Seq[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized {
+ override def add(state: Chain[Int], fut: Future[TraversableOnce[Int]]): Unit = synchronized {
assert(!added)
added = true
addedData = (state, fut)
}
override def addAll(
- iter: TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]): Unit = synchronized {
+ iter: TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]): Unit = synchronized {
assert(!added)
added = true
addedAllData = iter
}
- override def dequeue(maxItems: Int): Seq[(Seq[Int], Try[TraversableOnce[Int]])] = synchronized {
+ override def dequeue(maxItems: Int): Seq[(Chain[Int], Try[TraversableOnce[Int]])] = synchronized {
assert(!dequeued)
dequeued = true
dequeuedCount = maxItems
@@ -60,8 +64,8 @@ class AsyncBaseSpec extends WordSpec {
class TestAsyncBase(
queue: TestFutureQueue,
- tickData: => Future[TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"),
- applyData: => Future[TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int](
+ tickData: => Future[TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented"),
+ applyData: => Future[TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]] = throw new RuntimeException("not implemented")) extends AsyncBase[Int, Int, Int](
MaxWaitingFutures(100),
MaxFutureWaitTime(1.minute),
MaxEmitPerExecute(57)
@@ -71,7 +75,7 @@ class AsyncBaseSpec extends WordSpec {
override def tick = tickData
}
- def promise = Promise[TraversableOnce[(Seq[Int], Future[TraversableOnce[Int]])]]
+ def promise = Promise[TraversableOnce[(Chain[Int], Future[TraversableOnce[Int]])]]
"Queues tick on executeTick" in {
val queue = new TestFutureQueue
@@ -112,7 +116,7 @@ class AsyncBaseSpec extends WordSpec {
p.setException(ex)
assert(queue.added)
- assert(queue.addedData._1 === Nil)
+ assert(queue.addedData._1.iterator.isEmpty)
assert(ex === intercept[RuntimeException] { Await.result(queue.addedData._2) })
}
@@ -127,7 +131,7 @@ class AsyncBaseSpec extends WordSpec {
p.setException(ex)
assert(queue.added)
- assert(queue.addedData._1 === List(1089))
+ assert(queue.addedData._1 === Chain.single(1089))
assert(ex === intercept[RuntimeException] { Await.result(queue.addedData._2) })
}
}
@@ -29,6 +29,7 @@ import com.twitter.summingbird.online.executor.InputState
import com.twitter.summingbird.option.JobId
import com.twitter.summingbird.{ Group, JobCounters, Name, SummingbirdRuntimeStats }
import com.twitter.summingbird.online.Externalizer
+import chain.Chain
import scala.collection.JavaConverters._
import java.util.{ List => JList }
import org.slf4j.{ Logger, LoggerFactory }
@@ -118,10 +119,10 @@ case class BaseBolt[I, O](jobID: JobId,
logger.error(message, err)
}
- private def fail(inputs: Seq[InputState[Tuple]], error: Throwable): Unit = {
+ private def fail(inputs: Chain[InputState[Tuple]], error: Throwable): Unit = {
executor.notifyFailure(inputs, error)
if (!earlyAck) { inputs.foreach(_.fail(collector.fail(_))) }
- logError("Storm DAG of: %d tuples failed".format(inputs.size), error)
+ logError("Storm DAG of: %d tuples failed".format(inputs.iterator.size), error)
}
override def execute(tuple: Tuple) = {
@@ -149,12 +150,13 @@ case class BaseBolt[I, O](jobID: JobId,
}
}
- private def finish(inputs: Seq[InputState[Tuple]], results: TraversableOnce[O]) {
+ private def finish(inputs: Chain[InputState[Tuple]], results: TraversableOnce[O]) {
var emitCount = 0
if (hasDependants) {
if (anchorTuples.anchor) {
+ val states = inputs.iterator.map(_.state).toList.asJava
results.foreach { result =>
- collector.emit(inputs.map(_.state).asJava, encoder(result))
+ collector.emit(states, encoder(result))
emitCount += 1
}
} else { // don't anchor
@@ -167,7 +169,9 @@ case class BaseBolt[I, O](jobID: JobId,
// Always ack a tuple on completion:
if (!earlyAck) { inputs.foreach(_.ack(collector.ack(_))) }
- logger.debug("bolt finished processed {} linked tuples, emitted: {}", inputs.size, emitCount)
+ if (logger.isDebugEnabled()) {
+ logger.debug("bolt finished processed {} linked tuples, emitted: {}", inputs.iterator.size, emitCount)
+ }
}
override def prepare(conf: JMap[_, _], context: TopologyContext, oc: OutputCollector) {

0 comments on commit 77b65d5

Please sign in to comment.