Skip to content

Commit

Permalink
metrics: Add kernel for memoizing the metric builders
Browse files Browse the repository at this point in the history
Problem

Metric metadata including fully scoped names is filled through underlying
StatsReceiverProxies. This means the same MetricSchema we passed in Expressions
doesn't have any metadata.

Solution

For metrics we used to build expressions, we hash the MetricBuilder and memorizing
in the Metric storage using the object reference hashCode as hashMap key. For
registered expressions, replace all MetricBuilders with the fully hydrated ones.

Result

This is a workaround solution to buy us sometime re-design the StatsReceiver to carry
MetricBuilder information so we don't need to back-filling.

JIRA Issues: CSL-10845

Differential Revision: https://phabricator.twitter.biz/D645590
  • Loading branch information
yufangong authored and jenkins committed Apr 7, 2021
1 parent d1812c3 commit d81a57c
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 31 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@ New Features
This response classifier is useful when a client has set a super low `RequestTimeout` and
receiving a response is seen as 'best-effort'. ``PHAB_ID=D645818``

Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

* finagle-core: The "failures" counter is changed to be created eagerly, when no failure
happens, the counter value is 0. ``PHAB_ID=D645590``

21.3.0
------

New Features
~~~~~~~~~~~~

* finagle-core: Added value `ForceWithDtab` to flag
`-com.twitter.finagle.loadbalancer.exp.apertureEagerConnections` that forces the
aperture load balancer to eagerly connect, even in staging environments where
Expand All @@ -40,6 +47,7 @@ New Features

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* finagle: Builds are now only supported for Scala 2.12+ ``PHAB_ID=D631091``

