Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* An HttpClientFacade is a simple class that wraps an HttpClient implementation
* and delegates everything to its implementation delegate.
*/
final class HttpClientFacade extends HttpClient implements Trackable {
public final class HttpClientFacade extends HttpClient implements Trackable {

final HttpClientImpl impl;

Expand Down Expand Up @@ -110,6 +110,10 @@ public Optional<Executor> executor() {
return impl.executor();
}

public Executor theExecutor() {
return impl.theExecutor();
}

@Override
public <T> HttpResponse<T>
send(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,16 @@ class MessageDecoder implements Frame.Consumer {
private long payloadLen;
private long unconsumedPayloadLen;
private ByteBuffer binaryData;
private final boolean server;
private int maskingKey;

MessageDecoder(MessageStreamConsumer output) {
this(output, false);
}

MessageDecoder(MessageStreamConsumer output, boolean server) {
this.output = requireNonNull(output);
this.server = server;
}

/* Exposed for testing purposes */
Expand Down Expand Up @@ -143,9 +150,12 @@ public void mask(boolean value) {
if (debug.on()) {
debug.log("mask %s", value);
}
if (value) {
if (value && !server) {
throw new FailWebSocketException("Masked frame received");
}
if (!value && server) {
throw new FailWebSocketException("Masked frame expected");
}
}

@Override
Expand Down Expand Up @@ -175,7 +185,9 @@ public void maskingKey(int value) {
// So this method (`maskingKey`) is not supposed to be invoked while
// reading a frame that has came from the server. If this method is
// invoked, then it's an error in implementation, thus InternalError
throw new InternalError();
if (!server)
throw new InternalError();
maskingKey = value;
}

@Override
Expand Down Expand Up @@ -204,10 +216,17 @@ public void payloadData(ByteBuffer data) {
boolean last = fin && lastPayloadChunk;
boolean text = opcode == Opcode.TEXT || originatingOpcode == Opcode.TEXT;
if (!text) {
output.onBinary(data.slice(), last);
ByteBuffer slice = data.slice();
if (server) {
unMask(slice);
}
output.onBinary(slice, last);
data.position(data.limit()); // Consume
} else {
boolean binaryNonEmpty = data.hasRemaining();
if (server) {
unMask(data);
}
CharBuffer textData;
try {
textData = decoder.decode(data, last);
Expand All @@ -225,6 +244,17 @@ public void payloadData(ByteBuffer data) {
}
}

private void unMask(ByteBuffer src) {
int pos = src.position();
int size = src.remaining();
ByteBuffer temp = ByteBuffer.allocate(size);
Frame.Masker.transferMasking(src, temp, maskingKey);
temp.flip();
src.position(pos);
src.put(temp);
src.position(pos).limit(pos+size);
}

@Override
public void endFrame() {
if (debug.on()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ public class MessageEncoder {
/* Was the previous frame TEXT or a CONTINUATION thereof? */
private boolean previousText;
private boolean closed;
private final boolean server;

MessageEncoder() {
this(false);
}

MessageEncoder(boolean isServer) {
this.server = isServer;
}

/*
* How many bytes of the current message have been already encoded.
Expand Down Expand Up @@ -369,12 +378,20 @@ private void setupHeader(Opcode opcode, boolean fin, long payloadLen) {
opcode, fin, payloadLen);
}
headerBuffer.clear();
int mask = maskingKeySource.nextInt();
headerWriter.fin(fin)
// for server setting mask to 0 disables masking (xor)
int mask = this.server ? 0 : maskingKeySource.nextInt();
if (mask == 0) {
headerWriter.fin(fin)
.opcode(opcode)
.payloadLen(payloadLen)
.write(headerBuffer);
} else {
headerWriter.fin(fin)
.opcode(opcode)
.payloadLen(payloadLen)
.mask(mask)
.write(headerBuffer);
}
headerBuffer.flip();
payloadMasker.mask(mask);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

package jdk.internal.net.http.websocket;

import jdk.internal.net.http.HttpClientFacade;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
Expand All @@ -37,13 +38,15 @@
import java.lang.ref.Reference;
import java.net.ProtocolException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executor;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -115,10 +118,12 @@ enum State {
private final SequentialScheduler receiveScheduler
= new SequentialScheduler(new ReceiveTask());
private final Demand demand = new Demand();
private final Executor clientExecutor;

public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {
Function<Result, WebSocket> newWebSocket = r -> {
WebSocket ws = newInstance(b.getUri(),
b.getClient(),
r.subprotocol,
b.getListener(),
r.transport);
Expand All @@ -140,21 +145,24 @@ public static CompletableFuture<WebSocket> newInstanceAsync(BuilderImpl b) {

/* Exposed for testing purposes */
static WebSocketImpl newInstance(URI uri,
HttpClient client,
String subprotocol,
Listener listener,
TransportFactory transport) {
WebSocketImpl ws = new WebSocketImpl(uri, subprotocol, listener, transport);
WebSocketImpl ws = new WebSocketImpl(uri, client, subprotocol, listener, transport);
// This initialisation is outside of the constructor for the sake of
// safe publication of WebSocketImpl.this
ws.signalOpen();
return ws;
}

private WebSocketImpl(URI uri,
HttpClient client,
String subprotocol,
Listener listener,
TransportFactory transportFactory) {
this.uri = requireNonNull(uri);
this.clientExecutor = ((HttpClientFacade)client).theExecutor();
this.subprotocol = requireNonNull(subprotocol);
this.listener = requireNonNull(listener);
// Why 6? 1 sendPing/sendPong + 1 sendText/sendBinary + 1 Close +
Expand Down Expand Up @@ -356,7 +364,7 @@ public void request(long n) {
debug.log("request %s", n);
}
if (demand.increase(n)) {
receiveScheduler.runOrSchedule();
receiveScheduler.runOrSchedule(clientExecutor);
}
}

Expand Down Expand Up @@ -398,7 +406,7 @@ public String toString() {
* The assumptions about order is as follows:
*
* - state is never changed more than twice inside the `run` method:
* x --(1)--> IDLE --(2)--> y (otherwise we're loosing events, or
* x --(1)--> IDLE --(2)--> y (otherwise we're losing events, or
* overwriting parts of messages creating a mess since there's no
* queueing)
* - OPEN is always the first state
Expand Down Expand Up @@ -702,7 +710,7 @@ private boolean trySwapAutomaticPong(ByteBuffer copy) {

private void signalOpen() {
debug.log("signalOpen");
receiveScheduler.runOrSchedule();
receiveScheduler.runOrSchedule(clientExecutor);
}

private void signalError(Throwable error) {
Expand Down Expand Up @@ -834,7 +842,7 @@ private boolean trySetState(State newState) {
if (currentState == ERROR || currentState == CLOSE) {
break;
} else if (state.compareAndSet(currentState, newState)) {
receiveScheduler.runOrSchedule();
receiveScheduler.runOrSchedule(clientExecutor);
success = true;
break;
}
Expand All @@ -850,7 +858,7 @@ private boolean tryChangeState(State expectedState, State newState) {
State witness = state.compareAndExchange(expectedState, newState);
boolean success = false;
if (witness == expectedState) {
receiveScheduler.runOrSchedule();
receiveScheduler.runOrSchedule(clientExecutor);
success = true;
} else if (witness != ERROR && witness != CLOSE) {
// This should be the only reason for inability to change the state
Expand Down
33 changes: 33 additions & 0 deletions test/jdk/java/net/httpclient/websocket/WebSocketServerDriver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

/*
* @test
* @bug 8268294
* @modules java.net.http/jdk.internal.net.http.websocket:open jdk.httpserver
* @run main/othervm
* --add-reads java.net.http=ALL-UNNAMED
* --add-reads java.net.http=jdk.httpserver
* java.net.http/jdk.internal.net.http.websocket.WebSocketAndHttpTest
*/
public final class WebSocketServerDriver { }
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

package jdk.internal.net.http.websocket;

import java.nio.ByteBuffer;

/**
* No implementation provided for onInit() because that must always be
* implemented by user
*/
abstract class DefaultMessageStreamHandler implements MessageStreamHandler {

public void onText(CharSequence data, boolean last) {}

public void onBinary(ByteBuffer data, boolean last) {}

public void onPing(ByteBuffer data) {}

public void onPong(ByteBuffer data) {}

public void onClose(int statusCode, CharSequence reason) {}

public void onComplete() {}

public void onError(Throwable e) {}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

package jdk.internal.net.http.websocket;

/**
* WebSocket server listener interface, which is the same as the client API
* in java.net.http. See MessageStreamResponder for how listener methods
* can send response messages back to the client
*
* All MessageStreamConsumer methods must be implemented (plus the handler method
* declared here). DefaultMessageStreamHandler provides empty implementations of all
* that can be extended, except for onInit() which must always be implemented.
*
* void onText(CharSequence data, boolean last);
*
* void onBinary(ByteBuffer data, boolean last);
*
* void onPing(ByteBuffer data);
*
* void onPong(ByteBuffer data);
*
* void onClose(int statusCode, CharSequence reason);
*
* void onComplete();
*
* void onError(Throwable e);
*/
interface MessageStreamHandler extends MessageStreamConsumer {

/**
* called before any of the methods above to supply a
* MessageStreamResponder for any new connection, which can be used to send replies
* sendText(), sendBinary(), sendClose() etc
*/
void onInit(MessageStreamResponder responder);
}

Loading