From 1338e508dff6517a1801454cdcad95c1a4b94779 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 15 Jul 2019 15:55:12 +0000 Subject: [PATCH] finagle-http: Move the creation of the endpointer out of the Http object Problem The code in the Http.scala file is getting pretty hairy, making it tough to make changes that may benefit HTTP/2. Solution Change the http client implementation configuration to be based on providing an endpointer instead of transporter oriented tooling and put this all in a new object, ClientEndpointer. Differential Revision: https://phabricator.twitter.biz/D337136 --- CHANGELOG.rst | 5 + .../main/scala/com/twitter/finagle/Http.scala | 124 +++++------------- .../finagle/http/ClientEndpointer.scala | 120 +++++++++++++++++ .../scala/com/twitter/finagle/HttpTest.scala | 8 +- .../finagle/http/AbstractStreamingTest.scala | 26 +--- .../finagle/http/Netty4StreamingTest.scala | 2 +- 6 files changed, 164 insertions(+), 121 deletions(-) create mode 100644 finagle-http/src/main/scala/com/twitter/finagle/http/ClientEndpointer.scala diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f5d47ab6ae..65bd1dd8e0 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -55,6 +55,11 @@ Breaking API Changes * finagle-http: The `setOriginAndCredentials`, `setMaxAge`, `setMethod`, and `setHeaders` methods of `c.t.f.http.Cors.HttpFilter` are no longer overridable. ``PHAB_ID=D332765`` +* finagle-http: The details of the `c.t.f.Http.HttpImpl` class are meant to be implementation + details so the class constructor was made private along with the fields. Along these same lines + the `c.t.f.Http.H2ClientImpl.transporter` method has been moved to a private location. + ``PHAB_ID=D337136`` + Bug Fixes ~~~~~~~~~ diff --git a/finagle-http/src/main/scala/com/twitter/finagle/Http.scala b/finagle-http/src/main/scala/com/twitter/finagle/Http.scala index d932e78b02..f57b86beda 100644 --- a/finagle-http/src/main/scala/com/twitter/finagle/Http.scala +++ b/finagle-http/src/main/scala/com/twitter/finagle/Http.scala @@ -1,20 +1,16 @@ package com.twitter.finagle import com.twitter.finagle.client._ -import com.twitter.finagle.context.Contexts -import com.twitter.finagle.dispatch.GenSerialClientDispatcher import com.twitter.finagle.filter.NackAdmissionFilter import com.twitter.finagle.http._ -import com.twitter.finagle.http.codec.{HttpClientDispatcher, HttpServerDispatcher} +import com.twitter.finagle.http.codec.HttpServerDispatcher import com.twitter.finagle.http.exp.StreamTransport import com.twitter.finagle.http.filter._ import com.twitter.finagle.http.service.HttpResponseClassifier -import com.twitter.finagle.http2.exp.transport.{Http2Transport, StreamChannelTransport} -import com.twitter.finagle.http2.transport.MultiplexTransporter -import com.twitter.finagle.http2.{Http2Listener, Http2Transporter} +import com.twitter.finagle.http2.Http2Listener import com.twitter.finagle.liveness.FailureDetector -import com.twitter.finagle.netty4.http.{Netty4HttpListener, Netty4HttpTransporter} -import com.twitter.finagle.netty4.http.{Netty4ClientStreamTransport, Netty4ServerStreamTransport} +import com.twitter.finagle.netty4.http.Netty4HttpListener +import com.twitter.finagle.netty4.http.Netty4ServerStreamTransport import com.twitter.finagle.server._ import com.twitter.finagle.service.{ResponseClassifier, RetryBudget} import com.twitter.finagle.ssl.ApplicationProtocols @@ -22,7 +18,7 @@ import com.twitter.finagle.stats.{ExceptionStatsHandler, StatsReceiver} import com.twitter.finagle.toggle.Toggle import com.twitter.finagle.tracing._ import com.twitter.finagle.transport.{Transport, TransportContext} -import com.twitter.util.{Duration, Future, Monitor, StorageUnit, Time} +import com.twitter.util.{Duration, Future, Monitor, StorageUnit} import java.net.SocketAddress /** @@ -63,33 +59,39 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re private[this] val underlying: Toggle[Int] = Toggles("com.twitter.finagle.http.UseH2CServers") def apply(): Boolean = underlying(ServerInfo().id.hashCode) } - private[this] object useHttp2MultiplexCodecClient { - private[this] val underlying: Toggle[Int] = Toggles( - "com.twitter.finagle.http.UseHttp2MultiplexCodecClient" - ) - def apply(): Boolean = underlying(ServerInfo().id.hashCode) - } /** * configure alternative http 1.1 implementations * - * @param clientTransport client [[StreamTransport]] factory + * @param clientEndpointer client `Stackable[ServiceFactory]` * @param serverTransport server [[StreamTransport]] factory - * @param transporter [[Transporter]] factory * @param listener [[Listener]] factory */ - case class HttpImpl( - clientTransport: Transport[Any, Any] => StreamTransport[Request, Response], - serverTransport: Transport[Any, Any] => StreamTransport[Response, Request], - transporter: Stack.Params => SocketAddress => Transporter[Any, Any, TransportContext], - listener: Stack.Params => Listener[Any, Any, TransportContext], - implName: String) { + final class HttpImpl private ( + private[finagle] val clientEndpointer: Stackable[ServiceFactory[Request, Response]], + private[finagle] val serverTransport: Transport[Any, Any] => StreamTransport[Response, Request], + private[finagle] val listener: Stack.Params => Listener[Any, Any, TransportContext], + private[finagle] val implName: String) { def mk(): (HttpImpl, Stack.Param[HttpImpl]) = (this, HttpImpl.httpImplParam) } object HttpImpl { implicit val httpImplParam: Stack.Param[HttpImpl] = Stack.Param(Netty4Impl) + + val Netty4Impl: Http.HttpImpl = new Http.HttpImpl( + ClientEndpointer.HttpEndpointer, + new Netty4ServerStreamTransport(_), + Netty4HttpListener, + "Netty4" + ) + + val Http2Impl: Http.HttpImpl = new Http.HttpImpl( + ClientEndpointer.Http2Endpointer, + new Netty4ServerStreamTransport(_), + Http2Listener.apply _, + "Netty4" + ) } case class H2ClientImpl(useMultiplexClient: Option[Boolean]) @@ -97,36 +99,12 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re object H2ClientImpl { implicit val useMultiplexClientParam: Stack.Param[H2ClientImpl] = Stack.Param(H2ClientImpl(None)) - - def transporter( - params: Stack.Params - ): SocketAddress => Transporter[Any, Any, TransportContext] = { - params[H2ClientImpl].useMultiplexClient match { - case Some(true) => http2.exp.transport.Http2Transporter(params) - case Some(false) => Http2Transporter(params) - case None => - if (useHttp2MultiplexCodecClient()) http2.exp.transport.Http2Transporter(params) - else Http2Transporter(params) - } - } } - val Netty4Impl: Http.HttpImpl = Http.HttpImpl( - new Netty4ClientStreamTransport(_), - new Netty4ServerStreamTransport(_), - Netty4HttpTransporter, - Netty4HttpListener, - "Netty4" - ) + val Netty4Impl: Http.HttpImpl = HttpImpl.Netty4Impl val Http2: Stack.Params = Stack.Params.empty + - Http.HttpImpl( - new Netty4ClientStreamTransport(_), - new Netty4ServerStreamTransport(_), - H2ClientImpl.transporter, - Http2Listener.apply _, - "Netty4" - ) + + HttpImpl.Http2Impl + param.ProtocolLibrary("http/2") + netty4.ssl.Alpn(ApplicationProtocols.Supported(Seq("h2", "http/1.1"))) + // There is something funky about how ping-based failure detector is wired in H2 that @@ -212,46 +190,8 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re protected type Out = Any protected type Context = TransportContext - protected def endpointer: Stackable[ServiceFactory[Request, Response]] = { - new EndpointerModule[Request, Response]( - Seq(implicitly[Stack.Param[HttpImpl]], implicitly[Stack.Param[param.Stats]]), { - (prms: Stack.Params, addr: SocketAddress) => - val transporter = params[HttpImpl].transporter(prms)(addr) - val dispatcherStats = - prms[param.Stats].statsReceiver.scope(GenSerialClientDispatcher.StatsScope) - - new ServiceFactory[Request, Response] { - def apply(conn: ClientConnection): Future[Service[Request, Response]] = - // we do not want to capture and request specific Locals - // that would live for the life of the session. - Contexts.letClearAll { - transporter().map { trans => - val streamTransport = prms[HttpImpl].clientTransport(trans) - val httpTransport = trans match { - case _: StreamChannelTransport => new Http2Transport(streamTransport) - case _ => new HttpTransport(streamTransport) - } - - new HttpClientDispatcher( - httpTransport, - dispatcherStats - ) - } - } - - def close(deadline: Time): Future[Unit] = transporter match { - case multiplex: MultiplexTransporter => multiplex.close(deadline) - case _ => Future.Done - } - - override def status: Status = transporter match { - case http2: MultiplexTransporter => http2.transporterStatus - case _ => super.status - } - } - } - ) - } + protected def endpointer: Stackable[ServiceFactory[Request, Response]] = + params[HttpImpl].clientEndpointer protected def copy1( stack: Stack[ServiceFactory[Request, Response]] = this.stack, @@ -345,7 +285,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re * @note this will override whatever has been set in the toggle. */ def withNoHttp2: Client = - configured(Netty4Impl) + configured(HttpImpl.Netty4Impl) /** * Create a [[http.MethodBuilder]] for a given destination. @@ -586,7 +526,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re * @note this will override whatever has been set in the toggle. */ def withNoHttp2: Server = - configured(Netty4Impl) + configured(HttpImpl.Netty4Impl) /** * By default finagle-http automatically sends 100-CONTINUE responses to inbound @@ -601,7 +541,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re * can be met. * * @note Disabling automatic continues is only supported in - * [[com.twitter.finagle.Http.Netty4Impl]] servers. + * [[com.twitter.finagle.Http.HttpImpl.Netty4Impl]] servers. */ def withNoAutomaticContinue: Server = configured(http.param.AutomaticContinue(false)) diff --git a/finagle-http/src/main/scala/com/twitter/finagle/http/ClientEndpointer.scala b/finagle-http/src/main/scala/com/twitter/finagle/http/ClientEndpointer.scala new file mode 100644 index 0000000000..2a5da7dc56 --- /dev/null +++ b/finagle-http/src/main/scala/com/twitter/finagle/http/ClientEndpointer.scala @@ -0,0 +1,120 @@ +package com.twitter.finagle.http + +import com.twitter.finagle.{Status => FinagleStatus, _} +import com.twitter.finagle.Http.{H2ClientImpl, HttpImpl} +import com.twitter.finagle.client.{EndpointerModule, Transporter} +import com.twitter.finagle.context.Contexts +import com.twitter.finagle.dispatch.GenSerialClientDispatcher +import com.twitter.finagle.http.codec.HttpClientDispatcher +import com.twitter.finagle.http2.transport.MultiplexTransporter +import com.twitter.finagle.http.exp.StreamTransport +import com.twitter.finagle.http2.Http2Transporter +import com.twitter.finagle.http2.exp.transport.{Http2Transport, StreamChannelTransport} +import com.twitter.finagle.netty4.http.{Netty4ClientStreamTransport, Netty4HttpTransporter} +import com.twitter.finagle.param.Stats +import com.twitter.finagle.server.ServerInfo +import com.twitter.finagle.toggle.Toggle +import com.twitter.finagle.transport.{Transport, TransportContext} +import com.twitter.util.{Future, Time} +import java.net.SocketAddress + +/** + * `Endpointer` implementations for the HTTP client. + */ +private[finagle] object ClientEndpointer { + + /** + * Tool for testing. We may want to simulate different transport related events + * and this gives us a handle into the transport abstractions. + */ + private[http] case class TransportModifier(modifier: Transport[Any, Any] => Transport[Any, Any]) + + private[http] object TransportModifier { + implicit val transportModifierParam: Stack.Param[TransportModifier] = + Stack.Param(TransportModifier(identity(_))) + } + + private[this] object useHttp2MultiplexCodecClient { + private[this] val underlying: Toggle[Int] = Toggles( + "com.twitter.finagle.http.UseHttp2MultiplexCodecClient" + ) + def apply(): Boolean = underlying(ServerInfo().id.hashCode) + } + + val HttpEndpointer: Stackable[ServiceFactory[Request, Response]] = + transporterEndpointer( + new Netty4ClientStreamTransport(_), + Netty4HttpTransporter + ) + + val Http2Endpointer: Stackable[ServiceFactory[Request, Response]] = + transporterEndpointer( + new Netty4ClientStreamTransport(_), + selectH2Transporter + ) + + // Based on the provided params, build the correct Transporter, + // either the classic or the MultiplexCodec based Transporter. + private def selectH2Transporter( + params: Stack.Params + ): SocketAddress => Transporter[Any, Any, TransportContext] = { + params[H2ClientImpl].useMultiplexClient match { + case Some(true) => http2.exp.transport.Http2Transporter(params) + case Some(false) => Http2Transporter(params) + case None => + // If we haven't been explicitly configured fall back to the toggle + // to determine the default H2 transporter implementation. + if (useHttp2MultiplexCodecClient()) http2.exp.transport.Http2Transporter(params) + else Http2Transporter(params) + } + } + + // Construct an `Endpointer` that is based on the `Transporter` model. + private def transporterEndpointer( + clientTransport: Transport[Any, Any] => StreamTransport[Request, Response], + mkTransporter: Stack.Params => SocketAddress => Transporter[Any, Any, TransportContext] + ): Stackable[ServiceFactory[Request, Response]] = { + new EndpointerModule[Request, Response]( + Seq(implicitly[Stack.Param[HttpImpl]], implicitly[Stack.Param[Stats]]), { + (prms: Stack.Params, addr: SocketAddress) => + val modifier = prms[TransportModifier].modifier + val transporter = mkTransporter(prms)(addr) + val dispatcherStats = + prms[Stats].statsReceiver.scope(GenSerialClientDispatcher.StatsScope) + + new ServiceFactory[Request, Response] { + def apply(conn: ClientConnection): Future[Service[Request, Response]] = + // we do not want to capture and request specific Locals + // that would live for the life of the session. + Contexts.letClearAll { + transporter().map { trans => + val modifiedTransport = modifier(trans) + val streamTransport = clientTransport(modifiedTransport) + val httpTransport = trans match { + case _: StreamChannelTransport => new Http2Transport(streamTransport) + case _ => new HttpTransport(streamTransport) + } + + new HttpClientDispatcher( + httpTransport, + dispatcherStats + ) + } + } + + // `MultiplexTransporter`s are internally caching a H2 session so they + // must also have their resources released on close. + def close(deadline: Time): Future[Unit] = transporter match { + case multiplex: MultiplexTransporter => multiplex.close(deadline) + case _ => Future.Done + } + + override def status: FinagleStatus = transporter match { + case http2: MultiplexTransporter => http2.transporterStatus + case _ => super.status + } + } + } + ) + } +} diff --git a/finagle-http/src/test/scala/com/twitter/finagle/HttpTest.scala b/finagle-http/src/test/scala/com/twitter/finagle/HttpTest.scala index f2a5d0a44d..e180b42761 100644 --- a/finagle-http/src/test/scala/com/twitter/finagle/HttpTest.scala +++ b/finagle-http/src/test/scala/com/twitter/finagle/HttpTest.scala @@ -1,7 +1,7 @@ package com.twitter.finagle import com.twitter.finagle.filter.NackAdmissionFilter -import com.twitter.finagle.http.{Request, Response, serverErrorsAsFailures} +import com.twitter.finagle.http.{ClientEndpointer, Request, Response, serverErrorsAsFailures} import com.twitter.finagle.service.{ReqRep, ResponseClass, ResponseClassifier} import com.twitter.finagle.stats.InMemoryStatsReceiver import com.twitter.util.{Await, Duration, Future, Return} @@ -131,12 +131,10 @@ class HttpTest extends FunSuite with Eventually { } test("Netty 4 is a default implementation") { - val transporter = Http.client.params[Http.HttpImpl].transporter + val transporter = Http.client.params[Http.HttpImpl].clientEndpointer val listener = Http.server.params[Http.HttpImpl].listener - val addr = InetSocketAddress.createUnresolved("supdog", 0) - - assert(transporter(Stack.Params.empty)(addr).toString == "Netty4Transporter") + assert(transporter == ClientEndpointer.HttpEndpointer) assert(listener(Stack.Params.empty).toString == "Netty4Listener") } } diff --git a/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractStreamingTest.scala b/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractStreamingTest.scala index 6339ff48f7..d835490f8b 100644 --- a/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractStreamingTest.scala +++ b/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractStreamingTest.scala @@ -3,9 +3,8 @@ package com.twitter.finagle.http import com.twitter.conversions.DurationOps._ import com.twitter.conversions.StorageUnitOps._ import com.twitter.finagle.{Http => FinagleHttp, _} -import com.twitter.finagle.client.Transporter import com.twitter.finagle.service.ConstantService -import com.twitter.finagle.transport.{Transport, TransportContext} +import com.twitter.finagle.transport.Transport import com.twitter.io.{Buf, Pipe, Reader, ReaderDiscardedException, StreamTermination, Writer} import com.twitter.util._ import java.net.{InetSocketAddress, SocketAddress} @@ -513,12 +512,10 @@ abstract class AbstractStreamingTest extends FunSuite { singletonPool: Boolean = false )(mod: Modifier ): Service[Request, Response] = { - val poolSize = if (singletonPool) 1 else Int.MaxValue - val modifiedImpl = impl.copy(transporter = modifiedTransporterFn(mod, impl.transporter)) configureClient(FinagleHttp.client).withSessionPool - .maxSize(poolSize) + .maxSize(if (singletonPool) 1 else Int.MaxValue) .withStreaming(0.bytes) // no aggregation - .configured(modifiedImpl) + .configured(ClientEndpointer.TransportModifier(mod)) .newService(Name.bound(Address(addr.asInstanceOf[InetSocketAddress])), name) } @@ -577,21 +574,4 @@ object StreamingTest { res.headerMap.set("Connection", "close") res } - - def modifiedTransporterFn( - mod: Modifier, - fn: Stack.Params => SocketAddress => Transporter[Any, Any, TransportContext] - ): Stack.Params => SocketAddress => Transporter[Any, Any, TransportContext] = { - params: Stack.Params => - { addr => - val underlying = fn(params)(addr) - new Transporter[Any, Any, TransportContext] { - def apply(): Future[Transport[Any, Any]] = { - underlying().map(mod) - } - - def remoteAddress: SocketAddress = underlying.remoteAddress - } - } - } } diff --git a/finagle-http/src/test/scala/com/twitter/finagle/http/Netty4StreamingTest.scala b/finagle-http/src/test/scala/com/twitter/finagle/http/Netty4StreamingTest.scala index e85b3118da..31dab9f43a 100644 --- a/finagle-http/src/test/scala/com/twitter/finagle/http/Netty4StreamingTest.scala +++ b/finagle-http/src/test/scala/com/twitter/finagle/http/Netty4StreamingTest.scala @@ -3,5 +3,5 @@ package com.twitter.finagle.http import com.twitter.finagle.{Http => FinagleHttp} class Netty4StreamingTest extends AbstractStreamingTest { - def impl: FinagleHttp.HttpImpl = FinagleHttp.Netty4Impl + def impl: FinagleHttp.HttpImpl = FinagleHttp.HttpImpl.Netty4Impl }