diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java index 6a6ac5ce0e7..0ccbf6c86df 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java @@ -101,7 +101,7 @@ public void registerSender(TcpSender sender) { @Override public void registerSenders(List sendersToRegister) { - this.theConnection.registerSenders(sendersToRegister); + this.theConnection.registerSenders(sendersToRegister, this); } @Override diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java index aede7f50178..6f137d01800 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java @@ -48,6 +48,7 @@ * * @author Gary Russell * @author Artem Bilan + * @author Mário Dias * * @since 2.0 * @@ -316,9 +317,13 @@ public void registerSender(@Nullable TcpSender senderToRegister) { * @since 5.4 */ public void registerSenders(List sendersToRegister) { + registerSenders(sendersToRegister, this); + } + + protected final void registerSenders(List sendersToRegister, TcpConnection connection) { this.senders.addAll(sendersToRegister); for (TcpSender sender : sendersToRegister) { - sender.addNewConnection(this); + sender.addNewConnection(connection); } } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java index c0b15cfed2c..e74c17da748 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java @@ -58,9 +58,12 @@ import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.HelloWorldInterceptor; +import org.springframework.integration.ip.tcp.connection.TcpConnection; import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain; +import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent; import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory; @@ -80,7 +83,7 @@ /** * @author Gary Russell * @author Artem Bilan - * @author Mario Dias + * @author Mário Dias * * @since 2.0 */ @@ -1189,6 +1192,38 @@ public void testConnectionException() throws Exception { } } + @Test + public void testInterceptedConnection() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); + scf.setSerializer(serializer); + scf.setDeserializer(serializer); + TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); + adapter.setConnectionFactory(scf); + TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); + handler.setConnectionFactory(scf); + final AtomicReference connection = new AtomicReference<>(); + scf.setApplicationEventPublisher(event -> { + if (event instanceof TcpConnectionOpenEvent) { + connection.set(handler.getConnections() + .get(((TcpConnectionOpenEvent) event).getConnectionId())); + latch.countDown(); + } + }); + TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain(); + fc.setInterceptor(newInterceptorFactory(scf.getApplicationEventPublisher())); + scf.setInterceptorFactoryChain(fc); + scf.start(); + TestingUtilities.waitListening(scf, null); + int port = scf.getPort(); + Socket socket = SocketFactory.getDefault().createSocket("localhost", port); + socket.close(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(connection.get()).isInstanceOf(HelloWorldInterceptor.class); + scf.stop(); + } + @Test public void testInterceptedCleanup() throws Exception { final CountDownLatch latch = new CountDownLatch(1);