diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java index 64db55df5..d82230552 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java @@ -72,9 +72,9 @@ protected void initChannel(SocketChannel ch) throws Exception { }).connect(address); connect.addListener(connectFuture -> { + s.onSubscribe(EmptySubscription.INSTANCE); if (connectFuture.isSuccess()) { Channel ch = connect.channel(); - s.onSubscribe(EmptySubscription.INSTANCE); s.onNext(new ClientTcpDuplexConnection(ch, subjects)); s.onComplete(); } else { diff --git a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java index 14be91f64..a1831955a 100644 --- a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java +++ b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java @@ -91,8 +91,8 @@ protected void initChannel(SocketChannel ch) throws Exception { clientHandler .getHandshakePromise() .addListener(handshakeFuture -> { + s.onSubscribe(EmptySubscription.INSTANCE); if (handshakeFuture.isSuccess()) { - s.onSubscribe(EmptySubscription.INSTANCE); s.onNext(new ClientWebSocketDuplexConnection(ch, subjects)); s.onComplete(); } else {