1
1
/*
2
- * Copyright (c) 2018, 2023 , Oracle and/or its affiliates. All rights reserved.
2
+ * Copyright (c) 2018, 2025 , Oracle and/or its affiliates. All rights reserved.
3
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
4
*
5
5
* This code is free software; you can redistribute it and/or modify it
@@ -209,8 +209,11 @@ public HttpClient.Version version() {
209
209
if (obp .isPresent ()) {
210
210
ConsumingSubscriber subscriber = new ConsumingSubscriber ();
211
211
obp .get ().subscribe (subscriber );
212
+ // wait for our subscriber to be completed and get the
213
+ // list of ByteBuffers it received.
214
+ var buffers = subscriber .getBuffers ().join ();
212
215
if (responseBodyBytes == ECHO_SENTINAL ) {
213
- responseBody = subscriber . buffers ;
216
+ responseBody = buffers ;
214
217
}
215
218
}
216
219
@@ -246,6 +249,13 @@ public HttpClient.Version version() {
246
249
*/
247
250
private static class ConsumingSubscriber implements Flow .Subscriber <ByteBuffer > {
248
251
final List <ByteBuffer > buffers = Collections .synchronizedList (new ArrayList <>());
252
+ // A CompletableFuture that will be completed with a list of ByteBuffers that the
253
+ // ConsumingSubscriber has consumed.
254
+ final CompletableFuture <List <ByteBuffer >> consumed = new CompletableFuture <>();
255
+
256
+ public final CompletableFuture <List <ByteBuffer >> getBuffers () {
257
+ return consumed ;
258
+ }
249
259
250
260
@ Override
251
261
public void onSubscribe (Flow .Subscription subscription ) {
@@ -256,9 +266,9 @@ public void onSubscribe(Flow.Subscription subscription) {
256
266
buffers .add (item .duplicate ());
257
267
}
258
268
259
- @ Override public void onError (Throwable throwable ) { assert false : "Unexpected" ; }
269
+ @ Override public void onError (Throwable throwable ) { consumed . completeExceptionally ( throwable ) ; }
260
270
261
- @ Override public void onComplete () { /* do nothing */ }
271
+ @ Override public void onComplete () { consumed . complete ( buffers . stream (). toList ()); }
262
272
}
263
273
264
274
@ Override
0 commit comments