diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 810657e2a9..729aa94ed8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,16 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com Unreleased ---------- +New Features +~~~~~~~~~~~~ + +* finagle-core: Added tracing annotations to backup requests. ``PHAB_ID=D280998`` + + - Timestamped annotation "Client Backup Request Issued" + - Timestamped annotation "Client Backup Request Won" or "Client Backup Request Lost" + - Binary annotation "clnt/backup_request_threshold_ms", with the current value of the latency threshold, in milliseconds + - Binary annotation "clnt/backup_request_span_id", with the span id of the backup request + Breaking API Changes ~~~~~~~~~~~~~~~~~~~~ diff --git a/finagle-base-http/src/main/scala/com/twitter/finagle/http/HttpClientTraceInitializer.scala b/finagle-base-http/src/main/scala/com/twitter/finagle/http/HttpClientTraceInitializer.scala index 916d309457..e2d50b59f8 100644 --- a/finagle-base-http/src/main/scala/com/twitter/finagle/http/HttpClientTraceInitializer.scala +++ b/finagle-base-http/src/main/scala/com/twitter/finagle/http/HttpClientTraceInitializer.scala @@ -1,9 +1,25 @@ package com.twitter.finagle.http import com.twitter.finagle -import com.twitter.finagle.tracing.{Trace, TraceInitializerFilter} +import com.twitter.finagle.tracing.{Trace, TraceInitializerFilter, Tracer} import com.twitter.finagle.{Filter, ServiceFactory, Stack} +private[finagle] object HttpClientTraceInitializer { + + def apply[Req, Rep](tracer: Tracer): Filter[Req, Rep, Req, Rep] = + Filter.mk[Req, Rep, Req, Rep] { (req, svc) => + Trace.letTracerAndNextId(tracer) { + TraceInfo.setClientRequestHeaders(req.asInstanceOf[Request]) + svc(req) + } + } + + def typeAgnostic(tracer: Tracer): Filter.TypeAgnostic = new Filter.TypeAgnostic { + def toFilter[Req, Rep]: Filter[Req, Rep, Req, Rep] = apply(tracer) + } + +} + private[finagle] class HttpClientTraceInitializer[Req <: Request, Rep] extends Stack.Module1[finagle.param.Tracer, ServiceFactory[Req, Rep]] { val role: Stack.Role = TraceInitializerFilter.role @@ -13,13 +29,7 @@ private[finagle] class HttpClientTraceInitializer[Req <: Request, Rep] _tracer: finagle.param.Tracer, next: ServiceFactory[Req, Rep] ): ServiceFactory[Req, Rep] = { - val finagle.param.Tracer(tracer) = _tracer - val traceInitializer = Filter.mk[Req, Rep, Req, Rep] { (req, svc) => - Trace.letTracerAndNextId(tracer) { - TraceInfo.setClientRequestHeaders(req) - svc(req) - } - } + val traceInitializer = HttpClientTraceInitializer[Req, Rep](_tracer.tracer) traceInitializer.andThen(next) } } diff --git a/finagle-core/src/main/scala/com/twitter/finagle/client/BackupRequestFilter.scala b/finagle-core/src/main/scala/com/twitter/finagle/client/BackupRequestFilter.scala index 699f762a3b..c1accab327 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/client/BackupRequestFilter.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/client/BackupRequestFilter.scala @@ -6,6 +6,7 @@ import com.twitter.finagle._ import com.twitter.finagle.context.BackupRequest import com.twitter.finagle.service.{ReqRep, ResponseClass, ResponseClassifier, Retries, RetryBudget} import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finagle.tracing.{Annotation, Trace, TraceId, Tracing} import com.twitter.finagle.util.WindowedPercentileHistogram import com.twitter.logging.Logger import com.twitter.util._ @@ -20,6 +21,15 @@ object BackupRequestFilter { */ private val OrigRequestTimeout = Failure("Original request did not complete in time") + private val IssuedAnnotation = + Annotation.Message("Client Backup Request Issued") + + private val WonAnnotation = + Annotation.Message("Client Backup Request Won") + + private val LostAnnotation = + Annotation.Message("Client Backup Request Lost") + /** * Use a minimum non-zero delay to prevent sending unnecessary backup requests * immediately for services where the latency at the percentile where a backup will be sent is @@ -42,7 +52,7 @@ object BackupRequestFilter { private def getAndValidateMaxExtraLoad(maxExtraLoad: Tunable[Double]): Double = maxExtraLoad() match { - case Some(maxExtraLoad) if (maxExtraLoad >= 0.0 && maxExtraLoad < 1.0) => maxExtraLoad + case Some(mel) if mel >= 0.0 && mel < 1.0 => mel case Some(invalidMaxExtraLoad) => log.error(s"maxExtraLoad must be between 0.0 and 1.0, was $invalidMaxExtraLoad. Using 0.0") 0.0 @@ -156,8 +166,8 @@ object BackupRequestFilter { def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] = new Stack.ModuleParams[ServiceFactory[Req, Rep]] { - val role = BackupRequestFilter.role - val description = BackupRequestFilter.description + val role: Stack.Role = BackupRequestFilter.role + val description: String = BackupRequestFilter.description override def parameters: Seq[Stack.Param[_]] = Seq( implicitly[Stack.Param[param.Stats]], @@ -297,11 +307,19 @@ private[finagle] class BackupRequestFilter[Req, Rep]( windowedPercentileHistogramFac() // Prevent sending a backup on the first request - @volatile private[this] var sendBackupAfter: Int = Int.MaxValue + @volatile private[this] var sendBackupAfterMillis: Int = Int.MaxValue // For testing private[client] def sendBackupAfterDuration: Duration = - Duration.fromMilliseconds(sendBackupAfter) + Duration.fromMilliseconds(sendBackupAfterMillis) + + private[this] val sendAfterStat = statsReceiver.stat("send_backup_after_ms") + private[this] val backupsSent = statsReceiver.counter("backups_sent") + + // Indicates that the backup request returned first, regardless of whether it succeeded. + private[this] val backupsWon = statsReceiver.counter("backups_won") + + private[this] val budgetExhausted = statsReceiver.counter("budget_exhausted") // schedule timer to refresh `sendBackupAfter`, and refresh `backupRequestRetryBudget` in response // to changes to the value of `maxExtraLoadTunable`, @@ -315,19 +333,12 @@ private[finagle] class BackupRequestFilter[Req, Rep]( percentile = percentileFromMaxExtraLoad(curMaxExtraLoad) backupRequestRetryBudget = newRetryBudget(curMaxExtraLoad, nowMs) } - sendBackupAfter = Math.max(MinSendBackupAfterMs, windowedPercentile.percentile(percentile)) - sendAfterStat.add(sendBackupAfter) + sendBackupAfterMillis = + Math.max(MinSendBackupAfterMs, windowedPercentile.percentile(percentile)) + sendAfterStat.add(sendBackupAfterMillis) } } - private[this] val sendAfterStat = statsReceiver.stat("send_backup_after_ms") - private[this] val backupsSent = statsReceiver.counter("backups_sent") - - // Indicates that the backup request returned first, regardless of whether it succeeded. - private[this] val backupsWon = statsReceiver.counter("backups_won") - - private[this] val budgetExhausted = statsReceiver.counter("budget_exhausted") - private[this] def isSuccess(reqRep: ReqRep): Boolean = responseClassifier.applyOrElse(reqRep, ResponseClassifier.Default) match { case ResponseClass.Successful(_) => true @@ -344,9 +355,9 @@ private[finagle] class BackupRequestFilter[Req, Rep]( case _ => false } - private[this] def record(req: Req, f: Future[Rep]): Future[Rep] = { + private[this] def record(req: Req, rep: Future[Rep]): Future[Rep] = { val start = nowMs() - f.respond { response => + rep.respond { response => if (shouldRecord(response)) { val latency = (nowMs() - start).toInt windowedPercentile.add(latency) @@ -361,10 +372,57 @@ private[finagle] class BackupRequestFilter[Req, Rep]( private[this] def canIssueBackup(): Boolean = backupRequestRetryBudget.tryWithdraw() && clientRetryBudget.tryWithdraw() + private[this] def issueBackup( + req: Req, + service: Service[Req, Rep], + tracing: Tracing, + backupTraceId: TraceId + ): Future[Rep] = { + if (tracing.isActivelyTracing) { + tracing.record(IssuedAnnotation) + tracing.recordBinary("clnt/backup_request_threshold_ms", sendBackupAfterMillis) + tracing.recordBinary("clnt/backup_request_span_id", backupTraceId.spanId) + } + val rep = + Trace.letId(backupTraceId) { + BackupRequest.let { + service(req) + } + } + record(req, rep) + } + + private[this] def pickWinner( + req: Req, + orig: Future[Rep], + backup: Future[Rep], + trace: Tracing + ): Future[Rep] = { + val (winner, loser) = if (orig.isDefined) (orig, backup) else (backup, orig) + winner.transform { response => + val wasSuccess = isSuccess(ReqRep(req, response)) + val backupWon = wasSuccess && (backup eq winner) + if (backupWon) backupsWon.incr() + + if (trace.isActivelyTracing) { + val annotation = if (backupWon) WonAnnotation else LostAnnotation + trace.record(annotation) + } + if (wasSuccess) { + if (sendInterrupts) { + loser.raise(SupersededRequestFailure) + } + winner + } else { + loser + } + } + } + def apply(req: Req, service: Service[Req, Rep]): Future[Rep] = { backupRequestRetryBudget.deposit() val orig = record(req, service(req)) - val howLong = sendBackupAfter + val howLong = sendBackupAfterMillis // once our percentile exceeds how high we can track, we should stop sending backups. if (howLong >= windowedPercentile.highestTrackableValue) { @@ -381,21 +439,11 @@ private[finagle] class BackupRequestFilter[Req, Rep]( // pass on the first successful result we get back. if (canIssueBackup()) { backupsSent.incr() - val backup = record(req, BackupRequest.let(service(req))) + val tracing = Trace() + val backupTraceId = tracing.nextId + val backup = issueBackup(req, service, tracing, backupTraceId) orig.select(backup).transform { _ => - val winner = if (orig.isDefined) orig else backup - val loser = if (winner eq orig) backup else orig - winner.transform { response => - if (backup eq winner) backupsWon.incr() - if (isSuccess(ReqRep(req, response))) { - if (sendInterrupts) { - loser.raise(SupersededRequestFailure) - } - Future.const(response) - } else { - loser - } - } + pickWinner(req, orig, backup, tracing) } } else { budgetExhausted.incr() diff --git a/finagle-core/src/main/scala/com/twitter/finagle/client/MethodBuilder.scala b/finagle-core/src/main/scala/com/twitter/finagle/client/MethodBuilder.scala index c18b793e30..b90af21f37 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/client/MethodBuilder.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/client/MethodBuilder.scala @@ -2,9 +2,9 @@ package com.twitter.finagle.client import com.twitter.finagle.Filter.TypeAgnostic import com.twitter.finagle.client.MethodBuilderTimeout.TunableDuration -import com.twitter.finagle.param -import com.twitter.finagle.service.{Retries, TimeoutFilter, ResponseClassifier, ResponseClass} +import com.twitter.finagle.service.{ResponseClass, ResponseClassifier, Retries, TimeoutFilter} import com.twitter.finagle.stats.StatsReceiver +import com.twitter.finagle.tracing.TraceInitializerFilter import com.twitter.finagle.util.{Showable, StackRegistry} import com.twitter.finagle.{Filter, Name, Service, ServiceFactory, Stack, param, _} import com.twitter.util.tunable.Tunable @@ -62,7 +62,10 @@ private[finagle] object MethodBuilder { stack: Stack[ServiceFactory[Req, Rep]] ): Stack[ServiceFactory[Req, Rep]] = { stack - // total timeouts are managed directly by MethodBuilder + // backup requests happen before the stack's filters so MethodBuilder + // has to place this before (outside of the stack). + .remove(TraceInitializerFilter.role) + // total timeouts are managed directly by MethodBuilder .remove(TimeoutFilter.totalTimeoutRole) // allow for dynamic per-request timeouts .replace(TimeoutFilter.role, DynamicTimeout.perRequestModule[Req, Rep]) @@ -75,6 +78,7 @@ private[finagle] object MethodBuilder { */ def create(originalStack: Stack[_], params: Stack.Params): Config = { Config( + TraceInitializerFilter.typeAgnostic(params[param.Tracer].tracer, true), MethodBuilderRetry.Config( if (params.contains[param.ResponseClassifier]) { Some(params[param.ResponseClassifier].responseClassifier) @@ -95,7 +99,10 @@ private[finagle] object MethodBuilder { * @see [[MethodBuilder.Config.create]] to construct an initial instance. * Using its `copy` method is appropriate after that. */ - case class Config private (retry: MethodBuilderRetry.Config, timeout: MethodBuilderTimeout.Config) + case class Config private ( + traceInitializer: Filter.TypeAgnostic, + retry: MethodBuilderRetry.Config, + timeout: MethodBuilderTimeout.Config) /** Used by the `ClientRegistry` */ private[client] val RegistryKey = "methods" @@ -276,7 +283,7 @@ private[finagle] final class MethodBuilder[Req, Rep]( val configClassifier = config.retry.underlyingClassifier val idempotentedConfigClassifier = idempotentify(configClassifier, classifier) - (new MethodBuilder[Req, Rep]( + new MethodBuilder[Req, Rep]( refCounted, dest, stack, @@ -288,7 +295,7 @@ private[finagle] final class MethodBuilder[Req, Rep]( + param.ResponseClassifier(idempotentedStackClassifier) + stackParams[Retries.Budget], config - )).withRetry.forClassifier(idempotentedConfigClassifier) + ).withRetry.forClassifier(idempotentedConfigClassifier) } private[this] def nonidempotentify( @@ -329,6 +336,12 @@ private[finagle] final class MethodBuilder[Req, Rep]( ).withRetry.forClassifier(nonidempotentedConfigClassifier) } + /** + * Allow customizations for protocol-specific trace initialization. + */ + def withTraceInitializer(initializer: Filter.TypeAgnostic): MethodBuilder[Req, Rep] = + withConfig(config.copy(traceInitializer = initializer)) + // // Build // @@ -373,7 +386,7 @@ private[finagle] final class MethodBuilder[Req, Rep]( private[this] def statsReceiver(methodName: Option[String]): StatsReceiver = { val clientScoped = stackParams[param.Stats].statsReceiver.scope(clientName) methodName match { - case Some(methodName) => clientScoped.scope(methodName) + case Some(name) => clientScoped.scope(name) case None => clientScoped } } @@ -387,6 +400,7 @@ private[finagle] final class MethodBuilder[Req, Rep]( // total timeouts and before per-request timeouts so that each backup uses the per-request // timeout. // + // - Trace Initialization // - Logical Stats // - Failure logging // - Annotate method name for a `Failure` @@ -400,12 +414,12 @@ private[finagle] final class MethodBuilder[Req, Rep]( val timeouts = withTimeout val failureSource = methodName match { - case Some(methodName) => addFailureSource(methodName) + case Some(name) => addFailureSource(name) case None => TypeAgnostic.Identity } - retries - .logicalStatsFilter(stats) + config.traceInitializer + .andThen(retries.logicalStatsFilter(stats)) .andThen(retries.logFailuresFilter(clientName, methodName)) .andThen(failureSource) .andThen(timeouts.totalFilter) diff --git a/finagle-core/src/main/scala/com/twitter/finagle/tracing/Annotation.scala b/finagle-core/src/main/scala/com/twitter/finagle/tracing/Annotation.scala index f5fe2938bf..8bf8749f5f 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/tracing/Annotation.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/tracing/Annotation.scala @@ -10,6 +10,7 @@ import java.nio.ByteBuffer sealed abstract class Annotation object Annotation { + case object WireSend extends Annotation case object WireRecv extends Annotation final case class WireRecvError(error: String) extends Annotation diff --git a/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceInitializerFilter.scala b/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceInitializerFilter.scala index 2456180cea..710f728552 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceInitializerFilter.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/tracing/TraceInitializerFilter.scala @@ -6,14 +6,24 @@ import com.twitter.util.{Future, Throw} object TraceInitializerFilter { val role: Stack.Role = Stack.Role("TraceInitializerFilter") + /** + * @param newId Set the next TraceId when the tracer is pushed, `true` for clients. + */ + private[finagle] def apply[Req, Rep](tracer: Tracer, newId: Boolean): Filter[Req, Rep, Req, Rep] = + new TraceInitializerFilter[Req, Rep](tracer, newId) + + private[finagle] def typeAgnostic(tracer: Tracer, newId: Boolean): Filter.TypeAgnostic = + new Filter.TypeAgnostic { + def toFilter[Req, Rep]: Filter[Req, Rep, Req, Rep] = apply(tracer, newId) + } + private[finagle] class Module[Req, Rep](newId: Boolean) extends Stack.Module1[param.Tracer, ServiceFactory[Req, Rep]] { def this() = this(true) val role: Stack.Role = TraceInitializerFilter.role val description = "Initialize the tracing system" def make(_tracer: param.Tracer, next: ServiceFactory[Req, Rep]): ServiceFactory[Req, Rep] = { - val param.Tracer(tracer) = _tracer - new TraceInitializerFilter[Req, Rep](tracer, newId).andThen(next) + apply(_tracer.tracer, newId).andThen(next) } } diff --git a/finagle-core/src/test/scala/com/twitter/finagle/client/BackupRequestFilterTest.scala b/finagle-core/src/test/scala/com/twitter/finagle/client/BackupRequestFilterTest.scala index 024d65749e..c47592823f 100644 --- a/finagle-core/src/test/scala/com/twitter/finagle/client/BackupRequestFilterTest.scala +++ b/finagle-core/src/test/scala/com/twitter/finagle/client/BackupRequestFilterTest.scala @@ -501,7 +501,7 @@ class BackupRequestFilterTest } } - test(s"Backup request completes unsuccessfully first") { + test("Backup request completes unsuccessfully first") { Time.withCurrentTimeFrozen { tc => val origPromise, backupPromise = new Promise[String] val f = sendBackup(origPromise, backupPromise, tc, sendInterrupts = false) @@ -528,7 +528,7 @@ class BackupRequestFilterTest assert(wp.percentile(50.percent) == (WarmupRequestLatency + 1.second).inMillis) assert(statsReceiver.counters(Seq("backups_sent")) == 1) - assert(statsReceiver.counters(Seq("backups_won")) == 1) + assert(statsReceiver.counters(Seq("backups_won")) == 0) } } diff --git a/finagle-http/src/main/scala/com/twitter/finagle/http/MethodBuilder.scala b/finagle-http/src/main/scala/com/twitter/finagle/http/MethodBuilder.scala index 45c679279f..b38f4a27ed 100644 --- a/finagle-http/src/main/scala/com/twitter/finagle/http/MethodBuilder.scala +++ b/finagle-http/src/main/scala/com/twitter/finagle/http/MethodBuilder.scala @@ -3,6 +3,7 @@ package com.twitter.finagle.http import com.twitter.finagle.builder.{ClientBuilder, ClientConfig} import com.twitter.finagle.client.StackClient import com.twitter.finagle.http.service.HttpResponseClassifier +import com.twitter.finagle.param.{Tracer => TracerParam} import com.twitter.finagle.service.ResponseClassifier import com.twitter.finagle.{Name, Resolver, Service, client} import com.twitter.util.Duration @@ -47,7 +48,12 @@ object MethodBuilder { * @see [[com.twitter.finagle.Http.Client.methodBuilder(Name)]] */ def from(dest: Name, stackClient: StackClient[Request, Response]): MethodBuilder = { - val mb = client.MethodBuilder.from(dest, stackClient) + val initializer = HttpClientTraceInitializer.typeAgnostic( + stackClient.params[TracerParam].tracer + ) + val mb = client.MethodBuilder + .from(dest, stackClient) + .withTraceInitializer(initializer) new MethodBuilder(mb) } diff --git a/finagle-integration/src/test/scala/com/twitter/finagle/integration/BackupRequestFilterTest.scala b/finagle-integration/src/test/scala/com/twitter/finagle/integration/BackupRequestFilterTest.scala index e40a4bed41..161ba3b6d5 100644 --- a/finagle-integration/src/test/scala/com/twitter/finagle/integration/BackupRequestFilterTest.scala +++ b/finagle-integration/src/test/scala/com/twitter/finagle/integration/BackupRequestFilterTest.scala @@ -6,10 +6,12 @@ import com.twitter.finagle.context.BackupRequest import com.twitter.finagle.integration.thriftscala.Echo import com.twitter.finagle.service.RetryBudget import com.twitter.finagle.stats.InMemoryStatsReceiver +import com.twitter.finagle.tracing.{Annotation, Record, TraceId, Tracer} import com.twitter.finagle.util.DefaultTimer import com.twitter.finagle.{Http, Service, ThriftMux, http} import com.twitter.util.{Await, Future} import java.net.InetSocketAddress +import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import org.scalatest.FunSuite import org.scalatest.concurrent.{Eventually, IntegrationPatience} @@ -18,6 +20,28 @@ class BackupRequestFilterTest extends FunSuite with Eventually with IntegrationP private def await[T](f: Future[T]): T = Await.result(f, 15.seconds) + private def newTracer(messages: LinkedBlockingQueue[String]): Tracer = new Tracer { + def record(record: Record): Unit = { + record match { + case Record(_, _, Annotation.Message(value), _) => + messages.put(value) + case Record(_, _, Annotation.BinaryAnnotation(key, _), _) => + messages.put(key) + case _ => + } + } + def sampleTrace(traceId: TraceId): Option[Boolean] = Tracer.SomeTrue + } + + private def assertTracerMessages(messages: LinkedBlockingQueue[String]): Unit = { + assert(messages.contains("Client Backup Request Issued")) + assert( + messages.contains("Client Backup Request Won") + || messages.contains("Client Backup Request Lost")) + assert(messages.contains("clnt/backup_request_threshold_ms")) + assert(messages.contains("clnt/backup_request_span_id")) + } + test("Http client propagates BackupRequest context") { val goSlow = new AtomicBoolean(false) val backupsSeen = new AtomicInteger(0) @@ -38,8 +62,10 @@ class BackupRequestFilterTest extends FunSuite with Eventually with IntegrationP val server = Http.server.serve("localhost:*", service) val addr = server.boundAddress.asInstanceOf[InetSocketAddress] + val messages = new LinkedBlockingQueue[String]() val statsRecv = new InMemoryStatsReceiver() val client = Http.client + .withTracer(newTracer(messages)) .withStatsReceiver(statsRecv) .withRetryBudget(RetryBudget.Infinite) .withLabel("backend") @@ -55,6 +81,7 @@ class BackupRequestFilterTest extends FunSuite with Eventually with IntegrationP } // capture state and tee it up. + messages.clear() goSlow.set(true) val counter = statsRecv.counter("backend", "backups", "backups_sent") val backupsBefore = counter() @@ -63,6 +90,7 @@ class BackupRequestFilterTest extends FunSuite with Eventually with IntegrationP assert(backupsSeen.get == backupsSeenBefore + 1) eventually { assert(counter() == backupsBefore + 1) + assertTracerMessages(messages) } } @@ -88,8 +116,10 @@ class BackupRequestFilterTest extends FunSuite with Eventually with IntegrationP val server = ThriftMux.server.serveIface("localhost:*", service) val addr = server.boundAddress.asInstanceOf[InetSocketAddress] + val messages = new LinkedBlockingQueue[String]() val statsRecv = new InMemoryStatsReceiver() val client = ThriftMux.client + .withTracer(newTracer(messages)) .withStatsReceiver(statsRecv) .withRetryBudget(RetryBudget.Infinite) .withLabel("backend") @@ -105,6 +135,7 @@ class BackupRequestFilterTest extends FunSuite with Eventually with IntegrationP } // capture state and tee it up. + messages.clear() goSlow.set(true) val counter = statsRecv.counter("backend", "backups", "backups_sent") val backupsBefore = counter() @@ -113,6 +144,7 @@ class BackupRequestFilterTest extends FunSuite with Eventually with IntegrationP assert(backupsSeen.get == backupsSeenBefore + 1) eventually { assert(counter() == backupsBefore + 1) + assertTracerMessages(messages) } }