Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
8229822: ThrowingPushPromises tests sometimes fail due to EOF
SocketTube is fixed to cater for errors caused by pausing/resuming events on an asynchronously closed connection, from within the selector's manager thread. Http2Connection and Stream are fixed to prevent sending a DataFrame on a stream after Reset has been sent.

Reviewed-by: chegar
  • Loading branch information
dfuch committed Aug 7, 2020
1 parent 1b6f671 commit 18fdf05076643b78ae36279f5e88a4596ff34f36
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 29 deletions.
@@ -329,7 +329,11 @@ private Http2Connection(HttpConnection connection,
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());

Stream<?> initialStream = createStream(exchange);
initialStream.registerStream(1);
boolean opened = initialStream.registerStream(1, true);
if (debug.on() && !opened) {
debug.log("Initial stream was cancelled - but connection is maintained: " +
"reset frame will need to be sent later");
}
windowController.registerStream(1, getInitialSendWindowSize());
initialStream.requestSent();
// Upgrading:
@@ -338,6 +342,11 @@ private Http2Connection(HttpConnection connection,
this.initial = initial;
connectFlows(connection);
sendConnectionPreface();
if (!opened) {
debug.log("ensure reset frame is sent to cancel initial stream");
initialStream.sendCancelStreamFrame();
}

}

// Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
@@ -849,7 +858,7 @@ private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
pushExch.exchImpl = pushStream;
pushStream.registerStream(promisedStreamid);
pushStream.registerStream(promisedStreamid, true);
parent.incoming_pushPromise(pushReq, pushStream);
}

@@ -874,14 +883,15 @@ private void handleConnectionFrame(Http2Frame frame)
}
}

void resetStream(int streamid, int code) throws IOException {
void resetStream(int streamid, int code) {
try {
if (connection.channel().isOpen()) {
// no need to try & send a reset frame if the
// connection channel is already closed.
Log.logError(
"Resetting stream {0,number,integer} with error code {1,number,integer}",
streamid, code);
markStream(streamid, code);
ResetFrame frame = new ResetFrame(streamid, code);
sendFrame(frame);
} else if (debug.on()) {
@@ -894,6 +904,11 @@ void resetStream(int streamid, int code) throws IOException {
}
}

private void markStream(int streamid, int code) {
Stream<?> s = streams.get(streamid);
if (s != null) s.markStream(code);
}

// reduce count of streams by 1 if stream still exists
synchronized void decrementStreamsCount(int streamid) {
Stream<?> s = streams.get(streamid);
@@ -1192,12 +1207,19 @@ private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
Stream<?> stream = oh.getAttachment();
assert stream.streamid == 0;
int streamid = nextstreamid;
nextstreamid += 2;
stream.registerStream(streamid);
// set outgoing window here. This allows thread sending
// body to proceed.
windowController.registerStream(streamid, getInitialSendWindowSize());
return stream;
if (stream.registerStream(streamid, false)) {
// set outgoing window here. This allows thread sending
// body to proceed.
nextstreamid += 2;
windowController.registerStream(streamid, getInitialSendWindowSize());
return stream;
} else {
stream.cancelImpl(new IOException("Request cancelled"));
if (finalStream() && streams.isEmpty()) {
close();
}
return null;
}
}

private final Object sendlock = new Object();
@@ -1211,7 +1233,9 @@ void sendFrame(Http2Frame frame) {
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
Stream<?> stream = registerNewStream(oh);
// provide protection from inserting unordered frames between Headers and Continuation
publisher.enqueue(encodeHeaders(oh, stream));
if (stream != null) {
publisher.enqueue(encodeHeaders(oh, stream));
}
} else {
publisher.enqueue(encodeFrame(frame));
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -246,7 +246,7 @@ public final void handle() {
}
@Override
public final void abort(IOException error) {
debug().log(() -> "abort: " + error);
debug().log(() -> this.getClass().getSimpleName() + " abort: " + error);
pause(); // pause, then signal
signalError(error); // should not be resumed after abort (not checked)
}
@@ -724,10 +724,12 @@ public final void request(long n) {
@Override
public final void cancel() {
pauseReadEvent();
if (debug.on()) debug.log("Read subscription cancelled");
if (Log.channel()) {
Log.logChannel("Read subscription cancelled for channel {0}",
channelDescr());
}
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
}

@@ -748,6 +750,7 @@ final void handleError() {
}

final void signalError(Throwable error) {
if (debug.on()) debug.log("signal read error: " + error);
if (!errorRef.compareAndSet(null, error)) {
return;
}
@@ -808,6 +811,7 @@ final void read() {
}
current.errorRef.compareAndSet(null, error);
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
debugState("leaving read() loop with error: ");
return;
@@ -831,6 +835,7 @@ final void read() {
// anyway.
pauseReadEvent();
current.signalCompletion();
if (debug.on()) debug.log("Stopping read scheduler");
readScheduler.stop();
}
debugState("leaving read() loop after EOF: ");
@@ -850,6 +855,7 @@ final void read() {
// waiting for this event to terminate.
// So resume the read event and return now...
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop after onNext: ");
return;
} else {
@@ -861,6 +867,7 @@ final void read() {
// readable again.
demand.increase(1);
resumeReadEvent();
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no bytes");
return;
}
@@ -879,6 +886,7 @@ final void read() {
// Trying to pause the event here would actually
// introduce a race condition between this loop and
// request(n).
if (errorRef.get() != null) continue;
debugState("leaving read() loop with no demand");
break;
}
@@ -946,6 +954,7 @@ protected final void signalEvent() {

@Override
protected final void signalError(Throwable error) {
if (debug.on()) debug.log("signalError to %s (%s)", sub, error);
sub.signalError(error);
}

0 comments on commit 18fdf05

Please sign in to comment.