From 953e64f82dc688b0085557cbe83fbd48b7866364 Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Tue, 21 Jun 2016 17:25:13 -0700 Subject: [PATCH] Fix subscription in DuplexConnection. **Problem** A Publisher may not have been initialized when a DuplexConnection failed during the connection establishement. (e.g. connection reset by peer) **Solution** Do the subscription as soon as Netty complete, regarding of the result of the ConnectFuture. --- .../transport/tcp/client/ClientTcpDuplexConnection.java | 2 +- .../websocket/client/ClientWebSocketDuplexConnection.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java index 64db55df5..d82230552 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java @@ -72,9 +72,9 @@ protected void initChannel(SocketChannel ch) throws Exception { }).connect(address); connect.addListener(connectFuture -> { + s.onSubscribe(EmptySubscription.INSTANCE); if (connectFuture.isSuccess()) { Channel ch = connect.channel(); - s.onSubscribe(EmptySubscription.INSTANCE); s.onNext(new ClientTcpDuplexConnection(ch, subjects)); s.onComplete(); } else { diff --git a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java index 14be91f64..a1831955a 100644 --- a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java +++ b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/client/ClientWebSocketDuplexConnection.java @@ -91,8 +91,8 @@ protected void initChannel(SocketChannel ch) throws Exception { clientHandler .getHandshakePromise() .addListener(handshakeFuture -> { + s.onSubscribe(EmptySubscription.INSTANCE); if (handshakeFuture.isSuccess()) { - s.onSubscribe(EmptySubscription.INSTANCE); s.onNext(new ClientWebSocketDuplexConnection(ch, subjects)); s.onComplete(); } else {