From 3de9967353bb865c0e20a72061392a0a995ba8ae Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Wed, 15 Jun 2016 16:54:16 -0700 Subject: [PATCH] Refactor ReactiveSocketServerHandler to be not shareable. ** Problem ** There's a memory leak in `ReactiveSocketServerHandler`, it keep adding entries in the `duplexConnections` map but never remove them. ** Solution ** Instead of having only one `ReactiveSocketServerHandler`, and manage resources manually, we let Netty allocate one instance per Channel. Then no resource management is necessary. ** Modifications ** I refactored all the uage of `ReactiveSocketServerHandler` whithout changing the logic. I also got rid of the method ``` public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } ``` since we only use `writeAndFlush` in the DuplexConnection (we're sure there's nothing to flush). --- .../transport/tcp/EchoServerHandler.java | 11 +- .../server/ReactiveSocketServerHandler.java | 56 +++--- .../transport/tcp/ClientServerTest.java | 89 +++++----- .../io/reactivesocket/transport/tcp/Ping.java | 12 +- .../io/reactivesocket/transport/tcp/Pong.java | 164 +++++++----------- .../server/ReactiveSocketServerHandler.java | 35 ++-- .../transport/websocket/Ping.java | 12 +- .../transport/websocket/Pong.java | 163 +++++++---------- 8 files changed, 222 insertions(+), 320 deletions(-) diff --git a/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServerHandler.java b/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServerHandler.java index e4263ac08..c7101a80a 100644 --- a/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServerHandler.java +++ b/reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServerHandler.java @@ -16,11 +16,10 @@ public class EchoServerHandler extends ByteToMessageDecoder { private static SimpleChannelInboundHandler httpHandler = new HttpServerHandler(); - private static ReactiveSocketServerHandler reactiveSocketHandler = ReactiveSocketServerHandler.create((setupPayload, rs) -> - new RequestHandler.Builder().withRequestResponse(payload -> s -> { - s.onNext(payload); - s.onComplete(); - }).build()); + private static RequestHandler requestHandler = new RequestHandler.Builder().withRequestResponse(payload -> s -> { + s.onNext(payload); + s.onComplete(); + }).build(); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { @@ -61,6 +60,8 @@ private void switchToHttp(ChannelHandlerContext ctx) { private void switchToReactiveSocket(ChannelHandlerContext ctx) { ChannelPipeline p = ctx.pipeline(); + ReactiveSocketServerHandler reactiveSocketHandler = + ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler); p.addLast(reactiveSocketHandler); p.remove(this); } diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ReactiveSocketServerHandler.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ReactiveSocketServerHandler.java index 534c9c85c..ec649990d 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ReactiveSocketServerHandler.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ReactiveSocketServerHandler.java @@ -16,9 +16,7 @@ package io.reactivesocket.transport.tcp.server; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -28,25 +26,23 @@ import io.reactivesocket.LeaseGovernor; import io.reactivesocket.ReactiveSocket; import io.reactivesocket.transport.tcp.MutableDirectByteBuf; -import org.agrona.BitUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ConcurrentHashMap; +import static org.agrona.BitUtil.SIZE_OF_INT; -@ChannelHandler.Sharable public class ReactiveSocketServerHandler extends ChannelInboundHandlerAdapter { - private Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class); - - private ConcurrentHashMap duplexConnections = new ConcurrentHashMap<>(); + private static final Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class); + private static final int MAX_FRAME_LENGTH = Integer.MAX_VALUE >> 1; private ConnectionSetupHandler setupHandler; - private LeaseGovernor leaseGovernor; + private ServerTcpDuplexConnection connection; protected ReactiveSocketServerHandler(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { this.setupHandler = setupHandler; this.leaseGovernor = leaseGovernor; + this.connection = null; } public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler) { @@ -54,25 +50,27 @@ public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHan } public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { - return new - ReactiveSocketServerHandler( - setupHandler, - leaseGovernor); - + return new ReactiveSocketServerHandler(setupHandler, leaseGovernor); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ChannelPipeline cp = ctx.pipeline(); if (cp.get(LengthFieldBasedFrameDecoder.class) == null) { - ctx - .pipeline() - .addBefore( - ctx.name(), - LengthFieldBasedFrameDecoder.class.getName(), - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE >> 1, 0, BitUtil.SIZE_OF_INT, -1 * BitUtil.SIZE_OF_INT, 0)); + LengthFieldBasedFrameDecoder frameDecoder = + new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, SIZE_OF_INT, -1 * SIZE_OF_INT, 0); + ctx.pipeline() + .addBefore(ctx.name(), LengthFieldBasedFrameDecoder.class.getName(), frameDecoder); } + } + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + connection = new ServerTcpDuplexConnection(ctx); + ReactiveSocket reactiveSocket = + DefaultReactiveSocket.fromServerConnection(connection, setupHandler, leaseGovernor, Throwable::printStackTrace); + // Note: No blocking code here (still it should be refactored) + reactiveSocket.startAndWait(); } @Override @@ -81,29 +79,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception try { MutableDirectByteBuf mutableDirectByteBuf = new MutableDirectByteBuf(content); Frame from = Frame.from(mutableDirectByteBuf, 0, mutableDirectByteBuf.capacity()); - channelRegistered(ctx); - ServerTcpDuplexConnection connection = duplexConnections.computeIfAbsent(ctx.channel().id(), i -> { - logger.info("No connection found for channel id: " + i + " from host " + ctx.channel().remoteAddress().toString()); - ServerTcpDuplexConnection c = new ServerTcpDuplexConnection(ctx); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromServerConnection(c, setupHandler, leaseGovernor, throwable -> throwable.printStackTrace()); - reactiveSocket.startAndWait(); - return c; - }); + if (connection != null) { - connection - .getSubscribers() - .forEach(o -> o.onNext(from)); + connection.getSubscribers().forEach(o -> o.onNext(from)); } } finally { content.release(); } } - @Override - public void channelReadComplete(ChannelHandlerContext ctx) { - ctx.flush(); - } - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java index 9006da04a..856cd559b 100644 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java @@ -52,55 +52,52 @@ public class ClientServerTest { static EventLoopGroup bossGroup = new NioEventLoopGroup(1); static EventLoopGroup workerGroup = new NioEventLoopGroup(4); - static ReactiveSocketServerHandler serverHandler = ReactiveSocketServerHandler.create((setupPayload, rs) -> - new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return s -> { - //System.out.println("Handling request/response payload => " + s.toString()); - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - s.onNext(response); - s.onComplete(); - }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { + static RequestHandler requestHandler = new RequestHandler() { + @Override + public Publisher handleRequestResponse(Payload payload) { + return s -> { Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); + s.onNext(response); + s.onComplete(); + }; + } - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } + @Override + public Publisher handleRequestStream(Payload payload) { + Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); + return RxReactiveStreams + .toPublisher(Observable + .range(1, 10) + .map(i -> response)); + } + + @Override + public Publisher handleSubscription(Payload payload) { + Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); + + return RxReactiveStreams + .toPublisher(Observable + .range(1, 10) + .map(i -> response) + .repeat()); + } + + @Override + public Publisher handleFireAndForget(Payload payload) { + return Subscriber::onComplete; + } + + @Override + public Publisher handleChannel(Payload initialPayload, Publisher inputs) { + return null; + } - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response) - .repeat()); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - return null; - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } + @Override + public Publisher handleMetadataPush(Payload payload) { + return null; } - ); + }; @BeforeClass public static void setup() throws Exception { @@ -112,6 +109,8 @@ public static void setup() throws Exception { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); + ReactiveSocketServerHandler serverHandler = + ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler); pipeline.addLast(serverHandler); } }); @@ -123,7 +122,7 @@ protected void initChannel(Channel ch) throws Exception { ).toBlocking().single(); client = DefaultReactiveSocket - .fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace()); + .fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), Throwable::printStackTrace); client.startAndWait(); } diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java index ba85f1fdf..8b0ad0aac 100644 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java @@ -35,11 +35,15 @@ public class Ping { public static void main(String... args) throws Exception { + NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); + Publisher publisher = ClientTcpDuplexConnection - .create(InetSocketAddress.createUnresolved("localhost", 7878), new NioEventLoopGroup(1)); + .create(InetSocketAddress.createUnresolved("localhost", 7878), eventLoopGroup); ClientTcpDuplexConnection duplexConnection = RxReactiveStreams.toObservable(publisher).toBlocking().last(); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace()); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8"); + ReactiveSocket reactiveSocket = + DefaultReactiveSocket.fromClientConnection(duplexConnection, setupPayload, Throwable::printStackTrace); reactiveSocket.startAndWait(); @@ -80,13 +84,13 @@ public ByteBuffer getMetadata() { .toObservable( reactiveSocket .requestResponse(keyPayload)) - .doOnError(t -> t.printStackTrace()) + .doOnError(Throwable::printStackTrace) .doOnNext(s -> { long diff = System.nanoTime() - start; histogram.recordValue(diff); }); }, 16) - .doOnError(t -> t.printStackTrace()) + .doOnError(Throwable::printStackTrace) .subscribe(new Subscriber() { @Override public void onCompleted() { diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java index e60511fe4..c118674cf 100644 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java @@ -42,113 +42,68 @@ public static void main(String... args) throws Exception { Random r = new Random(); r.nextBytes(response); - ReactiveSocketServerHandler serverHandler = - ReactiveSocketServerHandler.create((setupPayload, rs) -> new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return new Publisher() { - @Override - public void subscribe(Subscriber s) { - Payload responsePayload = new Payload() { - ByteBuffer data = ByteBuffer.wrap(response); - ByteBuffer metadata = ByteBuffer.allocate(0); - - public ByteBuffer getData() { - return data; - } - - @Override - public ByteBuffer getMetadata() { - return metadata; - } - }; + RequestHandler requestHandler = new RequestHandler() { + @Override + public Publisher handleRequestResponse(Payload payload) { + return subscriber -> { + Payload responsePayload = new Payload() { + ByteBuffer data = ByteBuffer.wrap(response); + ByteBuffer metadata = ByteBuffer.allocate(0); + + public ByteBuffer getData() { + return data; + } - s.onNext(responsePayload); - s.onComplete(); + @Override + public ByteBuffer getMetadata() { + return metadata; } }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response = - TestUtil.utf8EncodedPayload("hello world", "metadata"); - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = - TestUtil.utf8EncodedPayload("hello world", "metadata"); - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - Observable observable = - RxReactiveStreams - .toObservable(inputs) - .map(input -> input); - return RxReactiveStreams.toPublisher(observable); -// return outputSubscriber -> { -// inputs.subscribe(new Subscriber() { -// private int count = 0; -// private boolean completed = false; -// -// @Override -// public void onSubscribe(Subscription s) { -// //outputSubscriber.onSubscribe(s); -// s.request(128); -// } -// -// @Override -// public void onNext(Payload input) { -// if (completed) { -// return; -// } -// count += 1; -// outputSubscriber.onNext(input); -// outputSubscriber.onNext(input); -// if (count > 10) { -// completed = true; -// outputSubscriber.onComplete(); -// } -// } -// -// @Override -// public void onError(Throwable t) { -// if (!completed) { -// outputSubscriber.onError(t); -// } -// } -// -// @Override -// public void onComplete() { -// if (!completed) { -// outputSubscriber.onComplete(); -// } -// } -// }); -// }; - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }); + subscriber.onNext(responsePayload); + subscriber.onComplete(); + }; + } + + @Override + public Publisher handleRequestStream(Payload payload) { + Payload response = + TestUtil.utf8EncodedPayload("hello world", "metadata"); + return RxReactiveStreams + .toPublisher(Observable + .range(1, 10) + .map(i -> response)); + } + + @Override + public Publisher handleSubscription(Payload payload) { + Payload response = + TestUtil.utf8EncodedPayload("hello world", "metadata"); + return RxReactiveStreams + .toPublisher(Observable + .range(1, 10) + .map(i -> response)); + } + + @Override + public Publisher handleFireAndForget(Payload payload) { + return Subscriber::onComplete; + } + + @Override + public Publisher handleChannel(Payload initialPayload, Publisher inputs) { + Observable observable = + RxReactiveStreams + .toObservable(inputs) + .map(input -> input); + return RxReactiveStreams.toPublisher(observable); + } + + @Override + public Publisher handleMetadataPush(Payload payload) { + return null; + } + }; EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); @@ -161,12 +116,13 @@ public Publisher handleMetadataPush(Payload payload) { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); + ReactiveSocketServerHandler serverHandler = + ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler); pipeline.addLast(serverHandler); } }); Channel localhost = b.bind("localhost", 7878).sync().channel(); localhost.closeFuture().sync(); - } } diff --git a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java index 276c3280a..7c2d4bb67 100644 --- a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java +++ b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java @@ -27,24 +27,23 @@ import io.reactivesocket.LeaseGovernor; import io.reactivesocket.ReactiveSocket; import io.reactivesocket.transport.tcp.MutableDirectByteBuf; +import io.reactivesocket.transport.tcp.server.ServerTcpDuplexConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; -@ChannelHandler.Sharable public class ReactiveSocketServerHandler extends SimpleChannelInboundHandler { - private Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class); - - private ConcurrentHashMap duplexConnections = new ConcurrentHashMap<>(); + private static final Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class); private ConnectionSetupHandler setupHandler; - private LeaseGovernor leaseGovernor; + private ServerWebSocketDuplexConnection connection; protected ReactiveSocketServerHandler(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { this.setupHandler = setupHandler; this.leaseGovernor = leaseGovernor; + this.connection = null; } public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler) { @@ -52,11 +51,16 @@ public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHan } public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { - return new - ReactiveSocketServerHandler( - setupHandler, - leaseGovernor); + return new ReactiveSocketServerHandler(setupHandler, leaseGovernor); + } + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + connection = new ServerWebSocketDuplexConnection(ctx); + ReactiveSocket reactiveSocket = + DefaultReactiveSocket.fromServerConnection(connection, setupHandler, leaseGovernor, Throwable::printStackTrace); + // Note: No blocking code here (still it should be refactored) + reactiveSocket.startAndWait(); } @Override @@ -64,18 +68,9 @@ protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) ByteBuf content = msg.content(); MutableDirectByteBuf mutableDirectByteBuf = new MutableDirectByteBuf(content); Frame from = Frame.from(mutableDirectByteBuf, 0, mutableDirectByteBuf.capacity()); - channelRegistered(ctx); - ServerWebSocketDuplexConnection connection = duplexConnections.computeIfAbsent(ctx.channel().id(), i -> { - System.out.println("No connection found for channel id: " + i); - ServerWebSocketDuplexConnection c = new ServerWebSocketDuplexConnection(ctx); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromServerConnection(c, setupHandler, leaseGovernor, throwable -> throwable.printStackTrace()); - reactiveSocket.startAndWait(); - return c; - }); + if (connection != null) { - connection - .getSubscribers() - .forEach(o -> o.onNext(from)); + connection.getSubscribers().forEach(o -> o.onNext(from)); } } diff --git a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java index c2fe4864f..8da8eafe3 100644 --- a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java +++ b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java @@ -35,10 +35,16 @@ public class Ping { public static void main(String... args) throws Exception { - Publisher publisher = ClientWebSocketDuplexConnection.create(InetSocketAddress.createUnresolved("localhost", 8025), "/rs", new NioEventLoopGroup(1)); + NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); - ClientWebSocketDuplexConnection duplexConnection = RxReactiveStreams.toObservable(publisher).toBlocking().last(); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace()); + Publisher publisher = + ClientWebSocketDuplexConnection.create(InetSocketAddress.createUnresolved("localhost", 8025), "/rs", eventLoopGroup); + + ClientWebSocketDuplexConnection duplexConnection = + RxReactiveStreams.toObservable(publisher).toBlocking().last(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8"); + ReactiveSocket reactiveSocket = + DefaultReactiveSocket.fromClientConnection(duplexConnection, setupPayload, Throwable::printStackTrace); reactiveSocket.startAndWait(); diff --git a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Pong.java b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Pong.java index 370bccf49..51e870572 100644 --- a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Pong.java +++ b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Pong.java @@ -45,113 +45,68 @@ public static void main(String... args) throws Exception { Random r = new Random(); r.nextBytes(response); - ReactiveSocketServerHandler serverHandler = - ReactiveSocketServerHandler.create((setupPayload, rs) -> new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return new Publisher() { + RequestHandler requestHandler = new RequestHandler() { + @Override + public Publisher handleRequestResponse(Payload payload) { + return subscriber -> { + Payload responsePayload = new Payload() { + ByteBuffer data = ByteBuffer.wrap(response); + ByteBuffer metadata = ByteBuffer.allocate(0); + + public ByteBuffer getData() { + return data; + } + @Override - public void subscribe(Subscriber s) { - Payload responsePayload = new Payload() { - ByteBuffer data = ByteBuffer.wrap(response); - ByteBuffer metadata = ByteBuffer.allocate(0); - - public ByteBuffer getData() { - return data; - } - - @Override - public ByteBuffer getMetadata() { - return metadata; - } - }; - - s.onNext(responsePayload); - s.onComplete(); + public ByteBuffer getMetadata() { + return metadata; } }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response = - TestUtil.utf8EncodedPayload("hello world", "metadata"); - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = - TestUtil.utf8EncodedPayload("hello world", "metadata"); - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - Observable observable = - RxReactiveStreams - .toObservable(inputs) - .map(input -> input); - return RxReactiveStreams.toPublisher(observable); - -// return outputSubscriber -> { -// inputs.subscribe(new Subscriber() { -// private int count = 0; -// private boolean completed = false; -// -// @Override -// public void onSubscribe(Subscription s) { -// //outputSubscriber.onSubscribe(s); -// s.request(128); -// } -// -// @Override -// public void onNext(Payload input) { -// if (completed) { -// return; -// } -// count += 1; -// outputSubscriber.onNext(input); -// outputSubscriber.onNext(input); -// if (count > 10) { -// completed = true; -// outputSubscriber.onComplete(); -// } -// } -// -// @Override -// public void onError(Throwable t) { -// if (!completed) { -// outputSubscriber.onError(t); -// } -// } -// -// @Override -// public void onComplete() { -// if (!completed) { -// outputSubscriber.onComplete(); -// } -// } -// }); -// }; - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }); + subscriber.onNext(responsePayload); + subscriber.onComplete(); + }; + } + + @Override + public Publisher handleRequestStream(Payload payload) { + Payload response = + TestUtil.utf8EncodedPayload("hello world", "metadata"); + return RxReactiveStreams + .toPublisher(Observable + .range(1, 10) + .map(i -> response)); + } + + @Override + public Publisher handleSubscription(Payload payload) { + Payload response = + TestUtil.utf8EncodedPayload("hello world", "metadata"); + return RxReactiveStreams + .toPublisher(Observable + .range(1, 10) + .map(i -> response)); + } + + @Override + public Publisher handleFireAndForget(Payload payload) { + return Subscriber::onComplete; + } + + @Override + public Publisher handleChannel(Payload initialPayload, Publisher inputs) { + Observable observable = + RxReactiveStreams + .toObservable(inputs) + .map(input -> input); + return RxReactiveStreams.toPublisher(observable); + } + + @Override + public Publisher handleMetadataPush(Payload payload) { + return null; + } + }; EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); @@ -167,6 +122,8 @@ protected void initChannel(Channel ch) throws Exception { pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(64 * 1024)); pipeline.addLast(new WebSocketServerProtocolHandler("/rs")); + ReactiveSocketServerHandler serverHandler = + ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler); pipeline.addLast(serverHandler); } });