* finagle-core: Changed flag `-com.twitter.finagle.loadbalancer.exp.apertureEagerConnections"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,21 @@ class StatsFilter[Req, Rep] private[service] (
}

private[this] val successSchema =
CounterSchema(new MetricBuilder(name = Seq("success"), statsReceiver = statsReceiver))
CounterSchema(MetricBuilder(name = Seq("success"), statsReceiver = statsReceiver).withKernel)
private[this] val failureSchema =
CounterSchema(
new MetricBuilder(name = Seq(ExceptionStatsHandler.Failures), statsReceiver = statsReceiver))
MetricBuilder(
name = Seq(ExceptionStatsHandler.Failures),
statsReceiver = statsReceiver).withKernel)
private[this] val requestSchema =
CounterSchema(new MetricBuilder(name = Seq("requests"), statsReceiver = statsReceiver))
CounterSchema(MetricBuilder(name = Seq("requests"), statsReceiver = statsReceiver).withKernel)

private[this] val outstandingRequestCount = new LongAdder()
private[this] val dispatchCount = statsReceiver.counter(requestSchema)
private[this] val successCount = statsReceiver.counter(successSchema)
// ExceptionStatsHandler creates the failure counter lazily.
// We need to eagerly register this counter in metrics for success rate expression.
private[this] val failureCount = statsReceiver.counter(failureSchema)
private[this] val latencyStat = statsReceiver.stat(s"request_latency_$latencyStatSuffix")
private[this] val outstandingRequestCountGauge =
statsReceiver.addGauge("pending") { outstandingRequestCount.sum() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ class StatsFilterTest extends FunSuite {
val (promise, receiver, statsService) = getService()

assert(receiver.counters(Seq("requests")) == 0)
assert(!receiver.counters.contains(Seq("failures")))
assert(receiver.counters(Seq("failures")) == 0)

val f = statsService("foo")

assert(receiver.counters(Seq("requests")) == 0)
assert(!receiver.counters.contains(Seq("failures")))
assert(receiver.counters(Seq("failures")) == 0)

promise.setException(new Exception)

Expand All @@ -237,12 +237,12 @@ class StatsFilterTest extends FunSuite {
val (promise, receiver, statsService) = getService()

assert(receiver.counters(Seq("requests")) == 0)
assert(!receiver.counters.contains(Seq("failures")))
assert(receiver.counters(Seq("failures")) == 0)

val f = statsService("foo")

assert(receiver.counters(Seq("requests")) == 0)
assert(!receiver.counters.contains(Seq("failures")))
assert(receiver.counters(Seq("failures")) == 0)

promise.setValue("whatever")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class StatsFilterTest extends FunSuite {

// Verify that the counters and stats were only created once
verify(receiver).counter(
CounterSchema(new MetricBuilder(name = Seq("status", "404"), statsReceiver = receiver)))
CounterSchema(MetricBuilder(name = Seq("status", "404"), statsReceiver = receiver)))
verify(receiver).counter(
CounterSchema(new MetricBuilder(name = Seq("status", "4XX"), statsReceiver = receiver)))
CounterSchema(MetricBuilder(name = Seq("status", "4XX"), statsReceiver = receiver)))
verify(receiver).stat(
HistogramSchema(new MetricBuilder(name = Seq("time", "404"), statsReceiver = receiver)))
HistogramSchema(MetricBuilder(name = Seq("time", "404"), statsReceiver = receiver)))
verify(receiver).stat(
HistogramSchema(new MetricBuilder(name = Seq("time", "4XX"), statsReceiver = receiver)))
HistogramSchema(MetricBuilder(name = Seq("time", "4XX"), statsReceiver = receiver)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ abstract class AbstractSmuxTest extends FunSuite {
}
assert(string.isEmpty)

assert(stats.counters.get(Seq("client", "failures")) == None)
assert(stats.counters.get(Seq("client", "failures")) == Some(0))
assert(stats.counters.get(Seq("client", "service_creation", "failures")) == Some(1))
assert(
stats.counters.get(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.finagle.stats

import com.twitter.finagle.stats.exp.ExpressionSchema
import com.twitter.finagle.stats.exp.Expression.replaceExpression
import com.twitter.finagle.stats.exp._
import com.twitter.logging.Logger
import com.twitter.util.lint.{Category, Issue, Rule}
import java.util
Expand Down Expand Up @@ -44,7 +45,13 @@ object Metrics {
gaugesMap = new ConcurrentHashMap[Seq[String], MetricsStore.StoreGauge](),
/** Store MetricSchemas for each metric in order to surface metric metadata to users. */
metricSchemas = new ConcurrentHashMap[String, MetricSchema](),
expressionSchemas = new ConcurrentHashMap[String, ExpressionSchema]()
expressionSchemas = new ConcurrentHashMap[String, ExpressionSchema](),
// Memoizing metrics used for building expressions.
// the key is the object reference hashcode shared between Expression and StatsReceiver,
// and the value is the fully hydrated metric builder in StatsReceiver.
// Use the value to replace metric builders carried in Expression which do not have full
// information.
metricBuilders = new ConcurrentHashMap[Int, MetricBuilder]()
)

private class StoreCounterImpl(override val name: String) extends MetricsStore.StoreCounter {
Expand Down Expand Up @@ -85,7 +92,8 @@ object Metrics {
statsMap: ConcurrentHashMap[Seq[String], MetricsStore.StoreStat],
gaugesMap: ConcurrentHashMap[Seq[String], MetricsStore.StoreGauge],
metricSchemas: ConcurrentHashMap[String, MetricSchema],
expressionSchemas: ConcurrentHashMap[String, ExpressionSchema])
expressionSchemas: ConcurrentHashMap[String, ExpressionSchema],
metricBuilders: ConcurrentHashMap[Int, MetricBuilder])
}

/**
Expand Down Expand Up @@ -138,6 +146,8 @@ private[finagle] class Metrics private (

private[this] val expressionSchemas = metricsMaps.expressionSchemas

private[this] val metricBuilders = metricsMaps.metricBuilders

private[this] val verbosityMap =
new ConcurrentHashMap[String, Verbosity]()

Expand Down Expand Up @@ -167,6 +177,7 @@ private[finagle] class Metrics private (
prev
} else {
metricSchemas.put(formatted, schema)
storeMetricBuilder(schema)
next
}
} else {
Expand Down Expand Up @@ -214,10 +225,28 @@ private[finagle] class Metrics private (
prev
} else {
metricSchemas.put(formatted, schema)
storeMetricBuilder(schema)
next
}
}

private[this] def storeMetricBuilder(schema: MetricSchema): Unit = {
schema.metricBuilder.kernel match {
case Some(kernel) => metricBuilders.put(kernel, schema.metricBuilder)
case None =>
}
}

private[this] def removeMetricBuilder(formattedName: String): Unit = {
val metricSchema = metricSchemas.get(formattedName)
if (metricSchema != null) {
metricSchema.metricBuilder.kernel match {
case Some(kernel) => metricBuilders.remove(kernel)
case None =>
}
}
}

private[stats] def registerExpression(exprSchema: ExpressionSchema): Unit = {
val expressionId = exprSchema.labels.serviceName match {
case Some(serviceName) => exprSchema.name + "_" + serviceName
Expand All @@ -240,6 +269,7 @@ private[finagle] class Metrics private (
val next = new Metrics.StoreGaugeImpl(formatted, f)
gaugesMap.putIfAbsent(schema.metricBuilder.name, next)
metricSchemas.putIfAbsent(formatted, schema)
storeMetricBuilder(schema)

if (schema.metricBuilder.verbosity != Verbosity.Default) {
verbosityMap.put(formatted, schema.metricBuilder.verbosity)
Expand All @@ -251,6 +281,7 @@ private[finagle] class Metrics private (
val next = new Metrics.StoreGaugeImpl(formatted, f)
gaugesMap.put(schema.metricBuilder.name, next)
metricSchemas.put(formatted, schema)
storeMetricBuilder(schema)
} else {
throw new MetricCollisionException(
s"A Counter with the name $formatted had already" +
Expand All @@ -262,6 +293,8 @@ private[finagle] class Metrics private (
def unregisterGauge(names: Seq[String]): Unit = {
gaugesMap.remove(names)
val formatted = format(names)
// remove from metricBuilders must prior to remove from metricSchemas
removeMetricBuilder(formatted)
metricSchemas.remove(formatted)
reservedNames.remove(formatted)
verbosityMap.remove(formatted)
Expand Down Expand Up @@ -303,8 +336,20 @@ private[finagle] class Metrics private (
def schemas: util.Map[String, MetricSchema] =
util.Collections.unmodifiableMap(metricSchemas)

def expressions: util.Map[String, ExpressionSchema] =
util.Collections.unmodifiableMap(expressionSchemas)
private[this] val filledExpression: ConcurrentHashMap[String, ExpressionSchema] =
new ConcurrentHashMap[String, ExpressionSchema]()

def expressions: util.Map[String, ExpressionSchema] = {
val added = expressionSchemas.keySet().asScala &~ filledExpression.keySet().asScala

added.foreach { name =>
val exprSchema = expressionSchemas.get(name)
val newExpression = exprSchema.copy(expr = replaceExpression(exprSchema.expr, metricBuilders))
filledExpression.putIfAbsent(name, newExpression)
}

util.Collections.unmodifiableMap(filledExpression)
}

def metricsCollisionsLinterRule: Rule =
Rule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@ private[stats] class MetricsViaDeprecatedInterface(metrics: Metrics) {

def getOrCreateCounter(verbosity: Verbosity, names: Seq[String]): MetricsStore.StoreCounter =
underlying.getOrCreateCounter(
CounterSchema(new MetricBuilder(name = names, verbosity = verbosity, statsReceiver = null)))
CounterSchema(MetricBuilder(name = names, verbosity = verbosity, statsReceiver = null)))

def registerGauge(verbosity: Verbosity, names: Seq[String], f: => Float): Unit =
underlying.registerGauge(
GaugeSchema(new MetricBuilder(name = names, verbosity = verbosity, statsReceiver = null)),
GaugeSchema(MetricBuilder(name = names, verbosity = verbosity, statsReceiver = null)),
f)

def registerLongGauge(verbosity: Verbosity, names: Seq[String], f: => Long): Unit =
underlying.registerLongGauge(
GaugeSchema(new MetricBuilder(name = names, verbosity = verbosity, statsReceiver = null)),
GaugeSchema(MetricBuilder(name = names, verbosity = verbosity, statsReceiver = null)),
f)

def getOrCreateStat(verbosity: Verbosity, names: Seq[String]): MetricsStore.StoreStat =
underlying.getOrCreateStat(
HistogramSchema(new MetricBuilder(name = names, verbosity = verbosity, statsReceiver = null)))
HistogramSchema(MetricBuilder(name = names, verbosity = verbosity, statsReceiver = null)))

def getOrCreateStat(
verbosity: Verbosity,
Expand All @@ -38,7 +38,7 @@ private[stats] class MetricsViaDeprecatedInterface(metrics: Metrics) {
): MetricsStore.StoreStat =
underlying.getOrCreateStat(
HistogramSchema(
new MetricBuilder(
MetricBuilder(
name = names,
verbosity = verbosity,
percentiles = percentiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ class JsonExporterTest extends FunSuite with Eventually with IntegrationPatience
val viewsCounter = registry
.getOrCreateCounter(
CounterSchema(
new MetricBuilder(
MetricBuilder(
verbosity = Verbosity.Default,
name = Seq("views"),
statsReceiver = null))).counter
val gcCounter = registry
.getOrCreateCounter(
CounterSchema(
new MetricBuilder(
MetricBuilder(
verbosity = Verbosity.Default,
name = Seq("jvm_gcs"),
statsReceiver = null))).counter
Expand Down Expand Up @@ -204,7 +204,7 @@ class JsonExporterTest extends FunSuite with Eventually with IntegrationPatience
registry
.getOrCreateCounter(
CounterSchema(
new MetricBuilder(
MetricBuilder(
verbosity = Verbosity.Default,
name = Seq(name),
statsReceiver = null))).counter
Expand Down Expand Up @@ -265,7 +265,7 @@ class JsonExporterTest extends FunSuite with Eventually with IntegrationPatience
val counter = registry
.getOrCreateCounter(
CounterSchema(
new MetricBuilder(
MetricBuilder(
verbosity = Verbosity.Default,
name = Seq("anCounter"),
statsReceiver = null))).counter
Expand Down Expand Up @@ -321,10 +321,7 @@ class JsonExporterTest extends FunSuite with Eventually with IntegrationPatience
val sr =
registry.registerGauge(
GaugeSchema(
new MetricBuilder(
verbosity = Verbosity.Default,
name = Seq("boom"),
statsReceiver = null)),
MetricBuilder(verbosity = Verbosity.Default, name = Seq("boom"), statsReceiver = null)),
throw new RuntimeException("loolool"))

val exporter = new JsonExporter(registry)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.twitter.finagle.stats

import com.twitter.finagle.stats.exp.{Expression, ExpressionSchema}
import org.scalatest.FunSuite

object MetricsStatsReceiverTest {
Expand Down Expand Up @@ -250,4 +251,38 @@ class MetricsStatsReceiverTest extends FunSuite {

testMetricsStatsReceiver(new PreSchemaCtx())
testMetricsStatsReceiver(new SchemaCtx())

test("expressions are reloaded with fully scoped names") {
val metrics = Metrics.createDetached()
val sr = new MetricsStatsReceiver(metrics)

val aSchema = CounterSchema(MetricBuilder(name = Seq("a"), statsReceiver = sr).withKernel)
val bSchema = HistogramSchema(MetricBuilder(name = Seq("b"), statsReceiver = sr).withKernel)
val cSchema = GaugeSchema(MetricBuilder(name = Seq("c"), statsReceiver = sr).withKernel)

val expression = ExpressionSchema(
"test_expression",
Expression(aSchema).plus(Expression(bSchema, Left(Expression.Min)).plus(Expression(cSchema))))
.register()

val aCounter = sr.scope("test").counter(aSchema)
val bHisto = sr.scope("test").stat(bSchema)
val cGauge = sr.scope(("test")).addGauge(cSchema) { 1 }

// what we expected as hydrated metric builders
val aaSchema = CounterSchema(MetricBuilder(name = Seq("test", "a"), statsReceiver = sr))
val bbSchema = HistogramSchema(
MetricBuilder(
name = Seq("test", "b"),
percentiles = BucketedHistogram.DefaultQuantiles,
statsReceiver = sr))
val ccSchema = GaugeSchema(MetricBuilder(name = Seq("test", "c"), statsReceiver = sr))

val expected_expression = ExpressionSchema(
"test_expression",
Expression(aaSchema).plus(
Expression(bbSchema, Left(Expression.Min)).plus(Expression(ccSchema))))

assert(metrics.expressions.get("test_expression").expr == expected_expression.expr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class ThriftSmuxTest extends FunSuite {
results.get
}
assert(string.isEmpty)
assert(stats.counters.get(Seq("client", "failures")) == None)
assert(stats.counters.get(Seq("client", "failures")) == Some(0))
assert(stats.counters.get(Seq("client", "service_creation", "failures")) == Some(1))
assert(
stats.counters.get(
Expand Down

0 comments on commit d81a57c

Please sign in to comment.