Skip to content

Commit

Permalink
8309118: HttpClient: Add more tests for 100 ExpectContinue with HTTP/2
Browse files Browse the repository at this point in the history
Reviewed-by: dfuchs, djelinski
  • Loading branch information
c-cleary committed Oct 31, 2023
1 parent f4c5db9 commit 3a7525d
Show file tree
Hide file tree
Showing 5 changed files with 428 additions and 138 deletions.
100 changes: 61 additions & 39 deletions src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ class Stream<T> extends ExchangeImpl<T> {
private final WindowController windowController;
private final WindowUpdateSender windowUpdater;

// Only accessed in all method calls from incoming(), no need for volatile
private boolean endStreamSeen;

@Override
HttpConnection connection() {
return connection.connection;
Expand All @@ -175,6 +178,8 @@ private void schedule() {
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
try {
if (subscriber == null) {
// pendingResponseSubscriber will be null until response headers have been received and
// readBodyAsync is called.
subscriber = responseSubscriber = pendingResponseSubscriber;
if (subscriber == null) {
// can't process anything yet
Expand All @@ -187,7 +192,13 @@ private void schedule() {
Http2Frame frame = inputQ.peek();
if (frame instanceof ResetFrame rf) {
inputQ.remove();
handleReset(rf, subscriber);
if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) {
// If END_STREAM is already received, complete the requestBodyCF successfully
// and stop sending any request data.
requestBodyCF.complete(null);
} else {
handleReset(rf, subscriber);
}
return;
}
DataFrame df = (DataFrame)frame;
Expand All @@ -201,7 +212,6 @@ private void schedule() {
connection.ensureWindowUpdated(df); // must update connection window
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
connection.decrementStreamsCount(streamid);
subscriber.onComplete();
onCompleteCalled = true;
Expand All @@ -220,7 +230,6 @@ private void schedule() {
if (consumed(df)) {
Log.logTrace("responseSubscriber.onComplete");
if (debug.on()) debug.log("incoming: onComplete");
sched.stop();
connection.decrementStreamsCount(streamid);
subscriber.onComplete();
onCompleteCalled = true;
Expand Down Expand Up @@ -479,10 +488,12 @@ void incoming(Http2Frame frame) throws IOException {
handleResponse(hf);
}
if (hf.getFlag(HeaderFrame.END_STREAM)) {
endStreamSeen = true;
if (debug.on()) debug.log("handling END_STREAM: %d", streamid);
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
}
} else if (frame instanceof DataFrame df) {
if (df.getFlag(DataFrame.END_STREAM)) endStreamSeen = true;
if (cancelled) {
if (debug.on()) {
debug.log("request cancelled or stream closed: dropping data frame");
Expand Down Expand Up @@ -568,46 +579,50 @@ request, exchange, responseHeaders, connection(),

void incoming_reset(ResetFrame frame) {
Log.logTrace("Received RST_STREAM on stream {0}", streamid);
if (endStreamReceived()) {
// responseSubscriber will be null if readBodyAsync has not yet been called
Flow.Subscriber<?> subscriber = responseSubscriber;
if (subscriber == null) subscriber = pendingResponseSubscriber;
// See RFC 9113 sec 5.1 Figure 2, life-cycle of a stream
if (endStreamReceived() && requestBodyCF.isDone()) {
// Stream is in a half closed or fully closed state, the RST_STREAM is ignored and logged.
Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
} else if (closed) {
// Stream is in a fully closed state, the RST_STREAM is ignored and logged.
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
} else if (subscriber == null && !endStreamSeen) {
// subscriber is null and the reader has not seen an END_STREAM flag, handle reset immediately
handleReset(frame, null);
} else if (!requestBodyCF.isDone()) {
// Not done sending the body, complete exceptionally or normally based on RST_STREAM error code
incompleteRequestBodyReset(frame, subscriber);
} else if (response == null || !finalResponseCodeReceived) {
// Complete response has not been received, handle reset immediately
handleReset(frame, null);
} else {
Flow.Subscriber<?> subscriber =
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
if (!requestBodyCF.isDone()) {
// If a RST_STREAM is received, complete the requestBody. This will allow the
// response to be read before the Reset is handled in the case where the client's
// input stream is partially consumed or not consumed at all by the server.
if (frame.getErrorCode() != ResetFrame.NO_ERROR) {
if (debug.on()) {
debug.log("completing requestBodyCF exceptionally due to received" +
" RESET(%s) (stream=%s)", frame.getErrorCode(), streamid);
}
requestBodyCF.completeExceptionally(new IOException("RST_STREAM received"));
} else {
if (debug.on()) {
debug.log("completing requestBodyCF normally due to received" +
" RESET(NO_ERROR) (stream=%s)", streamid);
}
requestBodyCF.complete(null);
}
// Put ResetFrame into inputQ. Any frames already in the queue will be processed before the ResetFrame.
receiveResetFrame(frame);
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
}
}

void incompleteRequestBodyReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
if (frame.getErrorCode() != ResetFrame.NO_ERROR) {
if (debug.on()) {
debug.log("completing requestBodyCF exceptionally due to received" +
" RESET(%s) (stream=%s)", frame.getErrorCode(), streamid);
}
requestBodyCF.completeExceptionally(new IOException("RST_STREAM received"));
} else {
if (debug.on()) {
debug.log("completing requestBodyCF normally due to received" +
" RESET(NO_ERROR) (stream=%s)", streamid);
}
if ((response == null || !finalResponseCodeReceived) && subscriber == null) {
// we haven't received the headers yet, and won't receive any!
// handle reset now.
handleReset(frame, null);
if (!endStreamSeen || !finalResponseCodeReceived) {
// If no END_STREAM flag seen or the final response code has not been received, any RST_STREAM
// should be handled here immediately
handleReset(frame, subscriber);
} else {
// put it in the input queue in order to read all
// pending data frames first. Indeed, a server may send
// RST_STREAM after sending END_STREAM, in which case we should
// ignore it. However, we won't know if we have received END_STREAM
// or not until all pending data frames are read.
receiveResetFrame(frame);
// RST_STREAM was pushed to the queue. It will be handled by
// asyncReceive after all pending data frames have been
// processed.
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
requestBodyCF.complete(null);
}
}
}
Expand Down Expand Up @@ -1376,14 +1391,21 @@ private void cancelImpl(final Throwable e, final int resetFrameErrCode) {
}

if (closing) { // true if the stream has not been closed yet
if (responseSubscriber != null || pendingResponseSubscriber != null) {
var subscriber = this.responseSubscriber;
if (subscriber == null) subscriber = this.pendingResponseSubscriber;
if (subscriber != null) {
if (debug.on())
debug.log("stream %s closing due to %s", streamid, (Object)errorRef.get());
sched.runOrSchedule();
if (subscriber instanceof Http2StreamResponseSubscriber<?> rs) {
// make sure the subscriber is stopped.
if (debug.on()) debug.log("closing response subscriber stream %s", streamid);
rs.complete(errorRef.get());
}
} else {
if (debug.on())
debug.log("stream %s closing due to %s before subscriber registered",
streamid, (Object)errorRef.get());
streamid, (Object)errorRef.get());
}
} else {
if (debug.on()) {
Expand Down

1 comment on commit 3a7525d

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.