diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java index 8bece5b40..576f71b76 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java @@ -45,10 +45,10 @@ public Publisher connect(SocketAddress address) { Publisher connection = ClientTcpDuplexConnection.create(address, eventLoopGroup); - return s -> connection.subscribe(new Subscriber() { + return subscriber -> connection.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { - s.request(1); + subscriber.onSubscribe(s); } @Override @@ -58,21 +58,20 @@ public void onNext(ClientTcpDuplexConnection connection) { reactiveSocket.start(new Completable() { @Override public void success() { - s.onSubscribe(EmptySubscription.INSTANCE); - s.onNext(reactiveSocket); - s.onComplete(); + subscriber.onNext(reactiveSocket); + subscriber.onComplete(); } @Override public void error(Throwable e) { - s.onError(e); + subscriber.onError(e); } }); } @Override public void onError(Throwable t) { - s.onError(t); + subscriber.onError(t); } @Override diff --git a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/WebSocketReactiveSocketConnector.java b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/WebSocketReactiveSocketConnector.java index 1a178d5ea..a8cf5d6db 100644 --- a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/WebSocketReactiveSocketConnector.java +++ b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/WebSocketReactiveSocketConnector.java @@ -53,10 +53,10 @@ public Publisher connect(SocketAddress address) { Publisher connection = ClientWebSocketDuplexConnection.create((InetSocketAddress)address, path, eventLoopGroup); - return s -> connection.subscribe(new Subscriber() { + return subscriber -> connection.subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { - s.request(1); + subscriber.onSubscribe(s); } @Override @@ -65,26 +65,25 @@ public void onNext(ClientWebSocketDuplexConnection connection) { reactiveSocket.start(new Completable() { @Override public void success() { - s.onSubscribe(EmptySubscription.INSTANCE); - s.onNext(reactiveSocket); - s.onComplete(); + subscriber.onNext(reactiveSocket); + subscriber.onComplete(); } @Override public void error(Throwable e) { - s.onError(e); + subscriber.onError(e); } }); } @Override public void onError(Throwable t) { - s.onError(t); + subscriber.onError(t); } @Override public void onComplete() { - s.onComplete(); + subscriber.onComplete(); } }); } else {