From 1d31b1c78fd9b67bf5272b664376f281e3f187ff Mon Sep 17 00:00:00 2001 From: Ryland Degnan Date: Mon, 7 May 2018 11:54:54 -0700 Subject: [PATCH 1/2] Remove nested flatMap in RSocketFactory --- .../main/java/io/rsocket/RSocketFactory.java | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index 32111f5a3..c65cc81c8 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -228,28 +228,21 @@ public Mono start() { ackTimeout, missedAcks); - Mono wrappedRSocketClient = - Mono.just(rSocketClient).map(plugins::applyClient); - - DuplexConnection finalConnection = connection; - return wrappedRSocketClient.flatMap( - wrappedClientRSocket -> { - RSocket unwrappedServerSocket = acceptor.get().apply(wrappedClientRSocket); - - Mono wrappedRSocketServer = - Mono.just(unwrappedServerSocket).map(plugins::applyServer); - - return wrappedRSocketServer - .doOnNext( - rSocket -> - new RSocketServer( - multiplexer.asServerConnection(), - rSocket, - frameDecoder, - errorConsumer)) - .then(finalConnection.sendOne(setupFrame)) - .then(wrappedRSocketClient); - }); + RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient); + + RSocket unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient); + + RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket); + + RSocketServer rSocketServer = new RSocketServer( + multiplexer.asServerConnection(), + wrappedRSocketServer, + frameDecoder, + errorConsumer); + + return connection + .sendOne(setupFrame) + .thenReturn(wrappedRSocketClient); }); } } From 6d98a5edbea6cd76020159282e7dff3f3478f8aa Mon Sep 17 00:00:00 2001 From: Ryland Degnan Date: Mon, 7 May 2018 13:30:26 -0700 Subject: [PATCH 2/2] Remove flatMap in ServerRSocketFactory --- .../main/java/io/rsocket/RSocketFactory.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index c65cc81c8..5458b3ebb 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -325,7 +325,7 @@ public Mono start() { }); } - private Mono processSetupFrame( + private Mono processSetupFrame( ClientServerInputMultiplexer multiplexer, Frame setupFrame) { int version = Frame.Setup.version(setupFrame); if (version != SetupFrameFlyweight.CURRENT_VERSION) { @@ -348,15 +348,20 @@ private Mono processSetupFrame( errorConsumer, StreamIdSupplier.serverSupplier()); - Mono wrappedRSocketClient = Mono.just(rSocketClient).map(plugins::applyClient); + RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient); - return wrappedRSocketClient - .flatMap( - sender -> acceptor.get().accept(setupPayload, sender).map(plugins::applyServer)) - .map( - handler -> - new RSocketServer( - multiplexer.asClientConnection(), handler, frameDecoder, errorConsumer)) + return acceptor + .get() + .accept(setupPayload, wrappedRSocketClient) + .doOnNext(unwrappedServerSocket -> { + RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket); + + RSocketServer rSocketServer = new RSocketServer( + multiplexer.asClientConnection(), + wrappedRSocketServer, + frameDecoder, + errorConsumer); + }) .then(); } }