Skip to content

Commit

Permalink
finagle-core: Move finagle-mux requeues into the default stack
Browse files Browse the repository at this point in the history
Problem

The requeue module in finagle-core lacked support for service
acquisition failures under certain conditions. The requeue
module in mux supports this plus requeues are credited at a ratio
of requests.

Solution

Move the mux requeue module into the default client stack.

RB_ID=627412
  • Loading branch information
Ruben Oanta authored and jenkins committed Apr 22, 2015
1 parent 3d6f236 commit 38dff0f
Show file tree
Hide file tree
Showing 23 changed files with 241 additions and 187 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Expand Up @@ -98,6 +98,10 @@ Runtime Behavior Changes

* finagle-core: added transit_latency_ms and deadline_budget_ms stats.

* finagle-core: Automatic retries (requeues) are now credited as a ratio of
requests over a window of time, instead of a fixed limit. The stats scope
has also changed from "automatic" to "requeues".

Deprecations
~~~~~~~~~~~~

Expand Down
20 changes: 12 additions & 8 deletions doc/src/sphinx/metrics/RateLimiting.rst
Expand Up @@ -4,13 +4,17 @@ RateLimitingFilter
**refused**
a counter of the number of refused connections by the rate limiting filter

RetryingFilter
<<<<<<<<<<<<<<
Requeue
<<<<<<<

There is a "retries" stat that's scoped to "automatic", which reenqueues
automatically when finagle detects that it's safe to reenqueue a request,
namely when finagle sees that it was never written to the wire. These
reenqueues are handled by finagle, and so don't sap your retry budget.
There is a "requeues" stat that's scoped to "requeue", which tracks
requests that fail (or failures to establish a session) but have been
automatically re-enqueued when finagle detects that it's safe.
These re-enqueues are credited by finagle and they don't sap your
retry budget.

**retries**
a histogram of the number of times the service had to retry
**requeues**
a counter of the number of times the service had to retry

