Skip to content

Commit

Permalink
finagle-http2: Client stream ID overflow kills connection in StreamTr…
Browse files Browse the repository at this point in the history
…ansportFactory

Problem

When StreamTransportFactory runs out of stream IDs, it stalls but remains open.

Solution

Kill off the StreamTransportFactory so that the connection can be recycled in
accordance with HTTP/2 spec.

JIRA Issues: CSL-6233

Differential Revision: https://phabricator.twitter.biz/D175898
  • Loading branch information
nepthar authored and jenkins committed May 29, 2018
1 parent 25474da commit c555467
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 2 deletions.
6 changes: 5 additions & 1 deletion CHANGES
Expand Up @@ -10,7 +10,7 @@ Unreleased
Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

* finagle-http2: Unprocessed streams are retryable in case of GOAWAY.
* finagle-http2: Unprocessed streams are retryable in case of GOAWAY.
``PHAB_ID=D174401``

New Features
Expand Down Expand Up @@ -89,6 +89,10 @@ Breaking API Changes
Bug Fixes
~~~~~~~~~

* finagle-http2: `StreamTransportFactory` now marks itself as dead/closed when it runs out of
HTTP/2 stream IDs instead of stalling. This allows the connection to be closed/reestablished in
accordance with the spec ``PHAB_ID=D175898``

* finagle-netty4: `SslServerSessionVerifier` is now supplied with the proper peer address
rather than `Address.failing`. ``PHAB_ID=D168334``

Expand Down
Expand Up @@ -347,6 +347,8 @@ final private[http2] class StreamTransportFactory(
case Idle =>
val newId = id.getAndAdd(2)
if (newId < 0) {
// When stream identifiers have been exhausted, a new connection must be established
parent.dead = true
handleCloseWith(new StreamIdOverflowException(addr))
} else if (newId % 2 != 1) {
handleCloseWith(new IllegalStreamIdException(addr, newId))
Expand Down Expand Up @@ -391,7 +393,7 @@ final private[http2] class StreamTransportFactory(

private[this] def handleCheckFinished() = state match {
case a: Active if a.finished && queue.size == 0 =>
if (parent.dead) handleCloseStream(s"parent MultiplexedTransporter already dead")
if (parent.dead) handleCloseStream(s"parent StreamTransportFactory already dead")
else {
if (handleRemoveStream(curId)) {
removeIdleCounter.incr()
Expand Down
Expand Up @@ -125,6 +125,34 @@ class StreamTransportFactoryTest extends FunSuite {
assert(streamFac.numActiveStreams == 0)
}

test("StreamTransportFactory dies if it runs out of stream IDs") {
val (writeq, readq) = (new AsyncQueue[StreamMessage](), new AsyncQueue[StreamMessage]())
val transport = new SlowClosingQueue(writeq, readq).asInstanceOf[Transport[StreamMessage, StreamMessage] {
type Context = TransportContext with HasExecutor
}]
val addr = new SocketAddress {}
val streamFac = new StreamTransportFactory(transport, addr, Stack.Params.empty)
streamFac.setStreamId(2147483647)

val s1, s2 = await(streamFac())

s1.write(H1Req)
s2.write(H1Req)

assert(s2.onClose.isDefined)
val exn = intercept[StreamIdOverflowException] {
throw await(s2.onClose)
}

intercept[DeadConnectionException] {
await(streamFac())
}
assert(exn.flags == FailureFlags.Retryable)
assert(streamFac.numActiveStreams == 1)
assert(streamFac.status == Status.Closed)
}


test("StreamTransportFactory forbids new streams on GOAWAY") {
val (writeq, readq) = (new AsyncQueue[StreamMessage](), new AsyncQueue[StreamMessage]())
val transport = new SlowClosingQueue(writeq, readq).asInstanceOf[Transport[StreamMessage, StreamMessage] {
Expand Down

0 comments on commit c555467

Please sign in to comment.