diff --git a/project/Build.scala b/project/Build.scala index 08d74c043..a31f9051d 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -159,7 +159,7 @@ object SummingbirdBuild extends Build { val dfsDatastoresVersion = "1.3.4" val bijectionVersion = "0.6.3" - val algebirdVersion = "0.7.0" + val algebirdVersion = "0.9.0" val scaldingVersion = "0.11.3rc1" val storehausVersion = "0.9.1" val utilVersion = "6.3.8" diff --git a/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala b/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala index 8e6646c1f..eabe485a7 100644 --- a/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala +++ b/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala @@ -56,11 +56,13 @@ object Timestamp { implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] { def next(old: Timestamp) = if (old.milliSinceEpoch != Long.MaxValue) Some(old.next) else None def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp + def partialOrdering = Timestamp.orderingOnTimestamp } implicit val timestampPredecessible: Predecessible[Timestamp] = new Predecessible[Timestamp] { def prev(old: Timestamp) = if (old.milliSinceEpoch != Long.MinValue) Some(old.prev) else None def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp + def partialOrdering = Timestamp.orderingOnTimestamp } // This is a right semigroup, that given any two Timestamps just take the one on the right. diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/Counter.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/Counter.scala index c16f73719..7b4b9a863 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/Counter.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/Counter.scala @@ -18,9 +18,9 @@ package com.twitter.summingbird import com.twitter.summingbird.option.JobId -case class Group(getString: String) +case class Group(getString: String) extends AnyVal -case class Name(getString: String) +case class Name(getString: String) extends AnyVal /* User-defined Counter diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index c9292321d..41388723e 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -17,9 +17,10 @@ package com.twitter.summingbird.storm import com.twitter.algebird.Semigroup -import com.twitter.algebird.util.summer.{ SyncSummingQueue, AsyncListSum, BufferSize, FlushFrequency, MemoryFlushPercent } +import com.twitter.algebird.util.summer._ +import com.twitter.summingbird.{ Name, Group, Counter } import com.twitter.summingbird.online.option.{ SummerBuilder, SummerConstructor } -import com.twitter.summingbird.option.CacheSize +import com.twitter.summingbird.option.{ JobId, CacheSize } import com.twitter.summingbird.planner.Dag import com.twitter.summingbird.storm.planner.StormNode import com.twitter.util.{ Future, FuturePool } @@ -38,7 +39,7 @@ import Constants._ object BuildSummer { @transient private val logger = LoggerFactory.getLogger(BuildSummer.getClass) - def apply(storm: Storm, dag: Dag[Storm], node: StormNode) = { + def apply(storm: Storm, dag: Dag[Storm], node: StormNode, jobID: JobId) = { val opSummerConstructor = storm.get[SummerConstructor](dag, node).map(_._2) logger.debug("Node ({}): Queried for SummerConstructor, got {}", dag.getNodeName(node), opSummerConstructor) @@ -46,19 +47,27 @@ object BuildSummer { case Some(cons) => logger.debug("Node ({}): Using user supplied SummerConstructor: {}", dag.getNodeName(node), cons) cons.get - case None => legacyBuilder(storm, dag, node) + case None => legacyBuilder(storm, dag, node, jobID) } } - private[this] final def legacyBuilder(storm: Storm, dag: Dag[Storm], node: StormNode) = { + private[this] final def legacyBuilder(storm: Storm, dag: Dag[Storm], node: StormNode, jobID: JobId) = { val nodeName = dag.getNodeName(node) val cacheSize = storm.getOrElse(dag, node, DEFAULT_FM_CACHE) logger.info("[{}] cacheSize lowerbound: {}", nodeName, cacheSize.lowerBound) + val memoryCounter = counter(jobID, Name(nodeName), Name("memory")) + val timeoutCounter = counter(jobID, Name(nodeName), Name("timeout")) + val sizeCounter = counter(jobID, Name(nodeName), Name("size")) + val tupleInCounter = counter(jobID, Name(nodeName), Name("tuplesIn")) + val tupleOutCounter = counter(jobID, Name(nodeName), Name("tuplesOut")) + val insertCounter = counter(jobID, Name(nodeName), Name("inserts")) + val insertFailCounter = counter(jobID, Name(nodeName), Name("insertFail")) + if (cacheSize.lowerBound == 0) { new SummerBuilder { def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { - new com.twitter.algebird.util.summer.NullSummer[K, V] + new com.twitter.algebird.util.summer.NullSummer[K, V](tupleInCounter, tupleOutCounter) } } } else { @@ -77,8 +86,13 @@ object BuildSummer { new SyncSummingQueue[K, V]( BufferSize(cacheSize.lowerBound), FlushFrequency(flushFrequency.get), - MemoryFlushPercent(softMemoryFlush.get) - ) + MemoryFlushPercent(softMemoryFlush.get), + memoryCounter, + timeoutCounter, + sizeCounter, + insertCounter, + tupleInCounter, + tupleOutCounter) } } } else { @@ -95,8 +109,16 @@ object BuildSummer { val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound), FlushFrequency(flushFrequency.get), MemoryFlushPercent(softMemoryFlush.get), - futurePool - ) + memoryCounter, + timeoutCounter, + insertCounter, + insertFailCounter, + sizeCounter, + tupleInCounter, + tupleOutCounter, + futurePool, + Compact(false), + CompactionSize(0)) summer.withCleanup(() => { Future { executor.shutdown @@ -108,4 +130,7 @@ object BuildSummer { } } } + + def counter(jobID: JobId, nodeName: Name, counterName: Name) = new Counter(Group("summingbird." + nodeName.getString), counterName)(jobID) with Incrementor + } \ No newline at end of file diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala index 84278ec90..469e43396 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala @@ -113,7 +113,7 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm], val operation = foldOperations[T, (K, V)](node.members.reverse) val wrappedOperation = wrapTimeBatchIDKV(operation)(batcher) - val builder = BuildSummer(storm, stormDag, node) + val builder = BuildSummer(storm, stormDag, node, jobID) BaseBolt( jobID, diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 2e79cc1cd..7b05e2bce 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -217,7 +217,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val metrics = getOrElse(stormDag, node, DEFAULT_SUMMER_STORM_METRICS) val shouldEmit = stormDag.dependantsOf(node).size > 0 - val builder = BuildSummer(this, stormDag, node) + val builder = BuildSummer(this, stormDag, node, jobID) val ackOnEntry = getOrElse(stormDag, node, DEFAULT_ACK_ON_ENTRY) logger.info("[{}] ackOnEntry : {}", nodeName, ackOnEntry.get)