diff --git a/netty-websocket-http1-perftest/build.gradle b/netty-websocket-http1-perftest/build.gradle index 9a23563..341b2c5 100644 --- a/netty-websocket-http1-perftest/build.gradle +++ b/netty-websocket-http1-perftest/build.gradle @@ -36,23 +36,23 @@ dependencies { task runServer(type: JavaExec) { classpath = sourceSets.main.runtimeClasspath - mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server.Main" + mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.server.Main" } task runClient(type: JavaExec) { classpath = sourceSets.main.runtimeClasspath - mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client.Main" + mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.client.Main" } task serverScripts(type: CreateStartScripts) { - mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server.Main" + mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.server.Main" applicationName = "${project.name}-server" classpath = startScripts.classpath outputDir = startScripts.outputDir } task clientScripts(type: CreateStartScripts) { - mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client.Main" + mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.client.Main" applicationName = "${project.name}-client" classpath = startScripts.classpath outputDir = startScripts.outputDir diff --git a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/client/Main.java b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/client/Main.java similarity index 99% rename from netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/client/Main.java rename to netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/client/Main.java index 645ab4d..f2d54b2 100644 --- a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/client/Main.java +++ b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/client/Main.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client; +package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.client; import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketCallbacksHandler; import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; diff --git a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/server/Main.java b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java similarity index 99% rename from netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/server/Main.java rename to netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java index 01e828a..e029068 100644 --- a/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/server/Main.java +++ b/netty-websocket-http1-perftest/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/perftest/encoder/server/Main.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server; +package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.server; import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketCallbacksHandler; import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketFrameFactory; diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java index d534e8c..b750ed2 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java @@ -98,6 +98,25 @@ void binaryFramesEncoder(boolean mask) throws Exception { client.close(); } + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void binaryFramesBulkEncoder(boolean mask) throws Exception { + int maxFrameSize = 1000; + Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false); + BinaryFramesEncoderClientBulkHandler clientHandler = + new BinaryFramesEncoderClientBulkHandler(maxFrameSize); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + WebSocketFrameFactory.BulkEncoder encoder = clientHandler.onHandshakeCompleted().join(); + Assertions.assertThat(encoder).isNotNull(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + @Timeout(300) @MethodSource("maskingArgs") @ParameterizedTest @@ -444,6 +463,162 @@ protected void initChannel(SocketChannel ch) { } } + static class BinaryFramesEncoderClientBulkHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + private final CompletableFuture onHandshakeComplete = + new CompletableFuture<>(); + private final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + private WebSocketFrameFactory.BulkEncoder binaryFrameEncoder; + private final int framesCount; + private int receivedFrames; + private int sentFrames; + private ByteBuf outBuffer; + private volatile ChannelHandlerContext ctx; + + BinaryFramesEncoderClientBulkHandler(int maxFrameSize) { + this.framesCount = maxFrameSize; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.binaryFrameEncoder = webSocketFrameFactory.bulkEncoder(); + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (!finalFragment) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final frame: " + finalFragment)); + payload.release(); + return; + } + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode != WebSocketProtocol.OPCODE_BINARY) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-binary frame: " + Long.toHexString(opcode))); + payload.release(); + return; + } + + int readableBytes = payload.readableBytes(); + + int expectedSize = receivedFrames; + if (expectedSize != readableBytes) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received frame of unexpected size: " + + expectedSize + + ", actual: " + + readableBytes)); + payload.release(); + return; + } + + for (int i = 0; i < readableBytes; i++) { + byte b = payload.readByte(); + if (b != (byte) 0xFE) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with unexpected content: " + Long.toHexString(b))); + payload.release(); + return; + } + } + payload.release(); + if (++receivedFrames == framesCount) { + onFrameExchangeComplete.complete(null); + } + } + + @Override + public void onChannelWritabilityChanged(ChannelHandlerContext ctx) { + boolean writable = ctx.channel().isWritable(); + if (sentFrames > 0 && writable) { + int toSend = framesCount - sentFrames; + if (toSend > 0) { + sendFrames(ctx, toSend); + } + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + int bufferSize = 4 * framesCount; + this.outBuffer = ctx.alloc().buffer(bufferSize, bufferSize); + onHandshakeComplete.complete(binaryFrameEncoder); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + ByteBuf out = outBuffer; + if (out != null) { + outBuffer = null; + out.release(); + } + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c, framesCount - sentFrames)); + return onFrameExchangeComplete; + } + + private void sendFrames(ChannelHandlerContext c, int toSend) { + WebSocketFrameFactory.BulkEncoder frameEncoder = binaryFrameEncoder; + for (int frameIdx = 0; frameIdx < toSend; frameIdx++) { + if (!c.channel().isOpen()) { + return; + } + int payloadSize = sentFrames; + int frameSize = frameEncoder.sizeofBinaryFrame(payloadSize); + ByteBuf out = outBuffer; + if (frameSize > out.capacity() - out.writerIndex()) { + int readableBytes = out.readableBytes(); + int bufferSize = 4 * framesCount; + outBuffer = c.alloc().buffer(bufferSize, bufferSize); + if (c.channel().bytesBeforeUnwritable() < readableBytes) { + c.writeAndFlush(out, c.voidPromise()); + if (!c.channel().isWritable()) { + return; + } + } else { + c.write(out, c.voidPromise()); + } + out = outBuffer; + } + int mask = frameEncoder.encodeBinaryFramePrefix(out, payloadSize); + for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) { + out.writeByte(0xFE); + } + frameEncoder.maskBinaryFrame(out, mask, payloadSize); + sentFrames++; + } + c.flush(); + } + } + static class BinaryFramesEncoderClientHandler implements WebSocketCallbacksHandler, WebSocketFrameListener { private final CompletableFuture onHandshakeComplete = diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java index ed90dbf..a587daa 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java @@ -48,7 +48,10 @@ public WebSocketFrameFactory frameFactory(ChannelHandlerContext ctx) { return FrameFactory.INSTANCE; } - static class FrameFactory implements WebSocketFrameFactory, WebSocketFrameFactory.Encoder { + static class FrameFactory + implements WebSocketFrameFactory, + WebSocketFrameFactory.Encoder, + WebSocketFrameFactory.BulkEncoder { static final int PREFIX_SIZE_SMALL = 6; static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; @@ -145,6 +148,11 @@ public Encoder encoder() { return this; } + @Override + public BulkEncoder bulkEncoder() { + return this; + } + @Override public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { int frameSize = binaryFrame.readableBytes(); @@ -168,6 +176,30 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); } + @Override + public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { + if (payloadSize <= 125) { + byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize); + int mask = mask(); + byteBuf.writeInt(mask); + return mask; + } + + if (payloadSize <= 65_535) { + int mask = mask(); + byteBuf.writeLong(((BINARY_FRAME_MEDIUM | (long) payloadSize) << 32) | mask); + return mask; + } + throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); + } + + @Override + public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) { + int end = byteBuf.writerIndex(); + int start = end - payloadSize; + return mask(mask, byteBuf, start, end); + } + @Override public int sizeofBinaryFrame(int payloadSize) { if (payloadSize <= 125) { diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java index 13d04d2..a120f7e 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java @@ -47,7 +47,10 @@ public WebSocketFrameFactory frameFactory(ChannelHandlerContext ctx) { return FrameFactory.INSTANCE; } - static class FrameFactory implements WebSocketFrameFactory, WebSocketFrameFactory.Encoder { + static class FrameFactory + implements WebSocketFrameFactory, + WebSocketFrameFactory.Encoder, + WebSocketFrameFactory.BulkEncoder { static final int PREFIX_SIZE_SMALL = 2; static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15; @@ -126,6 +129,11 @@ public Encoder encoder() { return this; } + @Override + public BulkEncoder bulkEncoder() { + return this; + } + @Override public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { int frameSize = binaryFrame.readableBytes(); @@ -144,6 +152,23 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) { throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); } + @Override + public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) { + if (payloadSize <= 125) { + byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize); + } else if (payloadSize <= 65_535) { + byteBuf.writeInt(BINARY_FRAME_MEDIUM | payloadSize); + } else { + throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535)); + } + return -1; + } + + @Override + public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) { + return byteBuf; + } + @Override public int sizeofBinaryFrame(int payloadSize) { if (payloadSize <= 125) { diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java index 5c8d622..bbc8ee8 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java @@ -37,11 +37,26 @@ public interface WebSocketFrameFactory { Encoder encoder(); + default BulkEncoder bulkEncoder() { + throw new UnsupportedOperationException("WebSocketFrameFactory.bulkEncoder() not implemented"); + } + + /** Encodes prefix of single binary websocket frame into provided bytebuffer. */ interface Encoder { - /*write prefix/mask, apply mask if needed*/ + ByteBuf encodeBinaryFrame(ByteBuf binaryFrame); - /*size with prefix/mask*/ + int sizeofBinaryFrame(int payloadSize); + } + + /** Encodes prefixes of multiple binary websocket frames into provided bytebuffer. */ + interface BulkEncoder { + + /** @return frame mask, or -1 if masking not applicable */ + int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize); + + ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize); + int sizeofBinaryFrame(int payloadSize); } }