Skip to content

Commit

Permalink
finagle-http: Move the creation of the endpointer out of the Http object
Browse files Browse the repository at this point in the history
Problem

The code in the Http.scala file is getting pretty hairy, making it tough
to make changes that may benefit HTTP/2.

Solution

Change the http client implementation configuration to be based on
providing an endpointer instead of transporter oriented tooling and
put this all in a new object, ClientEndpointer.

Differential Revision: https://phabricator.twitter.biz/D337136
  • Loading branch information
Bryce Anderson authored and jenkins committed Jul 15, 2019
1 parent e331491 commit 1338e50
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 121 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ Breaking API Changes
* finagle-http: The `setOriginAndCredentials`, `setMaxAge`, `setMethod`, and `setHeaders` methods
of `c.t.f.http.Cors.HttpFilter` are no longer overridable. ``PHAB_ID=D332765``

* finagle-http: The details of the `c.t.f.Http.HttpImpl` class are meant to be implementation
details so the class constructor was made private along with the fields. Along these same lines
the `c.t.f.Http.H2ClientImpl.transporter` method has been moved to a private location.
``PHAB_ID=D337136``

Bug Fixes
~~~~~~~~~

Expand Down
124 changes: 32 additions & 92 deletions finagle-http/src/main/scala/com/twitter/finagle/Http.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
package com.twitter.finagle

import com.twitter.finagle.client._
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.dispatch.GenSerialClientDispatcher
import com.twitter.finagle.filter.NackAdmissionFilter
import com.twitter.finagle.http._
import com.twitter.finagle.http.codec.{HttpClientDispatcher, HttpServerDispatcher}
import com.twitter.finagle.http.codec.HttpServerDispatcher
import com.twitter.finagle.http.exp.StreamTransport
import com.twitter.finagle.http.filter._
import com.twitter.finagle.http.service.HttpResponseClassifier
import com.twitter.finagle.http2.exp.transport.{Http2Transport, StreamChannelTransport}
import com.twitter.finagle.http2.transport.MultiplexTransporter
import com.twitter.finagle.http2.{Http2Listener, Http2Transporter}
import com.twitter.finagle.http2.Http2Listener
import com.twitter.finagle.liveness.FailureDetector
import com.twitter.finagle.netty4.http.{Netty4HttpListener, Netty4HttpTransporter}
import com.twitter.finagle.netty4.http.{Netty4ClientStreamTransport, Netty4ServerStreamTransport}
import com.twitter.finagle.netty4.http.Netty4HttpListener
import com.twitter.finagle.netty4.http.Netty4ServerStreamTransport
import com.twitter.finagle.server._
import com.twitter.finagle.service.{ResponseClassifier, RetryBudget}
import com.twitter.finagle.ssl.ApplicationProtocols
import com.twitter.finagle.stats.{ExceptionStatsHandler, StatsReceiver}
import com.twitter.finagle.toggle.Toggle
import com.twitter.finagle.tracing._
import com.twitter.finagle.transport.{Transport, TransportContext}
import com.twitter.util.{Duration, Future, Monitor, StorageUnit, Time}
import com.twitter.util.{Duration, Future, Monitor, StorageUnit}
import java.net.SocketAddress

