Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

compiles

  • Loading branch information...
commit 25b7ee82f353132d3df2da5d92aba9b5dbdc6fd6 1 parent 445ef3f
Nick Kallen authored
Showing with 365 additions and 442 deletions.
  1. +41 −58 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
  2. +24 −64 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
  3. +2 −2 finagle-core/src/main/scala/com/twitter/finagle/channel/Broker.scala
  4. +17 −17 finagle-core/src/main/scala/com/twitter/finagle/channel/LoadedBroker.scala
  5. +22 −6 finagle-core/src/main/scala/com/twitter/finagle/stats/JavaLoggerStatsReceiver.scala
  6. +43 −0 finagle-core/src/main/scala/com/twitter/finagle/stats/SimpleStatsRepository.scala
  7. +49 −2 finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReceiver.scala
  8. +0 −6 finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReporter4J.scala
  9. +82 −0 finagle-core/src/main/scala/com/twitter/finagle/stats/StatsRepository.scala
  10. +54 −0 finagle-core/src/main/scala/com/twitter/finagle/stats/TimeWindowedStatsRepository.scala
  11. +0 −187 finagle-core/src/main/scala/com/twitter/finagle/util/Sample.scala
  12. +0 −79 finagle-core/src/main/scala/com/twitter/finagle/util/TimeWindowedCollection.scala
  13. +0 −21 finagle-ostrich/src/main/scala/com/twitter/finagle/OstrichStatsReceiver.scala
  14. +31 −0 finagle-ostrich/src/main/scala/com/twitter/finagle/stats/OstrichStatsReceiver.scala
