Skip to content

Commit

Permalink
finagle-core: FailureAccrualPolicy is mutable and should never be shared
Browse files Browse the repository at this point in the history
Problem

`FailureAccrualPolicy` maintains a mutable state that `FailureAccrualFactory` uses
to determine whether or not an endpoint should be marked unavailable. Given that
it's mutable, it should never be shared across different endpoint stacks (endpoints)
in a single Finagle client. This is why using `Param(p: FailureAccrualPolicy)` may
lead to a surprising results when different endpoints interfere with each other.

Solution

`FaulureAccrualPolicy` should be passed as a function (factory) instead.

RB_ID=802953
TBR=true
  • Loading branch information
vkostyukov authored and jenkins committed Mar 10, 2016
1 parent 60b11df commit 823c3ed
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 132 deletions.
10 changes: 10 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ Breaking API Changes
* finagle-core: `c.t.f.client.Transporter.EndpointAddr` takes a `c.t.f.Address` as its
parameter instead of `SocketAddress`. ``RB_ID=792209``

* finagle-core: `c.t.f.service.FauilureAccrualFactory.Param(FailureAccrualPolicy)` is removed -
it's not safe to configure Failure Accrual with a shareable instance of the policy, use
`() => FailureAccrualPolicy` instead. ``RB_ID=802953``

