Skip to content

Commit

Permalink
finagle-core: Minor load balancing and retry changes
Browse files Browse the repository at this point in the history
Motivation

We identified a few small gaps in metrics around load balancing and requeues.

Solution

1. Improve balancer logic for P2C to rebuild more aggressively when it finds a
   down node.

2. Add stats:

"loadbalancer/max_effort_exhausted"

     A counter of the number of times a balancer failed find a node that was
    `Status.Open` within `com.twitter.finagle.loadbalancer.Balancer.maxEffort`
    attempts. When this occurs, a non-open node may be selected for that
    request.

"retries/request_limit"

    A counter of the number of times the limit of retry attempts for a logical
    request has been reached

RB_ID=776235
  • Loading branch information
kevinoliver authored and jenkins committed Dec 14, 2015
1 parent 65b232c commit 6e75832
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 24 deletions.
6 changes: 6 additions & 0 deletions doc/src/sphinx/metrics/LoadBalancing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ All Balancers
**removes**
A counter of the number of hosts removed from the loadbalancer

**max_effort_exhausted**
A counter of the number of times a balancer failed to find a node that was
`Status.Open` within `com.twitter.finagle.loadbalancer.Balancer.maxEffort`
attempts. When this occurs, a non-open node may be selected for that
request.

ApertureLoadBandBalancer
<<<<<<<<<<<<<<<<<<<<<<<<

Expand Down
4 changes: 4 additions & 0 deletions doc/src/sphinx/metrics/Retries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ with a `StatsFilter` scoped to `tries`.

**retries/budget_exhausted**
a counter of the number of times when the budget is exhausted

