Skip to content

Commit 3acdebe

Browse files
committed
8229822: ThrowingPushPromises tests sometimes fail due to EOF
SocketTube is fixed to cater for errors caused by pausing/resuming events on an asynchronously closed connection, from within the selector's manager thread. Http2Connection and Stream are fixed to prevent sending a DataFrame on a stream after Reset has been sent. Backport-of: 77c46ea
1 parent c5d2cc1 commit 3acdebe

10 files changed

+169
-29
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,11 @@ private Http2Connection(HttpConnection connection,
329329
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
330330

331331
Stream<?> initialStream = createStream(exchange);
332-
initialStream.registerStream(1);
332+
boolean opened = initialStream.registerStream(1, true);
333+
if (debug.on() && !opened) {
334+
debug.log("Initial stream was cancelled - but connection is maintained: " +
335+
"reset frame will need to be sent later");
336+
}
333337
windowController.registerStream(1, getInitialSendWindowSize());
334338
initialStream.requestSent();
335339
// Upgrading:
@@ -338,6 +342,11 @@ private Http2Connection(HttpConnection connection,
338342
this.initial = initial;
339343
connectFlows(connection);
340344
sendConnectionPreface();
345+
if (!opened) {
346+
debug.log("ensure reset frame is sent to cancel initial stream");
347+
initialStream.sendCancelStreamFrame();
348+
}
349+
341350
}
342351

343352
// Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
@@ -847,7 +856,7 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
847856
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
848857
Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
849858
pushExch.exchImpl = pushStream;
850-
pushStream.registerStream(promisedStreamid);
859+
pushStream.registerStream(promisedStreamid, true);
851860
parent.incoming_pushPromise(pushReq, pushStream);
852861
}
853862

@@ -872,14 +881,15 @@ private void handleConnectionFrame(Http2Frame frame)
872881
}
873882
}
874883

875-
void resetStream(int streamid, int code) throws IOException {
884+
void resetStream(int streamid, int code) {
876885
try {
877886
if (connection.channel().isOpen()) {
878887
// no need to try & send a reset frame if the
879888
// connection channel is already closed.
880889
Log.logError(
881890
"Resetting stream {0,number,integer} with error code {1,number,integer}",
882891
streamid, code);
892+
markStream(streamid, code);
883893
ResetFrame frame = new ResetFrame(streamid, code);
884894
sendFrame(frame);
885895
} else if (debug.on()) {
@@ -892,6 +902,11 @@ void resetStream(int streamid, int code) throws IOException {
892902
}
893903
}
894904

905+
private void markStream(int streamid, int code) {
906+
Stream<?> s = streams.get(streamid);
907+
if (s != null) s.markStream(code);
908+
}
909+
895910
// reduce count of streams by 1 if stream still exists
896911
synchronized void decrementStreamsCount(int streamid) {
897912
Stream<?> s = streams.get(streamid);
@@ -1193,12 +1208,19 @@ private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
11931208
Stream<?> stream = oh.getAttachment();
11941209
assert stream.streamid == 0;
11951210
int streamid = nextstreamid;
1196-
nextstreamid += 2;
1197-
stream.registerStream(streamid);
1198-
// set outgoing window here. This allows thread sending
1199-
// body to proceed.
1200-
windowController.registerStream(streamid, getInitialSendWindowSize());
1201-
return stream;
1211+
if (stream.registerStream(streamid, false)) {
1212+
// set outgoing window here. This allows thread sending
1213+
// body to proceed.
1214+
nextstreamid += 2;
1215+
windowController.registerStream(streamid, getInitialSendWindowSize());
1216+
return stream;
1217+
} else {
1218+
stream.cancelImpl(new IOException("Request cancelled"));
1219+
if (finalStream() && streams.isEmpty()) {
1220+
close();
1221+
}
1222+
return null;
1223+
}
12021224
}
12031225

12041226
private final Object sendlock = new Object();
@@ -1212,7 +1234,9 @@ void sendFrame(Http2Frame frame) {
12121234
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
12131235
Stream<?> stream = registerNewStream(oh);
12141236
// provide protection from inserting unordered frames between Headers and Continuation
1215-
publisher.enqueue(encodeHeaders(oh, stream));
1237+
if (stream != null) {
1238+
publisher.enqueue(encodeHeaders(oh, stream));
1239+
}
12161240
} else {
12171241
publisher.enqueue(encodeFrame(frame));
12181242
}

src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -246,7 +246,7 @@ public final void handle() {
246246
}
247247
@Override
248248
public final void abort(IOException error) {
249-
debug().log(() -> "abort: " + error);
249+
debug().log(() -> this.getClass().getSimpleName() + " abort: " + error);
250250
pause(); // pause, then signal
251251
signalError(error); // should not be resumed after abort (not checked)
252252
}
@@ -724,10 +724,12 @@ public final void request(long n) {
724724
@Override
725725
public final void cancel() {
726726
pauseReadEvent();
727+
if (debug.on()) debug.log("Read subscription cancelled");
727728
if (Log.channel()) {
728729
Log.logChannel("Read subscription cancelled for channel {0}",
729730
channelDescr());
730731
}
732+
if (debug.on()) debug.log("Stopping read scheduler");
731733
readScheduler.stop();
732734
}
733735

@@ -748,6 +750,7 @@ final void handleError() {
748750
}
749751

750752
final void signalError(Throwable error) {
753+
if (debug.on()) debug.log("signal read error: " + error);
751754
if (!errorRef.compareAndSet(null, error)) {
752755
return;
753756
}
@@ -808,6 +811,7 @@ final void read() {
808811
}
809812
current.errorRef.compareAndSet(null, error);
810813
current.signalCompletion();
814+
if (debug.on()) debug.log("Stopping read scheduler");
811815
readScheduler.stop();
812816
debugState("leaving read() loop with error: ");
813817
return;
@@ -831,6 +835,7 @@ final void read() {
831835
// anyway.
832836
pauseReadEvent();
833837
current.signalCompletion();
838+
if (debug.on()) debug.log("Stopping read scheduler");
834839
readScheduler.stop();
835840
}
836841
debugState("leaving read() loop after EOF: ");
@@ -850,6 +855,7 @@ final void read() {
850855
// waiting for this event to terminate.
851856
// So resume the read event and return now...
852857
resumeReadEvent();
858+
if (errorRef.get() != null) continue;
853859
debugState("leaving read() loop after onNext: ");
854860
return;
855861
} else {
@@ -861,6 +867,7 @@ final void read() {
861867
// readable again.
862868
demand.increase(1);
863869
resumeReadEvent();
870+
if (errorRef.get() != null) continue;
864871
debugState("leaving read() loop with no bytes");
865872
return;
866873
}
@@ -879,6 +886,7 @@ final void read() {
879886
// Trying to pause the event here would actually
880887
// introduce a race condition between this loop and
881888
// request(n).
889+
if (errorRef.get() != null) continue;
882890
debugState("leaving read() loop with no demand");
883891
break;
884892
}
@@ -946,6 +954,7 @@ protected final void signalEvent() {
946954

947955
@Override
948956
protected final void signalError(Throwable error) {
957+
if (debug.on()) debug.log("signalError to %s (%s)", sub, error);
949958
sub.signalError(error);
950959
}
951960

0 commit comments

Comments
 (0)