View
99 finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
@@ -10,13 +10,13 @@ import java.util.concurrent.Executors
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio._
-import com.twitter.util.Duration
+import com.twitter.util.{Duration, JavaTimer}
import com.twitter.util.TimeConversions._
import com.twitter.finagle.channel._
import com.twitter.finagle.util._
import com.twitter.finagle.service
-import com.twitter.finagle.stats.StatsReceiver
+import com.twitter.finagle.stats.{StatsRepository, TimeWindowedStatsRepository, StatsReceiver}
object ClientBuilder {
def apply() = new ClientBuilder
@@ -47,13 +47,12 @@ case class ClientBuilder(
_connectionTimeout: Duration,
_requestTimeout: Duration,
_statsReceiver: Option[StatsReceiver],
- _sampleWindow: Duration,
- _sampleGranularity: Duration,
+ _loadStatistics: (Int, Duration),
+ _failureAccrualStatistics: (Int, Duration),
_name: Option[String],
_hostConnectionLimit: Option[Int],
_sendBufferSize: Option[Int],
_recvBufferSize: Option[Int],
- _failureAccrualWindow: Duration,
_retries: Option[Int],
_initialBackoff: Option[Duration],
_backoffMultiplier: Option[Int],
@@ -68,13 +67,12 @@ case class ClientBuilder(
10.milliseconds, // connectionTimeout
Duration.MaxValue, // requestTimeout
None, // statsReceiver
- 10.minutes, // sampleWindow
- 10.seconds, // sampleGranularity
+ (60, 10.seconds), // loadStatistics
+ (10, 1.second), // failureAccrualStatistics
None, // name
None, // hostConnectionLimit
None, // sendBufferSize
None, // recvBufferSize
- 10.seconds, // failureAccrualWindow
None, // retries
None, // initialBackoff
None, // backoffMultiplier
@@ -104,11 +102,23 @@ case class ClientBuilder(
def reportTo(receiver: StatsReceiver): ClientBuilder =
copy(_statsReceiver = Some(receiver))
- def sampleWindow(window: Duration): ClientBuilder =
- copy(_sampleWindow = window)
+ /**
+ * The interval over which to aggregate load statistics.
+ */
+ def loadStatistics(numIntervals: Int, interval: Duration): ClientBuilder = {
+ require(numIntervals >= 1, "Must have at least 1 window to sample statistics over")
- def sampleGranularity(granularity: Duration): ClientBuilder =
- copy(_sampleGranularity = granularity)
+ copy(_loadStatistics = (numIntervals, interval))
+ }
+
+ /**
+ * The interval over which to aggregate failure accrual statistics.
+ */
+ def failureAccrualStatistics(numIntervals: Int, interval: Duration): ClientBuilder = {
+ require(numIntervals >= 1, "Must have at least 1 window to sample statistics over")
+
+ copy(_failureAccrualStatistics = (numIntervals, interval))
+ }
def name(value: String): ClientBuilder = copy(_name = Some(value))
@@ -127,9 +137,6 @@ case class ClientBuilder(
def sendBufferSize(value: Int): ClientBuilder = copy(_sendBufferSize = Some(value))
def recvBufferSize(value: Int): ClientBuilder = copy(_recvBufferSize = Some(value))
- def failureAccrualWindow(window: Duration): ClientBuilder =
- copy(_failureAccrualWindow = window)
-
def channelFactory(cf: ChannelFactory): ClientBuilder =
copy(_channelFactory = Some(cf))
@@ -186,48 +193,16 @@ case class ClientBuilder(
broker
}
- private def statsRepositoryForLoadedBroker(
- sockAddr: SocketAddress,
- name: Option[String],
- receiver: Option[StatsReceiver],
- window: Duration,
- granularity: Duration) =
- {
- if (window < granularity) {
- throw new IncompleteSpecification(
- "window smaller than granularity!")
- }
-
- val prefix = name map ("%s_".format(_)) getOrElse ""
- val sampleRepository =
- new ObservableSampleRepository[TimeWindowedSample[ScalarSample]] {
- override def makeStat = TimeWindowedSample[ScalarSample](window, granularity)
- }
-
- for (receiver <- receiver)
- sampleRepository observeTailsWith receiver.observer(prefix, sockAddr toString)
-
- sampleRepository
- }
-
- private def failureAccrualBroker(window: Duration)(broker: StatsLoadedBroker) = {
- val granularity = Seq((window.inMilliseconds / 10).milliseconds, 1.second).max
- def mk = new LazilyCreatingSampleRepository[TimeWindowedSample[ScalarSample]] {
- override def makeStat = TimeWindowedSample[ScalarSample](window, granularity)
- }
-
- new FailureAccruingLoadedBroker(broker, mk)
- }
-
def makeBroker(
codec: Codec,
- statsRepo: SampleRepository[T forSome { type T <: AddableSample[T] }]) =
+ loadStatsRepository: StatsRepository,
+ failureAccruingStatsRepo: StatsRepository) =
bootstrap(codec) _ andThen
pool(_hostConnectionLimit, _proactivelyConnect) _ andThen
(new PoolingBroker(_)) andThen
timeout(_requestTimeout) _ andThen
- (new StatsLoadedBroker(_, statsRepo)) andThen
- failureAccrualBroker(_failureAccrualWindow) _
+ (new StatsLoadedBroker(_, loadStatsRepository)) andThen
+ (new FailureAccruingLoadedBroker(_, failureAccruingStatsRepo))
def build(): Broker = {
val (hosts, codec) = (_hosts, _codec) match {
@@ -241,18 +216,26 @@ case class ClientBuilder(
(hosts, codec)
}
+ val timer = new JavaTimer
val brokers = hosts map { host =>
- val statsRepo = statsRepositoryForLoadedBroker(
- host, _name, _statsReceiver,
- _sampleWindow, _sampleGranularity)
+ val statsRepository = {
+ val statsRepository = new TimeWindowedStatsRepository(
+ _loadStatistics._1, _loadStatistics._2, timer)
+ statsRepository.scope(
+ "service" -> _name.getOrElse(""),
+ "host" -> host.toString)
+ }
- val broker = makeBroker(codec, statsRepo)(host)
+ val failureAccruingStatsRepo = {
+ new TimeWindowedStatsRepository(_failureAccrualStatistics._1, _failureAccrualStatistics._2)
+ }
+ val broker = makeBroker(codec, statsRepository, failureAccruingStatsRepo)(host)
_statsReceiver.foreach { statsReceiver =>
val hostString = host.toString
- statsReceiver.makeGauge(hostString + "_load", broker.load)
- statsReceiver.makeGauge(hostString + "_weight", broker.weight)
- statsReceiver.makeGauge(hostString + "_available", if (broker.isAvailable) 1 else 0)
+ statsReceiver.mkGauge("host" -> hostString, "load" -> "broker", broker.load)
+ statsReceiver.mkGauge("host" -> hostString, "weight" -> "broker", broker.weight)
+ statsReceiver.mkGauge("host" -> hostString, "available" -> "broker", if (broker.isAvailable) 1 else 0)
}
broker
View
88 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
@@ -13,13 +13,13 @@ import org.jboss.netty.handler.ssl._
import org.jboss.netty.channel.socket.nio._
import com.twitter.util.TimeConversions._
-import com.twitter.util.{Duration, Time}
+import com.twitter.util.{Time, JavaTimer}
import com.twitter.finagle._
import channel.{Job, QueueingChannelHandler}
import com.twitter.finagle.util._
import com.twitter.finagle.service.{Service, ServicePipelineFactory}
-import stats.StatsReceiver
+import stats.{StatsRepository, StatsReceiver}
object ServerBuilder {
def apply() = new ServerBuilder()
@@ -31,18 +31,17 @@ object ServerBuilder {
Executors.newCachedThreadPool())
}
-class SampleHandler(samples: SampleRepository[AddableSample[_]])
- extends SimpleChannelHandler{
- val dispatchSample: AddableSample[_] = samples("dispatch")
- val latencySample: AddableSample[_] = samples("latency")
+class SampleHandler(statsReceiver: StatsReceiver)
+ extends SimpleChannelHandler {
+ private[this] val dispatchSample = statsReceiver.counter("dispatches" -> "service")
+ private[this] val latencySample = statsReceiver.gauge("latency" -> "service")
case class Timing(requestedAt: Time = Time.now)
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
ctx.getAttachment match {
case Timing(requestedAt: Time) =>
- samples("exception", e.getCause.getClass.getName).add(
- requestedAt.untilNow.inMilliseconds.toInt)
+ statsReceiver.counter("exception" -> e.getCause.getClass.getName).incr()
case _ => ()
}
super.exceptionCaught(ctx, e)
@@ -62,7 +61,7 @@ class SampleHandler(samples: SampleRepository[AddableSample[_]])
val e = c.asInstanceOf[MessageEvent]
ctx.getAttachment match {
case Timing(requestedAt) =>
- latencySample.add(requestedAt.untilNow.inMilliseconds.toInt)
+ latencySample.measure(requestedAt.untilNow.inMilliseconds.toInt)
ctx.setAttachment(null)
case _ =>
// Can this happen?
@@ -80,8 +79,6 @@ class SampleHandler(samples: SampleRepository[AddableSample[_]])
case class ServerBuilder(
_codec: Option[Codec],
_statsReceiver: Option[StatsReceiver],
- _sampleWindow: Duration,
- _sampleGranularity: Duration,
_name: Option[String],
_sendBufferSize: Option[Int],
_recvBufferSize: Option[Int],
@@ -97,21 +94,19 @@ case class ServerBuilder(
import ServerBuilder._
def this() = this(
- None, // codec
- None, // statsReceiver
- 10.minutes, // sampleWindow
- 10.seconds, // sampleGranularity
- None, // name
- None, // sendBufferSize
- None, // recvBufferSize
- None, // pipelineFactory
- None, // bindTo
- None, // logger
- None, // tls
- false, // startTls
- None, // channelFactory
- None, // maxConcurrentRequests
- None // maxQueueDepth
+ None, // codec
+ None, // statsReceiver
+ None, // name
+ None, // sendBufferSize
+ None, // recvBufferSize
+ None, // pipelineFactory
+ None, // bindTo
+ None, // logger
+ None, // tls
+ false, // startTls
+ None, // channelFactory
+ None, // maxConcurrentRequests
+ None // maxQueueDepth
)
def codec(codec: Codec): ServerBuilder =
@@ -120,12 +115,6 @@ case class ServerBuilder(
def reportTo(receiver: StatsReceiver): ServerBuilder =
copy(_statsReceiver = Some(receiver))
- def sampleWindow(window: Duration): ServerBuilder =
- copy(_sampleWindow = window)
-
- def sampleGranularity(window: Duration): ServerBuilder =
- copy(_sampleGranularity = window)
-
def name(value: String): ServerBuilder = copy(_name = Some(value))
def sendBufferSize(value: Int): ServerBuilder = copy(_sendBufferSize = Some(value))
@@ -157,32 +146,6 @@ case class ServerBuilder(
def maxQueueDepth(max: Int): ServerBuilder =
copy(_maxQueueDepth = Some(max))
- private def statsRepository(
- name: Option[String],
- receiver: Option[StatsReceiver],
- window: Duration,
- granularity: Duration,
- sockAddr: SocketAddress) =
- {
- if (window < granularity) {
- throw new IncompleteSpecification(
- "window smaller than granularity!")
- }
-
- // .
-
- val prefix = name map ("%s_".format(_)) getOrElse ""
- val sampleRepository =
- new ObservableSampleRepository[TimeWindowedSample[ScalarSample]] {
- override def makeStat = TimeWindowedSample[ScalarSample](window, granularity)
- }
-
- for (receiver <- receiver)
- sampleRepository observeTailsWith receiver.observer(prefix, sockAddr toString)
-
- sampleRepository
- }
-
def build(): Channel = {
val (codec, pipelineFactory) = (_codec, _pipelineFactory) match {
case (None, _) =>
@@ -201,11 +164,6 @@ case class ServerBuilder(
_sendBufferSize foreach { s => bs.setOption("sendBufferSize", s) }
_recvBufferSize foreach { s => bs.setOption("receiveBufferSize", s) }
- val statsRepo = statsRepository(
- _name, _statsReceiver,
- _sampleWindow, _sampleGranularity,
- _bindTo.get)
-
bs.setPipelineFactory(new ChannelPipelineFactory {
def getPipeline = {
val pipeline = codec.serverPipelineFactory.getPipeline
@@ -229,7 +187,9 @@ case class ServerBuilder(
pipeline.addFirst("ssl", new SslHandler(sslEngine, _startTls))
}
- pipeline.addLast("stats", new SampleHandler(statsRepo))
+ _statsReceiver foreach { statsReceiver =>
+ pipeline.addLast("stats", new SampleHandler(statsReceiver))
+ }
for ((name, handler) <- pipelineFactory.getPipeline.toMap)
pipeline.addLast(name, handler)
View
4 finagle-core/src/main/scala/com/twitter/finagle/channel/Broker.scala
@@ -8,11 +8,11 @@ import com.twitter.finagle.service.Service
// TODO: variables.
trait Broker extends Service[AnyRef, AnyRef] {
- def isAvailable: Boolean = true
+ def isAvailable: Boolean = true
}
trait WrappingBroker extends Broker {
- val underlying: Broker
+ protected val underlying: Broker
def apply(request: AnyRef) = underlying(request)
override def isAvailable = underlying.isAvailable
View
34 finagle-core/src/main/scala/com/twitter/finagle/channel/LoadedBroker.scala
@@ -7,6 +7,7 @@ import com.twitter.util.{Time, Return, Throw, Future}
import com.twitter.util.TimeConversions._
import com.twitter.finagle.util._
import com.twitter.finagle.util.Conversions._
+import com.twitter.finagle.stats.StatsRepository
/**
* This is F-bounded to ensure that we have a homogenous set of
@@ -23,54 +24,53 @@ trait LoadedBroker[+A <: LoadedBroker[A]] extends Broker {
* Keeps track of request latencies & counts.
*/
class StatsLoadedBroker(
- val underlying: Broker,
- samples: SampleRepository[T forSome { type T <: AddableSample[T] }],
+ protected val underlying: Broker,
+ statsRepository: StatsRepository,
bias: Float = 1.0f)
extends WrappingBroker
with LoadedBroker[StatsLoadedBroker]
{
- val dispatchSample = samples("dispatch")
- val latencySample = samples("latency")
+ private[this] val dispatchStat = statsRepository.counter("dispatches" -> "broker")
+ private[this] val latencyStat = statsRepository.gauge("latency" -> "broker")
override def apply(request: AnyRef) = {
val begin = Time.now
- dispatchSample.incr()
+ dispatchStat.incr()
val f = underlying(request)
f respond {
case Return(_) =>
- latencySample.add(begin.untilNow.inMilliseconds.toInt)
+ latencyStat.measure(begin.untilNow.inMilliseconds.toInt)
case Throw(e) =>
// TODO: exception hierarchy here to differentiate between
// application, connection & other (internal?) exceptions.
- samples("exception", e.getClass.getName)
- .add(begin.untilNow.inMilliseconds.toInt)
+ statsRepository.counter("exception" -> e.getClass.getName).incr()
}
f
}
override def weight = super.weight * bias
- def load = dispatchSample.count
+ def load = dispatchStat.sum
// Fancy pants:
// latencyStats.sum + 2 * latencyStats.mean * failureStats.count
}
class FailureAccruingLoadedBroker(
- val underlying: LoadedBroker[_],
- samples: SampleRepository[TimeWindowedSample[_]])
+ protected val underlying: LoadedBroker[_],
+ statsRepository: StatsRepository)
extends WrappingBroker
with LoadedBroker[FailureAccruingLoadedBroker]
{
- val successSample = samples("success")
- val failureSample = samples("failure")
+ private[this] val successStat = statsRepository.counter("success" -> "broker")
+ private[this] val failureStat = statsRepository.counter("failure" -> "broker")
def load = underlying.load
override def weight = {
- val success = successSample.count
- val failure = failureSample.count
+ val success = successStat.sum
+ val failure = failureStat.sum
val sum = success + failure
// TODO: do we decay this decision beyond relying on the stats
@@ -86,8 +86,8 @@ class FailureAccruingLoadedBroker(
// TODO: discriminate request errors vs. connection errors, etc.?
val f = underlying(request)
f respond {
- case Return(_) => successSample.incr()
- case Throw(_) => failureSample.incr()
+ case Return(_) => successStat.incr()
+ case Throw(_) => failureStat.incr()
}
f
View
28 finagle-core/src/main/scala/com/twitter/finagle/stats/JavaLoggerStatsReceiver.scala
@@ -8,20 +8,36 @@ import com.twitter.finagle.util.Conversions._
case class JavaLoggerStatsReceiver(logger: Logger) extends StatsReceiver {
val timer = new HashedWheelTimer()
- def observer(prefix: String, label: String) = {
- val suffix = "_%s".format(label)
+ private[this] class Counter(description: Seq[(String, String)]) extends OCounter {
+ def incr(delta: Int) {
+ logger.info("%s incr %d".format(formatDescription(description), delta))
+ }
+
+ val sum = 0
+ }
- (path: Seq[String], value: Int, count: Int) => {
- val pathString = path mkString "__"
- logger.info(List(prefix, pathString, suffix, count) mkString " ")
+ private[this] class Gauge(description: Seq[(String, String)]) extends OGauge {
+ def measure(value: Float) {
+ logger.info("%s measure %f".format(formatDescription(description), value))
}
+
+ val summary = Summary(0.0f, 0)
}
- def makeGauge(name: String, f: => Float) {
+ def gauge(description: (String, String)*): OGauge = new Gauge(description)
+ def counter(description: (String, String)*): OCounter = new Counter(description)
+
+ def mkGauge(name: Seq[(String, String)], f: => Float) {
timer(10.seconds) {
logger.info("%s %2f".format(name, f))
}
}
+
+ private[this] def formatDescription(description: Seq[(String, String)]) = {
+ description.map { case (key, value) =>
+ "%s_%s".format(key, value)
+ }.mkString("__")
+ }
}
object JavaLoggerStatsReceiver {
View
43 finagle-core/src/main/scala/com/twitter/finagle/stats/SimpleStatsRepository.scala
@@ -0,0 +1,43 @@
+package com.twitter.finagle.stats
+
+import com.twitter.util.MapMaker
+import java.util.concurrent.atomic.AtomicInteger
+import com.twitter.concurrent.Serialized
+
+class SimpleStatsRepository extends StatsRepository {
+ class Counter extends OCounter {
+ private[this] val _sum = new AtomicInteger(0)
+
+ def incr(delta: Int) { _sum.addAndGet(delta) }
+ def sum = _sum.get
+ }
+
+ class Gauge extends OGauge with Serialized {
+ @volatile private[this] var _summary = Summary(0.0f, 0)
+
+ def measure(value: Float) {
+ serialized {
+ val snapshot = summary
+ _summary = Summary(snapshot.total + value, snapshot.count + 1)
+ }
+ }
+
+ def summary = _summary
+ }
+
+ private[this] val counters = MapMaker[Seq[(String, String)], Counter] { config =>
+ config.compute { _ => new Counter }
+ }
+
+ private[this] val gauges = MapMaker[Seq[(String, String)], Gauge] { config =>
+ config.compute { _ => new Gauge }
+ }
+
+ def counter(path: (String, String)*): OCounter = counters(path)
+ def gauge(path: (String, String)*): OGauge = gauges(path)
+
+ /**
+ * Unsupported for now.
+ */
+ def mkGauge(path: Seq[(String, String)], f: => Float) {}
+}
View
51 finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReceiver.scala
@@ -1,6 +1,53 @@
package com.twitter.finagle.stats
+/**
+ * A readable and writeable Counter. Only sums are kept of Counters.
+ * An example Counter is "number of requests served".
+ */
+trait Counter extends {
+ def incr(delta: Int)
+ def incr() { incr(1) }
+}
+
+/**
+ * A readable and writeable Gauge. Gauages are usually continuous
+ * values that are measured at moments in time (e.g., the value
+ * of a share of Twitter's stock).
+ */
+trait Gauge {
+ /**
+ * Record a measurement
+ */
+ def measure(value: Float)
+}
+
trait StatsReceiver {
- def observer(prefix: String, label: String): (Seq[String], Int, Int) => Unit
- def makeGauge(name: String, f: => Float)
+ /**
+ * Get a Counter with the description
+ */
+ def counter(description: (String, String)*): Counter
+
+ /**
+ * Get a Gauge with the description
+ */
+ def gauge(description: (String, String)*): Gauge
+
+ /**
+ * Register a function to be periodically measured.
+ */
+ def mkGauge(description: Seq[(String, String)], f: => Float)
+
+ /**
+ * Convenvenience function to deal with 1-arity descriptions
+ */
+ def mkGauge(description: (String, String), f: => Float) {
+ mkGauge(Seq(description), f)
+ }
+
+ /**
+ * Convenvenience function to deal with 2-arity descriptions
+ */
+ def mkGauge(description1: (String, String), description2: (String, String), f: => Float) {
+ mkGauge(Seq(description1, description2), f)
+ }
}
View
6 finagle-core/src/main/scala/com/twitter/finagle/stats/StatsReporter4J.scala
@@ -1,6 +0,0 @@
-package com.twitter.finagle.stats
-
-
-
-
-
View
82 finagle-core/src/main/scala/com/twitter/finagle/stats/StatsRepository.scala
@@ -0,0 +1,82 @@
+package com.twitter.finagle.stats
+
+trait OCounter extends com.twitter.finagle.stats.Counter {
+ def sum: Int
+}
+
+trait OGauge extends com.twitter.finagle.stats.Gauge {
+ /**
+ * An atomic snapshot of summary statistics.
+ */
+ case class Summary(total: Float, count: Int)
+
+ /**
+ * Arithmetic mean
+ */
+ def mean = {
+ val snapshot = summary
+ snapshot.total / snapshot.count
+ }
+
+ /**
+ * Get an atomic snapshot of summary statistics
+ */
+ def summary: Summary
+}
+
+
+/**
+ * A service for storing and collecting statistics. The kinds of data
+ * that can be measured include Counters (which maintains only a sum)
+ * and Gauges (which maintains a set of summary statistics such as
+ * mean).
+ */
+trait StatsRepository extends StatsReceiver {
+
+ /**
+ * Get a Counter with the description
+ */
+ def counter(description: (String, String)*): OCounter
+
+ /**
+ * Get a Gauge with the given description
+ */
+ def gauge(description: (String, String)*): OGauge
+
+ /**
+ * Prepends a prefix description to all descriptions on this StatsRepository
+ */
+ def scope(prefix: (String, String)*) = {
+ val self = this
+ new StatsRepository {
+ def counter(description: (String, String)*): OCounter =
+ self.counter(prefix ++ description: _*)
+
+ def gauge(description: (String, String)*): OGauge =
+ self.gauge(prefix ++ description: _*)
+
+ def mkGauge(description: Seq[(String, String)], f: => Float) {
+ self.mkGauge(prefix ++ description, f)
+ }
+ }
+ }
+}
+
+/**
+ * A StatsRepository that discards all data
+ */
+class NullStatsRepository extends StatsRepository {
+ private[this] class Gauge extends OGauge {
+ val summary = Summary(0.0f, 0)
+ def measure(value: Float) {}
+ }
+
+ private[this] class Counter extends OCounter {
+ def incr(delta: Int) {}
+ val sum = 0
+ }
+
+ def gauge(description: (String, String)*): OGauge = new Gauge
+ def counter(description: (String, String)*): OCounter = new Counter
+ def mkGauge(description: Seq[(String, String)], f: => Float) {}
+}
View
54 finagle-core/src/main/scala/com/twitter/finagle/stats/TimeWindowedStatsRepository.scala
@@ -0,0 +1,54 @@
+package com.twitter.finagle.stats
+
+import com.twitter.util.{Timer, Duration, JavaTimer, Time}
+
+/**
+ * A StatsRepository that keeps a rolling set of windows of data. Stats are
+ * collected over a time window, and a limited number of time windows are
+ * stored. Older windows drop off the back, newer windows are appended to
+ * the front.
+ *
+ * @param interval the duration of an individual time window
+ * @param windows the number of time windows to keep around
+ * @param timer a timer to schedule creating and dropping time windows
+ */
+class TimeWindowedStatsRepository(numIntervals: Int, interval: Duration, timer: Timer = new JavaTimer)
+ extends StatsRepository
+{
+ @volatile private[this] var position = 0
+ private[this] def repositories = new Array[StatsRepository](numIntervals)
+ private[this] def currentRepository = repositories(position % numIntervals)
+
+ repositories(0) = new SimpleStatsRepository
+ timer.schedule(interval.fromNow, interval) {
+ repositories((position + 1) % numIntervals) = new SimpleStatsRepository
+ position += 1
+ }
+
+ private[this] class Counter(path: (String, String)*) extends OCounter {
+ private[this] def current = currentRepository.counter(path: _*)
+
+ def sum = repositories.foldLeft(0) { (total, repository) =>
+ total + repository.counter(path: _*).sum
+ }
+
+ def incr(delta: Int) = current.incr(delta)
+ }
+
+ private[this] class Gauge(path: (String, String)*) extends OGauge {
+ private[this] def current = currentRepository.gauge(path: _*)
+
+ def summary = repositories.foldLeft(Summary(0.0f, 0)) { (acc, repository) =>
+ val summary = repository.gauge(path: _*).summary
+ Summary(acc.total + summary.total, acc.count + summary.count)
+ }
+
+ def measure(value: Float) = current.measure(value)
+ }
+
+ def counter(path: (String, String)*): OCounter = new Counter(path: _*)
+ def gauge(path: (String, String)*): OGauge = new Gauge(path: _*)
+ def mkGauge(description: Seq[(String, String)], f: => Float) {
+ timer.schedule(Time.now, interval)(f)
+ }
+}
View
187 finagle-core/src/main/scala/com/twitter/finagle/util/Sample.scala
@@ -1,187 +0,0 @@
-package com.twitter.finagle.util
-
-import scala.collection.mutable.Queue
-import scala.collection.JavaConversions._
-
-import java.util.concurrent.ConcurrentHashMap
-
-import com.twitter.util.Duration
-import com.twitter.util.TimeConversions._
-
-// TODO: do we want a decaying stat?
-
-trait Sample {
- // TODO: sumOfSquares
- def sum: Int
- def count: Int
- def mean: Int = if (count != 0) sum / count else 0
-
- override def toString = "[count=%d, sum=%d, mean=%d]".format(count, sum, mean)
-}
-
-trait AddableSample[+S <: AddableSample[S]] extends Sample {
- def add(value: Int): Unit = add(value, 1)
- def add(value: Int, count: Int)
- def incr(): Unit = add(0, 1)
-}
-
-trait SampleRepository[+S <: Sample] {
- def apply(path: String*): S
-}
-
-trait LazilyCreatingSampleRepository[S <: Sample] extends SampleRepository[S] {
- private val map = new ConcurrentHashMap[Seq[String], S]()
-
- def makeStat: S
- def apply(path: String*): S = map getOrElseUpdate(path, makeStat)
-}
-
-trait ObservableSampleRepository[S <: AddableSample[S]]
- extends LazilyCreatingSampleRepository[AddableSample[S]]
-{
- private def tails[A](s: Seq[A]): Seq[Seq[A]] = {
- s match {
- case s@Seq(_) =>
- Seq(s)
-
- case Seq(hd, tl@_*) =>
- Seq(Seq(hd)) ++ (tails(tl) map { t => Seq(hd) ++ t })
- }
- }
-
- private type Observer = (Seq[String], Int, Int) => Unit
- private val observers = new Queue[Observer]
- private val tailObservers = new Queue[Observer]
-
- def observeTailsWith(o: (Seq[String], Int, Int) => Unit) = synchronized {
- tailObservers += o
- }
-
- def observeWith(o: (Seq[String], Int, Int) => Unit) = synchronized {
- observers += o
- }
-
- override def apply(path: String*): AddableSample[S] =
- new AddableSampleProxy[S](super.apply(path:_*)) {
- override def add(value: Int, count: Int) {
- super.add(value, count)
-
- ObservableSampleRepository.this.synchronized {
- for (o <- observers)
- o(path, value, count)
-
- for (tail <- tails(path); o <- tailObservers)
- o(path, value, count)
- }
- }
- }
-}
-
-class AddableSampleProxy[+S <: AddableSample[S]](
- val self: AddableSample[S])
- extends AddableSample[S] with Proxy
-{
- def add(value: Int, count: Int) = self.add(value, count)
- def sum = self.sum
- def count = self.count
-}
-
-class ScalarSample extends AddableSample[ScalarSample] {
- @volatile private var counter = 0
- @volatile private var accumulator = 0
-
- def sum = accumulator
- def count = counter
-
- def add(value: Int, count: Int) = synchronized {
- counter += count
- accumulator += value
- }
-}
-
-trait AggregateSample extends Sample {
- protected val underlying: Iterable[Sample]
-
- def sum = underlying.map(_.sum).sum
- def count = underlying.map(_.count).sum
-}
-
-class TimeWindowedSample[S <: AddableSample[S]](bucketCount: Int, bucketDuration: Duration)
- (implicit val _s: Manifest[S])
- extends AggregateSample with AddableSample[TimeWindowedSample[S]]
-{
- protected val underlying = new TimeWindowedCollection[S](bucketCount, bucketDuration)
-
- def add(value: Int, count: Int) = underlying().add(value, count)
-
- def rateInHz = {
- val (begin, end) = underlying.timeSpan
- val timeDiff = end - begin
- count / timeDiff.inSeconds
- }
-
- // TODO: export for duration (to get different granularities but snaps to next bucket)
- // def apply(duration: Duration): AggregateSample
-
- override def toString = underlying.toString
-}
-
-object TimeWindowedSample {
- def apply[S <: AddableSample[S]]
- (window: Duration, granularity: Duration)
- (implicit _s: Manifest[S]): TimeWindowedSample[S] =
- {
- if (window < granularity)
- throw new IllegalArgumentException("window smaller than granularity!")
-
- val numBuckets = math.max(1, window.inMilliseconds / granularity.inMilliseconds)
- new TimeWindowedSample[S](numBuckets.toInt, granularity)
- }
-
-}
-
-sealed abstract class SampleTree extends AggregateSample {
- val name: String
- def merge(other: SampleTree): SampleTree
-}
-
-case class SampleNode(name: String, underlying: Seq[SampleTree])
- extends SampleTree
-{
- // In order to merge succesfully, trees must have the same shape.
- def merge(other: SampleTree) =
- other match {
- case SampleNode(otherName, otherUnderlying) if name == otherName =>
- val sampless =
- (underlying ++ otherUnderlying) groupBy (_.name) map { case (_, samples) => samples }
- val merged = sampless map { _.reduceLeft (_.merge(_)) }
- SampleNode(name, merged toSeq)
-
- case _ =>
- throw new IllegalArgumentException("trees are shape divergent")
- }
-
- override def toString = {
- val lines = underlying flatMap (_.toString.split("\n")) map ("_" + _) mkString "\n"
- "%s %s".format(name, super.toString) + "\n" + lines
- }
-}
-
-case class SampleLeaf(name: String, sample: Sample) extends SampleTree
-{
- val underlying = Seq(sample)
- override def toString = "%s %s".format(name, super.toString)
-
- def merge(other: SampleTree) = {
- other match {
- case SampleLeaf(otherName, otherSample) if name == otherName =>
- SampleLeaf(name, new AggregateSample { val underlying = Seq(sample, otherSample) })
-
- // Shape divergence!
- case _ =>
- throw new IllegalArgumentException("trees are shape divergent")
- }
- }
-}
-
-
View
79 finagle-core/src/main/scala/com/twitter/finagle/util/TimeWindowedCollection.scala
@@ -1,79 +0,0 @@
-package com.twitter.finagle.util
-
-import com.twitter.util.{Duration, Time}
-import com.twitter.util.TimeConversions._
-
-class TimeWindowedCollection[A](bucketCount: Int, bucketDuration: Duration)
- (implicit val _a: Manifest[A])
- extends Iterable[A]
-{
- // PREMATURE_OPTIMIZATION_TODO:
- //
- // - we can keep the head reference in an atomic ref, and only
- // ever synchronize on gc()
- // - we can keep a (circular) array of counters instead of
- // creating new objects every expiration.
- // - use serialized!
-
- protected def newInstance: A =
- _a.erasure.newInstance.asInstanceOf[A]
-
- @volatile private var buckets = List[Tuple2[Time, A]]()
-
- private def gc(now: Time) = synchronized {
- val limit = now - bucketDuration * bucketCount
-
- val index = buckets.findIndexOf {
- case (timestamp, _) if timestamp < limit => true
- case _ => false
- }
-
- if (index != -1)
- buckets = buckets.slice(0, index)
- }
-
- private[util] def prepend(now: Time) = synchronized {
- gc(now)
-
- val instance = newInstance
- buckets = (now, instance) :: buckets
- instance
- }
-
- def apply(): A = synchronized {
- val now = Time.now
-
- buckets.headOption match {
- case Some((timestamp, _)) if now - timestamp >= bucketDuration =>
- prepend(now)
- case Some((_, instance)) =>
- instance
- case None =>
- prepend(now)
- }
- }
-
- override def toString = synchronized {
- val details = buckets.headOption map { case (firstTs, _) =>
- val bucketDetails = buckets map {
- case (ts, instance) => "%s = %s".format(ts - firstTs, instance)
- }
- bucketDetails.mkString("\n\t", ",\n\t", "")
- }
-
- "TimeWindowedCollection[%s](%d buckets of %s sec, spanning %s) = [%s]".format(
- _a.erasure.getName, bucketCount, bucketDuration, details getOrElse "", timeSpan)
- }
-
- def timeSpan = {
- val now = Time.now
- (buckets.lastOption map { case (ts, _) => ts } getOrElse(now), now + bucketDuration)
- }
-
- // TODO: check that this is threadsafe. i believe it is (because we
- // get the head).
- def iterator = {
- gc(Time.now)
- buckets.iterator map { case (_, x) => x }
- }
-}
View
21 finagle-ostrich/src/main/scala/com/twitter/finagle/OstrichStatsReceiver.scala
@@ -1,21 +0,0 @@
-package com.twitter.finagle
-
-import com.twitter.ostrich.Stats
-import com.twitter.finagle.stats.StatsReceiver
-
-class OstrichStatsReceiver extends StatsReceiver {
- def observer(prefix: String, label: String) = {
- val suffix = "_%s".format(label)
-
- (path: Seq[String], value: Int, count: Int) => {
- // Enforce count == 1?
- val pathString = path mkString "__"
- Stats.addTiming(prefix + pathString, value)
- Stats.addTiming(prefix + pathString + suffix, value)
- }
- }
-
- def makeGauge(name: String, f: => Float) {
- Stats.makeGauge(name)(f)
- }
-}
View
31 finagle-ostrich/src/main/scala/com/twitter/finagle/stats/OstrichStatsReceiver.scala
@@ -0,0 +1,31 @@
+package com.twitter.finagle.stats
+
+import com.twitter.ostrich.Stats
+
+class OstrichStatsReceiver extends StatsReceiver {
+ def gauge(description: (String, String)*) = new Gauge {
+ private[this] val name = descriptionToName(description)
+
+ def measure(value: Float) {
+ Stats.setGauge(name, value)
+ }
+ }
+
+ def counter(description: (String, String)*) = new Counter {
+ private[this] val name = descriptionToName(description)
+
+ def incr(delta: Int) {
+ Stats.incr(name, delta)
+ }
+ }
+
+ def mkGauge(description: Seq[(String, String)], f: => Float) {
+ Stats.makeGauge(descriptionToName(description))(f)
+ }
+
+ private[this] def descriptionToName(description: Seq[(String, String)]) = {
+ description.map { case (key, value) =>
+ "%s_%s".format(key, value)
+ }.mkString("__")
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.