From 1c9d5504b58424c2e4d681c348c23431dabb7b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rio=20Dias?= Date: Fri, 12 Mar 2021 15:33:11 +0000 Subject: [PATCH 1/3] GH-3509: Register TcpSenders on wrapped connection Fixes https://github.com/spring-projects/spring-integration/issues/3509 Dead connections are not being removed on `TcpSender` when using a `TcpConnectionInterceptor` May cause a memory leak --- .../TcpConnectionInterceptorSupport.java | 9 ++++- .../tcp/AbstractTcpChannelAdapterTests.java | 8 +++- .../ip/tcp/TcpSendingMessageHandlerTests.java | 40 ++++++++++++++++++- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java index bad62d5295b..305bbce2b33 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.ip.tcp.connection; +import java.util.List; + import javax.net.ssl.SSLSession; import org.springframework.context.ApplicationEventPublisher; @@ -96,6 +98,11 @@ public void registerSender(TcpSender sender) { this.theConnection.registerSender(this); } + @Override + public void registerSenders(List sendersToRegister) { + this.theConnection.registerSenders(sendersToRegister); + } + @Override public String getConnectionId() { return this.theConnection.getConnectionId(); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java index 505f9ebc88d..4a29c6e5f01 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2019 the original author or authors. + * Copyright 2013-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,8 +42,12 @@ public void publishEvent(Object event) { }; protected HelloWorldInterceptorFactory newInterceptorFactory() { + return newInterceptorFactory(NOOP_PUBLISHER); + } + + protected HelloWorldInterceptorFactory newInterceptorFactory(ApplicationEventPublisher applicationEventPublisher) { HelloWorldInterceptorFactory factory = new HelloWorldInterceptorFactory(); - factory.setApplicationEventPublisher(NOOP_PUBLISHER); + factory.setApplicationEventPublisher(applicationEventPublisher); return factory; } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java index 06726c77fee..e3c9db20990 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,6 +47,7 @@ import org.mockito.Mockito; import org.springframework.beans.factory.BeanFactory; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.core.serializer.DefaultDeserializer; @@ -57,9 +59,11 @@ import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain; import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory; import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer; import org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer; @@ -1191,4 +1195,38 @@ public void testConnectionException() throws Exception { } } + @Test + public void testInterceptedCleanup() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); + scf.setSerializer(serializer); + scf.setDeserializer(serializer); + TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); + adapter.setConnectionFactory(scf); + TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); + handler.setConnectionFactory(scf); + scf.setApplicationEventPublisher(new ApplicationEventPublisher() { + + @Override + public void publishEvent(Object event) { + if (event instanceof TcpConnectionCloseEvent) { + latch.countDown(); + } + } + }); + TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain(); + fc.setInterceptors(new TcpConnectionInterceptorFactory[] { + newInterceptorFactory(scf.getApplicationEventPublisher()), + }); + scf.setInterceptorFactoryChain(fc); + scf.start(); + TestingUtilities.waitListening(scf, null); + int port = scf.getPort(); + Socket socket = SocketFactory.getDefault().createSocket("localhost", port); + socket.close(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(handler.getConnections().isEmpty()).isTrue(); + scf.stop(); + } } From e6626322cd4181438dfe90d5f042f87605b770f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rio=20Dias?= Date: Tue, 16 Mar 2021 12:18:31 +0000 Subject: [PATCH 2/3] Register TcpSenders on delegated TcpConnection with interceptor instance --- .../TcpConnectionInterceptorSupport.java | 2 +- .../tcp/connection/TcpConnectionSupport.java | 7 +++- .../ip/tcp/TcpSendingMessageHandlerTests.java | 38 ++++++++++++++++++- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java index 6a6ac5ce0e7..0ccbf6c86df 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java @@ -101,7 +101,7 @@ public void registerSender(TcpSender sender) { @Override public void registerSenders(List sendersToRegister) { - this.theConnection.registerSenders(sendersToRegister); + this.theConnection.registerSenders(sendersToRegister, this); } @Override diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java index aede7f50178..6f137d01800 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java @@ -48,6 +48,7 @@ * * @author Gary Russell * @author Artem Bilan + * @author Mário Dias * * @since 2.0 * @@ -316,9 +317,13 @@ public void registerSender(@Nullable TcpSender senderToRegister) { * @since 5.4 */ public void registerSenders(List sendersToRegister) { + registerSenders(sendersToRegister, this); + } + + protected final void registerSenders(List sendersToRegister, TcpConnection connection) { this.senders.addAll(sendersToRegister); for (TcpSender sender : sendersToRegister) { - sender.addNewConnection(this); + sender.addNewConnection(connection); } } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java index c0b15cfed2c..a2e391b71fd 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java @@ -58,9 +58,12 @@ import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.HelloWorldInterceptor; +import org.springframework.integration.ip.tcp.connection.TcpConnection; import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain; +import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent; import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory; @@ -80,7 +83,7 @@ /** * @author Gary Russell * @author Artem Bilan - * @author Mario Dias + * @author Mário Dias * * @since 2.0 */ @@ -1189,6 +1192,39 @@ public void testConnectionException() throws Exception { } } + @Test + public void testInterceptedConnection() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); + scf.setSerializer(serializer); + scf.setDeserializer(serializer); + TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); + adapter.setConnectionFactory(scf); + TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); + handler.setConnectionFactory(scf); + final AtomicReference connection = new AtomicReference<>(); + scf.setApplicationEventPublisher(event -> { + if (event instanceof TcpConnectionOpenEvent) { + connection.set(handler.getConnections() + .get(((TcpConnectionOpenEvent) event).getConnectionId())); + latch.countDown(); + } + }); + TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain(); + fc.setInterceptor(newInterceptorFactory(scf.getApplicationEventPublisher())); + scf.setInterceptorFactoryChain(fc); + scf.start(); + TestingUtilities.waitListening(scf, null); + int port = scf.getPort(); + Socket socket = SocketFactory.getDefault().createSocket("localhost", port); + socket.close(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(connection.get() instanceof HelloWorldInterceptor) + .as("Expected HelloWorldInterceptor, got " + connection.get().getClass().getSimpleName()).isTrue(); + scf.stop(); + } + @Test public void testInterceptedCleanup() throws Exception { final CountDownLatch latch = new CountDownLatch(1); From e9f56c7ed43c25d7287eeb9de5f0372965f8caa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rio=20Dias?= Date: Tue, 16 Mar 2021 14:20:39 +0000 Subject: [PATCH 3/3] use Assert.isInstanceOf() in TcpSendingMessageHandlerTests.testInterceptedConnection() --- .../integration/ip/tcp/TcpSendingMessageHandlerTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java index a2e391b71fd..e74c17da748 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java @@ -1220,8 +1220,7 @@ public void testInterceptedConnection() throws Exception { Socket socket = SocketFactory.getDefault().createSocket("localhost", port); socket.close(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(connection.get() instanceof HelloWorldInterceptor) - .as("Expected HelloWorldInterceptor, got " + connection.get().getClass().getSimpleName()).isTrue(); + assertThat(connection.get()).isInstanceOf(HelloWorldInterceptor.class); scf.stop(); }