Skip to content

Commit 720b446

Browse files
committed
8335181: Incorrect handling of HTTP/2 GOAWAY frames in HttpClient
Reviewed-by: dfuchs
1 parent f132b34 commit 720b446

File tree

12 files changed

+625
-56
lines changed

12 files changed

+625
-56
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/ExchangeImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -58,6 +58,10 @@ abstract class ExchangeImpl<T> {
5858

5959
final Exchange<T> exchange;
6060

61+
// this will be set to true only when the peer explicitly states (through a GOAWAY frame or
62+
// a relevant error code in reset frame) that the corresponding stream (id) wasn't processed
63+
private volatile boolean unprocessedByPeer;
64+
6165
ExchangeImpl(Exchange<T> e) {
6266
// e == null means a http/2 pushed stream
6367
this.exchange = e;
@@ -265,4 +269,13 @@ void upgraded() { }
265269
// Called when server returns non 100 response to
266270
// an Expect-Continue
267271
void expectContinueFailed(int rcode) { }
272+
273+
final boolean isUnprocessedByPeer() {
274+
return this.unprocessedByPeer;
275+
}
276+
277+
// Marks the exchange as unprocessed by the peer
278+
final void markUnprocessedByPeer() {
279+
this.unprocessedByPeer = true;
280+
}
268281
}

src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -47,6 +47,8 @@
4747
import java.util.concurrent.ConcurrentHashMap;
4848
import java.util.concurrent.ConcurrentLinkedQueue;
4949
import java.util.concurrent.Flow;
50+
import java.util.concurrent.atomic.AtomicInteger;
51+
import java.util.concurrent.atomic.AtomicLong;
5052
import java.util.concurrent.locks.Lock;
5153
import java.util.concurrent.locks.ReentrantLock;
5254
import java.util.function.Function;
@@ -358,6 +360,7 @@ private record PushContinuationState(HeaderDecoder pushContDecoder, PushPromiseF
358360
private final String key; // for HttpClientImpl.connections map
359361
private final FramesDecoder framesDecoder;
360362
private final FramesEncoder framesEncoder = new FramesEncoder();
363+
private final AtomicLong lastProcessedStreamInGoAway = new AtomicLong(-1);
361364

362365
/**
363366
* Send Window controller for both connection and stream windows.
@@ -725,7 +728,9 @@ final int maxConcurrentServerInitiatedStreams() {
725728

726729
void close() {
727730
if (markHalfClosedLocal()) {
728-
if (connection.channel().isOpen()) {
731+
// we send a GOAWAY frame only if the remote side hasn't already indicated
732+
// the intention to close the connection by previously sending a GOAWAY of its own
733+
if (connection.channel().isOpen() && !isMarked(closedState, HALF_CLOSED_REMOTE)) {
729734
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
730735
GoAwayFrame f = new GoAwayFrame(0,
731736
ErrorFrame.NO_ERROR,
@@ -1205,13 +1210,46 @@ private void handlePing(PingFrame frame)
12051210
sendUnorderedFrame(frame);
12061211
}
12071212

1208-
private void handleGoAway(GoAwayFrame frame)
1209-
throws IOException
1210-
{
1211-
if (markHalfClosedLRemote()) {
1212-
shutdown(new IOException(
1213-
connection.channel().getLocalAddress()
1214-
+ ": GOAWAY received"));
1213+
private void handleGoAway(final GoAwayFrame frame) {
1214+
final long lastProcessedStream = frame.getLastStream();
1215+
assert lastProcessedStream >= 0 : "unexpected last stream id: "
1216+
+ lastProcessedStream + " in GOAWAY frame";
1217+
1218+
markHalfClosedRemote();
1219+
setFinalStream(); // don't allow any new streams on this connection
1220+
if (debug.on()) {
1221+
debug.log("processing incoming GOAWAY with last processed stream id:%s in frame %s",
1222+
lastProcessedStream, frame);
1223+
}
1224+
// see if this connection has previously received a GOAWAY from the peer and if yes
1225+
// then check if this new last processed stream id is lesser than the previous
1226+
// known last processed stream id. Only update the last processed stream id if the new
1227+
// one is lesser than the previous one.
1228+
long prevLastProcessed = lastProcessedStreamInGoAway.get();
1229+
while (prevLastProcessed == -1 || lastProcessedStream < prevLastProcessed) {
1230+
if (lastProcessedStreamInGoAway.compareAndSet(prevLastProcessed,
1231+
lastProcessedStream)) {
1232+
break;
1233+
}
1234+
prevLastProcessed = lastProcessedStreamInGoAway.get();
1235+
}
1236+
handlePeerUnprocessedStreams(lastProcessedStreamInGoAway.get());
1237+
}
1238+
1239+
private void handlePeerUnprocessedStreams(final long lastProcessedStream) {
1240+
final AtomicInteger numClosed = new AtomicInteger(); // atomic merely to allow usage within lambda
1241+
streams.forEach((id, exchange) -> {
1242+
if (id > lastProcessedStream) {
1243+
// any streams with an stream id higher than the last processed stream
1244+
// can be retried (on a new connection). we close the exchange as unprocessed
1245+
// to facilitate the retrying.
1246+
client2.client().theExecutor().ensureExecutedAsync(exchange::closeAsUnprocessed);
1247+
numClosed.incrementAndGet();
1248+
}
1249+
});
1250+
if (debug.on()) {
1251+
debug.log(numClosed.get() + " stream(s), with id greater than " + lastProcessedStream
1252+
+ ", will be closed as unprocessed");
12151253
}
12161254
}
12171255

@@ -1745,7 +1783,7 @@ private boolean markHalfClosedLocal() {
17451783
return markClosedState(HALF_CLOSED_LOCAL);
17461784
}
17471785

1748-
private boolean markHalfClosedLRemote() {
1786+
private boolean markHalfClosedRemote() {
17491787
return markClosedState(HALF_CLOSED_REMOTE);
17501788
}
17511789

src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -90,7 +90,7 @@ class MultiExchange<T> implements Cancelable {
9090
Exchange<T> exchange; // the current exchange
9191
Exchange<T> previous;
9292
volatile Throwable retryCause;
93-
volatile boolean expiredOnce;
93+
volatile boolean retriedOnce;
9494
volatile HttpResponse<T> response;
9595

9696
// Maximum number of times a request will be retried/redirected
@@ -469,7 +469,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
469469
return exch.ignoreBody().handle((r,t) -> {
470470
previousreq = currentreq;
471471
currentreq = newrequest;
472-
expiredOnce = false;
472+
retriedOnce = false;
473473
setExchange(new Exchange<>(currentreq, this, acc));
474474
return responseAsyncImpl();
475475
}).thenCompose(Function.identity());
@@ -482,7 +482,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
482482
return completedFuture(response);
483483
}
484484
// all exceptions thrown are handled here
485-
CompletableFuture<Response> errorCF = getExceptionalCF(ex);
485+
CompletableFuture<Response> errorCF = getExceptionalCF(ex, exch.exchImpl);
486486
if (errorCF == null) {
487487
return responseAsyncImpl();
488488
} else {
@@ -554,36 +554,39 @@ private Throwable retryCause(Throwable t) {
554554
* Takes a Throwable and returns a suitable CompletableFuture that is
555555
* completed exceptionally, or null.
556556
*/
557-
private CompletableFuture<Response> getExceptionalCF(Throwable t) {
557+
private CompletableFuture<Response> getExceptionalCF(Throwable t, ExchangeImpl<?> exchImpl) {
558558
if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
559559
if (t.getCause() != null) {
560560
t = t.getCause();
561561
}
562562
}
563+
final boolean retryAsUnprocessed = exchImpl != null && exchImpl.isUnprocessedByPeer();
563564
if (cancelled && !requestCancelled() && t instanceof IOException) {
564565
if (!(t instanceof HttpTimeoutException)) {
565566
t = toTimeoutException((IOException)t);
566567
}
567-
} else if (retryOnFailure(t)) {
568+
} else if (retryAsUnprocessed || retryOnFailure(t)) {
568569
Throwable cause = retryCause(t);
569570

570571
if (!(t instanceof ConnectException)) {
571572
// we may need to start a new connection, and if so
572573
// we want to start with a fresh connect timeout again.
573574
if (connectTimeout != null) connectTimeout.reset();
574-
if (!canRetryRequest(currentreq)) {
575-
return failedFuture(cause); // fails with original cause
575+
if (!retryAsUnprocessed && !canRetryRequest(currentreq)) {
576+
// a (peer) processed request which cannot be retried, fail with
577+
// the original cause
578+
return failedFuture(cause);
576579
}
577580
} // ConnectException: retry, but don't reset the connectTimeout.
578581

579582
// allow the retry mechanism to do its work
580583
retryCause = cause;
581-
if (!expiredOnce) {
584+
if (!retriedOnce) {
582585
if (debug.on()) {
583586
debug.log(t.getClass().getSimpleName()
584-
+ " (async): retrying due to: ", t);
587+
+ " (async): retrying " + currentreq + " due to: ", t);
585588
}
586-
expiredOnce = true;
589+
retriedOnce = true;
587590
// The connection was abruptly closed.
588591
// We return null to retry the same request a second time.
589592
// The request filters have already been applied to the
@@ -594,7 +597,7 @@ private CompletableFuture<Response> getExceptionalCF(Throwable t) {
594597
} else {
595598
if (debug.on()) {
596599
debug.log(t.getClass().getSimpleName()
597-
+ " (async): already retried once.", t);
600+
+ " (async): already retried once " + currentreq, t);
598601
}
599602
t = cause;
600603
}

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -641,20 +641,39 @@ void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
641641
stateLock.unlock();
642642
}
643643
try {
644-
int error = frame.getErrorCode();
645-
IOException e = new IOException("Received RST_STREAM: "
646-
+ ErrorFrame.stringForCode(error));
647-
if (errorRef.compareAndSet(null, e)) {
648-
if (subscriber != null) {
649-
subscriber.onError(e);
644+
final int error = frame.getErrorCode();
645+
// A REFUSED_STREAM error code implies that the stream wasn't processed by the
646+
// peer and the client is free to retry the request afresh.
647+
if (error == ErrorFrame.REFUSED_STREAM) {
648+
// Here we arrange for the request to be retried. Note that we don't call
649+
// closeAsUnprocessed() method here because the "closed" state is already set
650+
// to true a few lines above and calling close() from within
651+
// closeAsUnprocessed() will end up being a no-op. We instead do the additional
652+
// bookkeeping here.
653+
markUnprocessedByPeer();
654+
errorRef.compareAndSet(null, new IOException("request not processed by peer"));
655+
if (debug.on()) {
656+
debug.log("request unprocessed by peer (REFUSED_STREAM) " + this.request);
657+
}
658+
} else {
659+
final String reason = ErrorFrame.stringForCode(error);
660+
final IOException failureCause = new IOException("Received RST_STREAM: " + reason);
661+
if (debug.on()) {
662+
debug.log(streamid + " received RST_STREAM with code: " + reason);
663+
}
664+
if (errorRef.compareAndSet(null, failureCause)) {
665+
if (subscriber != null) {
666+
subscriber.onError(failureCause);
667+
}
650668
}
651669
}
652-
completeResponseExceptionally(e);
670+
final Throwable failureCause = errorRef.get();
671+
completeResponseExceptionally(failureCause);
653672
if (!requestBodyCF.isDone()) {
654-
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
673+
requestBodyCF.completeExceptionally(failureCause); // we may be sending the body..
655674
}
656675
if (responseBodyCF != null) {
657-
responseBodyCF.completeExceptionally(errorRef.get());
676+
responseBodyCF.completeExceptionally(failureCause);
658677
}
659678
} finally {
660679
connection.decrementStreamsCount(streamid);
@@ -1663,7 +1682,35 @@ Throwable getCancelCause() {
16631682
}
16641683

16651684
final String dbgString() {
1666-
return connection.dbgString() + "/Stream("+streamid+")";
1685+
final int id = streamid;
1686+
final String sid = id == 0 ? "?" : String.valueOf(id);
1687+
return connection.dbgString() + "/Stream(" + sid + ")";
1688+
}
1689+
1690+
/**
1691+
* An unprocessed exchange is one that hasn't been processed by a peer. The local end of the
1692+
* connection would be notified about such exchanges when it receives a GOAWAY frame with
1693+
* a stream id that tells which exchanges have been unprocessed.
1694+
* This method is called on such unprocessed exchanges and the implementation of this method
1695+
* will arrange for the request, corresponding to this exchange, to be retried afresh on a
1696+
* new connection.
1697+
*/
1698+
void closeAsUnprocessed() {
1699+
try {
1700+
// We arrange for the request to be retried on a new connection as allowed by the RFC-9113
1701+
markUnprocessedByPeer();
1702+
this.errorRef.compareAndSet(null, new IOException("request not processed by peer"));
1703+
if (debug.on()) {
1704+
debug.log("closing " + this.request + " as unprocessed by peer");
1705+
}
1706+
// close the exchange and complete the response CF exceptionally
1707+
close();
1708+
completeResponseExceptionally(this.errorRef.get());
1709+
} finally {
1710+
// decrementStreamsCount isn't really needed but we do it to make sure
1711+
// the log messages, where these counts/states get reported, show the accurate state.
1712+
connection.decrementStreamsCount(streamid);
1713+
}
16671714
}
16681715

16691716
private class HeadersConsumer extends ValidatingHeadersConsumer {

src/java.net.http/share/classes/jdk/internal/net/http/WindowController.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -100,13 +100,16 @@ void removeStream(int streamid) {
100100
controllerLock.lock();
101101
try {
102102
Integer old = streams.remove(streamid);
103-
// Odd stream numbers (client streams) should have been registered.
103+
// A client initiated stream might be closed (as unprocessed, due to a
104+
// GOAWAY received on the connection) even before the stream is
105+
// registered with this WindowController instance (when sending out request headers).
106+
// Thus, for client initiated streams, we don't enforce the presence of the
107+
// stream in the registered "streams" map.
108+
104109
// Even stream numbers (server streams - aka Push Streams) should
105110
// not be registered
106111
final boolean isClientStream = (streamid & 0x1) == 1;
107-
if (old == null && isClientStream) {
108-
throw new InternalError("Expected entry for streamid: " + streamid);
109-
} else if (old != null && !isClientStream) {
112+
if (old != null && !isClientStream) {
110113
throw new InternalError("Unexpected entry for streamid: " + streamid);
111114
}
112115
} finally {

src/java.net.http/share/classes/jdk/internal/net/http/frame/GoAwayFrame.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -57,7 +57,9 @@ int length() {
5757

5858
@Override
5959
public String toString() {
60-
return super.toString() + " Debugdata: " + new String(debugData, UTF_8);
60+
return super.toString()
61+
+ " lastStreamId=" + lastStream
62+
+ ", Debugdata: " + new String(debugData, UTF_8);
6163
}
6264

6365
public int getLastStream() {

0 commit comments

Comments
 (0)