Skip to content

Commit

Permalink
=htc akka#1391 client pool: exponential backoff after failed connecti…
Browse files Browse the repository at this point in the history
…on attempts
  • Loading branch information
jrudolph committed Dec 4, 2018
1 parent f9bad09 commit a5de98b
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 31 deletions.
27 changes: 27 additions & 0 deletions akka-http-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,33 @@ akka.http {
# Client-side pipelining is not currently supported. See https://github.com/akka/akka-http/issues/32
pipelining-limit = 1

# The minimum duration to backoff new connection attempts after the previous connection attempt failed.
#
# The pool uses an exponential randomized backoff scheme. After the first failure, the next attempt will only be
# tried after a random duration between the base connection backoff and twice the base connection backoff. If that
# attempt fails as well, the next attempt will be delayed by twice that amount. The total delay is capped using the
# `max-connection-backoff` setting.
#
# The backoff applies for the complete pool. I.e. after one failed connection attempt, further connection attempts
# to that host will backoff for all connections of the pool. After the service recovered, connections will come out
# of backoff one by one due to the random extra backoff time. This is to avoid overloading just recently recovered
# services with new connections ("thundering herd").
#
# Example: base-connection-backoff = 100ms, max-connection-backoff = 10 seconds
# - After 1st failure, backoff somewhere between 100ms and 200ms
# - After 2nd, between 200ms and 400ms
# - After 3rd, between 200ms and 400ms
# - After 4th, between 400ms and 800ms
# - After 5th, between 800ms and 1600ms
# - After 6th, between 1600ms and 3200ms
# - After 7th, between 3200ms and 6400ms
# - After 8th, between 5000ms and 10 seconds (max capped by max-connection-backoff, min by half of that)
# - After 9th, etc., stays between 5000ms and 10 seconds
base-connection-backoff = 100ms

# Maximum backoff duration between failed connection attempts.
max-connection-backoff = 2 min

# The time after which an idle connection pool (without pending requests)
# will automatically terminate itself. Set to `infinite` to completely disable idle timeouts.
idle-timeout = 30 s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.util.OptionVal

import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }

