diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d6db06f385..9912cd521a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,6 +12,11 @@ Runtime Behavior Changes * finagle: Update Caffeine cache library to version 2.9.1 ``PHAB_ID=D660908`` +Bug Fixes +~~~~~~~~~~ + +* finagle-core: Add `BackupRequestFilter` to client registry when configured. ``PHAB_ID=D686981`` + 21.6.0 ------ diff --git a/finagle-core/src/main/scala/com/twitter/finagle/client/BackupRequestFilter.scala b/finagle-core/src/main/scala/com/twitter/finagle/client/BackupRequestFilter.scala index fa0733e17a..5690d1b5a9 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/client/BackupRequestFilter.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/client/BackupRequestFilter.scala @@ -4,10 +4,12 @@ import com.twitter.conversions.DurationOps._ import com.twitter.finagle.Stack.Params import com.twitter.finagle._ import com.twitter.finagle.context.BackupRequest +import com.twitter.finagle.naming.BindingFactory.Dest +import com.twitter.finagle.param.{Label, ProtocolLibrary} import com.twitter.finagle.service.{ReqRep, ResponseClass, ResponseClassifier, Retries, RetryBudget} import com.twitter.finagle.stats.StatsReceiver import com.twitter.finagle.tracing.{Annotation, Trace, TraceId, Tracing} -import com.twitter.finagle.util.WindowedPercentileHistogram +import com.twitter.finagle.util.{Showable, WindowedPercentileHistogram} import com.twitter.logging.Logger import com.twitter.util._ import com.twitter.util.tunable.Tunable @@ -151,10 +153,40 @@ object BackupRequestFilter { * * Users should only use this method for filtering generic services; otherwise, * usage through the `idempotent` method on [[MethodBuilder]] implementations is preferred. + * + * @note The BackupRequestFilter will be added to the ClientRegistry if and only if + * [[ProtocolLibrary]], [[Label]], and [[Dest]] are present in {@code params}. + * The BackupRequestFilter will be registered under scope: + * "client"/"client_protocol_library"/"client_label"/"dest"/"BackupRequestFilter" + * If any of the 3 params is missing, BRF will not be registered. */ def filterService[Req, Rep](params: Stack.Params, service: Service[Req, Rep]): Service[Req, Rep] = + if (params.contains[ProtocolLibrary] && params.contains[Label] && params.contains[Dest]) { + filterServiceWithPrefix(params, service, Seq(Showable.show(params[Dest].dest))) + } else { + filterServiceWithPrefix(params, service, Seq.empty) + } + + // an internal api to register BRF to client registry under `keyPrefixes`. + // We need to do it explicitly since BackupRequestFilter is never placed on Client stack. + // When invoked via `MethodBuilder.idempotent` endpoint, BRF will be registered under: + // "client"/"client_protocol_library"/"client_name"/"dest"/"methods"/"service_name"/"BackupRequestFilter" + // When invoked without `MethodBuilder`, BRF will be registered under: + // "client"/"client_protocol_library"/"client_name"/"dest"/"BackupRequestFilter" + private[client] def filterServiceWithPrefix[Req, Rep]( + params: Stack.Params, + service: Service[Req, Rep], + keyPrefixes: Seq[String] + ): Service[Req, Rep] = params[BackupRequestFilter.Param] match { case BackupRequestFilter.Param.Configured(maxExtraLoad, sendInterrupts) => + // register BRF when registry prefixes are provided + if (keyPrefixes.nonEmpty) { + val value = + "maxExtraLoad: " + maxExtraLoad().toString + ", sendInterrupts: " + sendInterrupts + val prefixes = keyPrefixes ++ Seq(BackupRequestFilter.role.name, value) + ClientRegistry.export(params, prefixes: _*) + } val brf = mkFilterFromParams[Req, Rep](maxExtraLoad, sendInterrupts, params) new ServiceProxy[Req, Rep](brf.andThen(service)) { override def close(deadline: Time): Future[Unit] = diff --git a/finagle-core/src/main/scala/com/twitter/finagle/client/MethodBuilder.scala b/finagle-core/src/main/scala/com/twitter/finagle/client/MethodBuilder.scala index 511709f227..9cfef34b87 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/client/MethodBuilder.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/client/MethodBuilder.scala @@ -524,6 +524,7 @@ final class MethodBuilder[Req, Rep] private[finagle] ( val entry = registryEntry() val keyPrefix = registryKeyPrefix(name) ClientRegistry.register(entry, keyPrefix :+ "statsReceiver", statsReceiver(name).toString) + withTimeout.registryEntries.foreach { case (suffix, value) => ClientRegistry.register(entry, keyPrefix ++ suffix, value) @@ -542,7 +543,10 @@ final class MethodBuilder[Req, Rep] private[finagle] ( param.Stats(statsReceiver(name)) + param.ResponseClassifier(config.retry.responseClassifier) - val underlying = BackupRequestFilter.filterService(backupRequestParams, methodPool.get) + // register BackupRequestFilter under the same prefixes as other method entries + val prefixes = Seq(registryEntry().addr) ++ registryKeyPrefix(name) + val underlying = BackupRequestFilter + .filterServiceWithPrefix(backupRequestParams, methodPool.get, prefixes) new ServiceProxy[Req, Rep](underlying) with CloseOnce { override def apply(request: Req): Future[Rep] = diff --git a/finagle-core/src/test/scala/com/twitter/finagle/client/BackupRequestFilterTest.scala b/finagle-core/src/test/scala/com/twitter/finagle/client/BackupRequestFilterTest.scala index 2e79a57c8f..bac918368a 100644 --- a/finagle-core/src/test/scala/com/twitter/finagle/client/BackupRequestFilterTest.scala +++ b/finagle-core/src/test/scala/com/twitter/finagle/client/BackupRequestFilterTest.scala @@ -1,21 +1,23 @@ package com.twitter.finagle.client -import com.twitter.conversions.PercentOps._ import com.twitter.conversions.DurationOps._ +import com.twitter.conversions.PercentOps._ import com.twitter.finagle._ +import com.twitter.finagle.naming.BindingFactory import com.twitter.finagle.service.{ReqRep, ResponseClass, ResponseClassifier, RetryBudget} import com.twitter.finagle.stats.{InMemoryStatsReceiver, NullStatsReceiver} -import com.twitter.finagle.util.{WindowedPercentileHistogram, MockWindowedPercentileHistogram} +import com.twitter.finagle.util.{MockWindowedPercentileHistogram, WindowedPercentileHistogram} import com.twitter.util._ +import com.twitter.util.registry.{Entry, GlobalRegistry, SimpleRegistry} import com.twitter.util.tunable.Tunable import org.mockito.Matchers.any import org.mockito.Mockito._ import org.scalatest.OneInstancePerTest import org.scalatest.concurrent.{Eventually, IntegrationPatience} -import org.scalatestplus.mockito.MockitoSugar -import scala.util.Random import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar +import scala.util.Random class BackupRequestFilterTest extends AnyFunSuite @@ -826,4 +828,68 @@ class BackupRequestFilterTest assert(statsReceiver.counters(Seq("backups_sent")) == 0) } } + + test( + "BackupRequestFilter is added to the Registry when protocolLibrary, label, and dest are present in the stack params") { + val mockService = mock[Service[String, String]] + val registry = new SimpleRegistry() + GlobalRegistry.withRegistry(registry) { + val params = + Stack.Params.empty + + param.ProtocolLibrary("thrift") + + param.Label("test") + + BindingFactory.Dest(Name.Path(Path.read("/$/inet/localhost/0"))) + + BackupRequestFilter.Configured(50.percent, false) + + BackupRequestFilter.filterService(params, mockService) + + def key(name: String, suffix: String*): Seq[String] = + Seq("client", name) ++ suffix + + def filteredRegistry: Set[Entry] = + registry.filter { entry => entry.key.contains("BackupRequestFilter") }.toSet + + val registeredEntries = Set( + Entry( + key("thrift", "test", "/$/inet/localhost/0", "BackupRequestFilter"), + "maxExtraLoad: Some(0.5), sendInterrupts: false") + ) + + filteredRegistry should contain theSameElementsAs registeredEntries + } + } + + test( + "BackupRequestFilter is not added to the Registry when any of protocolLibrary, label, or dest is missing in the stack params") { + val mockService = mock[Service[String, String]] + val registry = new SimpleRegistry() + GlobalRegistry.withRegistry(registry) { + val paramsMissingProtocolLibrary = + Stack.Params.empty + + param.Label("test") + + BindingFactory.Dest(Name.Path(Path.read("/$/inet/localhost/0"))) + + BackupRequestFilter.Configured(50.percent, false) + + val paramsMissingLabel = + Stack.Params.empty + + param.ProtocolLibrary("thrift") + + BindingFactory.Dest(Name.Path(Path.read("/$/inet/localhost/0"))) + + BackupRequestFilter.Configured(50.percent, false) + + val paramsMissingDest = + Stack.Params.empty + + param.ProtocolLibrary("thrift") + + param.Label("test") + + BackupRequestFilter.Configured(50.percent, false) + + BackupRequestFilter.filterService(paramsMissingProtocolLibrary, mockService) + BackupRequestFilter.filterService(paramsMissingLabel, mockService) + BackupRequestFilter.filterService(paramsMissingDest, mockService) + + def filteredRegistry: Set[Entry] = + registry.filter { entry => entry.key.contains("BackupRequestFilter") }.toSet + + filteredRegistry should contain theSameElementsAs Set.empty + } + } } diff --git a/finagle-core/src/test/scala/com/twitter/finagle/client/MethodBuilderTest.scala b/finagle-core/src/test/scala/com/twitter/finagle/client/MethodBuilderTest.scala index a3f10af3f0..89a9eeac52 100644 --- a/finagle-core/src/test/scala/com/twitter/finagle/client/MethodBuilderTest.scala +++ b/finagle-core/src/test/scala/com/twitter/finagle/client/MethodBuilderTest.scala @@ -64,9 +64,12 @@ class MethodBuilderTest import MethodBuilderTest._ - def await[T](a: Awaitable[T], d: Duration = 5.seconds): T = + def awaitResult[T](a: Awaitable[T], d: Duration = 5.seconds): T = Await.result(a, d) + def awaitReady[T <: Awaitable[_]](a: T, d: Duration = 5.seconds): T = + Await.ready(a, d) + test("retries do not see the total timeout") { val stats = new InMemoryStatsReceiver() val params = @@ -86,7 +89,7 @@ class MethodBuilderTest .newService("a_client") intercept[GlobalRequestTimeoutException] { - await(client(1)) + awaitResult(client(1)) } // while we have a RetryFilter, the underlying service returns `Future.never` // and as such, the stats are never updated. @@ -149,7 +152,7 @@ class MethodBuilderTest assert(rep.isDefined) intercept[GlobalRequestTimeoutException] { - await(rep) + awaitResult(rep) } eventually { @@ -212,7 +215,7 @@ class MethodBuilderTest val delta = 5.milliseconds tc.advance(delta) timer.tick() - Await.ready(rep, 5.seconds) + awaitReady(rep) assert(2 == attempts.get) eventually { @@ -235,7 +238,7 @@ class MethodBuilderTest // issue the request val rep2 = client2(2) - Await.ready(rep2, 5.seconds) + awaitReady(rep2) assert(3 == attempts.get) eventually { @@ -267,7 +270,7 @@ class MethodBuilderTest // issue a failing request intercept[Failure] { - await(client(1)) + awaitResult(client(1)) } eventually { @@ -280,7 +283,7 @@ class MethodBuilderTest // issue a failing request intercept[Failure] { - await(aMethodClient(1)) + awaitResult(aMethodClient(1)) } eventually { @@ -316,7 +319,7 @@ class MethodBuilderTest ) assert(filteredRegistry == totalSvcEntries) - await(totalSvc.close()) + awaitResult(totalSvc.close()) assert(Set.empty == filteredRegistry) } } @@ -376,12 +379,57 @@ class MethodBuilderTest assert(!vanillaClose.isDefined) filteredRegistry should contain theSameElementsAs sundaeEntries - Await.ready(sundaeSvc.close(), 5.seconds) + awaitReady(sundaeSvc.close()) assert(vanillaClose.isDefined) assert(Set.empty == filteredRegistry) } } + test("BackupRequestFilter is added to the Registry") { + val registry = new SimpleRegistry() + GlobalRegistry.withRegistry(registry) { + val protocolLib = "test_lib" + val clientName = "some_svc" + val addr = "test_addr" + val stats = new InMemoryStatsReceiver() + val params = + Stack.Params.empty + + param.Stats(stats) + + param.Label(clientName) + + param.ProtocolLibrary(protocolLib) + val stackClient = TestStackClient(stack, params) + val methodBuilder = MethodBuilder.from(addr, stackClient) + + val classifier: ResponseClassifier = ResponseClassifier.named("foo") { + case ReqRep(_, Throw(_: IndividualRequestTimeoutException)) => + ResponseClass.RetryableFailure + } + + def key(name: String, suffix: String*): Seq[String] = + Seq("client", protocolLib, clientName, addr, "methods", name) ++ suffix + + def filteredRegistry: Set[Entry] = + registry.filter { entry => entry.key.head == "client" }.toSet + + val sundaeSvc = methodBuilder + .idempotent(1.percent, false, classifier) + .newService("sundae") + + val sundaeEntries = Set( + Entry(key("sundae", "statsReceiver"), s"InMemoryStatsReceiver/$clientName/sundae"), + Entry(key("sundae", "retry"), "Config(Some(Idempotent(foo)),2)"), + Entry( + key("sundae", "BackupRequestFilter"), + "maxExtraLoad: Some(0.01), sendInterrupts: false") + ) + + filteredRegistry should contain theSameElementsAs sundaeEntries + + awaitReady(sundaeSvc.close()) + assert(Set.empty == filteredRegistry) + } + } + test("stats are filtered with methodName if it exists") { val stats = new InMemoryStatsReceiver() val clientLabel = "the_client" @@ -404,7 +452,7 @@ class MethodBuilderTest // issue a failing request intercept[Failure] { - await(client(1)) + awaitResult(client(1)) } eventually { @@ -453,7 +501,7 @@ class MethodBuilderTest // issue a failing request intercept[Failure] { - await(client(1)) + awaitResult(client(1)) } eventually { @@ -536,7 +584,7 @@ class MethodBuilderTest val m1Close = m1.close() assert(!m1Close.isDefined) assert(!m1.isAvailable) - intercept[ServiceClosedException] { await(m1(1)) } + intercept[ServiceClosedException] { awaitResult(m1(1)) } assert(m2.isAvailable) assert(svc.isAvailable) @@ -547,11 +595,11 @@ class MethodBuilderTest assert(svc.isAvailable) // validate that closing the last method closes the underlying service. - Await.ready(m2.close(), 5.seconds) + awaitReady(m2.close()) assert(m1Close.isDefined) assert(!m1.isAvailable) assert(!m2.isAvailable) - intercept[ServiceClosedException] { await(m2(1)) } + intercept[ServiceClosedException] { awaitResult(m2(1)) } assert(!svc.isAvailable) } @@ -571,7 +619,7 @@ class MethodBuilderTest // issue a failing request val f = intercept[Failure] { - await(client(1)) + awaitResult(client(1)) } assert(f.getSource(Failure.Source.Method).contains(methodName)) @@ -592,7 +640,7 @@ class MethodBuilderTest // issue a failing request val f = intercept[Failure] { - await(client(1)) + awaitResult(client(1)) } assert(f.getSource(Failure.Source.Method).isEmpty) @@ -678,7 +726,7 @@ class MethodBuilderTest val rep1 = client(0) intercept[Exception] { - await(rep1, 1.second) + awaitResult(rep1, 1.second) } eventually { assert(stats.counters(Seq("mb", "a_client", "logical", "requests")) == 1) @@ -690,7 +738,7 @@ class MethodBuilderTest val rep2 = nonIdempotentClient(0) intercept[Exception] { - await(rep2, 1.second) + awaitResult(rep2, 1.second) } eventually { assert(stats.counters(Seq("mb", "a_client_nonidempotent", "logical", "requests")) == 1) @@ -701,7 +749,7 @@ class MethodBuilderTest // per ResponseClassifier.default, but not retried in MethodBuilder's retries. val writeExceptionReq = nonIdempotentClient(1) intercept[Exception] { - await(writeExceptionReq, 1.second) + awaitResult(writeExceptionReq, 1.second) } eventually { assert(stats.counters(Seq("mb", "a_client_nonidempotent", "logical", "requests")) == 2) @@ -775,7 +823,7 @@ class MethodBuilderTest assert(rep.isDefined) intercept[GlobalRequestTimeoutException] { - await(rep) + awaitResult(rep) } eventually { @@ -821,7 +869,7 @@ class MethodBuilderTest val client = mb.newService("a_method") Time.withCurrentTimeFrozen { tc => - await(client(1), 1.second) + awaitResult(client(1), 1.second) tc.advance(5.seconds) timer.tick() assert( @@ -870,7 +918,7 @@ class MethodBuilderTest val rep1 = client(0) val exc1 = intercept[Exception] { - await(rep1, 1.second) + awaitResult(rep1, 1.second) } assert(exc1 == myException1) @@ -883,7 +931,7 @@ class MethodBuilderTest val rep2 = client(1) val exc2 = intercept[Exception] { - await(rep2, 1.second) + awaitResult(rep2, 1.second) } assert(exc2 == myException2) @@ -914,7 +962,7 @@ class MethodBuilderTest val rep = client(0) val exc = intercept[Exception] { - await(rep, 1.second) + awaitResult(rep, 1.second) } assert(exc == myException) eventually { @@ -944,7 +992,7 @@ class MethodBuilderTest val rep = client(0) val exc = intercept[Exception] { - await(rep, 1.second) + awaitResult(rep, 1.second) } assert(exc == myException) eventually { @@ -980,7 +1028,7 @@ class MethodBuilderTest val rep1 = client(0) val exc = intercept[Exception] { - await(rep1, 1.second) + awaitResult(rep1, 1.second) } assert(exc == myException) eventually { @@ -990,7 +1038,7 @@ class MethodBuilderTest } val rep2 = client(1) - val result = await(rep2, 1.second) + val result = awaitResult(rep2, 1.second) assert(result == 1) eventually { assert(stats.counters(Seq("mb", "a_client", "logical", "requests")) == 2) @@ -1026,7 +1074,7 @@ class MethodBuilderTest val rep1 = client(0) val exc = intercept[Exception] { - await(rep1, 1.second) + awaitResult(rep1, 1.second) } assert(exc == myException) eventually { @@ -1036,7 +1084,7 @@ class MethodBuilderTest } val rep2 = client(1) - val result = await(rep2, 1.second) + val result = awaitResult(rep2, 1.second) assert(result == 1) eventually { assert(stats.counters(Seq("mb", "a_client", "logical", "requests")) == 2) @@ -1118,8 +1166,8 @@ class MethodBuilderTest val clientA = methodBuilder.filtered(filterBoom).newService("a_client") val clientB = methodBuilder.filtered(filterCalled).newService("b_client") - intercept[Exception](await(clientA(1))) - await(clientB(1)) + intercept[Exception](awaitResult(clientA(1))) + awaitResult(clientB(1)) assert(called.get() == 1) } }