Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure h2 streams are fully received before closing (#1245) #1280

Merged
merged 3 commits into from May 11, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -256,6 +256,13 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
case _ => false
}

/**
* Updates the stateRef to reflect that the local stream has been closed.
*
* If the ref is already local closed, then the remote stream is reset and
* the reset promise results in an exception. If the ref is remote closed,
* then the ref becomes fully closed and the reset promise is completed.
*/
@tailrec private[this] def closeLocal(): Unit =
stateRef.get match {
case Closed(_) =>
Expand Down Expand Up @@ -304,7 +311,10 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]
if (remote.offer(f)) {
statsReceiver.recordRemoteFrame(f)
true
} else false
} else {
log.debug("[%s] remote offer failed", prefix)
false
}

in match {
case rst: Http2ResetFrame =>
Expand Down Expand Up @@ -348,15 +358,10 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]

case Open(remote@RemoteStreaming()) =>
if (stateRef.compareAndSet(state, remote.toRemoteClosed)) {
val f = toFrame(hdrs)
if (remote.offer(f)) {
statsReceiver.recordRemoteFrame(f)
if (recvFrame(toFrame(hdrs), remote)) {
remote.close()
true
} else {
log.debug("[%s] remote offer failed", prefix)
false
}
} else false
} else recv(hdrs)

case state@LocalClosed(remote@RemotePending()) =>
Expand All @@ -374,16 +379,11 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]

case LocalClosed(remote@RemoteStreaming()) =>
if (stateRef.compareAndSet(state, Closed(Reset.NoError))) {
val f = toFrame(hdrs)
if (remote.offer(f)) {
statsReceiver.recordRemoteFrame(f)
if (recvFrame(toFrame(hdrs), remote)) {
remote.close()
resetP.setDone()
true
} else {
log.debug("[%s] remote offer failed", prefix)
false
}
} else false
} else recv(hdrs)
}

Expand Down Expand Up @@ -442,8 +442,10 @@ private[h2] trait Netty4StreamTransport[SendMsg <: Message, RecvMsg <: Message]

case Open(remote@RemoteStreaming()) =>
if (stateRef.compareAndSet(state, remote.toRemoteClosed)) {
if (recvFrame(toFrame(data), remote)) true
else throw new IllegalStateException("stream queue closed prematurely")
if (recvFrame(toFrame(data), remote)) {
remote.close()
true
} else throw new IllegalStateException("stream queue closed prematurely")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be remote.close() here as well if the stream queue is closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If recvFrame returns false, that means that the call to remote.offer failed, due to the stream already being closed. So I think in this case calling remote.close again would be redundant.

} else recv(data)

case LocalClosed(remote@RemoteStreaming()) =>
Expand Down