Skip to content

Commit

Permalink
finagle-core: make ClientBuilder RetryingFilter a Stack.Module and in…
Browse files Browse the repository at this point in the history
…sert it down the stack

Problem:

In cases where Union nodes appear in a NameTree, we'd like
ClientBuilder-configured retries to hit the same destination cluster
rather than being distributed across the Union. We are introducing Union
nodes in place of multiple client stacks (with external distribution
across stacks), so making retries hit the same cluster retains the
existing behavior, stats, etc.

Solution:

Make the RetryingFilter in ClientBuilder a Stack.Module and insert it
just above RequeueingFilter (a separate change puts RequeueingFilter
below BindingFactory).

Along the way, add Stack.insertBefore, convert the other filters in
ClientBuilder to Stack.Modules, and add the "retries" stats scope
whether or not retries are configured (this is the only place
non-retry/requeue requests are counted).

RB_ID=589694
  • Loading branch information
Jake Donham authored and jenkins committed Mar 9, 2015
1 parent 11f6e11 commit 09b7987
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 28 deletions.
23 changes: 23 additions & 0 deletions finagle-core/src/main/scala/com/twitter/finagle/Stack.scala
Expand Up @@ -49,6 +49,29 @@ sealed trait Stack[T] {
case leaf@Leaf(_, _) => leaf
}

/**
* Insert the given [[Stackable]] before the stack elements matching
* the argument role. If no elements match the role, then an
* unmodified stack is returned.
*/
def insertBefore(target: Role, insertion: Stackable[T]): Stack[T] =
this match {
case Node(head, mk, next) if head.role == target =>
insertion +: Node(head, mk, next.insertBefore(target, insertion))
case Node(head, mk, next) =>
Node(head, mk, next.insertBefore(target, insertion))
case leaf@Leaf(_, _) => leaf
}

/**
* Insert the given [[Stackable]] before the stack elements matching
* the argument role. If no elements match the role, then an
* unmodified stack is returned. `insertion` must conform to
* typeclass [[CanStackFrom]].
*/
def insertBefore[U](target: Role, insertion: U)(implicit csf: CanStackFrom[U, T]): Stack[T] =
insertBefore(target, csf.toStackable(target, insertion))

