Skip to content

Commit

Permalink
{finagle, util, twitter-server}: Metric Expression prototype
Browse files Browse the repository at this point in the history
Problem

We want to ship our prototype of Metric Expression to start the development,
this prototype confirms that end-to-end the expression instrument and exports
work as we expected.

Solution

The current prototype code is sitting in experiment packages, the exported
expression json document is under-development.

JIRA Issues: CSL-10616

Differential Revision: https://phabricator.twitter.biz/D615453
  • Loading branch information
yufangong authored and jenkins committed Mar 8, 2021
1 parent 5e5a241 commit 5a0745d
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ trait ListeningStackServer[Req, Rep, This <: ListeningStackServer[Req, Rep, This
private[this] val statsReceiver =
if (serverLabel.isEmpty) new RoleConfiguredStatsReceiver(stats, Server)
else
new RoleConfiguredStatsReceiver(
RoleConfiguredStatsReceiver(
new RelativeNameMarkingStatsReceiver(stats.scope(serverLabel)),
Server)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package com.twitter.finagle.service

import com.twitter.finagle.Filter.TypeAgnostic
import com.twitter.finagle._
import com.twitter.finagle.stats.{
ExceptionStatsHandler,
MultiCategorizingExceptionStatsHandler,
StatsReceiver
}
import com.twitter.finagle.stats.exp.{Expression, ExpressionSchema, GreaterThan, MonotoneThreshold}
import com.twitter.finagle.stats._
import com.twitter.util._
import java.util.concurrent.atomic.LongAdder
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.LongAdder
import scala.util.control.NonFatal

object StatsFilter {
Expand Down Expand Up @@ -234,13 +231,35 @@ class StatsFilter[Req, Rep] private[service] (
}
}

private[this] val successSchema =
CounterSchema(new MetricBuilder(name = Seq("success"), statsReceiver = statsReceiver))
private[this] val failureSchema =
CounterSchema(
new MetricBuilder(name = Seq(ExceptionStatsHandler.Failures), statsReceiver = statsReceiver))
private[this] val requestSchema =
CounterSchema(new MetricBuilder(name = Seq("requests"), statsReceiver = statsReceiver))

private[this] val outstandingRequestCount = new LongAdder()
private[this] val dispatchCount = statsReceiver.counter("requests")
private[this] val successCount = statsReceiver.counter("success")
private[this] val dispatchCount = statsReceiver.counter(requestSchema)
private[this] val successCount = statsReceiver.counter(successSchema)
private[this] val latencyStat = statsReceiver.stat(s"request_latency_$latencyStatSuffix")
private[this] val outstandingRequestCountGauge =
statsReceiver.addGauge("pending") { outstandingRequestCount.sum() }

private[this] val successRate =
ExpressionSchema(
"success_rate",
Expression(successSchema).divide(Expression(successSchema).plus(Expression(failureSchema))))
.withBounds(MonotoneThreshold(GreaterThan, 99.5, 99.75))
.withUnit(Percentage)
.withDescription("The Success Rate")
.register()

private[this] val throughput = ExpressionSchema("throughput", Expression(requestSchema))
.withUnit(Requests)
.withDescription("The total requests")
.register()

private[this] def isIgnorableResponse(rep: Try[Rep]): Boolean = rep match {
case Throw(f: FailureFlags[_]) if f.isFlagged(FailureFlags.Ignorable) =>
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ object ClientStatsReceiver extends StatsReceiverProxy {
*/
object ServerStatsReceiver extends StatsReceiverProxy {
@volatile protected var self: StatsReceiver =
new RoleConfiguredStatsReceiver(LoadedStatsReceiver.scope("srv"), Server)
RoleConfiguredStatsReceiver(LoadedStatsReceiver.scope("srv"), Server)

def setRootScope(rootScope: String): Unit = {
self = new RoleConfiguredStatsReceiver(LoadedStatsReceiver.scope(rootScope), Server)
self = RoleConfiguredStatsReceiver(LoadedStatsReceiver.scope(rootScope), Server)
}

override def repr: ServerStatsReceiver.type = this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.twitter.finagle.stats.{
InMemoryStatsReceiver
}
import com.twitter.finagle._
import com.twitter.finagle.stats.exp.{FunctionExpression, MetricExpression}
import com.twitter.util._
import java.util.concurrent.TimeUnit
import org.scalatest.FunSuite
Expand Down Expand Up @@ -311,4 +312,11 @@ class StatsFilterTest extends FunSuite {
assert(2 == sr.counter("success")())
assert(1 == sr.counter("failures")())
}

test("success rate and throughput expressions are instrumented") {
val (_, receiver, _) = getService()

assert(receiver.expressions("success_rate").expr.isInstanceOf[FunctionExpression])
assert(receiver.expressions("throughput").expr.isInstanceOf[MetricExpression])
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.twitter.finagle.stats

import com.twitter.finagle.stats.exp.ExpressionSchema
import com.twitter.logging.Logger
import com.twitter.util.lint.{Category, Issue, Rule}
import java.util
Expand Down Expand Up @@ -42,7 +43,8 @@ object Metrics {
statsMap = new ConcurrentHashMap[Seq[String], MetricsStore.StoreStat](),
gaugesMap = new ConcurrentHashMap[Seq[String], MetricsStore.StoreGauge](),
/** Store MetricSchemas for each metric in order to surface metric metadata to users. */
metricSchemas = new ConcurrentHashMap[String, MetricSchema]()
metricSchemas = new ConcurrentHashMap[String, MetricSchema](),
expressionSchemas = new ConcurrentHashMap[String, ExpressionSchema]()
)

private class StoreCounterImpl(override val name: String) extends MetricsStore.StoreCounter {
Expand Down Expand Up @@ -82,7 +84,8 @@ object Metrics {
countersMap: ConcurrentHashMap[Seq[String], MetricsStore.StoreCounter],
statsMap: ConcurrentHashMap[Seq[String], MetricsStore.StoreStat],
gaugesMap: ConcurrentHashMap[Seq[String], MetricsStore.StoreGauge],
metricSchemas: ConcurrentHashMap[String, MetricSchema])
metricSchemas: ConcurrentHashMap[String, MetricSchema],
expressionSchemas: ConcurrentHashMap[String, ExpressionSchema])
}

/**
Expand Down Expand Up @@ -133,6 +136,8 @@ private[finagle] class Metrics private (

private[this] val metricSchemas = metricsMaps.metricSchemas

private[this] val expressionSchemas = metricsMaps.expressionSchemas

private[this] val verbosityMap =
new ConcurrentHashMap[String, Verbosity]()

Expand Down Expand Up @@ -213,6 +218,14 @@ private[finagle] class Metrics private (
}
}

private[stats] def registerExpression(exprSchema: ExpressionSchema): Unit = {
val expressionId = exprSchema.labels.serviceName match {
case Some(serviceName) => exprSchema.name + "_" + serviceName
case None => exprSchema.name
}
expressionSchemas.putIfAbsent(expressionId, exprSchema)
}

def registerGauge(schema: GaugeSchema, f: => Float): Unit =
registerNumberGauge(schema, f)

Expand Down Expand Up @@ -290,6 +303,9 @@ private[finagle] class Metrics private (
def schemas: util.Map[String, MetricSchema] =
util.Collections.unmodifiableMap(metricSchemas)

def expressions: util.Map[String, ExpressionSchema] =
util.Collections.unmodifiableMap(expressionSchemas)

def metricsCollisionsLinterRule: Rule =
Rule(
Category.Configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.twitter.finagle.stats

import com.twitter.app.GlobalFlag
import com.twitter.finagle.http.{HttpMuxHandler, Route, RouteIndex}
import com.twitter.finagle.stats.exp.ExpressionSchema
import com.twitter.finagle.util.DefaultTimer
import com.twitter.logging.{Level, Logger}
import com.twitter.util.{Future, FuturePool, Time}
Expand Down Expand Up @@ -150,6 +151,9 @@ class MetricsStatsReceiver(val registry: Metrics)
}

override def metricsCollisionsLinterRule: Rule = registry.metricsCollisionsLinterRule

override protected[finagle] def registerExpression(expressionSchema: ExpressionSchema): Unit =
registry.registerExpression(expressionSchema)
}

private object MetricsExporter {
Expand Down Expand Up @@ -179,6 +183,12 @@ class MetricsExporter(val registry: Metrics, val logger: Logger)
*/
override def schemas(): Map[String, MetricSchema] = registry.schemas.asScala.toMap

/**
* Exposes Metric Expressions for ExpressionRegistry.
* @return a map of expression names to their full ExpressionSchema.
*/
def expressions(): Map[String, ExpressionSchema] = registry.expressions.asScala.toMap

val pattern = "/admin/metrics.json"

def route: Route =
Expand Down

0 comments on commit 5a0745d

Please sign in to comment.