Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
+htc akka#1312 new host connection pool implementation
A new implementation of the HostConnectionPool. The basic idea is to replace
the former complicated streaming pipeline of `PoolFlow`, `PoolConductor`,
and `PoolSlot` into a single stage that handles all aspects to get rid
of all the small race condition issues that exist in the current ("legacy")
pool implementation.

The new pool implementation is split into two basic classes

 * akka.http.impl.engine.client.pool.NewHostConnectionPool that provides all
   the infrastructure and event handling to drive the pool
 * akka.http.impl.engine.client.pool.SlotState that contains only the logic
   to handle state changes of a single pool slot
  • Loading branch information
jrudolph committed Nov 30, 2017
1 parent 402b71f commit ecb76a8
Show file tree
Hide file tree
Showing 12 changed files with 1,376 additions and 33 deletions.
Expand Up @@ -10,3 +10,8 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings
# New settings in `@DoNotInherit` classes
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.settings.ParserSettings.getModeledHeaderParsing")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ParserSettings.modeledHeaderParsing")

# New poolImplementation setting on @DoNotInherit class
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ConnectionPoolSettings.poolImplementation")
# New responseEntitySubscriptionTimeout setting
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.ConnectionPoolSettings.responseEntitySubscriptionTimeout")
10 changes: 10 additions & 0 deletions akka-http-core/src/main/resources/reference.conf
Expand Up @@ -308,6 +308,16 @@ akka.http {
# will automatically terminate itself. Set to `infinite` to completely disable idle timeouts.
idle-timeout = 30 s

# The pool implementation to use. Currently supported are:
# - legacy: the original, still default, pool implementation
# - new: the new still-evolving pool implementation, that will receive fixes and new features
pool-implementation = legacy

# The "new" pool implementation will fail a connection early and clear the slot if a response entity was not
# subscribed during the given time period after the response was dispatched. In busy systems the timeout might be
# too tight if a response is not picked up quick enough after it was dispatched by the pool.
response-entity-subscription-timeout = 1.second

# Modify this section to tweak client settings only for host connection pools APIs like `Http().superPool` or
# `Http().singleRequest`.
client = {
Expand Down
Expand Up @@ -5,6 +5,7 @@
package akka.http.impl.engine.client

import akka.NotUsed
import akka.annotation.InternalApi
import akka.http.impl.engine.client.PoolConductor.PoolSlotsSetting
import akka.http.scaladsl.settings.ConnectionPoolSettings

Expand All @@ -16,7 +17,9 @@ import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http

private object PoolFlow {
/** Internal API */
@InternalApi
private[client] object PoolFlow {

case class RequestContext(request: HttpRequest, responsePromise: Promise[HttpResponse], retriesLeft: Int) {
require(retriesLeft >= 0)
Expand Down
Expand Up @@ -7,9 +7,11 @@ package akka.http.impl.engine.client
import akka.actor._
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.http.impl.engine.client.PoolFlow._
import akka.http.impl.engine.client.pool.NewHostConnectionPool
import akka.http.impl.util.RichHttpRequest
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.http.scaladsl.settings.PoolImplementation
import akka.macros.LogHelper
import akka.stream.actor.ActorPublisherMessage._
import akka.stream.actor.ActorSubscriberMessage._
Expand Down Expand Up @@ -98,7 +100,12 @@ private class PoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer
val connectionFlow =
Http().outgoingConnectionUsingTransport(host, port, settings.transport, connectionContext, settings.connectionSettings, setup.log)

val poolFlow = PoolFlow(connectionFlow, settings, log).named("PoolFlow")
val poolFlow =
settings.poolImplementation match {
case PoolImplementation.Legacy PoolFlow(connectionFlow, settings, log).named("PoolFlow")
case PoolImplementation.New NewHostConnectionPool(connectionFlow, settings, log).named("PoolFlow")
}

Source.fromPublisher(ActorPublisher(self)).via(poolFlow).runWith(Sink.fromSubscriber(ActorSubscriber[ResponseContext](self)))
}

Expand Down

0 comments on commit ecb76a8

Please sign in to comment.