Skip to content

Commit bd6152f

Browse files
committed
8343855: HTTP/2 ConnectionWindowUpdateSender may miss some unprocessed DataFrames from closed streams
Reviewed-by: jpai
1 parent c3776db commit bd6152f

File tree

5 files changed

+180
-40
lines changed

5 files changed

+180
-40
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

+59-14
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ class Stream<T> extends ExchangeImpl<T> {
160160
// send lock: prevent sending DataFrames after reset occurred.
161161
private final Lock sendLock = new ReentrantLock();
162162
private final Lock stateLock = new ReentrantLock();
163+
// inputQ lock: methods that take from the inputQ
164+
// must not run concurrently.
165+
private final Lock inputQLock = new ReentrantLock();
166+
163167
/**
164168
* A reference to this Stream's connection Send Window controller. The
165169
* stream MUST acquire the appropriate amount of Send Window before
@@ -183,6 +187,8 @@ HttpConnection connection() {
183187
private void schedule() {
184188
boolean onCompleteCalled = false;
185189
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
190+
// prevents drainInputQueue() from running concurrently
191+
inputQLock.lock();
186192
try {
187193
if (subscriber == null) {
188194
// pendingResponseSubscriber will be null until response headers have been received and
@@ -199,7 +205,7 @@ private void schedule() {
199205
Http2Frame frame = inputQ.peek();
200206
if (frame instanceof ResetFrame rf) {
201207
inputQ.remove();
202-
if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) {
208+
if (endStreamReceived() && rf.getErrorCode() == ResetFrame.NO_ERROR) {
203209
// If END_STREAM is already received, complete the requestBodyCF successfully
204210
// and stop sending any request data.
205211
requestBodyCF.complete(null);
@@ -208,7 +214,7 @@ private void schedule() {
208214
}
209215
return;
210216
}
211-
DataFrame df = (DataFrame)frame;
217+
DataFrame df = (DataFrame) frame;
212218
boolean finished = df.getFlag(DataFrame.END_STREAM);
213219

214220
List<ByteBuffer> buffers = df.getData();
@@ -256,6 +262,7 @@ private void schedule() {
256262
} catch (Throwable throwable) {
257263
errorRef.compareAndSet(null, throwable);
258264
} finally {
265+
inputQLock.unlock();
259266
if (sched.isStopped()) drainInputQueue();
260267
}
261268

@@ -274,26 +281,36 @@ private void schedule() {
274281
} catch (Throwable x) {
275282
Log.logError("Subscriber::onError threw exception: {0}", t);
276283
} finally {
284+
// cancelImpl will eventually call drainInputQueue();
277285
cancelImpl(t);
278-
drainInputQueue();
279286
}
280287
}
281288
}
282289

283-
// must only be called from the scheduler schedule() loop.
284-
// ensure that all received data frames are accounted for
290+
// Called from the scheduler schedule() loop,
291+
// or after resetting the stream.
292+
// Ensures that all received data frames are accounted for
285293
// in the connection window flow control if the scheduler
286294
// is stopped before all the data is consumed.
295+
// The inputQLock is used to prevent concurrently taking
296+
// from the queue.
287297
private void drainInputQueue() {
288298
Http2Frame frame;
289-
while ((frame = inputQ.poll()) != null) {
290-
if (frame instanceof DataFrame df) {
291-
// Data frames that have been added to the inputQ
292-
// must be released using releaseUnconsumed() to
293-
// account for the amount of unprocessed bytes
294-
// tracked by the connection.windowUpdater.
295-
connection.releaseUnconsumed(df);
299+
// will wait until schedule() has finished taking
300+
// from the queue, if needed.
301+
inputQLock.lock();
302+
try {
303+
while ((frame = inputQ.poll()) != null) {
304+
if (frame instanceof DataFrame df) {
305+
// Data frames that have been added to the inputQ
306+
// must be released using releaseUnconsumed() to
307+
// account for the amount of unprocessed bytes
308+
// tracked by the connection.windowUpdater.
309+
connection.releaseUnconsumed(df);
310+
}
296311
}
312+
} finally {
313+
inputQLock.unlock();
297314
}
298315
}
299316

@@ -405,12 +422,38 @@ private void receiveDataFrame(DataFrame df) {
405422
return;
406423
}
407424
}
408-
inputQ.add(df);
425+
pushDataFrame(len, df);
409426
} finally {
410427
sched.runOrSchedule();
411428
}
412429
}
413430

431+
// Ensures that no data frame is pushed on the inputQ
432+
// after the stream is closed.
433+
// Changes to the `closed` boolean are guarded by the
434+
// stateLock. Contention should be low as only one
435+
// thread at a time adds to the inputQ, and
436+
// we can only contend when closing the stream.
437+
// Note that this method can run concurrently with
438+
// methods holding the inputQLock: that is OK.
439+
// The inputQLock is there to ensure that methods
440+
// taking from the queue are not running concurrently
441+
// with each others, but concurrently adding at the
442+
// end of the queue while peeking/polling at the head
443+
// is OK.
444+
private void pushDataFrame(int len, DataFrame df) {
445+
boolean closed = false;
446+
stateLock.lock();
447+
try {
448+
if (!(closed = this.closed)) {
449+
inputQ.add(df);
450+
}
451+
} finally {
452+
stateLock.unlock();
453+
}
454+
if (closed && len > 0) connection.releaseUnconsumed(df);
455+
}
456+
414457
/** Handles a RESET frame. RESET is always handled inline in the queue. */
415458
private void receiveResetFrame(ResetFrame frame) {
416459
inputQ.add(frame);
@@ -1547,6 +1590,8 @@ void cancelImpl(final Throwable e, final int resetFrameErrCode) {
15471590
}
15481591
} catch (Throwable ex) {
15491592
Log.logError(ex);
1593+
} finally {
1594+
drainInputQueue();
15501595
}
15511596
}
15521597