**retries/request_limit**
a counter of the number of times the limit of retry attempts for a logical
request has been reached
12 changes: 10 additions & 2 deletions finagle-core/src/main/scala/com/twitter/finagle/Failure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,20 @@ object Failure {
/**
* Create a new [[Restartable]] failure with the given message.
*/
def rejected(why: String): Failure = new Failure(why, None, Failure.Restartable, logLevel = Level.DEBUG)
def rejected(why: String): Failure =
new Failure(why, None, Failure.Restartable, logLevel = Level.DEBUG)

/**
* Create a new [[Restartable]] failure with the given cause.
*/
def rejected(cause: Throwable): Failure = Failure(cause, Failure.Restartable, logLevel = Level.DEBUG)
def rejected(cause: Throwable): Failure =
Failure(cause, Failure.Restartable, logLevel = Level.DEBUG)

/**
* Create a new [[Restartable]] failure with the given message and cause.
*/
def rejected(why: String, cause: Throwable): Failure =
new Failure(why, Option(cause), Failure.Restartable, logLevel = Level.DEBUG)

/**
* A default [[Restartable]] failure.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.twitter.finagle.loadbalancer

import com.twitter.conversions.time._
import com.twitter.finagle.service.FailingFactory
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
import com.twitter.finagle.util.{Rng, Ring, Ema, DefaultTimer}
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.util.{Rng, Ring, Ema}
import com.twitter.finagle.{
ClientConnection, NoBrokersAvailableException, ServiceFactory, ServiceFactoryProxy,
ServiceProxy, Status}
import com.twitter.util.{Activity, Return, Future, Throw, Time, Var, Duration, Timer}
import com.twitter.util.{Activity, Return, Future, Throw, Time, Duration, Timer}
import java.util.concurrent.atomic.AtomicInteger
import java.util.logging.Logger

/**
* The aperture load-band balancer balances load to the smallest
Expand Down Expand Up @@ -55,7 +53,11 @@ private class ApertureLoadBandBalancer[Req, Rep](
extends Balancer[Req, Rep]
with Aperture[Req, Rep]
with LoadBand[Req, Rep]
with Updating[Req, Rep]
with Updating[Req, Rep] {

protected[this] val maxEffortExhausted = statsReceiver.counter("max_effort_exhausted")

}

object Aperture {
// Note, we need to have a non-zero range for each node
Expand Down Expand Up @@ -103,6 +105,7 @@ private trait Aperture[Req, Rep] { self: Balancer[Req, Rep] =>
extends DistributorT {
type This = Distributor

// Indicates if we've seen any down nodes during pick which we expected to be available
@volatile private[this] var sawDown = false

private[this] val (up, down) = vector.partition(nodeUp) match {
Expand All @@ -116,7 +119,7 @@ private trait Aperture[Req, Rep] { self: Balancer[Req, Rep] =>
} else {
val numNodes = up.size
val ring = Ring(numNodes, RingWidth)
val unit = (RingWidth/numNodes).toInt
val unit = RingWidth/numNodes
val max = RingWidth/unit
(ring, unit, max)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.twitter.finagle.loadbalancer

import com.twitter.finagle._
import com.twitter.finagle.service.FailingFactory
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.stats.{Counter, StatsReceiver}
import com.twitter.finagle.util.{OnReady, Rng, Updater}
import com.twitter.util.{Activity, Future, Promise, Time, Closable, Return, Throw}
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -144,6 +144,11 @@ private trait Balancer[Req, Rep] extends ServiceFactory[Req, Rep] { self =>
updater(Rebuild(dist))
}

// A counter that should be named "max_effort_exhausted".
// Due to a scalac compile/runtime problem we were unable
// to store it as a member variable on this trait.
protected[this] def maxEffortExhausted: Counter

private[this] val gauges = Seq(
statsReceiver.addGauge("available") {
dist.vector.count(n => n.status == Status.Open)
Expand Down Expand Up @@ -248,6 +253,7 @@ private trait Balancer[Req, Rep] extends ServiceFactory[Req, Rep] { self =>

var n = pick(d, maxEffort)
if (n == null) {
maxEffortExhausted.incr()
rebuild()
n = dist.pick()
}
Expand Down Expand Up @@ -363,16 +369,22 @@ private trait P2C[Req, Rep] { self: Balancer[Req, Rep] =>
protected class Distributor(val vector: Vector[Node]) extends DistributorT {
type This = Distributor

// Indicates if we've seen any down nodes during pick which we expected to be available
@volatile private[this] var sawDown = false

private[this] val nodeUp: Node => Boolean = { node =>
node.status == Status.Open
}

private[this] val (up, down) = vector.partition(nodeUp)

def needsRebuild: Boolean = down.nonEmpty && down.exists(nodeUp)
def needsRebuild: Boolean =
sawDown || (down.nonEmpty && down.exists(nodeUp))

def rebuild(): This = new Distributor(vector)
def rebuild(vec: Vector[Node]): This = new Distributor(vec)

// TODO: consider consolidating some of this code with `Aperture.Distributor.pick`
def pick(): Node = {
if (vector.isEmpty)
return failingNode(emptyException)
Expand All @@ -395,6 +407,10 @@ private trait P2C[Req, Rep] { self: Balancer[Req, Rep] =>

val nodeA = vec(a)
val nodeB = vec(b)

if (nodeA.status != Status.Open || nodeB.status != Status.Open)
sawDown = true

if (nodeA.load < nodeB.load) nodeA else nodeB
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ private class P2CBalancer[Req, Rep](
extends Balancer[Req, Rep]
with LeastLoaded[Req, Rep]
with P2C[Req, Rep]
with Updating[Req, Rep]
with Updating[Req, Rep] {

protected[this] val maxEffortExhausted = statsReceiver.counter("max_effort_exhausted")

}

/**
* Like [[com.twitter.finagle.loadbalancer.P2CBalancer]] but
Expand Down Expand Up @@ -76,7 +80,11 @@ private class P2CBalancerPeakEwma[Req, Rep](
extends Balancer[Req, Rep]
with PeakEwma[Req, Rep]
with P2C[Req, Rep]
with Updating[Req, Rep]
with Updating[Req, Rep] {

protected[this] val maxEffortExhausted = statsReceiver.counter("max_effort_exhausted")

}

private trait PeakEwma[Req, Rep] { self: Balancer[Req, Rep] =>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ trait StdStackServer[Req, Rep, This <: StdStackServer[Req, Rep, This]]
// away. This allows protocols that support graceful shutdown to
// also gracefully deny new sessions.
val d = server.newDispatcher(
transport, Service.const(Future.exception(Failure.rejected(exc))))
transport,
Service.const(Future.exception(
Failure.rejected("Terminating session and ignoring request", exc)))
)
connections.add(d)
transport.onClose ensure connections.remove(d)
// We give it a generous amount of time to shut down the session to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[finagle] class RequeueFilter[Req, Rep](

private[this] val requeueCounter = statsReceiver.counter("requeues")
private[this] val budgetExhaustCounter = statsReceiver.counter("budget_exhausted")
private[this] val requestLimitCounter = statsReceiver.counter("request_limit")

private[this] def applyService(
req: Req,
Expand Down Expand Up @@ -76,6 +77,8 @@ private[finagle] class RequeueFilter[Req, Rep](
} else {
if (retriesRemaining > 0)
budgetExhaustCounter.incr()
else
requestLimitCounter.incr()
Future.exception(exc)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.twitter.finagle.loadbalancer

import com.twitter.finagle._
import com.twitter.finagle.service.FailingFactory
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.util.Rng
import com.twitter.util._
import java.util.concurrent.atomic.AtomicInteger
import org.junit.runner.RunWith
import org.scalactic.Tolerance
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner
import scala.collection.mutable
Expand All @@ -24,6 +21,8 @@ private trait ApertureTesting {
protected def statsReceiver = NullStatsReceiver
protected val minAperture = 1

protected[this] val maxEffortExhausted = statsReceiver.counter("max_effort_exhausted")

def applyn(n: Int): Unit = {
val factories = Await.result(Future.collect(Seq.fill(n)(apply())))
Await.result(Closable.all(factories:_*).close())
Expand Down Expand Up @@ -87,7 +86,6 @@ private trait ApertureTesting {

@RunWith(classOf[JUnitRunner])
private class ApertureTest extends FunSuite with ApertureTesting {
import Tolerance._

protected class Bal extends TestBal with LeastLoaded[Unit, Unit]

Expand Down Expand Up @@ -183,7 +181,6 @@ private class ApertureTest extends FunSuite with ApertureTesting {

@RunWith(classOf[JUnitRunner])
private class LoadBandTest extends FunSuite with ApertureTesting {
import Tolerance._

val rng = Rng()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ private class BalancerTest extends FunSuite
private class TestBalancer(
protected val statsReceiver: InMemoryStatsReceiver = new InMemoryStatsReceiver)
extends Balancer[Unit, Unit] {
def maxEffort: Int = ???
def maxEffort: Int = 5
def emptyException: Throwable = ???

def stats: InMemoryStatsReceiver = statsReceiver
protected[this] val maxEffortExhausted = statsReceiver.counter("max_effort_exhausted")

def nodes: Vector[Node] = dist.vector
def factories: Set[ServiceFactory[Unit, Unit]] = nodes.map(_.factory).toSet
Expand All @@ -37,8 +38,8 @@ private class BalancerTest extends FunSuite

case class Distributor(vector: Vector[Node], gen: Int = 1) extends DistributorT {
type This = Distributor
def pick(): Node = ???
def needsRebuild = ???
def pick(): Node = vector.head
def needsRebuild = false
def rebuild(): This = {
rebuildDistributor()
copy(gen=gen+1)
Expand All @@ -58,7 +59,7 @@ private class BalancerTest extends FunSuite
factory.close()
Future.Done
}
def apply(conn: ClientConnection): Future[Service[Unit,Unit]] = ???
def apply(conn: ClientConnection): Future[Service[Unit,Unit]] = Future.never
}

protected def newNode(
Expand Down Expand Up @@ -122,6 +123,22 @@ private class BalancerTest extends FunSuite
assert(bal.status == Status.Busy)
}

test("max_effort_exhausted counter updated properly") {
val stats = new InMemoryStatsReceiver()
val bal = new TestBalancer(stats)
val closed = newFac(Status.Closed)
val open = newFac(Status.Open)

// start out all closed
bal.update(Seq(closed))
bal(ClientConnection.nil)
assert(1 == stats.counters(Seq("max_effort_exhausted")))

// now have it be open and a pick must succeed
bal.update(Seq(open))
bal(ClientConnection.nil)
assert(1 == stats.counters(Seq("max_effort_exhausted")))
}

test("updater: keeps nodes up to date") {
val bal = new TestBalancer
Expand Down

0 comments on commit 6e75832

Please sign in to comment.