* finagle-core: `$Client.withSessionQualifier.failureAccrualPolicy` has been removed from the API
since it enables an experimental feature (use Stack's `.configured` API instead). ``RB_ID=802953``

* finagle-core: `c.t.f.service.exp.FailureAccrualPolicies` (Java-friendly API) has bee removed -
use `c.t.f.service.exp.FailureAccrualPolicy` instead.

* finagle-memcached: `c.t.f.memcached.protocol.text.Memcached` no longer takes a `StatsReceiver`,
pass it to a `(Client/Server)Builder` instead. ``RB_ID=797821``

Expand Down
56 changes: 25 additions & 31 deletions doc/src/sphinx/Clients.rst
Original file line number Diff line number Diff line change
Expand Up @@ -561,69 +561,63 @@ The module is implemented by :src:`FailureAccrualFactory <com/twitter/finagle/se
See :ref:`Failure Accrual Stats <failure_accrual_stats>` for stats exported from the
``Failure Accrual`` module.

The ``FailureAccrualFactory`` uses configurable ``FailureAccrualPolicy`` [#experimental]_ to
determine whether to mark an endpoint dead upon a request failure. At this point, there are two
setups are available out of the box.
The ``FailureAccrualFactory`` is configurable in terms of used policy to determine whether to mark
an endpoint dead upon a request failure. At this point, there are two setups available out of
the box.

1. A policy based on the requests success rate meaning (i.e, an endpoint marked dead if its success rate
goes bellow the given threshold)
2. A policy based on the number of consecutive failures occurred in the endpoint (i.e., an endpoint marked
dead if there are at least ``N`` consecutive failures occurred in this endpoint)

The default setup for the `Failure Accrual` module is to use ``FailureAccrualPolicy`` based on the
The default setup for the `Failure Accrual` module is to use a policy based on the
number of consecutive failures (default is 5) accompanied by equal jittered backoff [#backoff]_ producing
durations for which an endpoint is marked dead.

Use the following code snippet to override the default configuration of the ``FailureAccrualFactory``.
Use ``FailureAccrualFactory.Param`` [#experimental]_ to configure Failure Accrual` based on requests
success rate [#example]_.

.. code-block:: scala
import com.twitter.conversions.time._
import com.twitter.finagle.Http
import com.twitter.finagle.service.{Backoff, FailureAccrualFactory}
import com.twitter.finagle.service.exp.FailureAccrualPolicy
val policy: FailureAccrualPolicy = ???
val twitter = Http.client
.withSessionQualifier.failureAccrualPolicy(policy)
.configured(FailureAccrual.Param(() => FailureAccrualPolicy.successRate(
requiredSuccessRate = 0.95,
window = 100,
markDeadFor = Backoff.const(10.seconds)
)))
.newService("twitter.com")
Use ``FailureAccrualPolicy.successRate`` to construct an instance of ``FailureAccrualPolicy`` based on
requests success rate [#example]_.

.. code-block:: scala
import com.twitter.conversions.time._
import com.twitter.finagle.service.Backoff
import com.twitter.finagle.service.exp.FailureAccrualPolicy
val policy: FailureAccrualPolicy = FailureAccrualPolicy.successRate(
requiredSuccessRate = 0.95,
window = 100,
markDeadFor = Backoff.const(30.seconds)
)
The ``successRate`` factory method takes three arguments:

1. `requiredSuccessRate` - the minimally required success rate bellow which an endpoint marked dead
2. `window` - the size of the window to tack success rate on
3. `markDeadFor` - the backoff policy (an instance of ``Stream[Duration]``) used to mark an endpoint
dead for

To construct an instance of ``FailureAccrualPolicy`` based on a number of consecutive failures, use the
``consecutiveFailures`` factory method [#example]_.
To configure `Failure Accrual` based on a number of consecutive failures [#experimental]_, use the
following snippet [#example]_.

.. code-block:: scala
import com.twitter.conversions.time._
import com.twitter.finagle.service.Backoff
import com.twitter.finagle.Http
import com.twitter.finagle.service.{Backoff, FailureAccrualFactory}
import com.twitter.finagle.service.exp.FailureAccrualPolicy
val policy: FailureAccrualPolicy =
FailureAccrualPolicy.consecutiveFailures(
consecutiveFailures = 10,
markDeadFor = Backoff.const(30.seconds)
)
The ``consecutiveFailures`` factory method takes two arguments:
val twitter = Http.client
.configured(FailureAccrual.Param(() => FailureAccrualPolicy.consecutiveFailures(
numFailures = 10,
markDeadFor = Backoff.const(10.seconds)
)))
.newService("twitter.com")
The ``consecutiveFailures`` method takes two arguments:

1. `consecutiveFailures` - the number of failures after which an endpoint is marked dead
2. `markDeadFor` - the backoff policy (an instance of ``Stream[Duration]``) used to mark an endpoint
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,13 @@ class SessionQualificationParams[A <: Stack.Parameterized[A]](self: Stack.Parame
* consecutive failures (default is 5) accompanied by equal jittered backoff
* producing durations for which a host is marked unavailable.
*
* @note Configuring Failure Accrual is experimental so `.configured` with
* `FailureAccrualFactory.Param` should be used to do that.
*
* @see [[https://twitter.github.io/finagle/guide/Clients.html#failure-accrual]]
* [[https://twitter.github.io/finagle/guide/Clients.html#circuit-breaking]]
* [[FailureAccrualFactory]]
*/
def noFailureAccrual: A =
self.configured(FailureAccrualFactory.Disabled)

/**
* Configures the Failure Accrual module of this client with given
* [[FailureAccrualPolicy policy]] (default: mark an endpoint dead after
* 5 consecutive failures).
*
* The Failure Accrual module is a Finagle per-request circuit breaker. It marks a
* host unavailable depending on the used [[FailureAccrualPolicy policy]]. The default
* setup for the Failure Accrual module is to use a policy based on the number of
* consecutive failures (default is 5) accompanied by equal jittered backoff
* producing durations for which a host is marked unavailable.
*
* @see [[https://twitter.github.io/finagle/guide/Clients.html#failure-accrual]]
* [[https://twitter.github.io/finagle/guide/Clients.html#circuit-breaking]]
* [[FailureAccrualFactory]]
*/
def failureAccrualPolicy(policy: FailureAccrualPolicy): A =
self.configured(FailureAccrualFactory.Param(policy))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.twitter.finagle.Stack.{Params, Role}
import com.twitter.finagle._
import com.twitter.finagle.client.Transporter
import com.twitter.finagle.service.exp.FailureAccrualPolicy
import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.util.DefaultLogger
import com.twitter.logging.Level
import com.twitter.util._
Expand Down Expand Up @@ -49,7 +49,8 @@ object FailureAccrualFactory {
private[finagle] val jitteredBackoff: Stream[Duration] =
Backoff.equalJittered(5.seconds, 300.seconds)

private[finagle] val defaultPolicy = () => FailureAccrualPolicy.consecutiveFailures(defaultConsecutiveFailures, jitteredBackoff)
private[finagle] val defaultPolicy =
() => FailureAccrualPolicy.consecutiveFailures(defaultConsecutiveFailures, jitteredBackoff)


/**
Expand Down Expand Up @@ -81,7 +82,6 @@ object FailureAccrualFactory {

private[finagle] object Param {
case class Configured(failureAccrualPolicy: () => FailureAccrualPolicy) extends Param

case class Replaced(factory: Timer => ServiceFactoryWrapper) extends Param
case object Disabled extends Param

Expand Down Expand Up @@ -134,17 +134,7 @@ object FailureAccrualFactory {
* @see The [[https://twitter.github.io/finagle/guide/Clients.html#failure-accrual user guide]]
* for more details.
*/
def Param(failureAccrualPolicy: FailureAccrualPolicy): Param =
Param.Configured(() => failureAccrualPolicy)

/**
* Configures the [[FailureAccrualFactory]].
*
* Used by the memcache client's default params so clients don't share the policy.
*
* @param failureAccrualPolicy The policy to use to determine when to mark an endpoint as dead.
*/
private[finagle] def Param(failureAccrualPolicy: () => FailureAccrualPolicy): Param =
def Param(failureAccrualPolicy: () => FailureAccrualPolicy): Param =
Param.Configured(failureAccrualPolicy)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import com.twitter.util.Duration
* @see The [[https://twitter.github.io/finagle/guide/Clients.html#failure-accrual user guide]]
* for more details.
*/
trait FailureAccrualPolicy {

abstract class FailureAccrualPolicy {
/** Invoked by FailureAccrualFactory when a request is successful. */
def recordSuccess(): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
import com.twitter.finagle.param.Timer;
import com.twitter.finagle.param.Tracer;
import com.twitter.finagle.server.Listener;
import com.twitter.finagle.service.Backoff;
import com.twitter.finagle.service.ExpiringService;
import com.twitter.finagle.service.FailFastFactory;
import com.twitter.finagle.service.FailureAccrualFactory;
import com.twitter.finagle.service.Retries;
import com.twitter.finagle.service.RetryBudgets;
import com.twitter.finagle.service.RetryPolicy;
import com.twitter.finagle.service.TimeoutFilter;
import com.twitter.finagle.service.exp.FailureAccrualPolicies;
import com.twitter.finagle.service.exp.FailureAccrualPolicy;
import com.twitter.finagle.socks.SocksProxyFlags;
import com.twitter.finagle.ssl.Engine;
import com.twitter.finagle.stats.NullStatsReceiver;
Expand Down Expand Up @@ -94,8 +95,20 @@ public void testParams() {
.configured(new ExpiringService.Param(Duration.Top(), Duration.Top()).mk())
.configured(new FailFastFactory.FailFast(true).mk())
.configured(FailureAccrualFactory.Param(10, Duration.Bottom()).mk())
.configured(FailureAccrualFactory.Param(FailureAccrualPolicies.newConsecutiveFailuresPolicy(
3, Duration.fromSeconds(0))).mk())
.configured(FailureAccrualFactory.Param(new Function0<FailureAccrualPolicy>() {
@Override
public FailureAccrualPolicy apply() {
return FailureAccrualPolicy.consecutiveFailures(
3, Backoff.constant(Duration.fromSeconds(1)));
}
}).mk())
.configured(FailureAccrualFactory.Param(new Function0<FailureAccrualPolicy>() {
@Override
public FailureAccrualPolicy apply() {
return FailureAccrualPolicy.successRate(
0.99, 100, Backoff.constant(Duration.fromSeconds(1)));
}
}).mk())
.configured(new TimeoutFilter.Param(Duration.Top()).mk())
.configured(new Transport.BufferSizes(Option.empty(), Option.empty()).mk())
.configured(new Transport.Liveness(Duration.Top(), Duration.Top(), Option.empty()).mk())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class MemcachedTest extends FunSuite
val markDeadFor = Backoff.const(1.second)
val failureAccrualPolicy = FailureAccrualPolicy.consecutiveFailures(20, markDeadFor)
val client = Memcached.client
.configured(FailureAccrualFactory.Param(failureAccrualPolicy))
.configured(FailureAccrualFactory.Param(() => failureAccrualPolicy))
.configured(Transporter.ConnectTimeout(100.milliseconds))
.configured(TimeoutFilter.Param(200.milliseconds))
.configured(TimeoutFactory.Param(200.milliseconds))
Expand Down

0 comments on commit 823c3ed

Please sign in to comment.