Skip to content

Commit

Permalink
finagle-http2: Don't use RstExceptions for nack responses
Browse files Browse the repository at this point in the history
Problem

We currently use a RstException for all RST frames. This can get
surfaced to users in metrics which can be tricky to understand. It also
means we can distinguish from all the other causes of RST which are much
more interesting.

Solution

In the case of a nack RST, synthesize a 503 nack response that will get
converted in the HttpNackFilter in the exact same way that it does for
HTTP/1.x.

JIRA Issues: CSL-8980

Differential Revision: https://phabricator.twitter.biz/D389234
  • Loading branch information
Bryce Anderson authored and jenkins committed Oct 29, 2019
1 parent ef2fc45 commit cb67fa3
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Runtime Behavior Changes

* finagle: Upgrade to caffeine 2.8.0 ``PHAB_ID=D384592``

* finagle-http2: Nacks in the form of RST(STREAM_REFUSED | ENHANCE_YOUR_CALM) no
longer surface as a RstException, instead opting for a generic Failure to be
symmetric with the HTTP/1.x nack behavior. ``PHAB_ID=D389234``

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,28 @@ abstract class AbstractEndToEndTest
await(client.close())
}

test("Retryable nacks surface as finagle Failures") {
val svc = Service.mk[Request, Response] { _ =>
Future.exception(Failure.RetryableNackFailure)
}
val client = nonStreamingConnect(svc)
val f = intercept[Failure] { await(client(Request())) }
assert(!f.isFlagged(FailureFlags.Retryable))
assert(f.isFlagged(FailureFlags.Rejected))
await(client.close())
}

test("Non-retryable nacks surface as finagle Failures") {
val svc = Service.mk[Request, Response] { _ =>
Future.exception(Failure.NonRetryableNackFailure)
}
val client = nonStreamingConnect(svc)
val f = intercept[Failure] { await(client(Request())) }
assert(f.isFlagged(FailureFlags.NonRetryable))
assert(f.isFlagged(FailureFlags.Rejected))
await(client.close())
}

test(implName + ": aggregates trailers when streams are aggregated") {
val service = new HttpService {
def apply(req: Request): Future[Response] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package com.twitter.finagle.http2.transport.common

import com.twitter.finagle.FailureFlags
import com.twitter.finagle.http.filter.HttpNackFilter
import com.twitter.finagle.http2.RstException
import com.twitter.logging.{HasLogLevel, Level, Logger}
import io.netty.buffer.Unpooled
import io.netty.channel.{ChannelDuplexHandler, ChannelHandlerContext, ChannelPromise}
import io.netty.handler.codec.http.HttpObject
import io.netty.handler.codec.http.{
DefaultFullHttpResponse,
FullHttpResponse,
HttpObject,
HttpResponseStatus,
HttpVersion
}
import io.netty.handler.codec.http2.{Http2Error, Http2ResetFrame, Http2WindowUpdateFrame}
import io.netty.util.ReferenceCountUtil
import java.net.SocketAddress
import scala.util.control.NoStackTrace

/**
Expand Down Expand Up @@ -107,25 +114,40 @@ private[http2] object Http2StreamMessageHandler {
// If this is the first message, we fire our own exception down the pipeline
// which may be a nack.
if (!observedFirstHttpObject) {
ctx.fireExceptionCaught(rstToException(rst, Option(ctx.channel.remoteAddress)))
convertAndFireRst(ctx, rst)
}
}
}

private[this] def rstToException(
rst: Http2ResetFrame,
remoteAddress: Option[SocketAddress]
): RstException = {
private[this] def convertAndFireRst(ctx: ChannelHandlerContext, rst: Http2ResetFrame): Unit = {
val rstCode = rst.errorCode
val flags = if (rstCode == Http2Error.REFUSED_STREAM.code) {
FailureFlags.Retryable | FailureFlags.Rejected

// For nack RST frames we want to reuse the machinery in ClientNackFilter
// so we synthesize an appropriate 503 response and fire it down the pipeline.
// For all other RST frames something bad happened so we push a RstException
// down the exception pathway that will close the pipeline.
if (rstCode == Http2Error.REFUSED_STREAM.code) {
ctx.fireChannelRead(syntheticNackResponse(HttpNackFilter.RetryableNackHeader))
} else if (rstCode == Http2Error.ENHANCE_YOUR_CALM.code) {
FailureFlags.Rejected | FailureFlags.NonRetryable
ctx.fireChannelRead(syntheticNackResponse(HttpNackFilter.NonRetryableNackHeader))
} else {
FailureFlags.Empty
// If we don't have a handle on the stream use -1 as a sentinel. In
// practice this should be exceedingly rare or never happen since we
// are part of the stream pipeline.
val streamId = if (rst.stream != null) rst.stream.id else -1
ctx.fireExceptionCaught(
new RstException(rstCode, streamId, Option(ctx.channel.remoteAddress), FailureFlags.Empty))
}
}

new RstException(rstCode, rst.stream.id, remoteAddress, flags)
private[this] def syntheticNackResponse(nackHeader: String): FullHttpResponse = {
val resp =
new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.SERVICE_UNAVAILABLE,
Unpooled.buffer(0))
resp.headers.set(nackHeader, "true")
resp
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.twitter.finagle.http2.transport.common

import com.twitter.finagle.http.filter.HttpNackFilter
import com.twitter.finagle.http2.RstException
import io.netty.buffer.Unpooled
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.handler.codec.http.FullHttpMessage
import io.netty.handler.codec.http2._
import io.netty.util.ReferenceCounted
import org.mockito.Mockito.when
Expand Down Expand Up @@ -72,4 +74,36 @@ class Http2StreamMessageHandlerTest
}
}
}

test(
"RST frames of type REFUSED_STREAM get propagated as a 503 " +
"with the finagle retryable nack header") {
val em = new EmbeddedChannel(Http2StreamMessageHandler(isServer = false))
em.pipeline.fireUserEventTriggered(new DefaultHttp2ResetFrame(Http2Error.REFUSED_STREAM))

val response = em.readInbound[FullHttpMessage]()
assert(response.headers.get(HttpNackFilter.RetryableNackHeader) == "true")
assert(!response.headers.contains(HttpNackFilter.NonRetryableNackHeader))
}

test(
"RST frames of type ENHANCE_YOUR_CALM get propagated as a 503 " +
"with the finagle non-retryable nack header") {
val em = new EmbeddedChannel(Http2StreamMessageHandler(isServer = false))
em.pipeline.fireUserEventTriggered(new DefaultHttp2ResetFrame(Http2Error.ENHANCE_YOUR_CALM))

val response = em.readInbound[FullHttpMessage]()
assert(response.headers.get(HttpNackFilter.NonRetryableNackHeader) == "true")
assert(!response.headers.contains(HttpNackFilter.RetryableNackHeader))
}

test(
"RST frames of type other than REFUSED_STREAM and ENHANCE_YOUR_CALM " +
"gets propagated as a RstException") {
val em = new EmbeddedChannel(Http2StreamMessageHandler(isServer = false))
em.pipeline.fireUserEventTriggered(new DefaultHttp2ResetFrame(Http2Error.CANCEL))

val ex = intercept[RstException] { em.checkException() }
assert(ex.errorCode == Http2Error.CANCEL.code)
}
}

0 comments on commit cb67fa3

Please sign in to comment.