From 5d35e9852b643a4c3be6f41c5ddf2c732399c33d Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Fri, 12 Dec 2014 16:35:04 -0800 Subject: [PATCH 1/8] added counters --- .../summingbird/storm/BuildSummer.scala | 36 ++++++++++++++----- .../storm/FlatMapBoltProvider.scala | 2 +- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index c9292321d..e19b96526 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -17,9 +17,10 @@ package com.twitter.summingbird.storm import com.twitter.algebird.Semigroup -import com.twitter.algebird.util.summer.{ SyncSummingQueue, AsyncListSum, BufferSize, FlushFrequency, MemoryFlushPercent } +import com.twitter.algebird.util.summer._ +import com.twitter.summingbird.{ Name, Group, Counter } import com.twitter.summingbird.online.option.{ SummerBuilder, SummerConstructor } -import com.twitter.summingbird.option.CacheSize +import com.twitter.summingbird.option.{ JobId, CacheSize } import com.twitter.summingbird.planner.Dag import com.twitter.summingbird.storm.planner.StormNode import com.twitter.util.{ Future, FuturePool } @@ -38,7 +39,7 @@ import Constants._ object BuildSummer { @transient private val logger = LoggerFactory.getLogger(BuildSummer.getClass) - def apply(storm: Storm, dag: Dag[Storm], node: StormNode) = { + def apply(storm: Storm, dag: Dag[Storm], node: StormNode, jobID: JobId) = { val opSummerConstructor = storm.get[SummerConstructor](dag, node).map(_._2) logger.debug("Node ({}): Queried for SummerConstructor, got {}", dag.getNodeName(node), opSummerConstructor) @@ -46,11 +47,11 @@ object BuildSummer { case Some(cons) => logger.debug("Node ({}): Using user supplied SummerConstructor: {}", dag.getNodeName(node), cons) cons.get - case None => legacyBuilder(storm, dag, node) + case None => legacyBuilder(storm, dag, node, jobID) } } - private[this] final def legacyBuilder(storm: Storm, dag: Dag[Storm], node: StormNode) = { + private[this] final def legacyBuilder(storm: Storm, dag: Dag[Storm], node: StormNode, jobID: JobId) = { val nodeName = dag.getNodeName(node) val cacheSize = storm.getOrElse(dag, node, DEFAULT_FM_CACHE) logger.info("[{}] cacheSize lowerbound: {}", nodeName, cacheSize.lowerBound) @@ -58,7 +59,7 @@ object BuildSummer { if (cacheSize.lowerBound == 0) { new SummerBuilder { def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { - new com.twitter.algebird.util.summer.NullSummer[K, V] + new com.twitter.algebird.util.summer.NullSummer[K, V](counter(jobID, Name(nodeName), Name("tuplesIn")), counter(jobID, Name(nodeName), Name("tuplesOut"))) } } } else { @@ -77,7 +78,13 @@ object BuildSummer { new SyncSummingQueue[K, V]( BufferSize(cacheSize.lowerBound), FlushFrequency(flushFrequency.get), - MemoryFlushPercent(softMemoryFlush.get) + MemoryFlushPercent(softMemoryFlush.get), + counter(jobID, Name(nodeName), Name("memory")), + counter(jobID, Name(nodeName), Name("timeout")), + counter(jobID, Name(nodeName), Name("size")), + counter(jobID, Name(nodeName), Name("puts")), + counter(jobID, Name(nodeName), Name("tuplesIn")), + counter(jobID, Name(nodeName), Name("tuplesOut")) ) } } @@ -95,8 +102,16 @@ object BuildSummer { val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound), FlushFrequency(flushFrequency.get), MemoryFlushPercent(softMemoryFlush.get), - futurePool - ) + counter(jobID, Name(nodeName), Name("memory")), + counter(jobID, Name(nodeName), Name("timeout")), + counter(jobID, Name(nodeName), Name("inserts")), + counter(jobID, Name(nodeName), Name("insertFail")), + counter(jobID, Name(nodeName), Name("size")), + counter(jobID, Name(nodeName), Name("tuplesIn")), + counter(jobID, Name(nodeName), Name("tuplesOut")), + futurePool, + Compact(false), + CompactionSize(0)) summer.withCleanup(() => { Future { executor.shutdown @@ -108,4 +123,7 @@ object BuildSummer { } } } + + def counter(jobID: JobId, nodeName: Name, counterName: Name) = new Counter(Group("summingbird-" + nodeName.getString), counterName)(jobID) with Incrementor + } \ No newline at end of file diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala index 3c38a59e2..20db2fead 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/FlatMapBoltProvider.scala @@ -113,7 +113,7 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm], val operation = foldOperations[T, (K, V)](node.members.reverse) val wrappedOperation = wrapTimeBatchIDKV(operation)(batcher) - val builder = BuildSummer(storm, stormDag, node) + val builder = BuildSummer(storm, stormDag, node, jobID) BaseBolt( jobID, From a29ed67502a215868369f9759db9fb73390782d3 Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Fri, 12 Dec 2014 17:00:02 -0800 Subject: [PATCH 2/8] added jobid --- .../scala/com/twitter/summingbird/storm/StormPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 173aee04f..18cccfbd2 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -201,7 +201,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val metrics = getOrElse(stormDag, node, DEFAULT_SUMMER_STORM_METRICS) val shouldEmit = stormDag.dependantsOf(node).size > 0 - val builder = BuildSummer(this, stormDag, node) + val builder = BuildSummer(this, stormDag, node, jobID) val ackOnEntry = getOrElse(stormDag, node, DEFAULT_ACK_ON_ENTRY) logger.info("[{}] ackOnEntry : {}", nodeName, ackOnEntry.get) From 5166063c0fda02bdca229cd9b7bf57fc2cc555db Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Tue, 16 Dec 2014 10:27:12 -0800 Subject: [PATCH 3/8] made counter creation eager --- .../summingbird/storm/BuildSummer.scala | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index e19b96526..4da7b90cd 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -56,6 +56,15 @@ object BuildSummer { val cacheSize = storm.getOrElse(dag, node, DEFAULT_FM_CACHE) logger.info("[{}] cacheSize lowerbound: {}", nodeName, cacheSize.lowerBound) + val memoryCounter = counter(jobID, Name(nodeName), Name("memory")) + val timeoutCounter = counter(jobID, Name(nodeName), Name("timeout")) + val sizeCounter = counter(jobID, Name(nodeName), Name("size")) + val putCounter = counter(jobID, Name(nodeName), Name("puts")) + val tupleInCounter = counter(jobID, Name(nodeName), Name("tuplesIn")) + val tupleOutCounter = counter(jobID, Name(nodeName), Name("tuplesOut")) + val insertCounter = counter(jobID, Name(nodeName), Name("inserts")) + val insertFailCounter = counter(jobID, Name(nodeName), Name("insertFail")) + if (cacheSize.lowerBound == 0) { new SummerBuilder { def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { @@ -79,13 +88,12 @@ object BuildSummer { BufferSize(cacheSize.lowerBound), FlushFrequency(flushFrequency.get), MemoryFlushPercent(softMemoryFlush.get), - counter(jobID, Name(nodeName), Name("memory")), - counter(jobID, Name(nodeName), Name("timeout")), - counter(jobID, Name(nodeName), Name("size")), - counter(jobID, Name(nodeName), Name("puts")), - counter(jobID, Name(nodeName), Name("tuplesIn")), - counter(jobID, Name(nodeName), Name("tuplesOut")) - ) + memoryCounter, + timeoutCounter, + sizeCounter, + putCounter, + tupleInCounter, + tupleOutCounter) } } } else { @@ -102,13 +110,13 @@ object BuildSummer { val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound), FlushFrequency(flushFrequency.get), MemoryFlushPercent(softMemoryFlush.get), - counter(jobID, Name(nodeName), Name("memory")), - counter(jobID, Name(nodeName), Name("timeout")), - counter(jobID, Name(nodeName), Name("inserts")), - counter(jobID, Name(nodeName), Name("insertFail")), - counter(jobID, Name(nodeName), Name("size")), - counter(jobID, Name(nodeName), Name("tuplesIn")), - counter(jobID, Name(nodeName), Name("tuplesOut")), + memoryCounter, + timeoutCounter, + insertCounter, + insertFailCounter, + sizeCounter, + tupleInCounter, + tupleOutCounter, futurePool, Compact(false), CompactionSize(0)) From ce645b33b0a6ea2ac85a1cd0ca395b437d07948b Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Tue, 16 Dec 2014 10:31:16 -0800 Subject: [PATCH 4/8] refactored counters --- .../scala/com/twitter/summingbird/storm/BuildSummer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index 4da7b90cd..84fcb9a83 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -68,7 +68,7 @@ object BuildSummer { if (cacheSize.lowerBound == 0) { new SummerBuilder { def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = { - new com.twitter.algebird.util.summer.NullSummer[K, V](counter(jobID, Name(nodeName), Name("tuplesIn")), counter(jobID, Name(nodeName), Name("tuplesOut"))) + new com.twitter.algebird.util.summer.NullSummer[K, V](tupleInCounter, tupleOutCounter) } } } else { @@ -132,6 +132,6 @@ object BuildSummer { } } - def counter(jobID: JobId, nodeName: Name, counterName: Name) = new Counter(Group("summingbird-" + nodeName.getString), counterName)(jobID) with Incrementor + def counter(jobID: JobId, nodeName: Name, counterName: Name) = new Counter(Group("summingbird." + nodeName.getString), counterName)(jobID) with Incrementor } \ No newline at end of file From a2767c962edae43cee9a990081e5688abb0ca24f Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Thu, 18 Dec 2014 09:27:42 -0800 Subject: [PATCH 5/8] removed redundant put counter --- .../main/scala/com/twitter/summingbird/storm/BuildSummer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala index 84fcb9a83..41388723e 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/BuildSummer.scala @@ -59,7 +59,6 @@ object BuildSummer { val memoryCounter = counter(jobID, Name(nodeName), Name("memory")) val timeoutCounter = counter(jobID, Name(nodeName), Name("timeout")) val sizeCounter = counter(jobID, Name(nodeName), Name("size")) - val putCounter = counter(jobID, Name(nodeName), Name("puts")) val tupleInCounter = counter(jobID, Name(nodeName), Name("tuplesIn")) val tupleOutCounter = counter(jobID, Name(nodeName), Name("tuplesOut")) val insertCounter = counter(jobID, Name(nodeName), Name("inserts")) @@ -91,7 +90,7 @@ object BuildSummer { memoryCounter, timeoutCounter, sizeCounter, - putCounter, + insertCounter, tupleInCounter, tupleOutCounter) } From ece1481a26ff6e48d29fbc6ede77a99b027fbc6d Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Fri, 23 Jan 2015 11:37:02 -0800 Subject: [PATCH 6/8] bumped algebird version --- project/Build.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Build.scala b/project/Build.scala index bb39d54bf..88c41fd8b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -159,7 +159,7 @@ object SummingbirdBuild extends Build { val dfsDatastoresVersion = "1.3.4" val bijectionVersion = "0.6.3" - val algebirdVersion = "0.7.0" + val algebirdVersion = "0.9.0" val scaldingVersion = "0.11.3rc1" val storehausVersion = "0.9.1" val utilVersion = "6.3.8" From 8494e96087a0a1d76208f00dc64f76fead2b2b06 Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Mon, 2 Feb 2015 13:07:23 -0800 Subject: [PATCH 7/8] fix build on 2.10 --- .../main/scala/com/twitter/summingbird/batch/TimeStamp.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala b/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala index 8e6646c1f..eabe485a7 100644 --- a/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala +++ b/summingbird-batch/src/main/scala/com/twitter/summingbird/batch/TimeStamp.scala @@ -56,11 +56,13 @@ object Timestamp { implicit val timestampSuccessible: Successible[Timestamp] = new Successible[Timestamp] { def next(old: Timestamp) = if (old.milliSinceEpoch != Long.MaxValue) Some(old.next) else None def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp + def partialOrdering = Timestamp.orderingOnTimestamp } implicit val timestampPredecessible: Predecessible[Timestamp] = new Predecessible[Timestamp] { def prev(old: Timestamp) = if (old.milliSinceEpoch != Long.MinValue) Some(old.prev) else None def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp + def partialOrdering = Timestamp.orderingOnTimestamp } // This is a right semigroup, that given any two Timestamps just take the one on the right. From 96f540effa2ac9b8850f1701183c94c09c04784c Mon Sep 17 00:00:00 2001 From: Mansur Ashraf Date: Tue, 3 Feb 2015 10:23:43 -0800 Subject: [PATCH 8/8] changed bunch of case classes to AnyVal --- .../src/main/scala/com/twitter/summingbird/Counter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/Counter.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/Counter.scala index c16f73719..7b4b9a863 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/Counter.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/Counter.scala @@ -18,9 +18,9 @@ package com.twitter.summingbird import com.twitter.summingbird.option.JobId -case class Group(getString: String) +case class Group(getString: String) extends AnyVal -case class Name(getString: String) +case class Name(getString: String) extends AnyVal /* User-defined Counter