Permalink
Browse files

big shuffle for gauges, to make their use easier, more predictable,

and garbage-collectable, and also make "AdditiveGauges" generic.

create a new api "addGauge". this returns a "Gauge" object -- this is
a handler provided by the underlying implementation, and allows for
the gauge to be removed.

furthermore, objects returned by ``addGauge'' are only weakly
referenced by the implementation, allowing its user to control the
gauge lifecycle.
  • Loading branch information...
1 parent 425f53d commit 2c01daf5424ddf39052c7d630e95e75b8e952873 @mariusae mariusae committed Feb 24, 2011
@@ -170,16 +170,24 @@ case class ServerBuilder[Req, Rep](
_recvBufferSize foreach { s => bs.setOption("receiveBufferSize", s) }
// TODO: we need something akin to a max queue depth.
- val queueingChannelHandler = _maxConcurrentRequests map { maxConcurrentRequests =>
- val semaphore = new AsyncSemaphore(maxConcurrentRequests)
- scopedStatsReceiver foreach { sr =>
- sr.provideGauge("request_concurrency") {
- maxConcurrentRequests - semaphore.numPermitsAvailable
+ val queueingChannelHandlerAndGauges =
+ _maxConcurrentRequests map { maxConcurrentRequests =>
+ val semaphore = new AsyncSemaphore(maxConcurrentRequests)
+ val gauges = scopedStatsReceiver.toList flatMap { sr =>
+ sr.addGauge("request_concurrency") {
+ maxConcurrentRequests - semaphore.numPermitsAvailable
+ } :: sr.addGauge("request_queue_size") {
+ semaphore.numWaiters
+ } :: Nil
}
- sr.provideGauge("request_queue_size") { semaphore.numWaiters }
+
+ (new ChannelSemaphoreHandler(semaphore), gauges)
}
- new ChannelSemaphoreHandler(semaphore)
- }
+
+ val queueingChannelHandler =
+ queueingChannelHandlerAndGauges map { case (q, _) => q }
+ val gauges =
+ queueingChannelHandlerAndGauges.toList flatMap { case (_, g) => g }
trait ChannelHandle {
def drain(): Future[Unit]
@@ -314,6 +322,9 @@ case class ServerBuilder[Req, Rep](
// deadlocking.
channels.synchronized { channels toArray } foreach { _.close() }
+ // Release any gauges we've created.
+ gauges foreach { _.remove() }
+
bs.releaseExternalResources()
Timer.default.stop()
}
@@ -98,8 +98,8 @@ class ChannelServiceFactory[Req, Rep](
{
private[this] val channelLatch = new AsyncLatch
private[this] val connectLatencyStat = statsReceiver.stat("connect_latency_ms")
-
- statsReceiver.provideGauge("connections") { channelLatch.getCount }
+ private[this] val gauge =
+ statsReceiver.addGauge("connections") { channelLatch.getCount }
protected[channel] def channelReleased(channel: ChannelService[Req, Rep]) {
channelLatch.decr()
@@ -13,8 +13,8 @@ class StatsFilter[Req, Rep](statsReceiver: StatsReceiver)
private[this] val dispatchCount = statsReceiver.counter("requests")
private[this] val successCount = statsReceiver.counter("success")
private[this] val latencyStat = statsReceiver.stat("request_latency_ms")
-
- statsReceiver.provideGauge("pending") { outstandingRequestCount.get }
+ private[this] val outstandingRequestCountgauge =
+ statsReceiver.addGauge("pending") { outstandingRequestCount.get }
def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
val requestedAt = Time.now
@@ -0,0 +1,74 @@
+package com.twitter.finagle.stats
+
+/**
+ * CumulativeGauge provides a gauge that is composed of the (addition)
+ * of several underlying gauges. It follows the weak reference
+ * semantics of Gauges as outlined in StatsReceiver.
+ */
+
+import ref.WeakReference
+import collection.mutable.WeakHashMap
+
+trait CumulativeGauge {
+ private[this] case class UnderlyingGauge(f: () => Float) extends Gauge {
+ def remove() { removeGauge(this) }
+ }
+
+ private[this] var underlying: List[WeakReference[UnderlyingGauge]] = Nil
+
+ private[this] def get() = synchronized {
+ removeGauge(null) // GC.
+ underlying map { _.get } flatten
+ }
+
+ private[this] def removeGauge(underlyingGauge: UnderlyingGauge) = synchronized {
+ // This does a GC also.
+ underlying = underlying filter { _.get map { _ ne underlyingGauge } getOrElse false }
+ if (underlying.isEmpty)
+ deregister()
+ }
+
+ def addGauge(f: => Float): Gauge = synchronized {
+ val shouldRegister = underlying.isEmpty
+ val underlyingGauge = UnderlyingGauge(() => f)
+ underlying ::= new WeakReference(underlyingGauge)
+
+ if (shouldRegister)
+ register()
+
+ underlyingGauge
+ }
+
+ def getValue = synchronized {
+ get() map { _.f() } sum
+ }
+
+ /**
+ * These need to be implemented by the gauge provider. They indicate
+ * when the gauge needs to be registered & deregistered.
+ */
+ def register(): Unit
+ def deregister(): Unit
+}
+
+trait StatsReceiverWithCumulativeGauges extends StatsReceiver {
+ private[this] val gaugeMap = new WeakHashMap[Seq[String], CumulativeGauge]
+
+ /**
+ * The StatsReceiver implements these. They provide the cumulated
+ * gauges.
+ */
+ protected[this] def registerGauge(name: Seq[String], f: => Float)
+ protected[this] def deregisterGauge(name: Seq[String])
+
+ def addGauge(name: String*)(f: => Float) = synchronized {
+ val cumulativeGauge = gaugeMap getOrElseUpdate(name, {
+ new CumulativeGauge {
+ def register() = StatsReceiverWithCumulativeGauges.this.registerGauge(name, getValue)
+ def deregister() = StatsReceiverWithCumulativeGauges.this.deregisterGauge(name)
+ }
+ })
+
+ cumulativeGauge.addGauge(f)
+ }
+}
@@ -1,12 +1,18 @@
package com.twitter.finagle.stats
+import collection.mutable.HashMap
+
import java.util.logging.Logger
import com.twitter.conversions.time._
import com.twitter.util
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.util.Timer
-class JavaLoggerStatsReceiver(logger: Logger, timer: util.Timer) extends StatsReceiver {
+class JavaLoggerStatsReceiver(logger: Logger, timer: util.Timer)
+ extends StatsReceiverWithCumulativeGauges
+{
+ var timerTasks = new HashMap[Seq[String], util.TimerTask]
+
def this(logger: Logger) = this(logger, Timer.default)
def stat(name: String*) = new Stat {
@@ -21,12 +27,18 @@ class JavaLoggerStatsReceiver(logger: Logger, timer: util.Timer) extends StatsRe
}
}
- def provideGauge(name: String*)(f: => Float) {
- timer.schedule(10.seconds) {
+ protected[this] def registerGauge(name: Seq[String], f: => Float) = synchronized {
+ deregisterGauge(name)
+
+ timerTasks(name) = timer.schedule(10.seconds) {
logger.info("%s %2f".format(formatName(name), f))
}
}
+ protected[this] def deregisterGauge(name: Seq[String]) {
+ timerTasks.remove(name) foreach { _.cancel() }
+ }
+
private[this] def formatName(description: Seq[String]) = {
description mkString "/"
}
@@ -16,6 +16,14 @@ trait Stat {
def add(value: Float)
}
+trait Gauge {
+ def remove()
+}
+
+object StatsReceiver {
+ private[StatsReceiver] var immortalGauges: List[Gauge] = Nil
+}
+
trait StatsReceiver {
/**
* Get a Counter with the description
@@ -28,9 +36,27 @@ trait StatsReceiver {
def stat(name: String*): Stat
/**
- * Register a function to be periodically measured.
+ * Register a function to be periodically measured. This measurement
+ * exists in perpetuity. Measurements under the same name are added
+ * together.
*/
- def provideGauge(name: String*)(f: => Float)
+ def provideGauge(name: String*)(f: => Float) {
+ val gauge = addGauge(name: _*)(f)
+ StatsReceiver.synchronized {
+ StatsReceiver.immortalGauges ::= gauge
+ }
+ }
+
+ /**
+ * Add the function ``f'' as a gauge with the given name. The
+ * returned gauge value is only weakly referenced by the
+ * StatsReceiver, and if garbage collected will cease to be a part
+ * of this measurement: thus, it needs to be retained by the
+ * caller. Immortal measurements are made with ``provideGauge''. As
+ * with ``provideGauge'', gauges with equal names are added
+ * together.
+ */
+ def addGauge(name: String*)(f: => Float): Gauge
/**
* Prepend ``namespace'' to the names of this receiver.
@@ -42,6 +68,9 @@ trait StatsReceiver {
}
}
+ /**
+ * Append ``namespace'' to the names of this receiver.
+ */
def withSuffix(namespace: String) = {
val seqSuffix = Seq(namespace)
new NameTranslatingStatsReceiver(this) {
@@ -73,8 +102,10 @@ class RollupStatsReceiver(val self: StatsReceiver)
def add(value: Float) = allStats foreach (_.add(value))
}
- def provideGauge(name: String*)(f: => Float) =
- tails(name) foreach { self.provideGauge(_: _*)(f) }
+ def addGauge(name: String*)(f: => Float) = new Gauge {
+ private[this] val underlying = tails(name) map { self.addGauge(_: _*)(f) }
+ def remove() = underlying foreach { _.remove() }
+ }
}
abstract class NameTranslatingStatsReceiver(val self: StatsReceiver)
@@ -84,7 +115,8 @@ abstract class NameTranslatingStatsReceiver(val self: StatsReceiver)
def counter(name: String*) = self.counter(translate(name): _*)
def stat(name: String*) = self.stat(translate(name): _*)
- def provideGauge(name: String*)(f: => Float) = self.provideGauge(translate(name): _*)(f)
+
+ def addGauge(name: String*)(f: => Float) = self.addGauge(translate(name): _*)(f)
}
object NullStatsReceiver extends StatsReceiver {
@@ -96,5 +128,5 @@ object NullStatsReceiver extends StatsReceiver {
def add(value: Float) {}
}
- def provideGauge(name: String*)(f: => Float) {}
+ def addGauge(name: String*)(f: => Float) = new Gauge { def remove() {} }
}
@@ -0,0 +1,67 @@
+package com.twitter.finagle.stats
+
+import org.specs.Specification
+import org.specs.mock.Mockito
+
+object CumulativeGaugeSpec extends Specification with Mockito {
+ class TestGauge extends CumulativeGauge {
+ def register() {}
+ def deregister() {}
+ }
+
+ "an empty CumulativeGauge" should {
+ val gauge = spy(new TestGauge)
+ there was no(gauge).register()
+
+ "register on the first gauge added" in {
+ gauge.addGauge { 0.0f }
+ there was one(gauge).register()
+ }
+ }
+
+ "a CumulativeGauge with size = 1" should {
+ val gauge = spy(new TestGauge)
+ var added = gauge.addGauge { 1.0f }
+ there was no(gauge).deregister()
+
+ "deregister when all gauges are removed" in {
+ added.remove()
+ there was one(gauge).deregister()
+ }
+
+ "not deregister after a System.gc when there are still valid references to the gauge" in {
+ System.gc()
+
+ // We have to incite some action for the weakref GC to take place.
+ gauge.getValue must be_==(1.0f)
+ there was no(gauge).deregister()
+ }
+
+ "deregister after a System.gc when no references are held onto" in {
+ added = null
+ System.gc()
+
+ // We have to incite some action for the weakref GC to take place.
+ gauge.getValue must be_==(0.0f)
+ there was one(gauge).deregister()
+ }
+ }
+
+ "a CumulativeGauge" should {
+ val gauge = spy(new TestGauge)
+
+ "sum values across all registered gauges" in {
+ 0 until 100 foreach { _ => gauge.addGauge { 10.0f } }
+ gauge.getValue must be_==(10.0f * 100)
+ }
+
+ "discount gauges once removed" in {
+ val underlying = 0 until 100 map { _ => gauge.addGauge { 10.0f } }
+ gauge.getValue must be_==(10.0f * 100)
+ underlying(0).remove()
+ gauge.getValue must be_==(10.0f * 99)
+ underlying(1).remove()
+ gauge.getValue must be_==(10.0f * 98)
+ }
+ }
+}
@@ -1,26 +0,0 @@
-package com.twitter.finagle.stats
-
-/**
- * AdditiveGauges provide composite gauges on top of Ostrich. This
- * allows us to roll up gauge values.
- */
-
-import collection.mutable.HashMap
-import com.twitter.ostrich.Stats
-
-object AdditiveGauges {
- private[this] val gauges = new HashMap[String, Seq[() => Float]]
-
- def apply(name: String)(f: => Float) = synchronized {
- val fs = Seq({ () => f })
-
- if (gauges contains name) {
- gauges(name) = gauges(name) ++ fs
- } else {
- gauges(name) = fs
- Stats.makeGauge(name) {
- AdditiveGauges.this.synchronized { gauges(name) map (_()) sum }
- }
- }
- }
-}
@@ -2,7 +2,15 @@ package com.twitter.finagle.stats
import com.twitter.ostrich.Stats
-class OstrichStatsReceiver extends StatsReceiver {
+class OstrichStatsReceiver extends StatsReceiverWithCumulativeGauges {
+ protected[this] def registerGauge(name: Seq[String], f: => Float) {
+ Stats.makeGauge(variableName(name)) { f }
+ }
+
+ protected[this] def deregisterGauge(name: Seq[String]) {
+ Stats.clearGauge(variableName(name))
+ }
+
def counter(name: String*) = new Counter {
private[this] val name_ = variableName(name)
@@ -17,9 +25,5 @@ class OstrichStatsReceiver extends StatsReceiver {
}
}
- def provideGauge(name: String*)(f: => Float) = {
- AdditiveGauges(variableName(name))(f)
- }
-
private[this] def variableName(name: Seq[String]) = name mkString "/"
}
Oops, something went wrong.

0 comments on commit 2c01daf

Please sign in to comment.