Skip to content

Commit

Permalink
Ensure h2 streams are fully received before closing (#1245)
Browse files Browse the repository at this point in the history
Problem

A race condition existed that would cause a stream to be closed by the
remote sender before the local receiver had fully received all frames in
an h2 stream. This would result in the final frame of an h2 response
never being sent back to the caller, causing the caller's connection to
hang indefinitely.

Solution

Make the receiver fully responsible for closing streams, to ensure that
they are fully received before closing.

Validation

I've deployed this branch to a test environment and it has successfully
served gRPC traffic for the past 48 hours, whereas without this fix the
connection would always hang within about 20 minutes.
  • Loading branch information
klingerf committed May 8, 2017
1 parent 1c812d4 commit 1037f76
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 60 deletions.
@@ -1,5 +1,4 @@
package com.twitter.finagle.buoyant.h2
package netty4

/**
* Detect connection-headers in accordance with RFC 7540 §8.1.2.2:
Expand Down
@@ -1,8 +1,7 @@
package com.twitter.finagle.buoyant.h2

import com.twitter.io.Buf
import com.twitter.concurrent.AsyncQueue
import com.twitter.finagle.Failure
import com.twitter.io.Buf
import com.twitter.util.{Future, Promise, Return, Throw, Try}

/**
Expand Down
Expand Up @@ -2,15 +2,11 @@ package com.twitter.finagle.buoyant.h2
package netty4

import com.twitter.finagle.{Service, Status => SvcStatus}
import com.twitter.finagle.stats.{StatsReceiver => FStatsReceiver}
import com.twitter.finagle.transport.Transport
import com.twitter.logging.Logger
import com.twitter.util._
import io.netty.handler.codec.http2._
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.collection.JavaConverters._
import scala.util.control.NoStackTrace
import java.util.concurrent.atomic.AtomicInteger

object Netty4ClientDispatcher {
private val log = Logger.get(getClass.getName)
Expand Down
Expand Up @@ -2,17 +2,15 @@ package com.twitter.finagle.buoyant.h2
package netty4

import com.twitter.finagle.WriteException
import com.twitter.finagle.netty4.{BufAsByteBuf, ByteBufAsBuf}
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
import com.twitter.finagle.netty4.BufAsByteBuf
import com.twitter.finagle.transport.Transport
import com.twitter.io.Buf
import com.twitter.logging.Logger
import com.twitter.util.{Future, NonFatal, Stopwatch, Time}
import com.twitter.util.{Future, Time}
import io.netty.handler.codec.http2._
import java.net.SocketAddress

private[netty4] trait Netty4H2Writer extends H2Transport.Writer {
import Netty4H2Writer.log

protected[this] def write(f: Http2Frame): Future[Unit]
protected[this] def close(deadline: Time): Future[Unit]
Expand Down
Expand Up @@ -2,16 +2,13 @@ package com.twitter.finagle.buoyant.h2
package netty4

import com.twitter.concurrent.AsyncQueue
import com.twitter.finagle.{ChannelClosedException, ChannelWriteException, Failure}
import com.twitter.finagle.stats.{StatsReceiver => FStatsReceiver, NullStatsReceiver => FNullStatsReceiver}
import com.twitter.finagle.Failure
import com.twitter.finagle.stats.{NullStatsReceiver => FNullStatsReceiver, StatsReceiver => FStatsReceiver}
import com.twitter.logging.Logger
import com.twitter.util.{Future, Promise, Return, Stopwatch, Throw, Try}
import io.netty.buffer.CompositeByteBuf
import com.twitter.util.{Future, Promise, Return, Throw}
import io.netty.handler.codec.http2._
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.util.control.NoStackTrace

/**
* Reads and writes a bi-directional HTTP/2 stream.
Expand All @@ -24,11 +21,11 @@ import scala.util.control.NoStackTrace
* dispatcher.
*
* - Dispatchers write a `LocalMsg`-typed message _to_ a socket. The
* stream transport reasds from the message's stream until it
* stream transport reads from the message's stream until it
* _fails_, so that errors may be propagated if the local side of
* the stream is reset.
*
* When both sides of the stram are closed, the `onReset` future is
* When both sides of the stream are closed, the `onReset` future is
* satisfied.
*
* Either side may reset the stream prematurely, causing the `onReset`
Expand Down Expand Up @@ -93,7 +90,7 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
* Since the local half of the stream is written from the dispatcher
* to the transport, we simply track whether this has completed.
*
* The remote half of the connection is represented with by a
* The remote half of the connection is represented with a
* [[RemoteState]] so that received frames may be passed inbound to
* the application: first, by satisfying the `onRemoteMessage`
* Future with [[RemotePending]], and then by offering data and
Expand Down Expand Up @@ -187,28 +184,28 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
private[this] val stateRef: AtomicReference[StreamState] = {
val remoteMsgP = new Promise[RecvMsg]

// When the remote message--especially a client's repsonse--is
// When the remote message--especially a client's response--is
// canceled, close the transport, sending a RST_STREAM as
// appropriate.
remoteMsgP.setInterruptHandler {
case err: Reset =>
log.debug(err, "[%s] remote message interrupted", prefix)
log.debug("[%s] remote message interrupted %s", prefix, err)
localReset(err)

case Failure(Some(err: Reset)) =>
log.debug(err, "[%s] remote message interrupted", prefix)
log.debug("[%s] remote message interrupted %s", prefix, err)
localReset(err)

case f@Failure(_) if f.isFlagged(Failure.Interrupted) =>
log.debug(f, "[%s] remote message interrupted", prefix)
log.debug("[%s] remote message interrupted %s", prefix, f)
localReset(Reset.Cancel)

case f@Failure(_) if f.isFlagged(Failure.Rejected) =>
log.debug(f, "[%s] remote message interrupted", prefix)
log.debug("[%s] remote message interrupted %s", prefix, f)
localReset(Reset.Refused)

case e =>
log.debug(e, "[%s] remote message interrupted", prefix)
log.debug("[%s] remote message interrupted %s", prefix, e)
localReset(Reset.InternalError)
}

Expand All @@ -217,7 +214,7 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]

val onRecvMessage: Future[RecvMsg] = stateRef.get match {
case Open(rp@RemotePending()) => rp.future
case s => sys.error(s"unexpected initailzation state: $s")
case s => sys.error(s"unexpected initialization state: $s")
}

/**
Expand Down Expand Up @@ -274,7 +271,6 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]

case state@RemoteClosed() =>
if (stateRef.compareAndSet(state, Closed(Reset.NoError))) {
state.close()
resetP.setDone(); ()
} else closeLocal()
}
Expand Down Expand Up @@ -353,8 +349,14 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
case Open(remote@RemoteStreaming()) =>
if (stateRef.compareAndSet(state, remote.toRemoteClosed)) {
val f = toFrame(hdrs)
statsReceiver.recordRemoteFrame(f)
remote.offer(f)
if (remote.offer(f)) {
statsReceiver.recordRemoteFrame(f)
remote.close()
true
} else {
log.debug("[%s] remote offer failed", prefix)
false
}
} else recv(hdrs)

case state@LocalClosed(remote@RemotePending()) =>
Expand All @@ -378,7 +380,10 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
remote.close()
resetP.setDone()
true
} else false
} else {
log.debug("[%s] remote offer failed", prefix)
false
}
} else recv(hdrs)
}

Expand All @@ -387,6 +392,7 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
// initiate a message (i.e. when the remote is still pending).
state match {
case Closed(_) => false

case state@RemoteClosed() =>
if (resetFromLocal(state, Reset.Closed)) false
else recv(hdrs)
Expand Down Expand Up @@ -422,7 +428,7 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
}
}

case data: Http2DataFrame =>
case data: Http2DataFrame if data.isEndStream =>
state match {
case Closed(_) => false

Expand All @@ -435,29 +441,40 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
else recv(data)

case Open(remote@RemoteStreaming()) =>
if (data.isEndStream) {
if (stateRef.compareAndSet(state, remote.toRemoteClosed)) {
if (recvFrame(toFrame(data), remote)) true
else throw new IllegalStateException("stream queue closed prematurely")
} else recv(data)
} else {
if (stateRef.compareAndSet(state, remote.toRemoteClosed)) {
if (recvFrame(toFrame(data), remote)) true
else recv(data)
}
else throw new IllegalStateException("stream queue closed prematurely")
} else recv(data)

case LocalClosed(remote@RemoteStreaming()) =>
if (data.isEndStream) {
if (stateRef.compareAndSet(state, Closed(Reset.NoError))) {
if (recvFrame(toFrame(data), remote)) {
remote.close()
resetP.setDone()
true
} else throw new IllegalStateException("stream queue closed prematurely")
} else recv(data)
} else {
if (recvFrame(toFrame(data), remote)) true
else recv(data)
}
if (stateRef.compareAndSet(state, Closed(Reset.NoError))) {
if (recvFrame(toFrame(data), remote)) {
remote.close()
resetP.setDone()
true
} else throw new IllegalStateException("stream queue closed prematurely")
} else recv(data)
}

case data: Http2DataFrame =>
state match {
case Closed(_) => false

case state@RemoteClosed() =>
if (resetFromLocal(state, Reset.Closed)) false
else recv(data)

case RemoteOpen(remote@RemotePending()) =>
if (resetFromLocal(remote, Reset.InternalError)) false
else recv(data)

case Open(remote@RemoteStreaming()) =>
if (recvFrame(toFrame(data), remote)) true
else recv(data)

case LocalClosed(remote@RemoteStreaming()) =>
if (recvFrame(toFrame(data), remote)) true
else recv(data)
}
}
}
Expand Down Expand Up @@ -488,7 +505,7 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
val streamFF = headersF.map(_ => writeStream(msg.stream))

val writeF = streamFF.flatten
onReset.onFailure(writeF.raise(_))
onReset.onFailure(writeF.raise)
writeF.respond {
case Return(_) =>
closeLocal()
Expand Down Expand Up @@ -551,7 +568,7 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
p
}

private[this] val writeFrame: Frame => Future[Unit] = { frame =>
private[this] def writeFrame(frame: Frame): Future[Unit] =
stateRef.get match {
case Closed(rst) => Future.exception(StreamError.Remote(rst))
case LocalClosed(_) => Future.exception(new IllegalStateException("writing on closed stream"))
Expand All @@ -560,7 +577,6 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
transport.write(streamId, frame).rescue(wrapRemoteEx)
.before(frame.release().rescue(wrapLocalEx))
}
}
}

object Netty4StreamTransport {
Expand Down
6 changes: 3 additions & 3 deletions linkerd/docs/routers.md
Expand Up @@ -104,7 +104,7 @@ kind | `io.l5d.global` | Either [io.l5d.global](#global-service-config) or [io.l
- protocol: http
service:
kind: io.l5d.global
totalTimeoutMs: 500ms
totalTimeoutMs: 500
retries:
budget:
minRetriesPerSec: 5
Expand Down Expand Up @@ -137,9 +137,9 @@ which will be applied to all services.
minMs: 10
maxMs: 10000
- prefix: /svc/foo
totalTimeoutMs: 500ms
totalTimeoutMs: 500
- prefix: /svc/bar
totalTimeoutMs: 200ms
totalTimeoutMs: 200

```

Expand Down

0 comments on commit 1037f76

Please sign in to comment.