Skip to content

Commit

Permalink
finagle-core: add BackupRequestFilter to client registry
Browse files Browse the repository at this point in the history
Problem/Solution
Add BackupRequestFilter to client registry

JIRA Issues: CSL-9833

Differential Revision: https://phabricator.twitter.biz/D686981
  • Loading branch information
jyanJing authored and jenkins committed Jun 23, 2021
1 parent d9e551a commit 56092e9
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 37 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ Runtime Behavior Changes

* finagle: Update Caffeine cache library to version 2.9.1 ``PHAB_ID=D660908``

Bug Fixes
~~~~~~~~~~

* finagle-core: Add `BackupRequestFilter` to client registry when configured. ``PHAB_ID=D686981``

21.6.0
------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import com.twitter.conversions.DurationOps._
import com.twitter.finagle.Stack.Params
import com.twitter.finagle._
import com.twitter.finagle.context.BackupRequest
import com.twitter.finagle.naming.BindingFactory.Dest
import com.twitter.finagle.param.{Label, ProtocolLibrary}
import com.twitter.finagle.service.{ReqRep, ResponseClass, ResponseClassifier, Retries, RetryBudget}
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.tracing.{Annotation, Trace, TraceId, Tracing}
import com.twitter.finagle.util.WindowedPercentileHistogram
import com.twitter.finagle.util.{Showable, WindowedPercentileHistogram}
import com.twitter.logging.Logger
import com.twitter.util._
import com.twitter.util.tunable.Tunable
Expand Down Expand Up @@ -151,10 +153,40 @@ object BackupRequestFilter {
*
* Users should only use this method for filtering generic services; otherwise,
* usage through the `idempotent` method on [[MethodBuilder]] implementations is preferred.
*
* @note The BackupRequestFilter will be added to the ClientRegistry if and only if
* [[ProtocolLibrary]], [[Label]], and [[Dest]] are present in {@code params}.
* The BackupRequestFilter will be registered under scope:
* "client"/"client_protocol_library"/"client_label"/"dest"/"BackupRequestFilter"
* If any of the 3 params is missing, BRF will not be registered.
*/
def filterService[Req, Rep](params: Stack.Params, service: Service[Req, Rep]): Service[Req, Rep] =
if (params.contains[ProtocolLibrary] && params.contains[Label] && params.contains[Dest]) {
filterServiceWithPrefix(params, service, Seq(Showable.show(params[Dest].dest)))
} else {
filterServiceWithPrefix(params, service, Seq.empty)
}

// an internal api to register BRF to client registry under `keyPrefixes`.
// We need to do it explicitly since BackupRequestFilter is never placed on Client stack.
// When invoked via `MethodBuilder.idempotent` endpoint, BRF will be registered under:
// "client"/"client_protocol_library"/"client_name"/"dest"/"methods"/"service_name"/"BackupRequestFilter"
// When invoked without `MethodBuilder`, BRF will be registered under:
// "client"/"client_protocol_library"/"client_name"/"dest"/"BackupRequestFilter"
private[client] def filterServiceWithPrefix[Req, Rep](
params: Stack.Params,
service: Service[Req, Rep],
keyPrefixes: Seq[String]
): Service[Req, Rep] =
params[BackupRequestFilter.Param] match {
case BackupRequestFilter.Param.Configured(maxExtraLoad, sendInterrupts) =>
// register BRF when registry prefixes are provided
if (keyPrefixes.nonEmpty) {
val value =
"maxExtraLoad: " + maxExtraLoad().toString + ", sendInterrupts: " + sendInterrupts
val prefixes = keyPrefixes ++ Seq(BackupRequestFilter.role.name, value)
ClientRegistry.export(params, prefixes: _*)
}
val brf = mkFilterFromParams[Req, Rep](maxExtraLoad, sendInterrupts, params)
new ServiceProxy[Req, Rep](brf.andThen(service)) {
override def close(deadline: Time): Future[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ final class MethodBuilder[Req, Rep] private[finagle] (
val entry = registryEntry()
val keyPrefix = registryKeyPrefix(name)
ClientRegistry.register(entry, keyPrefix :+ "statsReceiver", statsReceiver(name).toString)

withTimeout.registryEntries.foreach {
case (suffix, value) =>
ClientRegistry.register(entry, keyPrefix ++ suffix, value)
Expand All @@ -542,7 +543,10 @@ final class MethodBuilder[Req, Rep] private[finagle] (
param.Stats(statsReceiver(name)) +
param.ResponseClassifier(config.retry.responseClassifier)

val underlying = BackupRequestFilter.filterService(backupRequestParams, methodPool.get)
// register BackupRequestFilter under the same prefixes as other method entries
val prefixes = Seq(registryEntry().addr) ++ registryKeyPrefix(name)
val underlying = BackupRequestFilter
.filterServiceWithPrefix(backupRequestParams, methodPool.get, prefixes)

new ServiceProxy[Req, Rep](underlying) with CloseOnce {
override def apply(request: Req): Future[Rep] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package com.twitter.finagle.client

import com.twitter.conversions.PercentOps._
import com.twitter.conversions.DurationOps._
import com.twitter.conversions.PercentOps._
import com.twitter.finagle._
import com.twitter.finagle.naming.BindingFactory
import com.twitter.finagle.service.{ReqRep, ResponseClass, ResponseClassifier, RetryBudget}
import com.twitter.finagle.stats.{InMemoryStatsReceiver, NullStatsReceiver}
import com.twitter.finagle.util.{WindowedPercentileHistogram, MockWindowedPercentileHistogram}
import com.twitter.finagle.util.{MockWindowedPercentileHistogram, WindowedPercentileHistogram}
import com.twitter.util._
import com.twitter.util.registry.{Entry, GlobalRegistry, SimpleRegistry}
import com.twitter.util.tunable.Tunable
import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.scalatest.OneInstancePerTest
import org.scalatest.concurrent.{Eventually, IntegrationPatience}
import org.scalatestplus.mockito.MockitoSugar
import scala.util.Random
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar
import scala.util.Random

class BackupRequestFilterTest
extends AnyFunSuite
Expand Down Expand Up @@ -826,4 +828,68 @@ class BackupRequestFilterTest
assert(statsReceiver.counters(Seq("backups_sent")) == 0)
}
}

test(
"BackupRequestFilter is added to the Registry when protocolLibrary, label, and dest are present in the stack params") {
val mockService = mock[Service[String, String]]
val registry = new SimpleRegistry()
GlobalRegistry.withRegistry(registry) {
val params =
Stack.Params.empty +
param.ProtocolLibrary("thrift") +
param.Label("test") +
BindingFactory.Dest(Name.Path(Path.read("/$/inet/localhost/0"))) +
BackupRequestFilter.Configured(50.percent, false)

BackupRequestFilter.filterService(params, mockService)

def key(name: String, suffix: String*): Seq[String] =
Seq("client", name) ++ suffix

def filteredRegistry: Set[Entry] =
registry.filter { entry => entry.key.contains("BackupRequestFilter") }.toSet

val registeredEntries = Set(
Entry(
key("thrift", "test", "/$/inet/localhost/0", "BackupRequestFilter"),
"maxExtraLoad: Some(0.5), sendInterrupts: false")
)

filteredRegistry should contain theSameElementsAs registeredEntries
}
}

test(
"BackupRequestFilter is not added to the Registry when any of protocolLibrary, label, or dest is missing in the stack params") {
val mockService = mock[Service[String, String]]
val registry = new SimpleRegistry()
GlobalRegistry.withRegistry(registry) {
val paramsMissingProtocolLibrary =
Stack.Params.empty +
param.Label("test") +
BindingFactory.Dest(Name.Path(Path.read("/$/inet/localhost/0"))) +
BackupRequestFilter.Configured(50.percent, false)

val paramsMissingLabel =
Stack.Params.empty +
param.ProtocolLibrary("thrift") +
BindingFactory.Dest(Name.Path(Path.read("/$/inet/localhost/0"))) +
BackupRequestFilter.Configured(50.percent, false)

val paramsMissingDest =
Stack.Params.empty +
param.ProtocolLibrary("thrift") +
param.Label("test") +
BackupRequestFilter.Configured(50.percent, false)

BackupRequestFilter.filterService(paramsMissingProtocolLibrary, mockService)
BackupRequestFilter.filterService(paramsMissingLabel, mockService)
BackupRequestFilter.filterService(paramsMissingDest, mockService)

def filteredRegistry: Set[Entry] =
registry.filter { entry => entry.key.contains("BackupRequestFilter") }.toSet

filteredRegistry should contain theSameElementsAs Set.empty
}
}
}

0 comments on commit 56092e9

Please sign in to comment.