Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions netty-websocket-http1-perftest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,23 @@ dependencies {

task runServer(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server.Main"
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.server.Main"
}

task runClient(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client.Main"
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.client.Main"
}

task serverScripts(type: CreateStartScripts) {
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server.Main"
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.server.Main"
applicationName = "${project.name}-server"
classpath = startScripts.classpath
outputDir = startScripts.outputDir
}

task clientScripts(type: CreateStartScripts) {
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client.Main"
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.client.Main"
applicationName = "${project.name}-client"
classpath = startScripts.classpath
outputDir = startScripts.outputDir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client;
package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.client;

import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketCallbacksHandler;
import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server;
package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.server;

import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketCallbacksHandler;
import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketFrameFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,25 @@ void binaryFramesEncoder(boolean mask) throws Exception {
client.close();
}

@Timeout(300)
@ValueSource(booleans = {true, false})
@ParameterizedTest
void binaryFramesBulkEncoder(boolean mask) throws Exception {
int maxFrameSize = 1000;
Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false);
BinaryFramesEncoderClientBulkHandler clientHandler =
new BinaryFramesEncoderClientBulkHandler(maxFrameSize);
Channel client =
webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler);

WebSocketFrameFactory.BulkEncoder encoder = clientHandler.onHandshakeCompleted().join();
Assertions.assertThat(encoder).isNotNull();

CompletableFuture<Void> onComplete = clientHandler.startFramesExchange();
onComplete.join();
client.close();
}

@Timeout(300)
@MethodSource("maskingArgs")
@ParameterizedTest
Expand Down Expand Up @@ -444,6 +463,162 @@ protected void initChannel(SocketChannel ch) {
}
}

static class BinaryFramesEncoderClientBulkHandler
implements WebSocketCallbacksHandler, WebSocketFrameListener {
private final CompletableFuture<WebSocketFrameFactory.BulkEncoder> onHandshakeComplete =
new CompletableFuture<>();
private final CompletableFuture<Void> onFrameExchangeComplete = new CompletableFuture<>();
private WebSocketFrameFactory.BulkEncoder binaryFrameEncoder;
private final int framesCount;
private int receivedFrames;
private int sentFrames;
private ByteBuf outBuffer;
private volatile ChannelHandlerContext ctx;

BinaryFramesEncoderClientBulkHandler(int maxFrameSize) {
this.framesCount = maxFrameSize;
}

@Override
public WebSocketFrameListener exchange(
ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) {
this.binaryFrameEncoder = webSocketFrameFactory.bulkEncoder();
return this;
}

@Override
public void onChannelRead(
ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) {
if (!finalFragment) {
onFrameExchangeComplete.completeExceptionally(
new AssertionError("received non-final frame: " + finalFragment));
payload.release();
return;
}
if (rsv != 0) {
onFrameExchangeComplete.completeExceptionally(
new AssertionError("received frame with non-zero rsv: " + rsv));
payload.release();
return;
}
if (opcode != WebSocketProtocol.OPCODE_BINARY) {
onFrameExchangeComplete.completeExceptionally(
new AssertionError("received non-binary frame: " + Long.toHexString(opcode)));
payload.release();
return;
}

int readableBytes = payload.readableBytes();

int expectedSize = receivedFrames;
if (expectedSize != readableBytes) {
onFrameExchangeComplete.completeExceptionally(
new AssertionError(
"received frame of unexpected size: "
+ expectedSize
+ ", actual: "
+ readableBytes));
payload.release();
return;
}

for (int i = 0; i < readableBytes; i++) {
byte b = payload.readByte();
if (b != (byte) 0xFE) {
onFrameExchangeComplete.completeExceptionally(
new AssertionError("received frame with unexpected content: " + Long.toHexString(b)));
payload.release();
return;
}
}
payload.release();
if (++receivedFrames == framesCount) {
onFrameExchangeComplete.complete(null);
}
}

@Override
public void onChannelWritabilityChanged(ChannelHandlerContext ctx) {
boolean writable = ctx.channel().isWritable();
if (sentFrames > 0 && writable) {
int toSend = framesCount - sentFrames;
if (toSend > 0) {
sendFrames(ctx, toSend);
}
}
}

@Override
public void onOpen(ChannelHandlerContext ctx) {
this.ctx = ctx;
int bufferSize = 4 * framesCount;
this.outBuffer = ctx.alloc().buffer(bufferSize, bufferSize);
onHandshakeComplete.complete(binaryFrameEncoder);
}

@Override
public void onClose(ChannelHandlerContext ctx) {
ByteBuf out = outBuffer;
if (out != null) {
outBuffer = null;
out.release();
}
if (!onFrameExchangeComplete.isDone()) {
onFrameExchangeComplete.completeExceptionally(new ClosedChannelException());
}
}

@Override
public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (!onFrameExchangeComplete.isDone()) {
onFrameExchangeComplete.completeExceptionally(cause);
}
}

CompletableFuture<WebSocketFrameFactory.BulkEncoder> onHandshakeCompleted() {
return onHandshakeComplete;
}