**budget**
a gauge representing the instantaneous requeue budget
1 change: 1 addition & 0 deletions finagle-core/src/main/scala/BUILD
Expand Up @@ -6,6 +6,7 @@ scala_library(name='scala',
),
dependencies=[
'3rdparty/jvm/com/google/guava',
'3rdparty/jvm/com/twitter:jsr166e',
'3rdparty/jvm/io/netty',
'finagle/finagle-core/src/main/java',
'util/util-app',
Expand Down
Expand Up @@ -1014,8 +1014,8 @@ private object ClientBuilderClient {
.transformed(new Stack.Transformer {
def apply[Req, Rep](stack: Stack[ServiceFactory[Req, Rep]]) =
stack
.insertBefore(RequeueingFilter.role, new StatsFilterModule[Req, Rep])
.insertBefore(RequeueingFilter.role, new RetryingFilterModule[Req, Rep])
.insertBefore(Requeues.role, new StatsFilterModule[Req, Rep])
.insertBefore(Requeues.role, new RetryingFilterModule[Req, Rep])
.prepend(new GlobalTimeoutModule[Req, Rep])
.prepend(new ExceptionSourceFilterModule[Req, Rep])
})
Expand Down
Expand Up @@ -81,7 +81,7 @@ object StackClient {
* @see [[com.twitter.finagle.factory.RefcountedFactory]]
* @see [[com.twitter.finagle.factory.TimeoutFactory]]
* @see [[com.twitter.finagle.FactoryToService]]
* @see [[com.twitter.finagle.service.RequeueingFilter]]
* @see [[com.twitter.finagle.service.Requeues]]
* @see [[com.twitter.finagle.tracing.ClientTracingFilter]]
* @see [[com.twitter.finagle.tracing.TraceInitializerFilter]]
*/
Expand Down Expand Up @@ -137,7 +137,7 @@ object StackClient {
* load balancer on each request (and closes it after the
* response completes).
*
* * `RequeuingFilter` retries `WriteException`s
* * `Requeues` retries `RetryPolicy.RetryableWriteException`s
* automatically. It must appear above `FactoryToService` so
* that service acquisition failures are retried.
*/
Expand All @@ -148,7 +148,7 @@ object StackClient {
stk.push(TimeoutFactory.module)
stk.push(Role.prepFactory, identity[ServiceFactory[Req, Rep]](_))
stk.push(FactoryToService.module)
stk.push(RequeueingFilter.module)
stk.push(Requeues.module)

/*
* These modules deal with name resolution and request
Expand Down Expand Up @@ -362,11 +362,7 @@ trait StdStackClient[Req, Rep, This <: StdStackClient[Req, Rep, This]]
Stats(stats.scope(clientLabel)) +
BindingFactory.Dest(dest))

val finalStack =
if (!clientStack.contains(FailFastFactory.role)) clientStack.remove(RequeueingFilter.role)
else clientStack

finalStack.make(clientParams)
clientStack.make(clientParams)
}

override def newService(dest: Name, label: String): Service[Req, Rep] = {
Expand Down

This file was deleted.

123 changes: 123 additions & 0 deletions finagle-core/src/main/scala/com/twitter/finagle/service/Requeues.scala
@@ -0,0 +1,123 @@
package com.twitter.finagle.service

import com.twitter.conversions.time._
import com.twitter.finagle._
import com.twitter.finagle.stats.Counter
import com.twitter.finagle.util.TokenBucket
import com.twitter.util.Future

private[finagle] object Requeues {
val role = Stack.Role("Requeues")

def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module1[param.Stats, ServiceFactory[Req, Rep]] {
val role = Requeues.role
val description = "Requeue requests that have been rejected at the service application level"

/** Cost determines the relative cost of a reissue vs. an initial issue. */
private[this] val Cost = 5

/** The upper bound on service acquisition attempts */
private[this] val Effort = 25

def make(stats: param.Stats, next: ServiceFactory[Req, Rep]) = {
val param.Stats(sr0) = stats
val sr = sr0.scope("requeue")
val requeues = sr.counter("requeues")

// Each window in the token bucket has a reserve of 500 tokens, which allows for
// 10 free reissues/s. This is to aid just-starting or low-velocity clients.
val bucket = TokenBucket.newLeakyBucket(ttl = 10.seconds, reserve = Cost*10*10)

// The filter manages the tokens in the bucket: it credits a token per request and
// debits `Cost` per reqeueue. Thus, with the current parameters, reissue can never
// exceed the greater of 10/s and ~15% of total issue.
val requeueFilter = new RequeueFilter[Req, Rep](bucket, Cost, requeues, () => next.status)

new ServiceFactoryProxy(next) {
// We define the gauge inside of the ServiceFactory so that their lifetimes
// are tied together.
private[this] val budgetGauge = sr.addGauge("budget") { bucket.count/Cost }

/**
* Failures to acquire a service can be thought of as local failures because
* we're certain that we haven't dispatched a request yet. Thus, this simply
* tries up to `n` attempts to acquire a service. However, we still only
* requeue a subset of exceptions (currently only `RetryableWriteExceptions`) as
* some exceptions to acquire a service are considered fatal.
*/
private[this] def applySelf(conn: ClientConnection, n: Int): Future[Service[Req, Rep]] =
self(conn).rescue {
case RetryPolicy.RetryableWriteException(_) if status == Status.Open && n > 0 =>
requeues.incr()
applySelf(conn, n-1)
}

/**
* Note: This may seem like we are always attempting service acquisition
* with a fixed budget (i.e. `Effort`). However, this is not always the case
* and is dependent on how the client is built (i.e. `newService`/`newClient`).
*
* Clients built with `newService` compose FactoryToService as part of their stack
* which effectively moves service acquisition as part of service application,
* so all requeues are gated by [[RequeueFilter]].
*
* Clients built with `newClient` separate requeues into two distinct phases for
* service acquisition and service application. First, we try up to `Effort` to acquire
* a new service. Then we requeue requests as per [[RequeueFilter]]. Note, because the
* `newClient` API gives the user control over which service (i.e. session) to issue a
* request, request level requeues using this API must be issued over the same service.
*
* See StackClient#newStack for more details.
*/
override def apply(conn: ClientConnection): Future[Service[Req, Rep]] =
applySelf(conn, Effort).map(service => requeueFilter andThen service)
}
}
}
}

/**
* Requeues service application failures that are encountered in modules below it.
* In addition to requeueing local failures, the filter re-issues remote requests
* that have been NACKd. The policy is inherited from `RetryPolicy.RetryableWriteException`.
* Requeues are also rate-limited according to our budget in `bucket`. They are credited by
* the token bucket at a 1:`requeueCost` ratio of requests. That is, requests are credited 1
* token and requeues are debited `requeueCost` tokens.
*
* @param bucket Maintains our requeue budget.
*
* @param requeueCost How much to debit from `bucket` per each requeue.
*
* @param counter The counter to increment per each requeue.
*
* @param stackStatus Represents the status of the stack which generated the
* underlying service. Requeues are only dispatched when this status reports
* Status.Open.
*/
private[finagle] class RequeueFilter[Req, Rep](
bucket: TokenBucket,
requeueCost: Int,
requeueCounter: Counter,
stackStatus: () => Status)
extends SimpleFilter[Req, Rep] {

private[this] def applyService(req: Req, service: Service[Req, Rep]): Future[Rep] = {
service(req).rescue {
case exc@RetryPolicy.RetryableWriteException(_) =>
// TODO: If we ensure that the stack doesn't return restartable
// failures when it isn't Open, we wouldn't need to gate on status.
if (stackStatus() == Status.Open && bucket.tryGet(requeueCost)) {
requeueCounter.incr()
applyService(req, service)
} else {
Future.exception(exc)
}
}
}

def apply(req: Req, service: Service[Req, Rep]): Future[Rep] = {
bucket.put(1)
applyService(req, service)
}
}
@@ -1,4 +1,4 @@
package com.twitter.finagle.exp
package com.twitter.finagle.util

import com.twitter.util.Duration

Expand All @@ -11,30 +11,30 @@ private[finagle] trait TokenBucket {
* Put `n` tokens into the bucket.
*/
def put(n: Int): Unit
/**

/**
* Try to get `n` tokens out of the bucket.
*
* @return true if successful
*/
def tryGet(n: Int): Boolean

/**
* The number of tokens currently in the bucket.
*/
def count: Long
}

object TokenBucket {
/**
private[finagle] object TokenBucket {
/**
* A leaky bucket expires tokens after approximately `ttl` time.
* Thus, a bucket left alone will empty itself.
*
* @param ttl The (approximate) time after which a token will
* expire.
*
* @param reserve The number of reserve tokens over the TTL
* period. That is, every `ttl` has `reserve` tokens in addition to
* period. That is, every `ttl` has `reserve` tokens in addition to
* the ones added to the bucket.
*/
def newLeakyBucket(ttl: Duration, reserve: Int): TokenBucket = new TokenBucket {
Expand All @@ -55,7 +55,7 @@ object TokenBucket {
w.add(-n)
ok
}

def count: Long = w.sum() + reserve
}
}
@@ -1,4 +1,4 @@
package com.twitter.finagle.exp
package com.twitter.finagle.util

import com.twitter.jsr166e.LongAdder
import com.twitter.util.Time
Expand Down
Expand Up @@ -122,14 +122,15 @@ class EndToEndTest extends FunSuite {
val requestFailures = mem.counters(Seq("client", "failures"))
val serviceCreationFailures =
mem.counters(Seq("client", "service_creation", "failures"))
val automaticRetries =
mem.stats(Seq("client", "automatic", "retries"))

val requeues =
mem.counters.get(Seq("client", "requeue", "requeues"))

assert(requestFailures === 1)

// initial write exception and no requeues
assert(serviceCreationFailures === 1)
assert(automaticRetries === Seq(0.0f, 0.0f))
assert(requeues === None)
}

test("ClientBuilder should be properly instrumented on success") {
Expand Down

0 comments on commit 38dff0f

Please sign in to comment.