diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/TimeExtractor.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/TimeExtractor.scala index a416abbaa..44b8fdd87 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/TimeExtractor.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/TimeExtractor.scala @@ -28,4 +28,10 @@ object TimeExtractor { } } -trait TimeExtractor[T] extends (T => Long) with java.io.Serializable +/** This cannot be a subclass of function and use the pattern + * of implicit dependencies, since then you get an implicit function. + * Not good + */ +trait TimeExtractor[T] extends java.io.Serializable { + def apply(t: T): Long +} diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/FutureQueue.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/FutureQueue.scala deleted file mode 100644 index 84755ae24..000000000 --- a/summingbird-online/src/main/scala/com/twitter/summingbird/online/FutureQueue.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* -Copyright 2013 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package com.twitter.summingbird.online - -import com.twitter.util.{ Await, Future } -import scala.collection.mutable.{ Queue => MutableQueue } - -/** - * Maintains a rolling window of futures. Future # n is - * forced after Future (n + maxLength) is added to the - * queue. - * - * TODO (https://github.com/twitter/summingbird/issues/83): remove - * this in favor of BufferingStore in storehaus after further - * performance investigation. - * - * @author Oscar Boykin - * @author Sam Ritchie - * @author Ashu Singhal - */ - -case class FutureQueue[T](init: Future[T], maxLength: Int) { - require(maxLength >= 1, "maxLength cannot be negative.") - private val queue = MutableQueue[Future[T]](init) - - def +=(future: Future[T]): this.type = { - queue += future - // Force extra futures. - while (queue.length > maxLength) { Await.result(queue.dequeue) } - - // Drop all realized futures but the head off the tail - while(queue.size > 1 && queue.head.isDefined) { queue.dequeue } - this - } -} diff --git a/summingbird-online/src/main/scala/com/twitter/summingbird/online/Queue.scala b/summingbird-online/src/main/scala/com/twitter/summingbird/online/Queue.scala new file mode 100644 index 000000000..b5aacc83a --- /dev/null +++ b/summingbird-online/src/main/scala/com/twitter/summingbird/online/Queue.scala @@ -0,0 +1,146 @@ +/* +Copyright 2013 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.summingbird.online + +import com.twitter.util.{ Await, Duration, Future, Try } + +import java.util.{Queue => JQueue} +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, LinkedBlockingQueue, TimeUnit} +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger +/** + * + * @author Oscar Boykin + */ + +object Queue { + /** + * By default, don't block on put + */ + def apply[T]() = linkedNonBlocking[T] + + /** Use this for blocking puts when the size is reached + */ + def arrayBlocking[T](size: Int): Queue[T] = + fromBlocking(new ArrayBlockingQueue(size)) + + def linkedBlocking[T]: Queue[T] = + fromBlocking(new LinkedBlockingQueue()) + + def linkedNonBlocking[T]: Queue[T] = + fromQueue(new ConcurrentLinkedQueue()) + + def fromBlocking[T](bq: BlockingQueue[T]): Queue[T] = { + new Queue[T] { + override def add(t: T) = bq.put(t) + override def pollNonBlocking = Option(bq.poll()) + } + } + + // Uses Queue.add to put. This will fail for full blocking queues + def fromQueue[T](q: JQueue[T]): Queue[T] = { + new Queue[T] { + override def add(t: T) = q.add(t) + override def pollNonBlocking = Option(q.poll()) + } + } +} + +/** + * Use this class with a thread-safe queue to receive + * results from futures in one thread. + * Storm needs us to touch it's code in one event path (via + * the execute method in bolts) + */ +abstract class Queue[T] { + + /** These are the only two methods to implement. + * these must be thread-safe. + */ + protected def add(t: T): Unit + protected def pollNonBlocking: Option[T] + + private val count = new AtomicInteger(0) + + def put(item: T): Int = { + add(item) + count.incrementAndGet + } + + /** Returns the size immediately after the put */ + def putAll(items: TraversableOnce[T]): Int = { + val added = items.foldLeft(0) { (cnt, item) => + add(item) + cnt + 1 + } + count.addAndGet(added) + } + + /** + * check if something is ready now + */ + def poll: Option[T] = { + val res = pollNonBlocking + // This is for performance sensitive code. Prefering if to match defensively + if(res.isDefined) count.decrementAndGet + res + } + + /** + * Obviously, this might not be the same by the time you + * call trimTo or poll + */ + def size: Int = count.get + + // Do something on all the elements ready: + @annotation.tailrec + final def foreach(fn: T => Unit): Unit = + poll match { + case None => () + case Some(it) => + fn(it) + foreach(fn) + } + + // fold on all the elements ready: + @annotation.tailrec + final def foldLeft[V](init: V)(fn: (V, T) => V): V = + poll match { + case None => init + case Some(it) => foldLeft(fn(init, it))(fn) + } + + /** + * Take enough elements to get .size == maxLength + */ + def trimTo(maxLength: Int): Seq[T] = { + require(maxLength >= 0, "maxLength must be >= 0.") + + @annotation.tailrec + def loop(size: Int, acc: List[T] = Nil): List[T] = { + if(size > maxLength) { + pollNonBlocking match { + case None => acc.reverse // someone else cleared us out + case Some(item) => + loop(count.decrementAndGet, item::acc) + } + } + else acc.reverse + } + loop(count.get) + } +} diff --git a/summingbird-online/src/test/scala/com/twitter/summingbird/online/QueueLaws.scala b/summingbird-online/src/test/scala/com/twitter/summingbird/online/QueueLaws.scala new file mode 100644 index 000000000..4071e427a --- /dev/null +++ b/summingbird-online/src/test/scala/com/twitter/summingbird/online/QueueLaws.scala @@ -0,0 +1,111 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird.online + +import org.scalacheck._ +import Gen._ +import Arbitrary._ +import org.scalacheck.Prop._ + +import com.twitter.util.{Return, Throw, Future, Try} + +object QueueLaws extends Properties("Queue") { + + property("Putting into a BoundedQueue gets size right") = forAll { (items: List[String]) => + val q = Queue[String]() + q.putAll(items) + q.size == items.size + } + property("not spill if capacity is enough") = forAll { (items: List[Int]) => + val q = Queue[Int]() + q.putAll(items) + q.trimTo(items.size).size == 0 + } + property("Work with indepent additions") = forAll { (items: List[Int]) => + val q = Queue[Int]() + items.map(q.put(_)) == (1 to items.size).toList + } + property("spill all with zero capacity") = forAll { (items: List[Int]) => + val q = Queue[Int]() + q.putAll(items) + q.trimTo(0) == items + } + property("Queue works with finished futures") = forAll { (items: List[Int]) => + val q = Queue.linkedBlocking[(Int,Try[Int])] + items.foreach { i => q.put((i, Try(i*i))) } + q.foldLeft((0, true)) { case ((cnt, good), (i, ti)) => + ti match { + case Return(ii) => (cnt + 1, good) + case Throw(e) => (cnt + 1, false) + } + } == (items.size, true) + } + property("Queue.linkedNonBlocking works") = forAll { (items: List[Int]) => + val q = Queue.linkedNonBlocking[(Int,Try[Int])] + items.foreach { i => q.put((i, Try(i*i))) } + q.foldLeft((0, true)) { case ((cnt, good), (i, ti)) => + ti match { + case Return(ii) => (cnt + 1, good) + case Throw(e) => (cnt + 1, false) + } + } == (items.size, true) + } + property("Queue foreach works") = forAll { (items: List[Int]) => + // Make sure we can fit everything + val q = Queue.arrayBlocking[(Int,Try[Int])](items.size + 1) + items.foreach { i => q.put((i,Try(i*i))) } + var works = true + q.foreach { case (i, Return(ii)) => + works = works && (ii == i*i) + } + works && (q.size == 0) + } + property("Queue foldLeft works") = forAll { (items: List[Int]) => + // Make sure we can fit everything + val q = Queue.arrayBlocking[(Int,Try[Int])](items.size + 1) + items.foreach { i => q.put((i,Try(i*i))) } + q.foldLeft(true) { case (works, (i, Return(ii))) => + (ii == i*i) + } && (q.size == 0) + } + + property("Queue poll + size is correct") = forAll { (items: List[Int]) => + // Make sure we can fit everything + val q = Queue[Int]() + items.map { i => + q.put(i) + val size = q.size + if(i % 2 == 0) { + // do a poll test + q.poll match { + case None => q.size == 0 + case Some(_) => q.size == (size - 1) + } + } + else true + }.forall(identity) + } + property("Queue is fifo") = forAll { (items: List[Int]) => + val q = Queue[Int]() + q.putAll(items) + (q.trimTo(0).toList == items) && { + val q2 = Queue[Int]() + q2.putAll(items) + q2.foldLeft(List[Int]()) { (l, it) => it :: l }.reverse == items + } + } +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/AsyncBaseBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/AsyncBaseBolt.scala new file mode 100644 index 000000000..b76e0eac7 --- /dev/null +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/AsyncBaseBolt.scala @@ -0,0 +1,98 @@ +/* +Copyright 2013 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.summingbird.storm + +import backtype.storm.tuple.Tuple +import com.twitter.summingbird.batch.Timestamp +import com.twitter.summingbird.online.Queue +import com.twitter.summingbird.storm.option.{AnchorTuples, MaxWaitingFutures} +import com.twitter.util.{Await, Duration, Future, Return, Throw, Try} +import java.util.{ Arrays => JArrays, List => JList } + +abstract class AsyncBaseBolt[I, O](metrics: () => TraversableOnce[StormMetric[_]], + anchorTuples: AnchorTuples, + maxWaitingFutures: MaxWaitingFutures, + hasDependants: Boolean) extends BaseBolt[I, O](metrics, anchorTuples, hasDependants) { + + /** If you can use Future.value below, do so. The double Future is here to deal with + * cases that need to complete operations after or before doing a FlatMapOperation or + * doing a store merge + */ + def apply(tup: Tuple, in: (Timestamp, I)): Future[Iterable[(JList[Tuple], Future[TraversableOnce[(Timestamp, O)]])]] + + private lazy val outstandingFutures = Queue[Future[Unit]]() + private lazy val responses = Queue[(JList[Tuple], Try[TraversableOnce[(Timestamp, O)]])]() + + override def execute(tuple: Tuple) { + /** + * System ticks come with a fixed stream id + */ + if(!tuple.getSourceStreamId.equals("__tick")) { + // This not a tick tuple so we need to start an async operation + val tsIn = decoder.invert(tuple.getValues).get // Failing to decode here is an ERROR + + val fut = apply(tuple, tsIn) + .onSuccess { iter => + // Collect the result onto our responses + val (putCount, maxSize) = iter.foldLeft((0, 0)) { case ((p, ms), (tups, res)) => + res.respond { t => responses.put((tups, t)) } + // Make sure there are not too many outstanding: + val count = outstandingFutures.put(res.unit) + (p + 1, ms max count) + } + + if(maxSize > maxWaitingFutures.get) { + /* + * This can happen on large key expansion. + * May indicate maxWaitingFutures is too low. + */ + logger.debug( + "Exceeded maxWaitingFutures(%d): waiting = %d, put = %d" + .format(maxWaitingFutures.get, maxSize, putCount) + ) + } + } + .onFailure { thr => responses.put((JArrays.asList(tuple), Throw(thr))) } + + outstandingFutures.put(fut.unit) + } + // always empty the responses, even on tick + emptyQueue + } + + protected def forceExtraFutures { + val toForce = outstandingFutures.trimTo(maxWaitingFutures.get) + if(!toForce.isEmpty) Await.all(toForce, Duration.Top) + } + + /** + * NOTE: this is the only place where we call finish/fail in this method + * is only called from execute. This is what makes this code thread-safe with + * respect to storm. + */ + protected def emptyQueue = { + // don't let too many futures build up + forceExtraFutures + // Handle all ready results now: + responses.foreach { case (tups, res) => + res match { + case Return(outs) => finish(tups, outs) + case Throw(t) => fail(tups, t) + } + } + } +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala index c1050ac53..7deae9839 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BaseBolt.scala @@ -19,8 +19,15 @@ package com.twitter.summingbird.storm import backtype.storm.task.{ OutputCollector, TopologyContext } import backtype.storm.topology.IRichBolt import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.tuple.{ Fields, Tuple } -import java.util.{ Map => JMap } +import backtype.storm.tuple.Tuple +import java.util.{ Map => JMap, Arrays => JArrays, List => JList } + +import com.twitter.summingbird.batch.Timestamp +import com.twitter.summingbird.storm.option.{AnchorTuples, MaxWaitingFutures} + +import scala.collection.JavaConverters._ + +import org.slf4j.{LoggerFactory, Logger} /** * @@ -28,35 +35,64 @@ import java.util.{ Map => JMap } * @author Sam Ritchie * @author Ashu Singhal */ +abstract class BaseBolt[I,O](metrics: () => TraversableOnce[StormMetric[_]], + anchorTuples: AnchorTuples, + hasDependants: Boolean + ) extends IRichBolt { -abstract class BaseBolt(metrics: () => TraversableOnce[StormMetric[_]]) extends IRichBolt { - class Mutex extends java.io.Serializable + def decoder: StormTupleInjection[I] + def encoder: StormTupleInjection[O] - /** - * The fields this bolt plans on returning. - */ - def fields: Option[Fields] + @transient protected lazy val logger: Logger = + LoggerFactory.getLogger(getClass) private var collector: OutputCollector = null - val mutex = new Mutex - def onCollector[U](fn: OutputCollector => U): U = - mutex.synchronized { fn(collector) } + /** + * IMPORTANT: only call this inside of an execute method. + * storm is not safe to call methods on the emitter from other + * threads. + */ + protected def fail(inputs: JList[Tuple], error: Throwable): Unit = { + inputs.iterator.asScala.foreach(collector.fail(_)) + collector.reportError(error) + logger.error("Storm DAG of: %d tuples failed".format(inputs.size), error) + } - def ack(tuple: Tuple) { onCollector { _.ack(tuple) } } + /** + * IMPORTANT: only call this inside of an execute method. + * storm is not safe to call methods on the emitter from other + * threads. + */ + protected def finish(inputs: JList[Tuple], results: TraversableOnce[(Timestamp, O)]) { + var emitCount = 0 + if(hasDependants) { + if(anchorTuples.anchor) { + results.foreach { result => + collector.emit(inputs, encoder(result)) + emitCount += 1 + } + } + else { // don't anchor + results.foreach { result => + collector.emit(encoder(result)) + emitCount += 1 + } + } + } + // Always ack a tuple on completion: + inputs.iterator.asScala.foreach(collector.ack(_)) + logger.debug("bolt finished processed %d linked tuples, emitted: %d" + .format(inputs.size, emitCount)) + } - override def prepare( - conf: JMap[_,_], context: TopologyContext, oc: OutputCollector) { - /** - * No need for a mutex here because this called once on - * start - */ + override def prepare(conf: JMap[_,_], context: TopologyContext, oc: OutputCollector) { collector = oc metrics().foreach { _.register(context) } } override def declareOutputFields(declarer: OutputFieldsDeclarer) { - fields.foreach(declarer.declare(_)) + if(hasDependants) { declarer.declare(encoder.fields) } } override val getComponentConfiguration = null diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/CollectorMergeableStore.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/CollectorMergeableStore.scala deleted file mode 100644 index 1fd8be23e..000000000 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/CollectorMergeableStore.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* -Copyright 2013 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package com.twitter.summingbird.storm - -import backtype.storm.tuple.Tuple -import backtype.storm.task.OutputCollector -import backtype.storm.tuple.Values -import com.twitter.algebird.Monoid -import com.twitter.storehaus.algebra.MergeableStore -import com.twitter.summingbird.batch.BatchID -import com.twitter.summingbird.storm.option.AnchorTuples -import com.twitter.util.Future - -/** - * CollectorMergeableStore merges (K, BatchID) -> V into the - * underlying store by way of a storm topology. - * - * The summingbird storm topology is really just a Storehaus - * MergeableStore that shards the various "merge" calls by key. This - * brings us a little closer to expressing that idea, and lets us use - * the "BufferingStore" combinator on the FlatMapBolt collector - * itself without maintaining a separate SummingQueue in the - * FlatMapBolt and recreating the logic. - * - * @author Sam Ritchie - */ - -class CollectorMergeableStore[K, V]( - collector: OutputCollector, - anchorTuples: AnchorTuples) - (implicit monoid: Monoid[V]) - extends MergeableStore[(K, Tuple, BatchID), V] { - override val semigroup = monoid - override def get(k: (K, Tuple, BatchID)) = - sys.error("Gets out of a CollectorMergeableStore are not supported.") - override def put(pair: ((K, Tuple, BatchID), Option[V])) = - sys.error("Puts into a CollectorMergeableStore are not supported.") - - override def merge(pair: ((K, Tuple, BatchID), V)) = { - val ((k, tuple, id), v) = pair - val values = new Values( - id.asInstanceOf[AnyRef], - k.asInstanceOf[AnyRef], - v.asInstanceOf[AnyRef] - ) - if (anchorTuples.anchor) - collector.emit(tuple, values) - else collector.emit(values) - Future.value(Some(v)) - } -} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FinalFlatMapBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FinalFlatMapBolt.scala index c62bc0163..237f86188 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FinalFlatMapBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FinalFlatMapBolt.scala @@ -20,78 +20,73 @@ import backtype.storm.task.{ OutputCollector, TopologyContext } import backtype.storm.topology.OutputFieldsDeclarer import backtype.storm.tuple.{ Fields, Tuple, Values } -import com.twitter.algebird.{ Monoid, SummingQueue, Semigroup } +import com.twitter.algebird.{ SummingQueue, Semigroup, MapAlgebra } import com.twitter.summingbird.online.Externalizer import com.twitter.summingbird.batch.{ Batcher, BatchID, Timestamp} import com.twitter.summingbird.online.FlatMapOperation import com.twitter.summingbird.storm.option.{ - AnchorTuples, FlatMapStormMetrics + AnchorTuples, FlatMapStormMetrics, MaxWaitingFutures } + +import com.twitter.util.{Return, Throw} import com.twitter.storehaus.algebra.SummerConstructor import com.twitter.summingbird.option.CacheSize import com.twitter.storehaus.algebra.MergeableStore +import com.twitter.util.{Future} + import MergeableStore.enrich -import java.util.{ Date, Map => JMap } +import java.util.{ Date, Arrays => JArrays, List => JList, Map => JMap } + +import scala.collection.JavaConverters._ +import scala.collection.breakOut /** * @author Oscar Boykin * @author Sam Ritchie * @author Ashu Singhal */ - class FinalFlatMapBolt[Event, Key, Value]( @transient flatMapOp: FlatMapOperation[Event, (Key, Value)], cacheSize: CacheSize, metrics: FlatMapStormMetrics, - anchor: AnchorTuples) - (implicit monoid: Monoid[Value], batcher: Batcher) - extends BaseBolt(metrics.metrics) { + anchor: AnchorTuples, + maxWaitingFutures: MaxWaitingFutures) + (implicit monoid: Semigroup[Value], batcher: Batcher) + extends AsyncBaseBolt[Event, ((Key, BatchID), Value)](metrics.metrics, anchor, maxWaitingFutures, true) { + + import JListSemigroup._ + import Constants._ + + type KBV = ((Key, BatchID), Value) val lockedOp = Externalizer(flatMapOp) - var collectorMergeable: MergeableStore[(Key, Tuple, BatchID), (Timestamp, Value)] = null + val squeue: SummingQueue[Map[(Key, BatchID), (JList[Tuple], Timestamp, Value)]] = + SummingQueue(cacheSize.size.getOrElse(0)) - override val fields = { - import Constants._ - Some(new Fields(AGG_BATCH, AGG_KEY, AGG_VALUE)) - } + override val decoder = new SingleItemInjection[Event](VALUE_FIELD) + override val encoder = new KeyValueInjection[(Key, BatchID), Value](AGG_KEY, AGG_VALUE) - override def prepare( - conf: JMap[_,_], context: TopologyContext, oc: OutputCollector) { - super.prepare(conf, context, oc) - onCollector { collector => - collectorMergeable = - new CollectorMergeableStore[Key, (Timestamp, Value)](collector, anchor) - .withSummer(new SummerConstructor[(Key, Tuple, BatchID)] { - def apply[V](sg: Semigroup[V]) = { - implicit val semi = sg - SummingQueue(cacheSize.size.getOrElse(0)) - } - } - ) - } - } + def cache(tuple: Tuple, + time: Timestamp, + items: TraversableOnce[(Key, Value)]): Iterable[(JList[Tuple], Future[TraversableOnce[(Timestamp, KBV)]])] = { - override def execute(tuple: Tuple) { - val (timeMs, event) = tuple.getValue(0).asInstanceOf[(Long, Event)] - val time = Timestamp(timeMs) val batchID = batcher.batchOf(time) - - /** - * the flatMap function returns a future. - * - * each resulting key value pair is merged into the output once - * the future completes the input tuple is acked once the future - * completes. - */ - lockedOp.get.apply(event).foreach { pairs => - pairs.foreach { case (k, v) => - onCollector { _ => collectorMergeable.merge((k, tuple, batchID) -> (time, v)) } + squeue(MapAlgebra.sumByKey(items.map { case (k, v) => (k, batchID) -> (lift(tuple), time, v) })) + .map { kvs => + kvs.iterator + .map { case ((k, b), (tups, ts, v)) => + (tups, Future.value(List((ts, ((k,b), v))))) + } + .toList // don't need the ordering, so we could save by pushing a stack or something } - ack(tuple) - } + .getOrElse(Nil) } + override def apply(tup: Tuple, + timeIn: (Timestamp, Event)) = + lockedOp.get.apply(timeIn._2).map { cache(tup, timeIn._1, _) } + override def cleanup { lockedOp.get.close } } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/IntermediateFlatMapBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/IntermediateFlatMapBolt.scala index b48ebafac..e3ee03e7d 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/IntermediateFlatMapBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/IntermediateFlatMapBolt.scala @@ -20,9 +20,13 @@ import backtype.storm.tuple.{Fields, Tuple, Values} import com.twitter.summingbird.online.Externalizer import com.twitter.storehaus.algebra.MergeableStore.enrich import com.twitter.summingbird.batch.Timestamp -import com.twitter.summingbird.storm.option.{ AnchorTuples, FlatMapStormMetrics } +import com.twitter.summingbird.storm.option.{ AnchorTuples, FlatMapStormMetrics, MaxWaitingFutures } import com.twitter.summingbird.online.FlatMapOperation +import com.twitter.util.{Future} + +import java.util.{ Date, Arrays => JArrays, List => JList, Map => JMap, ArrayList => JAList } + import Constants._ /** @@ -32,36 +36,23 @@ import Constants._ * from which U was derived. Each U is one of the output items of the * flatMapOp. */ -class IntermediateFlatMapBolt[T]( - @transient flatMapOp: FlatMapOperation[T, _], +class IntermediateFlatMapBolt[T,U]( + @transient flatMapOp: FlatMapOperation[T, U], metrics: FlatMapStormMetrics, anchor: AnchorTuples, - shouldEmit: Boolean) extends BaseBolt(metrics.metrics) { + maxWaitingFutures: MaxWaitingFutures, + shouldEmit: Boolean) extends + AsyncBaseBolt[T,U](metrics.metrics, anchor, maxWaitingFutures, shouldEmit) { val lockedOp = Externalizer(flatMapOp) - def toValues(time: Timestamp, item: Any): Values = new Values((time.milliSinceEpoch, item)) - - override val fields = Some(new Fields("pair")) - - override def execute(tuple: Tuple) { - val (timeMs, t) = tuple.getValue(0).asInstanceOf[(Long, T)] - val time = Timestamp(timeMs) + override val decoder = new SingleItemInjection[T](VALUE_FIELD) + override val encoder = new SingleItemInjection[U](VALUE_FIELD) - lockedOp.get(t).foreach { items => - if(shouldEmit) { - items.foreach { u => - onCollector { col => - val values = toValues(time, u) - if (anchor.anchor) - col.emit(tuple, values) - else col.emit(values) - } - } - } - ack(tuple) - } - } + override def apply(tup: Tuple, + timeT: (Timestamp, T)) = + Future.value(List((JArrays.asList(tup), + lockedOp.get.apply(timeT._2).map { res => res.map((timeT._1, _)) }))) override def cleanup { lockedOp.get.close } } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 62d7c0187..94a49a15b 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -32,7 +32,7 @@ import com.twitter.storehaus.algebra.MergeableStore.enrich import com.twitter.summingbird._ import com.twitter.summingbird.viz.VizGraph import com.twitter.summingbird.chill._ -import com.twitter.summingbird.batch.{BatchID, Batcher} +import com.twitter.summingbird.batch.{BatchID, Batcher, Timestamp} import com.twitter.summingbird.storm.option.{AnchorTuples, IncludeSuccessHandler} import com.twitter.summingbird.util.CacheSize import com.twitter.tormenta.spout.Spout @@ -70,7 +70,7 @@ sealed trait StormService[-K, +V] case class StoreWrapper[K, V](store: StoreFactory[K, V]) extends StormService[K, V] sealed trait StormSource[+T] -case class SpoutSource[+T](spout: Spout[(Long, T)], parallelism: Option[option.SpoutParallelism]) extends StormSource[T] +case class SpoutSource[+T](spout: Spout[(Timestamp, T)], parallelism: Option[option.SpoutParallelism]) extends StormSource[T] object Storm { def local(options: Map[String, Options] = Map.empty): LocalStorm = @@ -96,7 +96,7 @@ object Storm { def toStormSource[T](spout: Spout[T], defaultSourcePar: Option[Int] = None)(implicit timeOf: TimeExtractor[T]): StormSource[T] = - SpoutSource(spout.map(t => (timeOf(t), t)), defaultSourcePar.map(option.SpoutParallelism(_))) + SpoutSource(spout.map(t => (Timestamp(timeOf(t)), t)), defaultSourcePar.map(option.SpoutParallelism(_))) implicit def spoutAsStormSource[T](spout: Spout[T])(implicit timeOf: TimeExtractor[T]): StormSource[T] = toStormSource(spout, None)(timeOf) @@ -172,6 +172,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val operation = foldOperations(node.members.reverse) val metrics = getOrElse(stormDag, node, DEFAULT_FM_STORM_METRICS) val anchorTuples = getOrElse(stormDag, node, AnchorTuples.default) + val maxWaiting = getOrElse(stormDag, node, DEFAULT_MAX_WAITING_FUTURES) val summerOpt:Option[SummerNode[Storm]] = stormDag.dependantsOf(node).collect{case s: SummerNode[Storm] => s}.headOption @@ -181,10 +182,16 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird new FinalFlatMapBolt( operation.asInstanceOf[FlatMapOperation[Any, (Any, Any)]], getOrElse(stormDag, node, DEFAULT_FM_CACHE), - getOrElse(stormDag, node, DEFAULT_FM_STORM_METRICS), - anchorTuples)(summerProducer.monoid.asInstanceOf[Monoid[Any]], summerProducer.store.batcher) + metrics, + anchorTuples, + maxWaiting)(summerProducer.monoid.asInstanceOf[Monoid[Any]], summerProducer.store.batcher) case None => - new IntermediateFlatMapBolt(operation, metrics, anchorTuples, stormDag.dependenciesOf(node).size > 0) + new IntermediateFlatMapBolt( + operation, + metrics, + anchorTuples, + maxWaiting, + stormDag.dependantsOf(node).size > 0) } val parallelism = getOrElse(stormDag, node, DEFAULT_FM_PARALLELISM).parHint @@ -192,6 +199,8 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) } + // TODO: https://github.com/twitter/summingbird/issues/366 + // test localOrShuffleGrouping here. may give big wins for serialization heavy jobs. dependenciesNames.foreach { declarer.shuffleGrouping(_) } } @@ -199,7 +208,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val (spout, parOpt) = node.members.collect { case Source(SpoutSource(s, parOpt)) => (s, parOpt) }.head val nodeName = stormDag.getNodeName(node) - val stormSpout = node.members.reverse.foldLeft(spout.asInstanceOf[Spout[(Long, Any)]]) { (spout, p) => + val stormSpout = node.members.reverse.foldLeft(spout.asInstanceOf[Spout[(Timestamp, Any)]]) { (spout, p) => p match { case Source(_) => spout // The source is still in the members list so drop it case OptionMappedProducer(_, op) => spout.flatMap {case (time, t) => op.apply(t).map { x => (time, x) }} @@ -264,6 +273,11 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird config.setMaxSpoutPending(1000) config.setNumAckers(12) config.setNumWorkers(12) + /** + * Set storm to tick our nodes every second to clean up finished futures + */ + config.put(BacktypeStormConfig.TOPOLOGY_TICK_TUPLE_FREQ_SECS, + java.lang.Integer.valueOf(1)) val initialStormConfig = StormConfig(config) val stormConfig = SBChillRegistrar(initialStormConfig, passedRegistrars) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/SummerBolt.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/SummerBolt.scala index 65be91ced..b2796aab5 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/SummerBolt.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/SummerBolt.scala @@ -18,16 +18,15 @@ package com.twitter.summingbird.storm import backtype.storm.task.{OutputCollector, TopologyContext} import backtype.storm.tuple.{Tuple, Values, Fields} -import com.twitter.algebird.{Monoid, SummingQueue} +import com.twitter.algebird.{Semigroup, SummingQueue} import com.twitter.summingbird.online.Externalizer import com.twitter.storehaus.algebra.MergeableStore import com.twitter.summingbird.batch.{BatchID, Timestamp} import com.twitter.summingbird.storm.option._ import com.twitter.summingbird.option.CacheSize -import com.twitter.summingbird.online.FutureQueue import com.twitter.util.{Await, Future} -import java.util.{ Map => JMap } +import java.util.{ Arrays => JArrays, List => JList, Map => JMap, ArrayList => JAList } /** * The SummerBolt takes two related options: CacheSize and MaxWaitingFutures. @@ -51,7 +50,20 @@ import java.util.{ Map => JMap } * @author Ashu Singhal */ -class SummerBolt[Key, Value: Monoid]( +object JListSemigroup { + def lift[T](t: T): JAList[T] = { val l = new JAList[T](); l.add(t); l } + + implicit def jlistConcat[T]: Semigroup[JAList[T]] = new JListSemigroup[T] +} +/** Mutably concat java lists */ +class JListSemigroup[T] extends Semigroup[JAList[T]] { + def plus(old: JAList[T], next: JAList[T]): JAList[T] = { + old.addAll(next) + old + } +} + +class SummerBolt[Key, Value: Semigroup]( @transient storeSupplier: () => MergeableStore[(Key,BatchID), Value], @transient successHandler: OnlineSuccessHandler, @transient exceptionHandler: OnlineExceptionHandler, @@ -60,72 +72,63 @@ class SummerBolt[Key, Value: Monoid]( maxWaitingFutures: MaxWaitingFutures, includeSuccessHandler: IncludeSuccessHandler, anchor: AnchorTuples, - shouldEmit: Boolean) extends BaseBolt(metrics.metrics) { + shouldEmit: Boolean) extends + AsyncBaseBolt[((Key, BatchID), Value), (Key, (Option[Value], Value))]( + metrics.metrics, + anchor, + maxWaitingFutures, + shouldEmit) { + import Constants._ + import JListSemigroup._ val storeBox = Externalizer(storeSupplier) lazy val store = storeBox.get.apply // See MaxWaitingFutures for a todo around removing this. lazy val cacheCount = cacheSize.size - lazy val buffer = SummingQueue[Map[(Key, BatchID), (Timestamp, Value)]](cacheCount.getOrElse(0)) - lazy val futureQueue = FutureQueue(Future.Unit, maxWaitingFutures.get) + lazy val buffer = SummingQueue[Map[(Key, BatchID), (JList[Tuple], Timestamp, Value)]](cacheCount.getOrElse(0)) - val exceptionHandlerBox = Externalizer(exceptionHandler) + val exceptionHandlerBox = Externalizer(exceptionHandler.handlerFn.lift) val successHandlerBox = Externalizer(successHandler) var successHandlerOpt: Option[OnlineSuccessHandler] = null - override val fields = Some(new Fields("pair")) + override val decoder = new KeyValueInjection[(Key,BatchID), Value](AGG_KEY, AGG_VALUE) + override val encoder = new SingleItemInjection[(Key, (Option[Value], Value))](VALUE_FIELD) - override def prepare( - conf: JMap[_,_], context: TopologyContext, oc: OutputCollector) { + override def prepare(conf: JMap[_,_], context: TopologyContext, oc: OutputCollector) { super.prepare(conf, context, oc) - // see IncludeSuccessHandler for why this is needed - successHandlerOpt = if (includeSuccessHandler.get) - Some(successHandlerBox.get) - else - None + successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None } - // TODO (https://github.com/twitter/tormenta/issues/1): Think about - // how this can help with Tormenta's open issue for a tuple - // conversion library. Storm emits Values and receives Tuples. - def unpack(tuple: Tuple) = { - val id = tuple.getValue(0).asInstanceOf[BatchID] - val key = tuple.getValue(1).asInstanceOf[Key] - val (ts, value) = tuple.getValue(2).asInstanceOf[(Timestamp, Value)] - ((key, id), (ts, value)) + protected override def fail(inputs: JList[Tuple], error: Throwable): Unit = { + super.fail(inputs, error) + exceptionHandlerBox.get.apply(error) + () } - def toValues(time: Timestamp, item: Any): Values = new Values((time.milliSinceEpoch, item)) - - override def execute(tuple: Tuple) { - // See MaxWaitingFutures for a todo around simplifying this. - buffer(Map(unpack(tuple))).foreach { pairs => - val futures = pairs.map { case ((k, batchID), (ts, delta)) => - val pair = ((k, batchID), delta) - - val mergeFuture = store.merge(pair) - .handle(exceptionHandlerBox.get.handlerFn) - - for (handler <- successHandlerOpt) - mergeFuture.onSuccess { _ => handler.handlerFn() } - - if(shouldEmit) { - mergeFuture.map { res => - onCollector { col => - val values = toValues(ts, (k, (res, delta))) - if (anchor.anchor) - col.emit(tuple, values) - else col.emit(values) - } + override def apply(tuple: Tuple, + tsIn: (Timestamp, ((Key, BatchID), Value))): + Future[Iterable[(JList[Tuple], Future[TraversableOnce[(Timestamp, (Key, (Option[Value], Value)))]])]] = { + + val (ts, (kb, v)) = tsIn + Future.value { + // See MaxWaitingFutures for a todo around simplifying this. + buffer(Map(kb -> ((lift(tuple), ts, v)))) + .map { kvs => + kvs.iterator.map { case ((k, batchID), (tups, stamp, delta)) => + (tups, + store.merge(((k, batchID), delta)).map { before => + List((stamp, (k, (before, delta)))) + } + .onSuccess { _ => successHandlerOpt.get.handlerFn.apply() } + ) } - } else mergeFuture - }.toList - futureQueue += Future.collect(futures).unit + .toList // force, but order does not matter, so we could optimize this + } + .getOrElse(Nil) } - onCollector { _.ack(tuple) } } override def cleanup { Await.result(store.close) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/TupleInjections.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/TupleInjections.scala new file mode 100644 index 000000000..eb9bd5929 --- /dev/null +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/TupleInjections.scala @@ -0,0 +1,62 @@ +/* +Copyright 2013 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.summingbird.storm + +import backtype.storm.tuple.Fields +import com.twitter.bijection.{Injection, Inversion, AbstractInjection} +import com.twitter.summingbird.batch.Timestamp +import java.util.{List => JList, ArrayList => JAList} +import scala.util.Try + +trait StormTupleInjection[T] extends Injection[(Timestamp, T), JList[AnyRef]] { + def fields: Fields +} + +class SingleItemInjection[T](fieldName: String) extends StormTupleInjection[T] { + + val fields = new Fields(fieldName) + + override def apply(t: (Timestamp, T)) = { + val list = new JAList[AnyRef](1) + list.add(t) + list + } + + override def invert(vin: JList[AnyRef]) = Inversion.attempt(vin) { v => + v.get(0).asInstanceOf[(Timestamp, T)] + } +} + +class KeyValueInjection[K, V](keyField: String, timeValField: String) + extends StormTupleInjection[(K, V)] { + + val fields = new Fields(keyField, timeValField) + + override def apply(item: (Timestamp, (K, V))) = { + val (ts, (key, v)) = item + val list = new JAList[AnyRef](2) + list.add(key.asInstanceOf[AnyRef]) + list.add((ts, v)) + list + } + + override def invert(vin: JList[AnyRef]) = Inversion.attempt(vin) { v => + val key = v.get(0).asInstanceOf[K] + val (ts, value) = v.get(1).asInstanceOf[(Timestamp, V)] + (ts, (key, value)) + } +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/SinkOptions.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/SinkOptions.scala index 4a0c0faee..f163db50d 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/SinkOptions.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/SinkOptions.scala @@ -51,7 +51,6 @@ object IncludeSuccessHandler { case class OnlineExceptionHandler(handlerFn: PartialFunction[Throwable, Unit]) - /** * See FlatMapOptions.scala for an explanation. */ diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala new file mode 100644 index 000000000..6189d4f6a --- /dev/null +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala @@ -0,0 +1,38 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird.storm + +import com.twitter.summingbird.batch.Timestamp + +import org.scalacheck._ +import Gen._ +import Arbitrary._ +import org.scalacheck.Prop._ + +object InjectionLaws extends Properties("InjectionTests") { + implicit def ts: Arbitrary[Timestamp] = + Arbitrary(Arbitrary.arbitrary[Long].map(Timestamp(_))) + + property("Single injection works") = forAll { in: (Timestamp, String) => + val inj = new SingleItemInjection[String]("item") + inj.invert(inj(in)).get == in + } + property("KV injection works") = forAll { in: (Timestamp, (String, List[Int])) => + val inj = new KeyValueInjection[String, List[Int]]("k", "v") + inj.invert(inj(in)).get == in + } +}