From 29e8f07d887ed5ba5ef0b5fdd9bb8885f9410912 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 10 Jun 2020 09:30:58 -0400 Subject: [PATCH] GH-3299: Fix client connectionId for TCP/NIO Resolves https://github.com/spring-projects/spring-integration/issues/3299 Connect before creating the `TcpNioConnection` object and publishing the `TcpConnectionOpenEvent`. This was a regression caused by supporting connect timout; which moved the connect to after the object was created and event published, causing the `connectionId` to start with `unknown`. **cherry-pick to 5.3.x, 5.2.x** --- .../TcpNioClientConnectionFactory.java | 28 +++++++++++-------- .../tcp/connection/TcpNioConnectionTests.java | 8 +++++- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java index fbcc3c20d36..fb923c5b31b 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java @@ -87,6 +87,7 @@ protected TcpConnectionSupport buildNewConnection() { try { SocketChannel socketChannel = SocketChannel.open(); setSocketAttributes(socketChannel.socket()); + connect(socketChannel); TcpNioConnection connection = this.tcpNioConnectionSupport.createNewConnection(socketChannel, false, isLookupHost(), getApplicationEventPublisher(), getComponentName()); @@ -98,18 +99,6 @@ protected TcpConnectionSupport buildNewConnection() { } TcpConnectionSupport wrappedConnection = wrapConnection(connection); initializeConnection(wrappedConnection, socketChannel.socket()); - socketChannel.configureBlocking(false); - socketChannel.connect(new InetSocketAddress(getHost(), getPort())); - boolean connected = socketChannel.finishConnect(); - long timeLeft = getConnectTimeout().toMillis(); - while (!connected && timeLeft > 0) { - Thread.sleep(50); // NOSONAR Magic # - connected = socketChannel.finishConnect(); - timeLeft -= 50; // NOSONAR Magic # - } - if (!connected) { - throw new IOException("Not connected after connectTimeout"); - } if (getSoTimeout() > 0) { connection.setLastRead(System.currentTimeMillis()); } @@ -127,6 +116,21 @@ protected TcpConnectionSupport buildNewConnection() { } } + private void connect(SocketChannel socketChannel) throws IOException, InterruptedException { + socketChannel.configureBlocking(false); + socketChannel.connect(new InetSocketAddress(getHost(), getPort())); + boolean connected = socketChannel.finishConnect(); + long timeLeft = getConnectTimeout().toMillis(); + while (!connected && timeLeft > 0) { + Thread.sleep(50); // NOSONAR Magic # + connected = socketChannel.finishConnect(); + timeLeft -= 50; // NOSONAR Magic # + } + if (!connected) { + throw new IOException("Not connected after connectTimeout"); + } + } + /** * When set to true, connections created by this factory attempt * to use direct buffers where possible. diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java index ff274dc6e2f..4d081f0645e 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java @@ -139,7 +139,12 @@ public void testWriteTimeout() throws Exception { assertThat(latch.await(10000, TimeUnit.MILLISECONDS)).isTrue(); TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory("localhost", serverSocket.get().getLocalPort()); - factory.setApplicationEventPublisher(nullPublisher); + AtomicReference connectionId = new AtomicReference<>(); + factory.setApplicationEventPublisher(event -> { + if (event instanceof TcpConnectionOpenEvent) { + connectionId.set(((TcpConnectionOpenEvent) event).getConnectionId()); + } + }); factory.setSoTimeout(100); factory.start(); try { @@ -154,6 +159,7 @@ public void testWriteTimeout() throws Exception { done.countDown(); factory.stop(); serverSocket.get().close(); + assertThat(connectionId.get()).startsWith("localhost"); } @Test