@@ -1770,7 +1815,7 @@ String dbgString() {
17701815
@Override
17711816
protected boolean windowSizeExceeded(long received) {
17721817
onProtocolError(new ProtocolException("stream %s flow control window exceeded"
1773-
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
1818+
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
17741819
return true;
17751820
}
17761821
}

test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,11 @@ void test(String uri) throws Exception {
171171
var response = responses.get(keys[i]);
172172
String ckey = response.headers().firstValue("X-Connection-Key").get();
173173
if (label == null) label = ckey;
174-
assertEquals(ckey, label, "Unexpected key for " + query);
174+
if (i < max - 1) {
175+
// the connection window might be exceeded at i == max - 2, which
176+
// means that the last request could go on a new connection.
177+
assertEquals(ckey, label, "Unexpected key for " + query);
178+
}
175179
int wait = uri.startsWith("https://") ? 500 : 250;
176180
try (InputStream is = response.body()) {
177181
Thread.sleep(Utils.adjustTimeout(wait));

test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java

+56-16
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
/*
2525
* @test
26-
* @bug 8342075
26+
* @bug 8342075 8343855
2727
* @library /test/lib /test/jdk/java/net/httpclient/lib
2828
* @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext
2929
* @run testng/othervm -Djdk.internal.httpclient.debug=true
@@ -40,7 +40,6 @@
4040
import java.net.http.HttpClient;
4141
import java.net.http.HttpHeaders;
4242
import java.net.http.HttpRequest;
43-
import java.net.http.HttpRequest.BodyPublishers;
4443
import java.net.http.HttpResponse;
4544
import java.net.http.HttpResponse.BodyHandlers;
4645
import java.nio.charset.StandardCharsets;
@@ -53,6 +52,7 @@
5352
import javax.net.ssl.SSLContext;
5453
import javax.net.ssl.SSLSession;
5554

55+
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpHeadOrGetHandler;
5656
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer;
5757
import jdk.httpclient.test.lib.http2.BodyOutputStream;
5858
import jdk.httpclient.test.lib.http2.Http2Handler;
@@ -69,6 +69,7 @@
6969
import org.testng.annotations.DataProvider;
7070
import org.testng.annotations.Test;
7171

72+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
7273
import static org.testng.Assert.assertEquals;
7374
import static org.testng.Assert.fail;
7475

@@ -92,6 +93,19 @@ public Object[][] variants() {
9293
};
9394
}
9495

96+
static void sleep(long wait) throws InterruptedException {
97+
if (wait <= 0) return;
98+
long remaining = Utils.adjustTimeout(wait);
99+
long start = System.nanoTime();
100+
while (remaining > 0) {
101+
Thread.sleep(remaining);
102+
long end = System.nanoTime();
103+
remaining = remaining - NANOSECONDS.toMillis(end - start);
104+
}
105+
System.out.printf("Waited %s ms%n",
106+
NANOSECONDS.toMillis(System.nanoTime() - start));
107+
}
108+
95109

96110
@Test(dataProvider = "variants")
97111
void test(String uri,
@@ -115,7 +129,7 @@ void test(String uri,
115129
CompletableFuture<String> sent = new CompletableFuture<>();
116130
responseSent.put(query, sent);
117131
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
118-
.POST(BodyPublishers.ofString("Hello there!"))
132+
.GET()
119133
.build();
120134
System.out.println("\nSending request:" + uriWithQuery);
121135
final HttpClient cc = client;
@@ -130,9 +144,9 @@ void test(String uri,
130144
// we have to pull to get the exception, but slow enough
131145
// so that DataFrames are buffered up to the point that
132146
// the window is exceeded...
133-
int wait = uri.startsWith("https://") ? 500 : 350;
147+
long wait = uri.startsWith("https://") ? 800 : 350;
134148
try (InputStream is = response.body()) {
135-
Thread.sleep(Utils.adjustTimeout(wait));
149+
sleep(wait);
136150
is.readAllBytes();
137151
}
138152
// we could fail here if we haven't waited long enough
@@ -174,7 +188,7 @@ void testAsync(String uri,
174188
CompletableFuture<String> sent = new CompletableFuture<>();
175189
responseSent.put(query, sent);
176190
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
177-
.POST(BodyPublishers.ofString("Hello there!"))
191+
.GET()
178192
.build();
179193
System.out.println("\nSending request:" + uriWithQuery);
180194
final HttpClient cc = client;
@@ -188,9 +202,9 @@ void testAsync(String uri,
188202
assertEquals(key, label, "Unexpected key for " + query);
189203
}
190204
sent.join();
191-
int wait = uri.startsWith("https://") ? 600 : 300;
205+
long wait = uri.startsWith("https://") ? 800 : 350;
192206
try (InputStream is = response.body()) {
193-
Thread.sleep(Utils.adjustTimeout(wait));
207+
sleep(wait);
194208
is.readAllBytes();
195209
}
196210
// we could fail here if we haven't waited long enough
@@ -252,7 +266,9 @@ public void setup() throws Exception {
252266
var https2TestServer = new Http2TestServer("localhost", true, sslContext);
253267
https2TestServer.addHandler(new Http2TestHandler(), "/https2/");
254268
this.https2TestServer = HttpTestServer.of(https2TestServer);
269+
this.https2TestServer.addHandler(new HttpHeadOrGetHandler(), "/https2/head/");
255270
https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x";
271+
String h2Head = "https://" + this.https2TestServer.serverAuthority() + "/https2/head/z";
256272

257273
// Override the default exchange supplier with a custom one to enable
258274
// particular test scenarios
@@ -261,6 +277,13 @@ public void setup() throws Exception {
261277

262278
this.http2TestServer.start();
263279
this.https2TestServer.start();
280+
281+
// warmup to eliminate delay due to SSL class loading and initialization.
282+
try (var client = HttpClient.newBuilder().sslContext(sslContext).build()) {
283+
var request = HttpRequest.newBuilder(URI.create(h2Head)).HEAD().build();
284+
var resp = client.send(request, BodyHandlers.discarding());
285+
assertEquals(resp.statusCode(), 200);
286+
}
264287
}
265288

266289
@AfterTest
@@ -279,11 +302,19 @@ public void handle(Http2TestExchange t) throws IOException {
279302
OutputStream os = t.getResponseBody()) {
280303

281304
byte[] bytes = is.readAllBytes();
282-
System.out.println("Server " + t.getLocalAddress() + " received:\n"
283-
+ t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
305+
if (bytes.length != 0) {
306+
System.out.println("Server " + t.getLocalAddress() + " received:\n"
307+
+ t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
308+
} else {
309+
System.out.println("No request body for " + t.getRequestMethod());
310+
}
311+
284312
t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey());
285313

286-
if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8);
314+
if (bytes.length == 0) {
315+
bytes = "no request body!"
316+
.repeat(100).getBytes(StandardCharsets.UTF_8);
317+
}
287318
int window = Integer.getInteger("jdk.httpclient.windowsize", 2 * 16 * 1024);
288319
final int maxChunkSize;
289320
if (t instanceof FCHttp2TestExchange fct) {
@@ -307,13 +338,22 @@ public void handle(Http2TestExchange t) throws IOException {
307338
// ignore and continue...
308339
}
309340
}
310-
((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length);
341+
try {
342+
((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length);
343+
} catch (IOException x) {
344+
if (t instanceof FCHttp2TestExchange fct) {
345+
fct.conn.updateConnectionWindow(resp.length);
346+
}
347+
}
348+
}
349+
} finally {
350+
if (t instanceof FCHttp2TestExchange fct) {
351+
fct.responseSent(query);
352+
} else {
353+
fail("Exchange is not %s but %s"
354+
.formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName()));
311355
}
312356
}
313-
if (t instanceof FCHttp2TestExchange fct) {
314-
fct.responseSent(query);
315-
} else fail("Exchange is not %s but %s"
316-
.formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName()));
317357
}
318358
}
319359

0 commit comments

Comments
 (0)