Expand Down Expand Up @@ -70,15 +70,22 @@ private[client] object NewHostConnectionPool {
val slots = Vector.tabulate(_settings.maxConnections)(new Slot(_))
val slotsWaitingForDispatch: util.Deque[Slot] = new util.ArrayDeque[Slot]
val retryBuffer: util.Deque[RequestContext] = new util.ArrayDeque[RequestContext]
var _connectionEmbargo: FiniteDuration = Duration.Zero
def baseEmbargo: FiniteDuration = _settings.baseConnectionBackoff
def maxBaseEmbargo: FiniteDuration = _settings.maxConnectionBackoff / 2 // because we'll add a random component of the same size to the base

override def preStart(): Unit = {
pull(requestsIn)
slots.foreach(_.initialize())
}

def onPush(): Unit = {
dispatchRequest(grab(requestsIn))
pullIfNeeded()
val nextRequest = grab(requestsIn)
if (hasIdleSlots) {
dispatchRequest(nextRequest)
pullIfNeeded()
} else // embargo might change state from unconnected -> embargoed losing an idle slot between the pull and the push here
retryBuffer.addLast(nextRequest)
}
def onPull(): Unit =
if (!slotsWaitingForDispatch.isEmpty)
Expand Down Expand Up @@ -115,13 +122,29 @@ private[client] object NewHostConnectionPool {

def numConnectedSlots: Int = slots.count(_.isConnected)

def onConnectionAttemptFailed(atPreviousEmbargoLevel: FiniteDuration): Unit = {
_connectionEmbargo match {
case Duration.Zero _connectionEmbargo = baseEmbargo
case `atPreviousEmbargoLevel` _connectionEmbargo = (_connectionEmbargo * 2) min maxBaseEmbargo
case _
// don't increase if the embargo level has already changed since the start of the connection attempt
}
slots.foreach(_.onNewConnectionEmbargo(_connectionEmbargo))
}
def onConnectionAttemptSucceeded(): Unit = _connectionEmbargo = Duration.Zero
def currentEmbargo: FiniteDuration = _connectionEmbargo

final case class Event[T](name: String, transition: (SlotState, Slot, T) SlotState) {
def preApply(t: T): Event[Unit] = Event(name, (state, slot, _) transition(state, slot, t))
override def toString: String = s"Event($name)"
}
object Event {
val onPreConnect = event0("onPreConnect", _.onPreConnect(_))
val onConnectionAttemptSucceeded = event[Http.OutgoingConnection]("onConnectionAttemptSucceeded", _.onConnectionAttemptSucceeded(_, _))
val onConnectionAttemptFailed = event[Throwable]("onConnectionAttemptFailed", _.onConnectionAttemptFailed(_, _))

val onNewConnectionEmbargo = event[FiniteDuration]("onNewConnectionEmbargo", _.onNewConnectionEmbargo(_, _))

val onNewRequest = event[RequestContext]("onNewRequest", _.onNewRequest(_, _))

val onRequestDispatched = event0("onRequestDispatched", _.onRequestDispatched(_))
Expand Down Expand Up @@ -180,6 +203,9 @@ private[client] object NewHostConnectionPool {
def onConnectionAttemptFailed(cause: Throwable): Unit =
updateState(Event.onConnectionAttemptFailed, cause)

def onNewConnectionEmbargo(embargo: FiniteDuration): Unit =
updateState(Event.onNewConnectionEmbargo, embargo)

def onNewRequest(req: RequestContext): Unit =
updateState(Event.onNewRequest, req)

Expand Down Expand Up @@ -235,12 +261,12 @@ private[client] object NewHostConnectionPool {
case _ // no timeout set, nothing to do
}

if (state == Unconnected && connection != null) {
if (!state.isConnected && connection != null) {
debug(s"State change from [${previousState.name}] to [Unconnected]. Closing the existing connection.")
closeConnection()
}

if (!previousState.isIdle && state.isIdle) {
if (!previousState.isIdle && state.isIdle && !(state == Unconnected && currentEmbargo != Duration.Zero)) {
debug("Slot became idle... Trying to pull")
pullIfNeeded()
}
Expand All @@ -266,7 +292,9 @@ private[client] object NewHostConnectionPool {
case WaitingForEndOfResponseEntity(_, HttpResponse(_, _, _: HttpEntity.Strict, _), _)
// the connection cannot drive these for a strict entity so we have to loop ourselves
OptionVal.Some(Event.onResponseEntityCompleted)
case Unconnected if numConnectedSlots < settings.minConnections
case Unconnected if currentEmbargo != Duration.Zero
OptionVal.Some(Event.onNewConnectionEmbargo.preApply(currentEmbargo))
case s if !s.isConnected && s.isIdle && numConnectedSlots < settings.minConnections
debug(s"Preconnecting because number of connected slots fell down to $numConnectedSlots")
OptionVal.Some(Event.onPreConnect)
case _ OptionVal.None
Expand Down Expand Up @@ -370,6 +398,7 @@ private[client] object NewHostConnectionPool {
responseIn: SubSinkInlet[HttpResponse]
) extends InHandler with OutHandler { connection
var ongoingResponseEntity: Option[HttpEntity] = None
var connectionEstablished: Boolean = false

/** Will only be executed if this connection is still the current connection for its slot */
def withSlot(f: Slot Unit): Unit =
Expand Down Expand Up @@ -435,10 +464,11 @@ private[client] object NewHostConnectionPool {
slot.onConnectionCompleted()
}
override def onUpstreamFailure(ex: Throwable): Unit =
withSlot { slot
slot.debug("Connection failed")
slot.onConnectionFailed(ex)
}
if (connectionEstablished)
withSlot { slot
slot.debug("Connection failed")
slot.onConnectionFailed(ex)
}

def onPull(): Unit = () // emitRequests makes sure not to push too early

Expand All @@ -461,6 +491,8 @@ private[client] object NewHostConnectionPool {
})
}
def openConnection(slot: Slot): SlotConnection = {
val currentEmbargoLevel = currentEmbargo

val requestOut = new SubSourceOutlet[HttpRequest](s"PoolSlot[${slot.slotId}].requestOut")
val responseIn = new SubSinkInlet[HttpResponse](s"PoolSlot[${slot.slotId}].responseIn")
responseIn.pull()
Expand All @@ -477,8 +509,19 @@ private[client] object NewHostConnectionPool {
responseIn.setHandler(slotCon)

connection.onComplete(safely {
case Success(outgoingConnection) slotCon.withSlot(_.onConnectionAttemptSucceeded(outgoingConnection))
case Failure(cause) slotCon.withSlot(_.onConnectionAttemptFailed(cause))
case Success(outgoingConnection)
slotCon.withSlot { sl
slotCon.connectionEstablished = true
slot.debug("Connection attempt succeeded")
onConnectionAttemptSucceeded()
sl.onConnectionAttemptSucceeded(outgoingConnection)
}
case Failure(cause)
slotCon.withSlot { sl
slot.debug("Connection attempt failed with {}", cause.getMessage)
onConnectionAttemptFailed(currentEmbargoLevel)
sl.onConnectionAttemptFailed(cause)
}
})(ExecutionContexts.sameThreadExecutionContext)

slotCon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.http.impl.engine.client.pool

import java.util.concurrent.ThreadLocalRandom

import akka.annotation.InternalApi
import akka.http.impl.engine.client.PoolFlow.RequestContext
import akka.http.impl.util._
Expand Down Expand Up @@ -50,6 +52,8 @@ private[pool] sealed abstract class SlotState extends Product {
def onConnectionAttemptSucceeded(ctx: SlotContext, outgoingConnection: Http.OutgoingConnection): SlotState = illegalState(ctx, "onConnectionAttemptSucceeded")
def onConnectionAttemptFailed(ctx: SlotContext, cause: Throwable): SlotState = illegalState(ctx, "onConnectionAttemptFailed")

def onNewConnectionEmbargo(ctx: SlotContext, embargoDuration: FiniteDuration): SlotState = illegalState(ctx, "onNewConnectionEmbargo")

def onNewRequest(ctx: SlotContext, requestContext: RequestContext): SlotState = illegalState(ctx, "onNewRequest")

def onRequestDispatched(ctx: SlotContext): SlotState = illegalState(ctx, "onRequestDispatched")
Expand Down Expand Up @@ -97,6 +101,9 @@ private[pool] sealed abstract class SlotState extends Product {
private[pool] object SlotState {
sealed abstract class ConnectedState extends SlotState {
def isConnected: Boolean = true

// ignore embargo while still connected
override def onNewConnectionEmbargo(ctx: SlotContext, embargoDuration: FiniteDuration): SlotState = this
}
sealed trait IdleState extends SlotState {
final override def isIdle = true
Expand All @@ -120,9 +127,7 @@ private[pool] object SlotState {
super.onShutdown(ctx)
}

override def onConnectionAttemptFailed(ctx: SlotContext, cause: Throwable): SlotState =
// TODO: register failed connection attempt to be able to backoff (see https://github.com/akka/akka-http/issues/1391)
failOngoingRequest(ctx, "connection attempt failed", cause)
override def onConnectionAttemptFailed(ctx: SlotContext, cause: Throwable): SlotState = failOngoingRequest(ctx, "connection attempt failed", cause)

override def onRequestEntityFailed(ctx: SlotContext, cause: Throwable): SlotState = failOngoingRequest(ctx, "request entity stream failed", cause)
override def onConnectionCompleted(ctx: SlotContext): SlotState =
Expand All @@ -145,7 +150,23 @@ private[pool] object SlotState {
}
}

case object Unconnected extends SlotState with IdleState {
case class Embargoed(embargoDuration: FiniteDuration) extends SlotState {
override def isConnected: Boolean = false
override def isIdle: Boolean = false

override val stateTimeout: Duration = newLevelTimeout()

private def newLevelTimeout(): FiniteDuration = {
val minMillis = embargoDuration.toMillis
val maxMillis = minMillis * 2
ThreadLocalRandom.current().nextLong(minMillis, maxMillis).millis
}
override def onTimeout(ctx: SlotContext): SlotState = OutOfEmbargo

override def onNewConnectionEmbargo(ctx: SlotContext, embargoDuration: FiniteDuration): SlotState =
Embargoed(embargoDuration)
}
trait UnconnectedState extends SlotState with IdleState {
def isConnected: Boolean = false

override def onPreConnect(ctx: SlotContext): SlotState = {
Expand All @@ -157,7 +178,15 @@ private[pool] object SlotState {
ctx.openConnection()
Connecting(requestContext)
}

override def onNewConnectionEmbargo(ctx: SlotContext, embargoDuration: FiniteDuration): SlotState =
Embargoed(embargoDuration)
}

// a special case of `Unconnected` that will not be instantly re-embargoed
case object OutOfEmbargo extends UnconnectedState
case object Unconnected extends UnconnectedState

case object Idle extends ConnectedState with IdleState {
override def onNewRequest(ctx: SlotContext, requestContext: RequestContext): SlotState =
PushingRequestToConnection(requestContext)
Expand All @@ -183,16 +212,22 @@ private[pool] object SlotState {
override def onNewRequest(ctx: SlotContext, requestContext: RequestContext): SlotState =
Connecting(requestContext)

override def onConnectionAttemptFailed(ctx: SlotContext, cause: Throwable): SlotState =
override def onConnectionAttemptFailed(ctx: SlotContext, cause: Throwable): SlotState = {
// TODO: register failed connection attempt to be able to backoff (see https://github.com/akka/akka-http/issues/1391)
closeAndGoToUnconnected(ctx, "connection attempt failed", cause)
onConnectionFailure(ctx, "connection attempt failed", cause)
}
override def onConnectionFailed(ctx: SlotContext, cause: Throwable): SlotState =
closeAndGoToUnconnected(ctx, "connection failed", cause)
override def onConnectionCompleted(ctx: SlotContext): SlotState =
closeAndGoToUnconnected(ctx, "connection completed", new IllegalStateException("Unexpected connection closure") with NoStackTrace)
onConnectionFailure(ctx, "connection failed", cause)

private def closeAndGoToUnconnected(ctx: SlotContext, signal: String, cause: Throwable): SlotState = {
ctx.debug("Connection was closed by [{}] while preconnecting because of [{}]", signal, cause.getMessage)
override def onConnectionCompleted(ctx: SlotContext): SlotState =
onConnectionFailure(
ctx,
"connection completed",
new IllegalStateException("Unexpected connection closure") with NoStackTrace
)

private def onConnectionFailure(ctx: SlotContext, signal: String, cause: Throwable): SlotState = {
ctx.debug("Connection was closed by [{}] while preconnecting because of [{}].", signal, cause.getMessage)
Unconnected
}
}
Expand Down Expand Up @@ -279,8 +314,6 @@ private[pool] object SlotState {
override def onResponseEntityCompleted(ctx: SlotContext): SlotState =
if (waitingForEndOfRequestEntity)
WaitingForEndOfRequestEntity
// TODO can we be *sure* that by skipping to Unconnected if ctx.willCloseAfter(ongoingResponse)
// we can't get a connection closed event from the 'previous' connection later?
else if (ctx.willCloseAfter(ongoingResponse) || ctx.isConnectionClosed)
Unconnected
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSet
import com.typesafe.config.Config

import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

/** INTERNAL API */
@InternalApi
Expand All @@ -19,6 +20,8 @@ private[akka] final case class ConnectionPoolSettingsImpl(
maxRetries: Int,
maxOpenRequests: Int,
pipeliningLimit: Int,
baseConnectionBackoff: FiniteDuration,
maxConnectionBackoff: FiniteDuration,
idleTimeout: Duration,
connectionSettings: ClientConnectionSettings,
poolImplementation: PoolImplementation,
Expand Down Expand Up @@ -49,14 +52,18 @@ private[akka] final case class ConnectionPoolSettingsImpl(
}
}

object ConnectionPoolSettingsImpl extends SettingsCompanion[ConnectionPoolSettingsImpl]("akka.http.host-connection-pool") {
/** INTERNAL API */
@InternalApi
private[akka] object ConnectionPoolSettingsImpl extends SettingsCompanion[ConnectionPoolSettingsImpl]("akka.http.host-connection-pool") {
def fromSubConfig(root: Config, c: Config) = {
new ConnectionPoolSettingsImpl(
c getInt "max-connections",
c getInt "min-connections",
c getInt "max-retries",
c getInt "max-open-requests",
c getInt "pipelining-limit",
c getFiniteDuration "base-connection-backoff",
c getFiniteDuration "max-connection-backoff",
c getPotentiallyInfiniteDuration "idle-timeout",
ClientConnectionSettingsImpl.fromSubConfig(root, c.getConfig("client")),
c.getString("pool-implementation").toLowerCase match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import scala.concurrent.duration.Duration
import akka.http.impl.util.JavaMapping.Implicits._
import akka.http.javadsl.ClientTransport

import scala.concurrent.duration.FiniteDuration

@ApiMayChange
trait PoolImplementation
@ApiMayChange
Expand All @@ -31,6 +33,8 @@ abstract class ConnectionPoolSettings private[akka] () { self: ConnectionPoolSet
def getMaxRetries: Int = maxRetries
def getMaxOpenRequests: Int = maxOpenRequests
def getPipeliningLimit: Int = pipeliningLimit
def getBaseConnectionBackoff: FiniteDuration = baseConnectionBackoff
def getMaxConnectionBackoff: FiniteDuration = maxConnectionBackoff
def getIdleTimeout: Duration = idleTimeout
def getConnectionSettings: ClientConnectionSettings = connectionSettings

Expand All @@ -55,6 +59,8 @@ abstract class ConnectionPoolSettings private[akka] () { self: ConnectionPoolSet
def withMaxOpenRequests(newValue: Int): ConnectionPoolSettings = self.copy(maxOpenRequests = newValue)
/** Client-side pipelining is not currently supported, see https://github.com/akka/akka-http/issues/32 */
def withPipeliningLimit(newValue: Int): ConnectionPoolSettings = self.copy(pipeliningLimit = newValue)
def withBaseConnectionBackoff(newValue: FiniteDuration): ConnectionPoolSettings = self.copy(baseConnectionBackoff = newValue)
def withMaxConnectionBackoff(newValue: FiniteDuration): ConnectionPoolSettings = self.copy(maxConnectionBackoff = newValue)
def withIdleTimeout(newValue: Duration): ConnectionPoolSettings = self.copy(idleTimeout = newValue)
def withConnectionSettings(newValue: ClientConnectionSettings): ConnectionPoolSettings = self.copy(connectionSettings = newValue.asScala)

Expand Down
Loading

0 comments on commit a5de98b

Please sign in to comment.