CompletableFuture<Void> startFramesExchange() {
ChannelHandlerContext c = ctx;
c.executor().execute(() -> sendFrames(c, framesCount - sentFrames));
return onFrameExchangeComplete;
}

private void sendFrames(ChannelHandlerContext c, int toSend) {
WebSocketFrameFactory.BulkEncoder frameEncoder = binaryFrameEncoder;
for (int frameIdx = 0; frameIdx < toSend; frameIdx++) {
if (!c.channel().isOpen()) {
return;
}
int payloadSize = sentFrames;
int frameSize = frameEncoder.sizeofBinaryFrame(payloadSize);
ByteBuf out = outBuffer;
if (frameSize > out.capacity() - out.writerIndex()) {
int readableBytes = out.readableBytes();
int bufferSize = 4 * framesCount;
outBuffer = c.alloc().buffer(bufferSize, bufferSize);
if (c.channel().bytesBeforeUnwritable() < readableBytes) {
c.writeAndFlush(out, c.voidPromise());
if (!c.channel().isWritable()) {
return;
}
} else {
c.write(out, c.voidPromise());
}
out = outBuffer;
}
int mask = frameEncoder.encodeBinaryFramePrefix(out, payloadSize);
for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) {
out.writeByte(0xFE);
}
frameEncoder.maskBinaryFrame(out, mask, payloadSize);
sentFrames++;
}
c.flush();
}
}

static class BinaryFramesEncoderClientHandler
implements WebSocketCallbacksHandler, WebSocketFrameListener {
private final CompletableFuture<WebSocketFrameFactory.Encoder> onHandshakeComplete =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ public WebSocketFrameFactory frameFactory(ChannelHandlerContext ctx) {
return FrameFactory.INSTANCE;
}

static class FrameFactory implements WebSocketFrameFactory, WebSocketFrameFactory.Encoder {
static class FrameFactory
implements WebSocketFrameFactory,
WebSocketFrameFactory.Encoder,
WebSocketFrameFactory.BulkEncoder {
static final int PREFIX_SIZE_SMALL = 6;
static final int BINARY_FRAME_SMALL =
OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7;
Expand Down Expand Up @@ -145,6 +148,11 @@ public Encoder encoder() {
return this;
}

@Override
public BulkEncoder bulkEncoder() {
return this;
}

@Override
public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) {
int frameSize = binaryFrame.readableBytes();
Expand All @@ -168,6 +176,30 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) {
throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535));
}

@Override
public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) {
if (payloadSize <= 125) {
byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize);
int mask = mask();
byteBuf.writeInt(mask);
return mask;
}

if (payloadSize <= 65_535) {
int mask = mask();
byteBuf.writeLong(((BINARY_FRAME_MEDIUM | (long) payloadSize) << 32) | mask);
return mask;
}
throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535));
}

@Override
public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) {
int end = byteBuf.writerIndex();
int start = end - payloadSize;
return mask(mask, byteBuf, start, end);
}

@Override
public int sizeofBinaryFrame(int payloadSize) {
if (payloadSize <= 125) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ public WebSocketFrameFactory frameFactory(ChannelHandlerContext ctx) {
return FrameFactory.INSTANCE;
}

static class FrameFactory implements WebSocketFrameFactory, WebSocketFrameFactory.Encoder {
static class FrameFactory
implements WebSocketFrameFactory,
WebSocketFrameFactory.Encoder,
WebSocketFrameFactory.BulkEncoder {
static final int PREFIX_SIZE_SMALL = 2;
static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15;

Expand Down Expand Up @@ -126,6 +129,11 @@ public Encoder encoder() {
return this;
}

@Override
public BulkEncoder bulkEncoder() {
return this;
}

@Override
public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) {
int frameSize = binaryFrame.readableBytes();
Expand All @@ -144,6 +152,23 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) {
throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535));
}

@Override
public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) {
if (payloadSize <= 125) {
byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize);
} else if (payloadSize <= 65_535) {
byteBuf.writeInt(BINARY_FRAME_MEDIUM | payloadSize);
} else {
throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535));
}
return -1;
}

@Override
public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) {
return byteBuf;
}

@Override
public int sizeofBinaryFrame(int payloadSize) {
if (payloadSize <= 125) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,26 @@ public interface WebSocketFrameFactory {

Encoder encoder();

default BulkEncoder bulkEncoder() {
throw new UnsupportedOperationException("WebSocketFrameFactory.bulkEncoder() not implemented");
}

/** Encodes prefix of single binary websocket frame into provided bytebuffer. */
interface Encoder {
/*write prefix/mask, apply mask if needed*/

ByteBuf encodeBinaryFrame(ByteBuf binaryFrame);

/*size with prefix/mask*/
int sizeofBinaryFrame(int payloadSize);
}

/** Encodes prefixes of multiple binary websocket frames into provided bytebuffer. */
interface BulkEncoder {

/** @return frame mask, or -1 if masking not applicable */
int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize);

ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize);

int sizeofBinaryFrame(int payloadSize);
}
}