New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure h2 streams are fully received before closing (#1245) #1280
Conversation
🙏 |
localReset(Reset.Refused) | ||
|
||
case e => | ||
log.debug(e, "[%s] remote message interrupted", prefix) | ||
log.debug("[%s] remote message interrupted %s", prefix, e) | ||
localReset(Reset.InternalError) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the prior format is used so that we print stack traces when they are enabled (this can be controlled at start time via a system property).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is good to know. Which system property enables stack traces? Does that also world for Failure types (cases 3 and 4)? I can revert this change.
@@ -551,7 +568,7 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message] | |||
p | |||
} | |||
|
|||
private[this] val writeFrame: Frame => Future[Unit] = { frame => | |||
private[this] def writeFrame(frame: Frame): Future[Unit] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this necessary? We prefer to use val'd functions when possible, since they save on unnecessary anonymous function allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this for consistency with the writeHeaders
and writeStream
defs defined above. I can revert this, but I might also convert those other defs to vals.
@@ -488,7 +505,7 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message] | |||
val streamFF = headersF.map(_ => writeStream(msg.stream)) | |||
|
|||
val writeF = streamFF.flatten | |||
onReset.onFailure(writeF.raise(_)) | |||
onReset.onFailure(writeF.raise) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the prior form is used to be explicit that this allocates. We use val'd functions and the paren-less form to indicate that there is no allocation performed, and we use parens to illustrate that an anonymous function is allocated (i.e. when we call a function rather than pass a val'd function).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, good to know. Will revert.
|
||
case LocalClosed(remote@RemoteStreaming()) => | ||
if (recvFrame(toFrame(data), remote)) true | ||
else recv(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change related to the bugfix in some way or is this just a refactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a refactor. I kept getting confused that we process Http2HeadersFrame.isEndStream
and Http2HeadersFrame
in separate cases, but we process all Http2DataFrame
in the same case. Breaking data frames out into two separate cases feels more consistent with how we're handling header frames.
@@ -274,7 +271,6 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message] | |||
|
|||
case state@RemoteClosed() => | |||
if (stateRef.compareAndSet(state, Closed(Reset.NoError))) { | |||
state.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth adding a comment here explaining how state.close is expected to be called?
Problem A race condition existed that would cause a stream to be closed by the remote sender before the local receiver had fully received all frames in an h2 stream. This would result in the final frame of an h2 response never being sent back to the caller, causing the caller's connection to hang indefinitely. Solution Make the receiver fully responsible for closing streams, to ensure that they are fully received before closing. Validation I've deployed this branch to a test environment and it has successfully served gRPC traffic for the past 48 hours, whereas without this fix the connection would always hang within about 20 minutes.
if (recvFrame(toFrame(data), remote)) { | ||
remote.close() | ||
true | ||
} else throw new IllegalStateException("stream queue closed prematurely") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be remote.close()
here as well if the stream queue is closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If recvFrame
returns false, that means that the call to remote.offer
failed, due to the stream already being closed. So I think in this case calling remote.close
again would be redundant.
Problem
A race condition existed that would cause a stream to be closed by the remote sender before the local receiver had fully received all frames in an h2 stream. This would result in the final frame of an h2 response never being sent back to the caller, causing the caller's connection to hang indefinitely.
Solution
Make the receiver fully responsible for closing streams, to ensure that they are fully received before closing.
Validation
I've deployed this branch to a test environment and it has successfully served gRPC traffic for the past 48 hours, whereas without this fix the connection hangs within about 20 minutes.
As part of this change I've included some small refactors for readability in the Netty4StreamTransport. Will also try to add a unit test in a follow-up commit. Fixes #1245.