diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index 32111f5a3..5458b3ebb 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -228,28 +228,21 @@ public Mono start() { ackTimeout, missedAcks); - Mono wrappedRSocketClient = - Mono.just(rSocketClient).map(plugins::applyClient); - - DuplexConnection finalConnection = connection; - return wrappedRSocketClient.flatMap( - wrappedClientRSocket -> { - RSocket unwrappedServerSocket = acceptor.get().apply(wrappedClientRSocket); - - Mono wrappedRSocketServer = - Mono.just(unwrappedServerSocket).map(plugins::applyServer); - - return wrappedRSocketServer - .doOnNext( - rSocket -> - new RSocketServer( - multiplexer.asServerConnection(), - rSocket, - frameDecoder, - errorConsumer)) - .then(finalConnection.sendOne(setupFrame)) - .then(wrappedRSocketClient); - }); + RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient); + + RSocket unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient); + + RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket); + + RSocketServer rSocketServer = new RSocketServer( + multiplexer.asServerConnection(), + wrappedRSocketServer, + frameDecoder, + errorConsumer); + + return connection + .sendOne(setupFrame) + .thenReturn(wrappedRSocketClient); }); } } @@ -332,7 +325,7 @@ public Mono start() { }); } - private Mono processSetupFrame( + private Mono processSetupFrame( ClientServerInputMultiplexer multiplexer, Frame setupFrame) { int version = Frame.Setup.version(setupFrame); if (version != SetupFrameFlyweight.CURRENT_VERSION) { @@ -355,15 +348,20 @@ private Mono processSetupFrame( errorConsumer, StreamIdSupplier.serverSupplier()); - Mono wrappedRSocketClient = Mono.just(rSocketClient).map(plugins::applyClient); + RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient); - return wrappedRSocketClient - .flatMap( - sender -> acceptor.get().accept(setupPayload, sender).map(plugins::applyServer)) - .map( - handler -> - new RSocketServer( - multiplexer.asClientConnection(), handler, frameDecoder, errorConsumer)) + return acceptor + .get() + .accept(setupPayload, wrappedRSocketClient) + .doOnNext(unwrappedServerSocket -> { + RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket); + + RSocketServer rSocketServer = new RSocketServer( + multiplexer.asClientConnection(), + wrappedRSocketServer, + frameDecoder, + errorConsumer); + }) .then(); } }