/**
Expand Down Expand Up @@ -63,70 +59,52 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
private[this] val underlying: Toggle[Int] = Toggles("com.twitter.finagle.http.UseH2CServers")
def apply(): Boolean = underlying(ServerInfo().id.hashCode)
}
private[this] object useHttp2MultiplexCodecClient {
private[this] val underlying: Toggle[Int] = Toggles(
"com.twitter.finagle.http.UseHttp2MultiplexCodecClient"
)
def apply(): Boolean = underlying(ServerInfo().id.hashCode)
}

/**
* configure alternative http 1.1 implementations
*
* @param clientTransport client [[StreamTransport]] factory
* @param clientEndpointer client `Stackable[ServiceFactory]`
* @param serverTransport server [[StreamTransport]] factory
* @param transporter [[Transporter]] factory
* @param listener [[Listener]] factory
*/
case class HttpImpl(
clientTransport: Transport[Any, Any] => StreamTransport[Request, Response],
serverTransport: Transport[Any, Any] => StreamTransport[Response, Request],
transporter: Stack.Params => SocketAddress => Transporter[Any, Any, TransportContext],
listener: Stack.Params => Listener[Any, Any, TransportContext],
implName: String) {
final class HttpImpl private (
private[finagle] val clientEndpointer: Stackable[ServiceFactory[Request, Response]],
private[finagle] val serverTransport: Transport[Any, Any] => StreamTransport[Response, Request],
private[finagle] val listener: Stack.Params => Listener[Any, Any, TransportContext],
private[finagle] val implName: String) {

def mk(): (HttpImpl, Stack.Param[HttpImpl]) = (this, HttpImpl.httpImplParam)
}

object HttpImpl {
implicit val httpImplParam: Stack.Param[HttpImpl] = Stack.Param(Netty4Impl)

val Netty4Impl: Http.HttpImpl = new Http.HttpImpl(
ClientEndpointer.HttpEndpointer,
new Netty4ServerStreamTransport(_),
Netty4HttpListener,
"Netty4"
)

val Http2Impl: Http.HttpImpl = new Http.HttpImpl(
ClientEndpointer.Http2Endpointer,
new Netty4ServerStreamTransport(_),
Http2Listener.apply _,
"Netty4"
)
}

case class H2ClientImpl(useMultiplexClient: Option[Boolean])

object H2ClientImpl {
implicit val useMultiplexClientParam: Stack.Param[H2ClientImpl] =
Stack.Param(H2ClientImpl(None))

def transporter(
params: Stack.Params
): SocketAddress => Transporter[Any, Any, TransportContext] = {
params[H2ClientImpl].useMultiplexClient match {
case Some(true) => http2.exp.transport.Http2Transporter(params)
case Some(false) => Http2Transporter(params)
case None =>
if (useHttp2MultiplexCodecClient()) http2.exp.transport.Http2Transporter(params)
else Http2Transporter(params)
}
}
}

val Netty4Impl: Http.HttpImpl = Http.HttpImpl(
new Netty4ClientStreamTransport(_),
new Netty4ServerStreamTransport(_),
Netty4HttpTransporter,
Netty4HttpListener,
"Netty4"
)
val Netty4Impl: Http.HttpImpl = HttpImpl.Netty4Impl

val Http2: Stack.Params = Stack.Params.empty +
Http.HttpImpl(
new Netty4ClientStreamTransport(_),
new Netty4ServerStreamTransport(_),
H2ClientImpl.transporter,
Http2Listener.apply _,
"Netty4"
) +
HttpImpl.Http2Impl +
param.ProtocolLibrary("http/2") +
netty4.ssl.Alpn(ApplicationProtocols.Supported(Seq("h2", "http/1.1"))) +
// There is something funky about how ping-based failure detector is wired in H2 that
Expand Down Expand Up @@ -212,46 +190,8 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
protected type Out = Any
protected type Context = TransportContext

protected def endpointer: Stackable[ServiceFactory[Request, Response]] = {
new EndpointerModule[Request, Response](
Seq(implicitly[Stack.Param[HttpImpl]], implicitly[Stack.Param[param.Stats]]), {
(prms: Stack.Params, addr: SocketAddress) =>
val transporter = params[HttpImpl].transporter(prms)(addr)
val dispatcherStats =
prms[param.Stats].statsReceiver.scope(GenSerialClientDispatcher.StatsScope)

new ServiceFactory[Request, Response] {
def apply(conn: ClientConnection): Future[Service[Request, Response]] =
// we do not want to capture and request specific Locals
// that would live for the life of the session.
Contexts.letClearAll {
transporter().map { trans =>
val streamTransport = prms[HttpImpl].clientTransport(trans)
val httpTransport = trans match {
case _: StreamChannelTransport => new Http2Transport(streamTransport)
case _ => new HttpTransport(streamTransport)
}

new HttpClientDispatcher(
httpTransport,
dispatcherStats
)
}
}

def close(deadline: Time): Future[Unit] = transporter match {
case multiplex: MultiplexTransporter => multiplex.close(deadline)
case _ => Future.Done
}

override def status: Status = transporter match {
case http2: MultiplexTransporter => http2.transporterStatus
case _ => super.status
}
}
}
)
}
protected def endpointer: Stackable[ServiceFactory[Request, Response]] =
params[HttpImpl].clientEndpointer

protected def copy1(
stack: Stack[ServiceFactory[Request, Response]] = this.stack,
Expand Down Expand Up @@ -345,7 +285,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
* @note this will override whatever has been set in the toggle.
*/
def withNoHttp2: Client =
configured(Netty4Impl)
configured(HttpImpl.Netty4Impl)

/**
* Create a [[http.MethodBuilder]] for a given destination.
Expand Down Expand Up @@ -586,7 +526,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
* @note this will override whatever has been set in the toggle.
*/
def withNoHttp2: Server =
configured(Netty4Impl)
configured(HttpImpl.Netty4Impl)

/**
* By default finagle-http automatically sends 100-CONTINUE responses to inbound
Expand All @@ -601,7 +541,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
* can be met.
*
* @note Disabling automatic continues is only supported in
* [[com.twitter.finagle.Http.Netty4Impl]] servers.
* [[com.twitter.finagle.Http.HttpImpl.Netty4Impl]] servers.
*/
def withNoAutomaticContinue: Server =
configured(http.param.AutomaticContinue(false))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.twitter.finagle.http

import com.twitter.finagle.{Status => FinagleStatus, _}
import com.twitter.finagle.Http.{H2ClientImpl, HttpImpl}
import com.twitter.finagle.client.{EndpointerModule, Transporter}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.dispatch.GenSerialClientDispatcher
import com.twitter.finagle.http.codec.HttpClientDispatcher
import com.twitter.finagle.http2.transport.MultiplexTransporter
import com.twitter.finagle.http.exp.StreamTransport
import com.twitter.finagle.http2.Http2Transporter
import com.twitter.finagle.http2.exp.transport.{Http2Transport, StreamChannelTransport}
import com.twitter.finagle.netty4.http.{Netty4ClientStreamTransport, Netty4HttpTransporter}
import com.twitter.finagle.param.Stats
import com.twitter.finagle.server.ServerInfo
import com.twitter.finagle.toggle.Toggle
import com.twitter.finagle.transport.{Transport, TransportContext}
import com.twitter.util.{Future, Time}
import java.net.SocketAddress

/**
* `Endpointer` implementations for the HTTP client.
*/
private[finagle] object ClientEndpointer {

/**
* Tool for testing. We may want to simulate different transport related events
* and this gives us a handle into the transport abstractions.
*/
private[http] case class TransportModifier(modifier: Transport[Any, Any] => Transport[Any, Any])

private[http] object TransportModifier {
implicit val transportModifierParam: Stack.Param[TransportModifier] =
Stack.Param(TransportModifier(identity(_)))
}

private[this] object useHttp2MultiplexCodecClient {
private[this] val underlying: Toggle[Int] = Toggles(
"com.twitter.finagle.http.UseHttp2MultiplexCodecClient"
)
def apply(): Boolean = underlying(ServerInfo().id.hashCode)
}

val HttpEndpointer: Stackable[ServiceFactory[Request, Response]] =
transporterEndpointer(
new Netty4ClientStreamTransport(_),
Netty4HttpTransporter
)

val Http2Endpointer: Stackable[ServiceFactory[Request, Response]] =
transporterEndpointer(
new Netty4ClientStreamTransport(_),
selectH2Transporter
)

// Based on the provided params, build the correct Transporter,
// either the classic or the MultiplexCodec based Transporter.
private def selectH2Transporter(
params: Stack.Params
): SocketAddress => Transporter[Any, Any, TransportContext] = {
params[H2ClientImpl].useMultiplexClient match {
case Some(true) => http2.exp.transport.Http2Transporter(params)
case Some(false) => Http2Transporter(params)
case None =>
// If we haven't been explicitly configured fall back to the toggle
// to determine the default H2 transporter implementation.
if (useHttp2MultiplexCodecClient()) http2.exp.transport.Http2Transporter(params)
else Http2Transporter(params)
}
}

// Construct an `Endpointer` that is based on the `Transporter` model.
private def transporterEndpointer(
clientTransport: Transport[Any, Any] => StreamTransport[Request, Response],
mkTransporter: Stack.Params => SocketAddress => Transporter[Any, Any, TransportContext]
): Stackable[ServiceFactory[Request, Response]] = {
new EndpointerModule[Request, Response](
Seq(implicitly[Stack.Param[HttpImpl]], implicitly[Stack.Param[Stats]]), {
(prms: Stack.Params, addr: SocketAddress) =>
val modifier = prms[TransportModifier].modifier
val transporter = mkTransporter(prms)(addr)
val dispatcherStats =
prms[Stats].statsReceiver.scope(GenSerialClientDispatcher.StatsScope)

new ServiceFactory[Request, Response] {
def apply(conn: ClientConnection): Future[Service[Request, Response]] =
// we do not want to capture and request specific Locals
// that would live for the life of the session.
Contexts.letClearAll {
transporter().map { trans =>
val modifiedTransport = modifier(trans)
val streamTransport = clientTransport(modifiedTransport)
val httpTransport = trans match {
case _: StreamChannelTransport => new Http2Transport(streamTransport)
case _ => new HttpTransport(streamTransport)
}

new HttpClientDispatcher(
httpTransport,
dispatcherStats
)
}
}

// `MultiplexTransporter`s are internally caching a H2 session so they
// must also have their resources released on close.
def close(deadline: Time): Future[Unit] = transporter match {
case multiplex: MultiplexTransporter => multiplex.close(deadline)
case _ => Future.Done
}

override def status: FinagleStatus = transporter match {
case http2: MultiplexTransporter => http2.transporterStatus
case _ => super.status
}
}
}
)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.twitter.finagle

import com.twitter.finagle.filter.NackAdmissionFilter
import com.twitter.finagle.http.{Request, Response, serverErrorsAsFailures}
import com.twitter.finagle.http.{ClientEndpointer, Request, Response, serverErrorsAsFailures}
import com.twitter.finagle.service.{ReqRep, ResponseClass, ResponseClassifier}
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.util.{Await, Duration, Future, Return}
Expand Down Expand Up @@ -131,12 +131,10 @@ class HttpTest extends FunSuite with Eventually {
}

test("Netty 4 is a default implementation") {
val transporter = Http.client.params[Http.HttpImpl].transporter
val transporter = Http.client.params[Http.HttpImpl].clientEndpointer
val listener = Http.server.params[Http.HttpImpl].listener

val addr = InetSocketAddress.createUnresolved("supdog", 0)

assert(transporter(Stack.Params.empty)(addr).toString == "Netty4Transporter")
assert(transporter == ClientEndpointer.HttpEndpointer)
assert(listener(Stack.Params.empty).toString == "Netty4Listener")
}
}
Loading

0 comments on commit 1338e50

Please sign in to comment.