From cb67fa33c4b3a482e1545405af444ecbfdc1b3fd Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Tue, 29 Oct 2019 17:54:32 +0000 Subject: [PATCH] finagle-http2: Don't use RstExceptions for nack responses Problem We currently use a RstException for all RST frames. This can get surfaced to users in metrics which can be tricky to understand. It also means we can distinguish from all the other causes of RST which are much more interesting. Solution In the case of a nack RST, synthesize a 503 nack response that will get converted in the HttpNackFilter in the exact same way that it does for HTTP/1.x. JIRA Issues: CSL-8980 Differential Revision: https://phabricator.twitter.biz/D389234 --- CHANGELOG.rst | 4 ++ .../finagle/http/AbstractEndToEndTest.scala | 22 +++++++++ .../common/Http2StreamMessageHandler.scala | 46 ++++++++++++++----- .../Http2StreamMessageHandlerTest.scala | 34 ++++++++++++++ 4 files changed, 94 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 653834bcc2..0b284fc66f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,10 @@ Runtime Behavior Changes * finagle: Upgrade to caffeine 2.8.0 ``PHAB_ID=D384592`` +* finagle-http2: Nacks in the form of RST(STREAM_REFUSED | ENHANCE_YOUR_CALM) no + longer surface as a RstException, instead opting for a generic Failure to be + symmetric with the HTTP/1.x nack behavior. ``PHAB_ID=D389234`` + Breaking API Changes ~~~~~~~~~~~~~~~~~~~~ diff --git a/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractEndToEndTest.scala b/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractEndToEndTest.scala index 3c98b648eb..11b4b5a57d 100644 --- a/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractEndToEndTest.scala +++ b/finagle-http/src/test/scala/com/twitter/finagle/http/AbstractEndToEndTest.scala @@ -578,6 +578,28 @@ abstract class AbstractEndToEndTest await(client.close()) } + test("Retryable nacks surface as finagle Failures") { + val svc = Service.mk[Request, Response] { _ => + Future.exception(Failure.RetryableNackFailure) + } + val client = nonStreamingConnect(svc) + val f = intercept[Failure] { await(client(Request())) } + assert(!f.isFlagged(FailureFlags.Retryable)) + assert(f.isFlagged(FailureFlags.Rejected)) + await(client.close()) + } + + test("Non-retryable nacks surface as finagle Failures") { + val svc = Service.mk[Request, Response] { _ => + Future.exception(Failure.NonRetryableNackFailure) + } + val client = nonStreamingConnect(svc) + val f = intercept[Failure] { await(client(Request())) } + assert(f.isFlagged(FailureFlags.NonRetryable)) + assert(f.isFlagged(FailureFlags.Rejected)) + await(client.close()) + } + test(implName + ": aggregates trailers when streams are aggregated") { val service = new HttpService { def apply(req: Request): Future[Response] = { diff --git a/finagle-http2/src/main/scala/com/twitter/finagle/http2/transport/common/Http2StreamMessageHandler.scala b/finagle-http2/src/main/scala/com/twitter/finagle/http2/transport/common/Http2StreamMessageHandler.scala index 4316f55fed..37ed07e72a 100644 --- a/finagle-http2/src/main/scala/com/twitter/finagle/http2/transport/common/Http2StreamMessageHandler.scala +++ b/finagle-http2/src/main/scala/com/twitter/finagle/http2/transport/common/Http2StreamMessageHandler.scala @@ -1,13 +1,20 @@ package com.twitter.finagle.http2.transport.common import com.twitter.finagle.FailureFlags +import com.twitter.finagle.http.filter.HttpNackFilter import com.twitter.finagle.http2.RstException import com.twitter.logging.{HasLogLevel, Level, Logger} +import io.netty.buffer.Unpooled import io.netty.channel.{ChannelDuplexHandler, ChannelHandlerContext, ChannelPromise} -import io.netty.handler.codec.http.HttpObject +import io.netty.handler.codec.http.{ + DefaultFullHttpResponse, + FullHttpResponse, + HttpObject, + HttpResponseStatus, + HttpVersion +} import io.netty.handler.codec.http2.{Http2Error, Http2ResetFrame, Http2WindowUpdateFrame} import io.netty.util.ReferenceCountUtil -import java.net.SocketAddress import scala.util.control.NoStackTrace /** @@ -107,25 +114,40 @@ private[http2] object Http2StreamMessageHandler { // If this is the first message, we fire our own exception down the pipeline // which may be a nack. if (!observedFirstHttpObject) { - ctx.fireExceptionCaught(rstToException(rst, Option(ctx.channel.remoteAddress))) + convertAndFireRst(ctx, rst) } } } - private[this] def rstToException( - rst: Http2ResetFrame, - remoteAddress: Option[SocketAddress] - ): RstException = { + private[this] def convertAndFireRst(ctx: ChannelHandlerContext, rst: Http2ResetFrame): Unit = { val rstCode = rst.errorCode - val flags = if (rstCode == Http2Error.REFUSED_STREAM.code) { - FailureFlags.Retryable | FailureFlags.Rejected + + // For nack RST frames we want to reuse the machinery in ClientNackFilter + // so we synthesize an appropriate 503 response and fire it down the pipeline. + // For all other RST frames something bad happened so we push a RstException + // down the exception pathway that will close the pipeline. + if (rstCode == Http2Error.REFUSED_STREAM.code) { + ctx.fireChannelRead(syntheticNackResponse(HttpNackFilter.RetryableNackHeader)) } else if (rstCode == Http2Error.ENHANCE_YOUR_CALM.code) { - FailureFlags.Rejected | FailureFlags.NonRetryable + ctx.fireChannelRead(syntheticNackResponse(HttpNackFilter.NonRetryableNackHeader)) } else { - FailureFlags.Empty + // If we don't have a handle on the stream use -1 as a sentinel. In + // practice this should be exceedingly rare or never happen since we + // are part of the stream pipeline. + val streamId = if (rst.stream != null) rst.stream.id else -1 + ctx.fireExceptionCaught( + new RstException(rstCode, streamId, Option(ctx.channel.remoteAddress), FailureFlags.Empty)) } + } - new RstException(rstCode, rst.stream.id, remoteAddress, flags) + private[this] def syntheticNackResponse(nackHeader: String): FullHttpResponse = { + val resp = + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.SERVICE_UNAVAILABLE, + Unpooled.buffer(0)) + resp.headers.set(nackHeader, "true") + resp } } diff --git a/finagle-http2/src/test/scala/com/twitter/finagle/http2/transport/common/Http2StreamMessageHandlerTest.scala b/finagle-http2/src/test/scala/com/twitter/finagle/http2/transport/common/Http2StreamMessageHandlerTest.scala index 4d00030742..013dbf91c7 100644 --- a/finagle-http2/src/test/scala/com/twitter/finagle/http2/transport/common/Http2StreamMessageHandlerTest.scala +++ b/finagle-http2/src/test/scala/com/twitter/finagle/http2/transport/common/Http2StreamMessageHandlerTest.scala @@ -1,8 +1,10 @@ package com.twitter.finagle.http2.transport.common +import com.twitter.finagle.http.filter.HttpNackFilter import com.twitter.finagle.http2.RstException import io.netty.buffer.Unpooled import io.netty.channel.embedded.EmbeddedChannel +import io.netty.handler.codec.http.FullHttpMessage import io.netty.handler.codec.http2._ import io.netty.util.ReferenceCounted import org.mockito.Mockito.when @@ -72,4 +74,36 @@ class Http2StreamMessageHandlerTest } } } + + test( + "RST frames of type REFUSED_STREAM get propagated as a 503 " + + "with the finagle retryable nack header") { + val em = new EmbeddedChannel(Http2StreamMessageHandler(isServer = false)) + em.pipeline.fireUserEventTriggered(new DefaultHttp2ResetFrame(Http2Error.REFUSED_STREAM)) + + val response = em.readInbound[FullHttpMessage]() + assert(response.headers.get(HttpNackFilter.RetryableNackHeader) == "true") + assert(!response.headers.contains(HttpNackFilter.NonRetryableNackHeader)) + } + + test( + "RST frames of type ENHANCE_YOUR_CALM get propagated as a 503 " + + "with the finagle non-retryable nack header") { + val em = new EmbeddedChannel(Http2StreamMessageHandler(isServer = false)) + em.pipeline.fireUserEventTriggered(new DefaultHttp2ResetFrame(Http2Error.ENHANCE_YOUR_CALM)) + + val response = em.readInbound[FullHttpMessage]() + assert(response.headers.get(HttpNackFilter.NonRetryableNackHeader) == "true") + assert(!response.headers.contains(HttpNackFilter.RetryableNackHeader)) + } + + test( + "RST frames of type other than REFUSED_STREAM and ENHANCE_YOUR_CALM " + + "gets propagated as a RstException") { + val em = new EmbeddedChannel(Http2StreamMessageHandler(isServer = false)) + em.pipeline.fireUserEventTriggered(new DefaultHttp2ResetFrame(Http2Error.CANCEL)) + + val ex = intercept[RstException] { em.checkException() } + assert(ex.errorCode == Http2Error.CANCEL.code) + } }