Skip to content

Commit 24c7fe3

Browse files
committed
8268294: Reusing HttpClient in a WebSocket.Listener hangs.
Backport-of: 2d088fa91d18252a801db3b84ff87e261d63ebd4
1 parent b2f4dff commit 24c7fe3

File tree

12 files changed

+1066
-12
lines changed

12 files changed

+1066
-12
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/HttpClientFacade.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
* An HttpClientFacade is a simple class that wraps an HttpClient implementation
5050
* and delegates everything to its implementation delegate.
5151
*/
52-
final class HttpClientFacade extends HttpClient implements Trackable {
52+
public final class HttpClientFacade extends HttpClient implements Trackable {
5353

5454
final HttpClientImpl impl;
5555

@@ -110,6 +110,10 @@ public Optional<Executor> executor() {
110110
return impl.executor();
111111
}
112112

113+
public Executor theExecutor() {
114+
return impl.theExecutor();
115+
}
116+
113117
@Override
114118
public <T> HttpResponse<T>
115119
send(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler)

src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageDecoder.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,16 @@ class MessageDecoder implements Frame.Consumer {
6060
private long payloadLen;
6161
private long unconsumedPayloadLen;
6262
private ByteBuffer binaryData;
63+
private final boolean server;
64+
private int maskingKey;
6365

6466
MessageDecoder(MessageStreamConsumer output) {
67+
this(output, false);
68+
}
69+
70+
MessageDecoder(MessageStreamConsumer output, boolean server) {
6571
this.output = requireNonNull(output);
72+
this.server = server;
6673
}
6774

6875
/* Exposed for testing purposes */
@@ -143,9 +150,12 @@ public void mask(boolean value) {
143150
if (debug.on()) {
144151
debug.log("mask %s", value);
145152
}
146-
if (value) {
153+
if (value && !server) {
147154
throw new FailWebSocketException("Masked frame received");
148155
}
156+
if (!value && server) {
157+
throw new FailWebSocketException("Masked frame expected");
158+
}
149159
}
150160

151161
@Override
@@ -175,7 +185,9 @@ public void maskingKey(int value) {
175185
// So this method (`maskingKey`) is not supposed to be invoked while
176186
// reading a frame that has came from the server. If this method is
177187
// invoked, then it's an error in implementation, thus InternalError
178-
throw new InternalError();
188+
if (!server)
189+
throw new InternalError();
190+
maskingKey = value;
179191
}
180192

181193
@Override
@@ -204,10 +216,17 @@ public void payloadData(ByteBuffer data) {
204216
boolean last = fin && lastPayloadChunk;
205217
boolean text = opcode == Opcode.TEXT || originatingOpcode == Opcode.TEXT;
206218
if (!text) {
207-
output.onBinary(data.slice(), last);
219+
ByteBuffer slice = data.slice();
220+
if (server) {
221+
unMask(slice);
222+
}
223+
output.onBinary(slice, last);
208224
data.position(data.limit()); // Consume
209225
} else {
210226
boolean binaryNonEmpty = data.hasRemaining();
227+
if (server) {
228+
unMask(data);
229+
}
211230
CharBuffer textData;
212231
try {
213232
textData = decoder.decode(data, last);
@@ -225,6 +244,17 @@ public void payloadData(ByteBuffer data) {
225244
}
226245
}
227246

247+
private void unMask(ByteBuffer src) {
248+
int pos = src.position();
249+
int size = src.remaining();
250+
ByteBuffer temp = ByteBuffer.allocate(size);
251+
Frame.Masker.transferMasking(src, temp, maskingKey);
252+
temp.flip();
253+
src.position(pos);
254+
src.put(temp);
255+
src.position(pos).limit(pos+size);
256+
}
257+
228258
@Override
229259
public void endFrame() {
230260
if (debug.on()) {

src/java.net.http/share/classes/jdk/internal/net/http/websocket/MessageEncoder.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ public class MessageEncoder {
8181
/* Was the previous frame TEXT or a CONTINUATION thereof? */
8282
private boolean previousText;
8383
private boolean closed;
84+
private final boolean server;
85+
86+
MessageEncoder() {
87+
this(false);
88+
}
89+
90+
MessageEncoder(boolean isServer) {
91+
this.server = isServer;
92+
}
8493

8594
/*
8695
* How many bytes of the current message have been already encoded.
@@ -369,12 +378,20 @@ private void setupHeader(Opcode opcode, boolean fin, long payloadLen) {
369378
opcode, fin, payloadLen);
370379
}
371380
headerBuffer.clear();
372-
int mask = maskingKeySource.nextInt();
373-
headerWriter.fin(fin)
381+
// for server setting mask to 0 disables masking (xor)
382+
int mask = this.server ? 0 : maskingKeySource.nextInt();
383+
if (mask == 0) {
384+
headerWriter.fin(fin)
385+
.opcode(opcode)
386+
.payloadLen(payloadLen)
387+
.write(headerBuffer);
388+
} else {
389+
headerWriter.fin(fin)
374390
.opcode(opcode)
375391
.payloadLen(payloadLen)
376392
.mask(mask)
377393
.write(headerBuffer);
394+
}
378395
headerBuffer.flip();
379396
payloadMasker.mask(mask);
380397
}

src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
package jdk.internal.net.http.websocket;
2727

28+
import jdk.internal.net.http.HttpClientFacade;
2829
import jdk.internal.net.http.common.Demand;
2930
import jdk.internal.net.http.common.Log;
3031
import jdk.internal.net.http.common.Logger;
@@ -37,13 +38,15 @@
3738
import java.lang.ref.Reference;
3839
import java.net.ProtocolException;
3940
import java.net.URI;
41+
import java.net.http.HttpClient;
4042
import java.net.http.WebSocket;
4143
import java.nio.ByteBuffer;
4244
import java.nio.CharBuffer;
4345
import java.nio.charset.CharacterCodingException;
4446
import java.nio.charset.CharsetEncoder;
4547
import java.nio.charset.CodingErrorAction;
4648
import java.nio.charset.StandardCharsets;
49+
import java.util.concurrent.Executor;
4750
import java.util.Objects;
4851
import java.util.concurrent.CompletableFuture;
4952
import java.util.concurrent.CompletionStage;
@@ -115,10 +118,12 @@ enum State {
115118
private final SequentialScheduler receiveScheduler
116119
= new SequentialScheduler(new ReceiveTask());
117120
private final Demand demand = new Demand();
121+
private final Executor clientExecutor;
118122

119123
public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
120124
Function<Result, WebSocket> newWebSocket = r -> {
121125
WebSocket ws = newInstance(b.getUri(),
126+
b.getClient(),
122127
r.subprotocol,
123128
b.getListener(),
124129
r.transport);
@@ -140,21 +145,24 @@ public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
140145

141146
/* Exposed for testing purposes */
142147
static WebSocketImpl newInstance(URI uri,
148+
HttpClient client,
143149
String subprotocol,
144150
Listener listener,
145151
TransportFactory transport) {
146-
WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
152+
WebSocketImpl ws = new WebSocketImpl(uri, client, subprotocol, listener, transport);
147153
// This initialisation is outside of the constructor for the sake of
148154
// safe publication of WebSocketImpl.this
149155
ws.signalOpen();
150156
return ws;
151157
}
152158

153159
private WebSocketImpl(URI uri,
160+
HttpClient client,
154161
String subprotocol,
155162
Listener listener,
156163
TransportFactory transportFactory) {
157164
this.uri = requireNonNull(uri);
165+
this.clientExecutor = ((HttpClientFacade)client).theExecutor();
158166
this.subprotocol = requireNonNull(subprotocol);
159167
this.listener = requireNonNull(listener);
160168
// Why 6? 1 sendPing/sendPong + 1 sendText/sendBinary + 1 Close +
@@ -356,7 +364,7 @@ public void request(long n) {
356364
debug.log("request %s", n);
357365
}
358366
if (demand.increase(n)) {
359-
receiveScheduler.runOrSchedule();
367+
receiveScheduler.runOrSchedule(clientExecutor);
360368
}
361369
}
362370

@@ -398,7 +406,7 @@ public String toString() {
398406
* The assumptions about order is as follows:
399407
*
400408
* - state is never changed more than twice inside the `run` method:
401-
* x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or
409+
* x --(1)--> IDLE --(2)--> y (otherwise we're losing events, or
402410
* overwriting parts of messages creating a mess since there's no
403411
* queueing)
404412
* - OPEN is always the first state
@@ -702,7 +710,7 @@ private boolean trySwapAutomaticPong(ByteBuffer copy) {
702710

703711
private void signalOpen() {
704712
debug.log("signalOpen");
705-
receiveScheduler.runOrSchedule();
713+
receiveScheduler.runOrSchedule(clientExecutor);
706714
}
707715

708716
private void signalError(Throwable error) {
@@ -834,7 +842,7 @@ private boolean trySetState(State newState) {
834842
if (currentState == ERROR || currentState == CLOSE) {
835843
break;
836844
} else if (state.compareAndSet(currentState, newState)) {
837-
receiveScheduler.runOrSchedule();
845+
receiveScheduler.runOrSchedule(clientExecutor);
838846
success = true;
839847
break;
840848
}
@@ -850,7 +858,7 @@ private boolean tryChangeState(State expectedState, State newState) {
850858
State witness = state.compareAndExchange(expectedState, newState);
851859
boolean success = false;
852860
if (witness == expectedState) {
853-
receiveScheduler.runOrSchedule();
861+
receiveScheduler.runOrSchedule(clientExecutor);
854862
success = true;
855863
} else if (witness != ERROR && witness != CLOSE) {
856864
// This should be the only reason for inability to change the state
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation.
8+
*
9+
* This code is distributed in the hope that it will be useful, but WITHOUT
10+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12+
* version 2 for more details (a copy is included in the LICENSE file that
13+
* accompanied this code).
14+
*
15+
* You should have received a copy of the GNU General Public License version
16+
* 2 along with this work; if not, write to the Free Software Foundation,
17+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18+
*
19+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20+
* or visit www.oracle.com if you need additional information or have any
21+
* questions.
22+
*/
23+
24+
/*
25+
* @test
26+
* @bug 8268294
27+
* @modules java.net.http/jdk.internal.net.http.websocket:open jdk.httpserver
28+
* @run main/othervm
29+
* --add-reads java.net.http=ALL-UNNAMED
30+
* --add-reads java.net.http=jdk.httpserver
31+
* java.net.http/jdk.internal.net.http.websocket.WebSocketAndHttpTest
32+
*/
33+
public final class WebSocketServerDriver { }
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation.
8+
*
9+
* This code is distributed in the hope that it will be useful, but WITHOUT
10+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12+
* version 2 for more details (a copy is included in the LICENSE file that
13+
* accompanied this code).
14+
*
15+
* You should have received a copy of the GNU General Public License version
16+
* 2 along with this work; if not, write to the Free Software Foundation,
17+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18+
*
19+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20+
* or visit www.oracle.com if you need additional information or have any
21+
* questions.
22+
*/
23+
24+
package jdk.internal.net.http.websocket;
25+
26+
import java.nio.ByteBuffer;
27+
28+
/**
29+
* No implementation provided for onInit() because that must always be
30+
* implemented by user
31+
*/
32+
abstract class DefaultMessageStreamHandler implements MessageStreamHandler {
33+
34+
public void onText(CharSequence data, boolean last) {}
35+
36+
public void onBinary(ByteBuffer data, boolean last) {}
37+
38+
public void onPing(ByteBuffer data) {}
39+
40+
public void onPong(ByteBuffer data) {}
41+
42+
public void onClose(int statusCode, CharSequence reason) {}
43+
44+
public void onComplete() {}
45+
46+
public void onError(Throwable e) {}
47+
}
48+
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation.
8+
*
9+
* This code is distributed in the hope that it will be useful, but WITHOUT
10+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12+
* version 2 for more details (a copy is included in the LICENSE file that
13+
* accompanied this code).
14+
*
15+
* You should have received a copy of the GNU General Public License version
16+
* 2 along with this work; if not, write to the Free Software Foundation,
17+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18+
*
19+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20+
* or visit www.oracle.com if you need additional information or have any
21+
* questions.
22+
*/
23+
24+
package jdk.internal.net.http.websocket;
25+
26+
/**
27+
* WebSocket server listener interface, which is the same as the client API
28+
* in java.net.http. See MessageStreamResponder for how listener methods
29+
* can send response messages back to the client
30+
*
31+
* All MessageStreamConsumer methods must be implemented (plus the handler method
32+
* declared here). DefaultMessageStreamHandler provides empty implementations of all
33+
* that can be extended, except for onInit() which must always be implemented.
34+
*
35+
* void onText(CharSequence data, boolean last);
36+
*
37+
* void onBinary(ByteBuffer data, boolean last);
38+
*
39+
* void onPing(ByteBuffer data);
40+
*
41+
* void onPong(ByteBuffer data);
42+
*
43+
* void onClose(int statusCode, CharSequence reason);
44+
*
45+
* void onComplete();
46+
*
47+
* void onError(Throwable e);
48+
*/
49+
interface MessageStreamHandler extends MessageStreamConsumer {
50+
51+
/**
52+
* called before any of the methods above to supply a
53+
* MessageStreamResponder for any new connection, which can be used to send replies
54+
* sendText(), sendBinary(), sendClose() etc
55+
*/
56+
void onInit(MessageStreamResponder responder);
57+
}
58+

0 commit comments

Comments
 (0)