Skip to content

Commit 209939a

Browse files
author
Alexey Bakhtin
committed
8335181: Incorrect handling of HTTP/2 GOAWAY frames in HttpClient
Reviewed-by: andrew Backport-of: 720b44648bcff997278af92746f942b2425298a5
1 parent a5678d3 commit 209939a

File tree

12 files changed

+622
-54
lines changed

12 files changed

+622
-54
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/ExchangeImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -58,6 +58,10 @@ abstract class ExchangeImpl<T> {
5858

5959
final Exchange<T> exchange;
6060

61+
// this will be set to true only when the peer explicitly states (through a GOAWAY frame or
62+
// a relevant error code in reset frame) that the corresponding stream (id) wasn't processed
63+
private volatile boolean unprocessedByPeer;
64+
6165
ExchangeImpl(Exchange<T> e) {
6266
// e == null means a http/2 pushed stream
6367
this.exchange = e;
@@ -264,4 +268,13 @@ void upgraded() { }
264268
// Called when server returns non 100 response to
265269
// an Expect-Continue
266270
void expectContinueFailed(int rcode) { }
271+
272+
final boolean isUnprocessedByPeer() {
273+
return this.unprocessedByPeer;
274+
}
275+
276+
// Marks the exchange as unprocessed by the peer
277+
final void markUnprocessedByPeer() {
278+
this.unprocessedByPeer = true;
279+
}
267280
}

src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.concurrent.ConcurrentLinkedQueue;
5050
import java.util.concurrent.Flow;
5151
import java.util.concurrent.atomic.AtomicInteger;
52+
import java.util.concurrent.atomic.AtomicLong;
5253
import java.util.concurrent.atomic.AtomicReference;
5354
import java.util.concurrent.locks.Lock;
5455
import java.util.concurrent.locks.ReentrantLock;
@@ -396,6 +397,7 @@ private record PushContinuationState(PushPromiseDecoder pushContDecoder, PushPro
396397
private final String key; // for HttpClientImpl.connections map
397398
private final FramesDecoder framesDecoder;
398399
private final FramesEncoder framesEncoder = new FramesEncoder();
400+
private final AtomicLong lastProcessedStreamInGoAway = new AtomicLong(-1);
399401

400402
/**
401403
* Send Window controller for both connection and stream windows.
@@ -802,7 +804,9 @@ final int maxConcurrentServerInitiatedStreams() {
802804

803805
void close() {
804806
if (markHalfClosedLocal()) {
805-
if (connection.channel().isOpen()) {
807+
// we send a GOAWAY frame only if the remote side hasn't already indicated
808+
// the intention to close the connection by previously sending a GOAWAY of its own
809+
if (connection.channel().isOpen() && !isMarked(closedState, HALF_CLOSED_REMOTE)) {
806810
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
807811
GoAwayFrame f = new GoAwayFrame(0,
808812
ErrorFrame.NO_ERROR,
@@ -1354,13 +1358,46 @@ private void handlePing(PingFrame frame)
13541358
sendUnorderedFrame(frame);
13551359
}
13561360

1357-
private void handleGoAway(GoAwayFrame frame)
1358-
throws IOException
1359-
{
1360-
if (markHalfClosedLRemote()) {
1361-
shutdown(new IOException(
1362-
connection.channel().getLocalAddress()
1363-
+ ": GOAWAY received"));
1361+
private void handleGoAway(final GoAwayFrame frame) {
1362+
final long lastProcessedStream = frame.getLastStream();
1363+
assert lastProcessedStream >= 0 : "unexpected last stream id: "
1364+
+ lastProcessedStream + " in GOAWAY frame";
1365+
1366+
markHalfClosedRemote();
1367+
setFinalStream(); // don't allow any new streams on this connection
1368+
if (debug.on()) {
1369+
debug.log("processing incoming GOAWAY with last processed stream id:%s in frame %s",
1370+
lastProcessedStream, frame);
1371+
}
1372+
// see if this connection has previously received a GOAWAY from the peer and if yes
1373+
// then check if this new last processed stream id is lesser than the previous
1374+
// known last processed stream id. Only update the last processed stream id if the new
1375+
// one is lesser than the previous one.
1376+
long prevLastProcessed = lastProcessedStreamInGoAway.get();
1377+
while (prevLastProcessed == -1 || lastProcessedStream < prevLastProcessed) {
1378+
if (lastProcessedStreamInGoAway.compareAndSet(prevLastProcessed,
1379+
lastProcessedStream)) {
1380+
break;
1381+
}
1382+
prevLastProcessed = lastProcessedStreamInGoAway.get();
1383+
}
1384+
handlePeerUnprocessedStreams(lastProcessedStreamInGoAway.get());
1385+
}
1386+
1387+
private void handlePeerUnprocessedStreams(final long lastProcessedStream) {
1388+
final AtomicInteger numClosed = new AtomicInteger(); // atomic merely to allow usage within lambda
1389+
streams.forEach((id, exchange) -> {
1390+
if (id > lastProcessedStream) {
1391+
// any streams with an stream id higher than the last processed stream
1392+
// can be retried (on a new connection). we close the exchange as unprocessed
1393+
// to facilitate the retrying.
1394+
client2.client().theExecutor().ensureExecutedAsync(exchange::closeAsUnprocessed);
1395+
numClosed.incrementAndGet();
1396+
}
1397+
});
1398+
if (debug.on()) {
1399+
debug.log(numClosed.get() + " stream(s), with id greater than " + lastProcessedStream
1400+
+ ", will be closed as unprocessed");
13641401
}
13651402
}
13661403

@@ -1911,7 +1948,7 @@ private boolean markHalfClosedLocal() {
19111948
return markClosedState(HALF_CLOSED_LOCAL);
19121949
}
19131950

1914-
private boolean markHalfClosedLRemote() {
1951+
private boolean markHalfClosedRemote() {
19151952
return markClosedState(HALF_CLOSED_REMOTE);
19161953
}
19171954

src/java.net.http/share/classes/jdk/internal/net/http/MultiExchange.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -90,7 +90,7 @@ class MultiExchange<T> implements Cancelable {
9090
Exchange<T> exchange; // the current exchange
9191
Exchange<T> previous;
9292
volatile Throwable retryCause;
93-
volatile boolean expiredOnce;
93+
volatile boolean retriedOnce;
9494
volatile HttpResponse<T> response;
9595

9696
// Maximum number of times a request will be retried/redirected
@@ -459,7 +459,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
459459
return exch.ignoreBody().handle((r,t) -> {
460460
previousreq = currentreq;
461461
currentreq = newrequest;
462-
expiredOnce = false;
462+
retriedOnce = false;
463463
setExchange(new Exchange<>(currentreq, this, acc));
464464
return responseAsyncImpl();
465465
}).thenCompose(Function.identity());
@@ -472,7 +472,7 @@ private CompletableFuture<Response> responseAsyncImpl() {
472472
return completedFuture(response);
473473
}
474474
// all exceptions thrown are handled here
475-
CompletableFuture<Response> errorCF = getExceptionalCF(ex);
475+
CompletableFuture<Response> errorCF = getExceptionalCF(ex, exch.exchImpl);
476476
if (errorCF == null) {
477477
return responseAsyncImpl();
478478
} else {
@@ -544,34 +544,38 @@ private Throwable retryCause(Throwable t) {
544544
* Takes a Throwable and returns a suitable CompletableFuture that is
545545
* completed exceptionally, or null.
546546
*/
547-
private CompletableFuture<Response> getExceptionalCF(Throwable t) {
547+
private CompletableFuture<Response> getExceptionalCF(Throwable t, ExchangeImpl<?> exchImpl) {
548548
if ((t instanceof CompletionException) || (t instanceof ExecutionException)) {
549549
if (t.getCause() != null) {
550550
t = t.getCause();
551551
}
552552
}
553+
final boolean retryAsUnprocessed = exchImpl != null && exchImpl.isUnprocessedByPeer();
553554
if (cancelled && !requestCancelled() && t instanceof IOException) {
554555
if (!(t instanceof HttpTimeoutException)) {
555556
t = toTimeoutException((IOException)t);
556557
}
557-
} else if (retryOnFailure(t)) {
558+
} else if (retryAsUnprocessed || retryOnFailure(t)) {
558559
Throwable cause = retryCause(t);
559560

560561
if (!(t instanceof ConnectException)) {
561562
// we may need to start a new connection, and if so
562563
// we want to start with a fresh connect timeout again.
563564
if (connectTimeout != null) connectTimeout.reset();
564-
if (!canRetryRequest(currentreq)) {
565-
return failedFuture(cause); // fails with original cause
565+
if (!retryAsUnprocessed && !canRetryRequest(currentreq)) {
566+
// a (peer) processed request which cannot be retried, fail with
567+
// the original cause
568+
return failedFuture(cause);
566569
}
567570
} // ConnectException: retry, but don't reset the connectTimeout.
568571

569572
// allow the retry mechanism to do its work
570573
retryCause = cause;
571-
if (!expiredOnce) {
574+
if (!retriedOnce) {
572575
if (debug.on())
573-
debug.log(t.getClass().getSimpleName() + " (async): retrying...", t);
574-
expiredOnce = true;
576+
debug.log(t.getClass().getSimpleName()
577+
+ " (async): retrying " + currentreq + " due to: ", t);
578+
retriedOnce = true;
575579
// The connection was abruptly closed.
576580
// We return null to retry the same request a second time.
577581
// The request filters have already been applied to the
@@ -582,7 +586,7 @@ private CompletableFuture<Response> getExceptionalCF(Throwable t) {
582586
} else {
583587
if (debug.on()) {
584588
debug.log(t.getClass().getSimpleName()
585-
+ " (async): already retried once.", t);
589+
+ " (async): already retried once " + currentreq, t);
586590
}
587591
t = cause;
588592
}

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -673,20 +673,39 @@ void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
673673
stateLock.unlock();
674674
}
675675
try {
676-
int error = frame.getErrorCode();
677-
IOException e = new IOException("Received RST_STREAM: "
678-
+ ErrorFrame.stringForCode(error));
679-
if (errorRef.compareAndSet(null, e)) {
680-
if (subscriber != null) {
681-
subscriber.onError(e);
676+
final int error = frame.getErrorCode();
677+
// A REFUSED_STREAM error code implies that the stream wasn't processed by the
678+
// peer and the client is free to retry the request afresh.
679+
if (error == ErrorFrame.REFUSED_STREAM) {
680+
// Here we arrange for the request to be retried. Note that we don't call
681+
// closeAsUnprocessed() method here because the "closed" state is already set
682+
// to true a few lines above and calling close() from within
683+
// closeAsUnprocessed() will end up being a no-op. We instead do the additional
684+
// bookkeeping here.
685+
markUnprocessedByPeer();
686+
errorRef.compareAndSet(null, new IOException("request not processed by peer"));
687+
if (debug.on()) {
688+
debug.log("request unprocessed by peer (REFUSED_STREAM) " + this.request);
689+
}
690+
} else {
691+
final String reason = ErrorFrame.stringForCode(error);
692+
final IOException failureCause = new IOException("Received RST_STREAM: " + reason);
693+
if (debug.on()) {
694+
debug.log(streamid + " received RST_STREAM with code: " + reason);
695+
}
696+
if (errorRef.compareAndSet(null, failureCause)) {
697+
if (subscriber != null) {
698+
subscriber.onError(failureCause);
699+
}
682700
}
683701
}
684-
completeResponseExceptionally(e);
702+
final Throwable failureCause = errorRef.get();
703+
completeResponseExceptionally(failureCause);
685704
if (!requestBodyCF.isDone()) {
686-
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
705+
requestBodyCF.completeExceptionally(failureCause); // we may be sending the body..
687706
}
688707
if (responseBodyCF != null) {
689-
responseBodyCF.completeExceptionally(errorRef.get());
708+
responseBodyCF.completeExceptionally(failureCause);
690709
}
691710
} finally {
692711
connection.decrementStreamsCount(streamid);
@@ -1699,7 +1718,35 @@ Throwable getCancelCause() {
16991718
}
17001719

17011720
final String dbgString() {
1702-
return connection.dbgString() + "/Stream("+streamid+")";
1721+
final int id = streamid;
1722+
final String sid = id == 0 ? "?" : String.valueOf(id);
1723+
return connection.dbgString() + "/Stream(" + sid + ")";
1724+
}
1725+
1726+
/**
1727+
* An unprocessed exchange is one that hasn't been processed by a peer. The local end of the
1728+
* connection would be notified about such exchanges when it receives a GOAWAY frame with
1729+
* a stream id that tells which exchanges have been unprocessed.
1730+
* This method is called on such unprocessed exchanges and the implementation of this method
1731+
* will arrange for the request, corresponding to this exchange, to be retried afresh on a
1732+
* new connection.
1733+
*/
1734+
void closeAsUnprocessed() {
1735+
try {
1736+
// We arrange for the request to be retried on a new connection as allowed by the RFC-9113
1737+
markUnprocessedByPeer();
1738+
this.errorRef.compareAndSet(null, new IOException("request not processed by peer"));
1739+
if (debug.on()) {
1740+
debug.log("closing " + this.request + " as unprocessed by peer");
1741+
}
1742+
// close the exchange and complete the response CF exceptionally
1743+
close();
1744+
completeResponseExceptionally(this.errorRef.get());
1745+
} finally {
1746+
// decrementStreamsCount isn't really needed but we do it to make sure
1747+
// the log messages, where these counts/states get reported, show the accurate state.
1748+
connection.decrementStreamsCount(streamid);
1749+
}
17031750
}
17041751

17051752
private class HeadersConsumer extends ValidatingHeadersConsumer implements DecodingCallback {

src/java.net.http/share/classes/jdk/internal/net/http/WindowController.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -100,13 +100,16 @@ void removeStream(int streamid) {
100100
controllerLock.lock();
101101
try {
102102
Integer old = streams.remove(streamid);
103-
// Odd stream numbers (client streams) should have been registered.
103+
// A client initiated stream might be closed (as unprocessed, due to a
104+
// GOAWAY received on the connection) even before the stream is
105+
// registered with this WindowController instance (when sending out request headers).
106+
// Thus, for client initiated streams, we don't enforce the presence of the
107+
// stream in the registered "streams" map.
108+
104109
// Even stream numbers (server streams - aka Push Streams) should
105110
// not be registered
106111
final boolean isClientStream = (streamid & 0x1) == 1;
107-
if (old == null && isClientStream) {
108-
throw new InternalError("Expected entry for streamid: " + streamid);
109-
} else if (old != null && !isClientStream) {
112+
if (old != null && !isClientStream) {
110113
throw new InternalError("Unexpected entry for streamid: " + streamid);
111114
}
112115
} finally {

src/java.net.http/share/classes/jdk/internal/net/http/frame/GoAwayFrame.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -57,7 +57,9 @@ int length() {
5757

5858
@Override
5959
public String toString() {
60-
return super.toString() + " Debugdata: " + new String(debugData, UTF_8);
60+
return super.toString()
61+
+ " lastStreamId=" + lastStream
62+
+ ", Debugdata: " + new String(debugData, UTF_8);
6163
}
6264

6365
public int getLastStream() {

0 commit comments

Comments
 (0)