Skip to content

Commit

Permalink
finagle-http: enable Ping Failure Detection for MultiplexHandler base…
Browse files Browse the repository at this point in the history
…d HTTP/2 clients

Problem

Ping based failure detection is disable for HTTP/2 clients, due to
issues with the `ConnectionHandler` based implementation.

Solution

Remove ping based failure detection from the `ConnectionHandler`
based implementation, but enable for HTTP/2 clients.

Result

`MultiplexHandler` based HTTP/2 clients will have ping failure
detection on by default and the problematic ping failure detection
in the legacy `ConnectionHandler` based implementation will be
removed completely to prevent it from being accidentally re-enabled.

JIRA Issues: CSL-8670

Differential Revision: https://phabricator.twitter.biz/D360712
  • Loading branch information
enbnt authored and jenkins committed Aug 26, 2019
1 parent b2af98f commit 8af3274
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 86 deletions.
12 changes: 6 additions & 6 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

* finagle: Upgrade to Netty 4.1.39.Final. ``PHAB_ID=D355848``


Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -44,6 +38,12 @@ Runtime Behavior Changes
`com.twitter.finagle.mysql.IncludeHandshakeInServiceAcquisition` toggle
has been removed and it no longer applies. ``PHAB_ID=D355549``

* finagle: Upgrade to Netty 4.1.39.Final. ``PHAB_ID=D355848``

* finagle-http: Enable Ping Failure Detection for MultiplexHandler based HTTP/2 clients. Note that
the Ping Failure Detection implementation has been removed completely from the
non-MultiplexHandler based HTTP/2 client. ``PHAB_ID=D360712``

19.8.0
------

Expand Down
8 changes: 1 addition & 7 deletions finagle-http/src/main/scala/com/twitter/finagle/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.twitter.finagle.http.exp.StreamTransport
import com.twitter.finagle.http.filter._
import com.twitter.finagle.http.service.HttpResponseClassifier
import com.twitter.finagle.http2.Http2Listener
import com.twitter.finagle.liveness.FailureDetector
import com.twitter.finagle.netty4.http.Netty4HttpListener
import com.twitter.finagle.netty4.http.Netty4ServerStreamTransport
import com.twitter.finagle.server._
Expand Down Expand Up @@ -106,12 +105,7 @@ object Http extends Client[Request, Response] with HttpRichClient with Server[Re
val Http2: Stack.Params = Stack.Params.empty +
HttpImpl.Http2Impl +
param.ProtocolLibrary("http/2") +
netty4.ssl.Alpn(ApplicationProtocols.Supported(Seq("h2", "http/1.1"))) +
// There is something funky about how ping-based failure detector is wired in H2 that
// it does more harm than good: when/if it kicks in, the client is having a really hard time
// to recover. Disabling it and relying on other circuit breakers in the stack instead, makes
// client way more stable under a high load.
FailureDetector.Param(FailureDetector.NullConfig)
netty4.ssl.Alpn(ApplicationProtocols.Supported(Seq("h2", "http/1.1")))

private val protocolLibrary = param.ProtocolLibrary("http")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,32 +86,6 @@ final private[http2] class StreamTransportFactory(
activeStreams.remove(streamId).isDefined
}

// exposed for testing
private[http2] def ping(): Future[Unit] = {
val done = new Promise[Unit]
exec.execute(new Runnable {
def run(): Unit = {
if (pingPromise == null) {
pingPromise = done
underlying.write(Ping)
} else {
done.setException(PingOutstandingFailure)
}
}
})
done
}

private[this] val detector =
FailureDetector(detectorConfig, ping _, statsReceiver.scope("failuredetector"))

// H2 uses the default WatermarkPool, which believes each StreamTransport
// represents a connection. When the WatermarkPool sees a peer marked "Closed",
// it believes the connection has already been torn down and doesn't make an
// attempt to close it. Therefore, we ensure that if the FailureDetector marks
// this connection as closed, it gets torn down.
detector.onClose.ensure(close())

private[this] def handleGoaway(
addr: SocketAddress,
obj: HttpObject,
Expand Down Expand Up @@ -212,13 +186,6 @@ final private[http2] class StreamTransportFactory(
if (log.isLoggable(Level.DEBUG))
log.debug(exn, s"Got exception for nonexistent stream: $streamId")
}
case Ping =>
if (pingPromise != null) {
pingPromise.setDone()
pingPromise = null
} else {
log.debug(s"Got unmatched PING message for address $addr")
}

case rep =>
if (log.isLoggable(Level.DEBUG)) {
Expand Down Expand Up @@ -313,7 +280,7 @@ final private[http2] class StreamTransportFactory(
}

// Ensure we report closed if closed has been called but the detector has not yet been triggered
def status: Status = if (dead) Status.Closed else detector.status
def status: Status = if (dead) Status.Closed else Status.Open

/**
* StreamTransport represents a single http/2 stream at a time. Once the stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,25 +221,6 @@ class StreamTransportFactoryTest extends FunSuite {
assert(streamFac.numActiveStreams == 0)
}

test("StreamTransportFactory reflects detector status") {
val (writeq, readq) = (new AsyncQueue[StreamMessage](), new AsyncQueue[StreamMessage]())
val transport =
new SlowClosingQueue(writeq, readq).asInstanceOf[Transport[StreamMessage, StreamMessage] {
type Context = TransportContext with HasExecutor
}]
val addr = new SocketAddress {}
var cur: Status = Status.Open
val params = Stack.Params.empty + FailureDetector.Param(
new FailureDetector.MockConfig(() => cur)
)
val streamFac = new StreamTransportFactory(transport, addr, params)

assert(streamFac.status == Status.Open)
cur = Status.Busy
assert(streamFac.status == Status.Busy)
assert(streamFac.numActiveStreams == 0)
}

test("StreamTransportFactory call to first() provides stream with streamId == 1") {
val (writeq, readq) = (new AsyncQueue[StreamMessage](), new AsyncQueue[StreamMessage]())
val transport =
Expand Down Expand Up @@ -410,26 +391,6 @@ class StreamTransportFactoryTest extends FunSuite {
assert(streamFac.numActiveStreams == 0)
}

test("PINGs receive replies every time") {
val (writeq, readq) = (new AsyncQueue[StreamMessage](), new AsyncQueue[StreamMessage]())
val transport =
new SlowClosingQueue(writeq, readq).asInstanceOf[Transport[StreamMessage, StreamMessage] {
type Context = TransportContext with HasExecutor
}]
val addr = new SocketAddress {}
var cur: Status = Status.Open
val params = Stack.Params.empty + FailureDetector.Param(
new FailureDetector.MockConfig(() => cur)
)
val streamFac = new StreamTransportFactory(transport, addr, params)

for (_ <- 1 to 10) {
streamFac.ping()
assert(await(writeq.poll()) == Ping)
readq.offer(Ping)
}
}

test("reading a StreamException fails that stream") {
val (writeq, readq) = (new AsyncQueue[StreamMessage](), new AsyncQueue[StreamMessage]())
val transport =
Expand Down

0 comments on commit 8af3274

Please sign in to comment.