/**
* Insert the given [[Stackable]] after the stack elements matching
* the argument role. If no elements match the role, then an
Expand Down
Expand Up @@ -117,9 +117,7 @@ object ClientConfig {
(this, Retries.param)
}
object Retries {
implicit val param = Stack.Param(Retries(new RetryPolicy[Try[Nothing]] {
def apply(t: Try[Nothing]) = None
}))
implicit val param = Stack.Param(Retries(RetryPolicy.Never))
}

case class Daemonize(onOrOff: Boolean) {
Expand Down Expand Up @@ -912,27 +910,84 @@ private case class ClientBuilderClient[Req, Rep](
sr.scope(label)
}

private def retryFilter(timer: util.Timer) =
params[Retries] match {
case Retries(policy) if params.contains[Retries] =>
val exceptions = params[ExceptionStatsHandler]
val stats = new StatsFilter[Req, Rep](statsReceiver.scope("tries"), exceptions.categorizer)
private object RetryingFilterModule
extends Stack.Module3[Stats, Retries, Timer, ServiceFactory[Req, Rep]] {
override val role = new Stack.Role("ClientBuilder RetryingFilter")
override val description = "Application-configured retries"

override def make(
statsP: Stats,
retriesP: Retries,
timerP: Timer,
next: ServiceFactory[Req, Rep]
) = {
val Stats(statsReceiver) = statsP
val Retries(policy) = retriesP
val Timer(timer) = timerP

if (policy eq RetryPolicy.Never) next
else {
val retries = new RetryingFilter[Req, Rep](policy, timer, statsReceiver)
stats andThen retries
case _ => identityFilter
retries andThen next
}
}
}

private def globalTimeoutFilter(timer: util.Timer) = {
val GlobalTimeout(timeout) = params[GlobalTimeout]
if (timeout < Duration.Top) {
val exception = new GlobalRequestTimeoutException(timeout)
new TimeoutFilter[Req, Rep](timeout, exception, timer)
} else {
identityFilter
private object StatsFilterModule
extends Stack.Module2[Stats, ExceptionStatsHandler, ServiceFactory[Req, Rep]] {
override val role = new Stack.Role("ClientBuilder StatsFilter")
override val description = "Record request stats scoped to 'tries'"

override def make(
statsP: Stats,
exceptionStatsHandlerP: ExceptionStatsHandler,
next: ServiceFactory[Req, Rep]
) = {
val Stats(statsReceiver) = statsP
val ExceptionStatsHandler(categorizer) = exceptionStatsHandlerP

val stats = new StatsFilter[Req, Rep](statsReceiver.scope("tries"), categorizer)
stats andThen next
}
}

private val identityFilter = Filter.identity[Req, Rep]
private object GlobalTimeoutModule
extends Stack.Module2[GlobalTimeout, Timer, ServiceFactory[Req, Rep]] {
override val role = new Stack.Role("ClientBuilder GlobalTimeoutFilter")
override val description = "Application-configured global timeout"

override def make(
globalTimeoutP: GlobalTimeout,
timerP: Timer,
next: ServiceFactory[Req, Rep]
) = {
val GlobalTimeout(timeout) = globalTimeoutP
val Timer(timer) = timerP

if (timeout == Duration.Top) next
else {
val exception = new GlobalRequestTimeoutException(timeout)
val globalTimeout = new TimeoutFilter[Req, Rep](timeout, exception, timer)
globalTimeout andThen next
}
}
}

private object ExceptionSourceFilterModule
extends Stack.Module1[Label, ServiceFactory[Req, Rep]] {
override val role = new Stack.Role("ClientBuilder ExceptionSourceFilter")
override val description = "Exception source filter"

override def make(
labelP: Label,
next: ServiceFactory[Req, Rep]
) = {
val Label(label) = labelP

val exceptionSource = new ExceptionSourceFilter[Req, Rep](label)
exceptionSource andThen next
}
}

def newClient(dest: Name, label: String): ServiceFactory[Req, Rep] = {
// need the label in statsReceiver
Expand Down Expand Up @@ -972,18 +1027,19 @@ private case class ClientBuilderClient[Req, Rep](
return configured(Label(label)).newService(dest, label);
}

val factory = configured(FactoryToService.Enabled(true)).newClient(dest, label)
val service: Service[Req, Rep] = new FactoryToService[Req, Rep](factory)

val Timer(timer) = params[Timer]
val stackClient =
withStack(
stack
.insertBefore(RequeueingFilter.role, StatsFilterModule)
.insertBefore(RequeueingFilter.role, RetryingFilterModule)
.prepend(GlobalTimeoutModule)
.prepend(ExceptionSourceFilterModule))
.configured(FactoryToService.Enabled(true))

val exceptionSourceFilter = new ExceptionSourceFilter[Req, Rep](label)
// We keep the retrying filter after the load balancer so we can
// retry across different hosts rather than the same one repeatedly.
val filter = exceptionSourceFilter andThen globalTimeoutFilter(timer) andThen retryFilter(timer)
val composed = filter andThen service
val factory = stackClient.newClient(dest, label)
val service: Service[Req, Rep] = new FactoryToService[Req, Rep](factory)

new ServiceProxy[Req, Rep](composed) {
new ServiceProxy[Req, Rep](service) {
private[this] val released = new AtomicBoolean(false)
override def close(deadline: Time): Future[Unit] = {
if (!released.compareAndSet(false, true)) {
Expand Down
Expand Up @@ -161,6 +161,10 @@ object RetryPolicy extends JavaSingleton {
case Throw(_: ChannelClosedException) => true
}

val Never: RetryPolicy[Try[Nothing]] = new RetryPolicy[Try[Nothing]] {
def apply(t: Try[Nothing]) = None
}

/**
* Lifts a function of type `A => Option[(Duration, RetryPolicy[A])]` in the `RetryPolicy` type.
*/
Expand Down
21 changes: 21 additions & 0 deletions finagle-core/src/test/scala/com/twitter/finagle/StackTest.scala
Expand Up @@ -43,6 +43,27 @@ class StackTest extends FunSuite {
assert(stack.make(empty) === Seq(30, 1, 2, 3, 4))
}

test("Stack.insertBefore") {
val stack = newStack()
val module = new Stack.Module0[List[Int]] {
val role = testRole4
val description = testRole4.toString
def make(next: List[Int]): List[Int] = 100 :: next
}

assert(
stack.insertBefore(testRole4, module).make(empty) ===
Seq(20, 10, 1, 2, 3, 4))

assert(
stack.insertBefore(testRole2, module).make(empty) ===
Seq(20, 100, 10, 1, 2, 3, 4))

assert(
(stack ++ stack).insertBefore(testRole2, module).make(empty) ===
Seq(20, 100, 10, 20, 100, 10, 1, 2, 3, 4))
}

test("Stack.insertAfter") {
val stack = newStack()
val module = new Stack.Module0[List[Int]] {
Expand Down
Expand Up @@ -86,6 +86,29 @@ class ClientBuilderTest extends FunSuite
}
}

test("ClientBuilder should collect stats on 'tries' with no retrypolicy") {
new ClientBuilderHelper {
val inMemory = new InMemoryStatsReceiver
val client = ClientBuilder()
.name("test")
.hostConnectionLimit(1)
.codec(m.codec)
.hosts(Seq(m.clientAddress))
.reportTo(inMemory)
.build()

val service = mock[Service[String, String]]
when(service("123")) thenReturn Future.exception(WriteException(new Exception()))
when(service.close(any[Time])) thenReturn Future.Done
preparedServicePromise() = Return(service)

val f = client("123")

assert(f.isDefined)
assert(inMemory.counters(Seq("test", "tries", "requests")) === 1)
assert(inMemory.counters(Seq("test", "requests")) === 1)
}
}

/* TODO: Stopwatches eliminated mocking.
"measure codec connection preparation latency" in {
Expand Down

0 comments on commit 09b7987

Please sign in to comment.