Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Add counters to default summers #571

Merged
merged 9 commits into from Feb 11, 2015
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,9 +17,10 @@
package com.twitter.summingbird.storm

import com.twitter.algebird.Semigroup
import com.twitter.algebird.util.summer.{ SyncSummingQueue, AsyncListSum, BufferSize, FlushFrequency, MemoryFlushPercent }
import com.twitter.algebird.util.summer._
import com.twitter.summingbird.{ Name, Group, Counter }
import com.twitter.summingbird.online.option.{ SummerBuilder, SummerConstructor }
import com.twitter.summingbird.option.CacheSize
import com.twitter.summingbird.option.{ JobId, CacheSize }
import com.twitter.summingbird.planner.Dag
import com.twitter.summingbird.storm.planner.StormNode
import com.twitter.util.{ Future, FuturePool }
Expand All @@ -38,27 +39,35 @@ import Constants._
object BuildSummer {
@transient private val logger = LoggerFactory.getLogger(BuildSummer.getClass)

def apply(storm: Storm, dag: Dag[Storm], node: StormNode) = {
def apply(storm: Storm, dag: Dag[Storm], node: StormNode, jobID: JobId) = {
val opSummerConstructor = storm.get[SummerConstructor](dag, node).map(_._2)
logger.debug("Node ({}): Queried for SummerConstructor, got {}", dag.getNodeName(node), opSummerConstructor)

opSummerConstructor match {
case Some(cons) =>
logger.debug("Node ({}): Using user supplied SummerConstructor: {}", dag.getNodeName(node), cons)
cons.get
case None => legacyBuilder(storm, dag, node)
case None => legacyBuilder(storm, dag, node, jobID)
}
}

private[this] final def legacyBuilder(storm: Storm, dag: Dag[Storm], node: StormNode) = {
private[this] final def legacyBuilder(storm: Storm, dag: Dag[Storm], node: StormNode, jobID: JobId) = {
val nodeName = dag.getNodeName(node)
val cacheSize = storm.getOrElse(dag, node, DEFAULT_FM_CACHE)
logger.info("[{}] cacheSize lowerbound: {}", nodeName, cacheSize.lowerBound)

val memoryCounter = counter(jobID, Name(nodeName), Name("memory"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, though i think a comment for each of these to describe them would be good. What does memory mean here?

val timeoutCounter = counter(jobID, Name(nodeName), Name("timeout"))
val sizeCounter = counter(jobID, Name(nodeName), Name("size"))
val tupleInCounter = counter(jobID, Name(nodeName), Name("tuplesIn"))
val tupleOutCounter = counter(jobID, Name(nodeName), Name("tuplesOut"))
val insertCounter = counter(jobID, Name(nodeName), Name("inserts"))
val insertFailCounter = counter(jobID, Name(nodeName), Name("insertFail"))

if (cacheSize.lowerBound == 0) {
new SummerBuilder {
def getSummer[K, V: Semigroup]: com.twitter.algebird.util.summer.AsyncSummer[(K, V), Map[K, V]] = {
new com.twitter.algebird.util.summer.NullSummer[K, V]
new com.twitter.algebird.util.summer.NullSummer[K, V](tupleInCounter, tupleOutCounter)
}
}
} else {
Expand All @@ -77,8 +86,13 @@ object BuildSummer {
new SyncSummingQueue[K, V](
BufferSize(cacheSize.lowerBound),
FlushFrequency(flushFrequency.get),
MemoryFlushPercent(softMemoryFlush.get)
)
MemoryFlushPercent(softMemoryFlush.get),
memoryCounter,
timeoutCounter,
sizeCounter,
insertCounter,
tupleInCounter,
tupleOutCounter)
}
}
} else {
Expand All @@ -95,8 +109,16 @@ object BuildSummer {
val summer = new AsyncListSum[K, V](BufferSize(cacheSize.lowerBound),
FlushFrequency(flushFrequency.get),
MemoryFlushPercent(softMemoryFlush.get),
futurePool
)
memoryCounter,
timeoutCounter,
insertCounter,
insertFailCounter,
sizeCounter,
tupleInCounter,
tupleOutCounter,
futurePool,
Compact(false),
CompactionSize(0))
summer.withCleanup(() => {
Future {
executor.shutdown
Expand All @@ -108,4 +130,7 @@ object BuildSummer {
}
}
}

def counter(jobID: JobId, nodeName: Name, counterName: Name) = new Counter(Group("summingbird." + nodeName.getString), counterName)(jobID) with Incrementor

}
Expand Up @@ -113,7 +113,7 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
val operation = foldOperations[T, (K, V)](node.members.reverse)
val wrappedOperation = wrapTimeBatchIDKV(operation)(batcher)

val builder = BuildSummer(storm, stormDag, node)
val builder = BuildSummer(storm, stormDag, node, jobID)

BaseBolt(
jobID,
Expand Down
Expand Up @@ -201,7 +201,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
val metrics = getOrElse(stormDag, node, DEFAULT_SUMMER_STORM_METRICS)
val shouldEmit = stormDag.dependantsOf(node).size > 0

val builder = BuildSummer(this, stormDag, node)
val builder = BuildSummer(this, stormDag, node, jobID)

val ackOnEntry = getOrElse(stormDag, node, DEFAULT_ACK_ON_ENTRY)
logger.info("[{}] ackOnEntry : {}", nodeName, ackOnEntry.get)
Expand Down