1
1
/*
2
- * Copyright (c) 2015, 2024 , Oracle and/or its affiliates. All rights reserved.
2
+ * Copyright (c) 2015, 2025 , Oracle and/or its affiliates. All rights reserved.
3
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
4
*
5
5
* This code is free software; you can redistribute it and/or modify it
33
33
import java .util .concurrent .Executor ;
34
34
import java .util .concurrent .TimeUnit ;
35
35
import java .util .concurrent .atomic .AtomicInteger ;
36
+ import java .util .concurrent .atomic .AtomicReference ;
36
37
import java .util .function .Function ;
37
38
import java .net .http .HttpClient ;
38
39
import java .net .http .HttpResponse ;
39
40
import java .net .http .HttpTimeoutException ;
40
41
42
+ import jdk .internal .net .http .HttpClientImpl .DelegatingExecutor ;
41
43
import jdk .internal .net .http .common .Logger ;
42
44
import jdk .internal .net .http .common .MinimalFuture ;
43
45
import jdk .internal .net .http .common .Utils ;
@@ -63,9 +65,9 @@ final class Exchange<T> {
63
65
64
66
// used to record possible cancellation raised before the exchImpl
65
67
// has been established.
66
- private volatile IOException failed ;
68
+ private final AtomicReference < IOException > failed = new AtomicReference <>() ;
67
69
final MultiExchange <T > multi ;
68
- final Executor parentExecutor ;
70
+ final DelegatingExecutor parentExecutor ;
69
71
volatile boolean upgrading ; // to HTTP/2
70
72
volatile boolean upgraded ; // to HTTP/2
71
73
final PushGroup <T > pushGroup ;
@@ -91,7 +93,7 @@ PushGroup<T> getPushGroup() {
91
93
return pushGroup ;
92
94
}
93
95
94
- Executor executor () {
96
+ DelegatingExecutor executor () {
95
97
return parentExecutor ;
96
98
}
97
99
@@ -236,26 +238,26 @@ public void cancel(IOException cause) {
236
238
// If the impl is non null, propagate the exception right away.
237
239
// Otherwise record it so that it can be propagated once the
238
240
// exchange impl has been established.
239
- ExchangeImpl <?> impl = exchImpl ;
241
+ ExchangeImpl <?> impl ;
242
+ IOException closeReason = null ;
243
+ synchronized (this ) {
244
+ impl = exchImpl ;
245
+ if (impl == null ) {
246
+ // no impl yet. record the exception
247
+ failed .compareAndSet (null , cause );
248
+ }
249
+ }
240
250
if (impl != null ) {
241
251
// propagate the exception to the impl
242
252
if (debug .on ()) debug .log ("Cancelling exchImpl: %s" , exchImpl );
243
253
impl .cancel (cause );
244
254
} else {
245
- // no impl yet. record the exception
246
- IOException failed = this .failed ;
247
- if (failed == null ) {
248
- synchronized (this ) {
249
- failed = this .failed ;
250
- if (failed == null ) {
251
- failed = this .failed = cause ;
252
- }
253
- }
254
- }
255
-
256
- // abort/close the connection if setting up the exchange. This can
255
+ // abort/close the connection if setting up the exchange. This can
257
256
// be important when setting up HTTP/2
258
- connectionAborter .closeConnection (failed );
257
+ closeReason = failed .get ();
258
+ if (closeReason != null ) {
259
+ connectionAborter .closeConnection (closeReason );
260
+ }
259
261
260
262
// now call checkCancelled to recheck the impl.
261
263
// if the failed state is set and the impl is not null, reset
@@ -274,9 +276,9 @@ private void checkCancelled() {
274
276
ExchangeImpl <?> impl = null ;
275
277
IOException cause = null ;
276
278
CompletableFuture <? extends ExchangeImpl <T >> cf = null ;
277
- if (failed != null ) {
279
+ if (failed . get () != null ) {
278
280
synchronized (this ) {
279
- cause = failed ;
281
+ cause = failed . get () ;
280
282
impl = exchImpl ;
281
283
cf = exchangeCF ;
282
284
}
@@ -286,7 +288,11 @@ private void checkCancelled() {
286
288
// The exception is raised by propagating it to the impl.
287
289
if (debug .on ()) debug .log ("Cancelling exchImpl: %s" , impl );
288
290
impl .cancel (cause );
289
- failed = null ;
291
+ synchronized (this ) {
292
+ if (impl == exchImpl ) {
293
+ failed .compareAndSet (cause , null );
294
+ }
295
+ }
290
296
} else {
291
297
Log .logTrace ("Exchange: request [{0}/timeout={1}ms] no impl is set."
292
298
+ "\n \t Can''t cancel yet with {2}" ,
@@ -313,7 +319,7 @@ <U> CompletableFuture<U> checkCancelled(CompletableFuture<U> cf, HttpConnection
313
319
if (t == null ) t = new IOException ("Request cancelled" );
314
320
if (debug .on ()) debug .log ("exchange cancelled during connect: " + t );
315
321
try {
316
- connection .close ();
322
+ connection .close (t );
317
323
} catch (Throwable x ) {
318
324
if (debug .on ()) debug .log ("Failed to close connection" , x );
319
325
}
@@ -330,8 +336,13 @@ public void h2Upgrade() {
330
336
request .setH2Upgrade (this );
331
337
}
332
338
339
+ synchronized IOException failed (IOException io ) {
340
+ IOException cause = failed .compareAndExchange (null , io );
341
+ return cause == null ? io : cause ;
342
+ }
343
+
333
344
synchronized IOException getCancelCause () {
334
- return failed ;
345
+ return failed . get () ;
335
346
}
336
347
337
348
// get/set the exchange impl, solving race condition issues with
@@ -409,6 +420,11 @@ private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t,
409
420
}
410
421
}
411
422
423
+ private CompletableFuture <Response > startSendingBody (DelegatingExecutor executor ) {
424
+ return exchImpl .sendBodyAsync ()
425
+ .thenCompose (exIm -> exIm .getResponseAsync (executor ));
426
+ }
427
+
412
428
// After sending the request headers, if no ProxyAuthorizationRequired
413
429
// was raised and the expectContinue flag is on, we need to wait
414
430
// for the 100-Continue response
@@ -430,9 +446,7 @@ private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
430
446
if (debug .on ())
431
447
debug .log ("Setting ExpectTimeoutRaised and sending request body" );
432
448
exchImpl .setExpectTimeoutRaised ();
433
- CompletableFuture <Response > cf =
434
- exchImpl .sendBodyAsync ()
435
- .thenCompose (exIm -> exIm .getResponseAsync (parentExecutor ));
449
+ CompletableFuture <Response > cf = startSendingBody (parentExecutor );
436
450
cf = wrapForUpgrade (cf );
437
451
cf = wrapForLog (cf );
438
452
return cf ;
@@ -444,9 +458,7 @@ private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
444
458
nonFinalResponses .incrementAndGet ();
445
459
Log .logTrace ("Received 100-Continue: sending body" );
446
460
if (debug .on ()) debug .log ("Received 100-Continue for %s" , r1 );
447
- CompletableFuture <Response > cf =
448
- exchImpl .sendBodyAsync ()
449
- .thenCompose (exIm -> exIm .getResponseAsync (parentExecutor ));
461
+ CompletableFuture <Response > cf = startSendingBody (parentExecutor );
450
462
cf = wrapForUpgrade (cf );
451
463
cf = wrapForLog (cf );
452
464
return cf ;
@@ -471,8 +483,7 @@ private CompletableFuture<Response> expectContinue(ExchangeImpl<T> ex) {
471
483
private CompletableFuture <Response > sendRequestBody (ExchangeImpl <T > ex ) {
472
484
assert !request .expectContinue ();
473
485
if (debug .on ()) debug .log ("sendRequestBody" );
474
- CompletableFuture <Response > cf = ex .sendBodyAsync ()
475
- .thenCompose (exIm -> exIm .getResponseAsync (parentExecutor ));
486
+ CompletableFuture <Response > cf = startSendingBody (parentExecutor );
476
487
cf = wrapForUpgrade (cf );
477
488
// after 101 is handled we check for other 1xx responses
478
489
cf = cf .thenCompose (this ::ignore1xxResponse );
@@ -669,7 +680,7 @@ HttpResponse.BodySubscriber<T> ignoreBody(HttpResponse.ResponseInfo hdrs) {
669
680
// Either way, we need to relay it to s.
670
681
synchronized (this ) {
671
682
exchImpl = s ;
672
- t = failed ;
683
+ t = failed . get () ;
673
684
}
674
685
// Check whether the HTTP/1.1 was cancelled.
675
686
if (t == null ) t = e .getCancelCause ();
0 commit comments