From 1e67a94af95ec63bcad1025209234cf41197bd99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rio=20Dias?= Date: Fri, 12 Mar 2021 15:33:11 +0000 Subject: [PATCH] GH-3509 Register TcpSenders on wrapped connection Fixes https://github.com/spring-projects/spring-integration/issues/3509 Dead connections are not being removed on `TcpSender` when using a `TcpConnectionInterceptor`. May cause a memory leak * Fix `TcpConnectionInterceptorSupport` to override `registerSenders()` and delegate to the `this.theConnection` * Introduce vararg-based `setInterceptor()` into `TcpConnectionInterceptorFactoryChain` * Remove redundant `//NOSONAR` and properly return an `Arrays.copyOf()` in the `getInterceptorFactories()` **Cherry-pick to `5.4.x`** --- .../TcpConnectionInterceptorFactoryChain.java | 12 +- .../TcpConnectionInterceptorSupport.java | 10 +- .../tcp/AbstractTcpChannelAdapterTests.java | 26 ++-- .../ip/tcp/TcpSendingMessageHandlerTests.java | 111 +++++++++++------- 4 files changed, 98 insertions(+), 61 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorFactoryChain.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorFactoryChain.java index 10251676d71..a253604a32a 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorFactoryChain.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorFactoryChain.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ /** * @author Gary Russell + * @author Artem Bilan + * * @since 2.0 * */ @@ -31,11 +33,17 @@ public class TcpConnectionInterceptorFactoryChain { @Nullable public TcpConnectionInterceptorFactory[] getInterceptorFactories() { - return this.interceptorFactories; //NOSONAR + return this.interceptorFactories != null + ? Arrays.copyOf(this.interceptorFactories, this.interceptorFactories.length) + : null; } public void setInterceptors(TcpConnectionInterceptorFactory[] interceptorFactories) { this.interceptorFactories = Arrays.copyOf(interceptorFactories, interceptorFactories.length); } + public void setInterceptor(TcpConnectionInterceptorFactory... interceptorFactories) { + setInterceptors(interceptorFactories); + } + } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java index bad62d5295b..6a6ac5ce0e7 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.ip.tcp.connection; +import java.util.List; + import javax.net.ssl.SSLSession; import org.springframework.context.ApplicationEventPublisher; @@ -29,6 +31,7 @@ * to the underlying {@link TcpConnection}. * * @author Gary Russell + * @author Mário Dias * * @since 2.0 */ @@ -96,6 +99,11 @@ public void registerSender(TcpSender sender) { this.theConnection.registerSender(this); } + @Override + public void registerSenders(List sendersToRegister) { + this.theConnection.registerSenders(sendersToRegister); + } + @Override public String getConnectionId() { return this.theConnection.getConnectionId(); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java index 505f9ebc88d..a499df9431c 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2019 the original author or authors. + * Copyright 2013-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,34 +16,29 @@ package org.springframework.integration.ip.tcp; -import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; import org.springframework.integration.ip.tcp.connection.HelloWorldInterceptorFactory; /** * @author Gary Russell + * @author Mário Dias + * @author Artem Bilan + * * @since 3.0 * */ public class AbstractTcpChannelAdapterTests { - private static final ApplicationEventPublisher NOOP_PUBLISHER = new ApplicationEventPublisher() { - - @Override - public void publishEvent(ApplicationEvent event) { - } - - @Override - public void publishEvent(Object event) { - - } - - }; + private static final ApplicationEventPublisher NOOP_PUBLISHER = event -> { }; protected HelloWorldInterceptorFactory newInterceptorFactory() { + return newInterceptorFactory(NOOP_PUBLISHER); + } + + protected HelloWorldInterceptorFactory newInterceptorFactory(ApplicationEventPublisher applicationEventPublisher) { HelloWorldInterceptorFactory factory = new HelloWorldInterceptorFactory(); - factory.setApplicationEventPublisher(NOOP_PUBLISHER); + factory.setApplicationEventPublisher(applicationEventPublisher); return factory; } @@ -51,5 +46,4 @@ protected void noopPublisher(AbstractConnectionFactory connectionFactory) { connectionFactory.setApplicationEventPublisher(NOOP_PUBLISHER); } - } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java index 06726c77fee..ed82300949d 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java @@ -39,10 +39,11 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.springframework.beans.factory.BeanFactory; @@ -57,9 +58,11 @@ import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain; import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory; import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer; import org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer; @@ -77,6 +80,7 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Mario Dias * * @since 2.0 */ @@ -94,7 +98,7 @@ private void readFully(InputStream is, byte[] buff) throws IOException { @Test public void testNetCrLf() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -118,8 +122,8 @@ public void testNetCrLf() throws Exception { } }); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - AbstractConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", - serverSocket.get().getLocalPort()); + AbstractConnectionFactory ccf = + new TcpNetClientConnectionFactory("localhost", serverSocket.get().getLocalPort()); noopPublisher(ccf); ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); ccf.setSerializer(serializer); @@ -147,7 +151,7 @@ public void testNetCrLf() throws Exception { @Test public void testNetCrLfClientMode() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -213,7 +217,7 @@ public void testNetCrLfClientMode() throws Exception { @Test public void testNioCrLf() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -253,7 +257,7 @@ public void testNioCrLf() throws Exception { adapter.setOutputChannel(channel); handler.handleMessage(MessageBuilder.withPayload("Test").build()); handler.handleMessage(MessageBuilder.withPayload("Test").build()); - Set results = new HashSet(); + Set results = new HashSet<>(); Message mOut = channel.receive(10000); assertThat(mOut).isNotNull(); results.add(new String((byte[]) mOut.getPayload())); @@ -269,7 +273,7 @@ public void testNioCrLf() throws Exception { @Test public void testNetStxEtx() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -322,7 +326,7 @@ public void testNetStxEtx() throws Exception { @Test public void testNioStxEtx() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -362,7 +366,7 @@ public void testNioStxEtx() throws Exception { adapter.setOutputChannel(channel); handler.handleMessage(MessageBuilder.withPayload("Test").build()); handler.handleMessage(MessageBuilder.withPayload("Test").build()); - Set results = new HashSet(); + Set results = new HashSet<>(); Message mOut = channel.receive(10000); assertThat(mOut).isNotNull(); results.add(new String((byte[]) mOut.getPayload())); @@ -434,7 +438,7 @@ public void testNetLength() throws Exception { @Test public void testNioLength() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -477,7 +481,7 @@ public void testNioLength() throws Exception { adapter.setOutputChannel(channel); handler.handleMessage(MessageBuilder.withPayload("Test").build()); handler.handleMessage(MessageBuilder.withPayload("Test").build()); - Set results = new HashSet(); + Set results = new HashSet<>(); Message mOut = channel.receive(10000); assertThat(mOut).isNotNull(); results.add(new String((byte[]) mOut.getPayload())); @@ -493,7 +497,7 @@ public void testNioLength() throws Exception { @Test public void testNetSerial() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -584,7 +588,7 @@ public void testNioSerial() throws Exception { adapter.setOutputChannel(channel); handler.handleMessage(MessageBuilder.withPayload("Test").build()); handler.handleMessage(MessageBuilder.withPayload("Test").build()); - Set results = new HashSet(); + Set results = new HashSet<>(); Message mOut = channel.receive(10000); assertThat(mOut).isNotNull(); results.add((String) mOut.getPayload()); @@ -600,7 +604,7 @@ public void testNioSerial() throws Exception { @Test public void testNetSingleUseNoInbound() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final Semaphore semaphore = new Semaphore(0); final AtomicBoolean done = new AtomicBoolean(); @@ -647,7 +651,7 @@ public void testNetSingleUseNoInbound() throws Exception { @Test public void testNioSingleUseNoInbound() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final Semaphore semaphore = new Semaphore(0); final AtomicBoolean done = new AtomicBoolean(); @@ -694,7 +698,7 @@ public void testNioSingleUseNoInbound() throws Exception { @Test public void testNetSingleUseWithInbound() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final Semaphore semaphore = new Semaphore(0); final AtomicBoolean done = new AtomicBoolean(); @@ -739,7 +743,7 @@ public void testNetSingleUseWithInbound() throws Exception { handler.handleMessage(MessageBuilder.withPayload("Test").build()); handler.handleMessage(MessageBuilder.withPayload("Test").build()); assertThat(semaphore.tryAcquire(2, 10000, TimeUnit.MILLISECONDS)).isTrue(); - Set replies = new HashSet(); + Set replies = new HashSet<>(); for (int i = 0; i < 2; i++) { Message mOut = channel.receive(10000); assertThat(mOut).isNotNull(); @@ -754,7 +758,7 @@ public void testNetSingleUseWithInbound() throws Exception { @Test public void testNioSingleUseWithInbound() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final Semaphore semaphore = new Semaphore(0); final AtomicBoolean done = new AtomicBoolean(); @@ -799,7 +803,7 @@ public void testNioSingleUseWithInbound() throws Exception { handler.handleMessage(MessageBuilder.withPayload("Test").build()); handler.handleMessage(MessageBuilder.withPayload("Test").build()); assertThat(semaphore.tryAcquire(2, 10000, TimeUnit.MILLISECONDS)).isTrue(); - Set replies = new HashSet(); + Set replies = new HashSet<>(); for (int i = 0; i < 2; i++) { Message mOut = channel.receive(10000); assertThat(mOut).isNotNull(); @@ -814,11 +818,11 @@ public void testNioSingleUseWithInbound() throws Exception { @Test public void testNioSingleUseWithInboundMany() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final Semaphore semaphore = new Semaphore(0); final AtomicBoolean done = new AtomicBoolean(); - final List serverSockets = new ArrayList(); + final List serverSockets = new ArrayList<>(); this.executor.execute(() -> { try { ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(0, 100); @@ -900,7 +904,7 @@ public void testNioSingleUseWithInboundMany() throws Exception { @Test public void testNetNegotiate() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -912,7 +916,7 @@ public void testNetNegotiate() throws Exception { int i = 0; while (true) { ObjectInputStream ois = new ObjectInputStream(socket.getInputStream()); - Object in = null; + Object in; ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); if (i == 0) { in = ois.readObject(); @@ -944,7 +948,7 @@ public void testNetNegotiate() throws Exception { ccf.setDeserializer(new DefaultDeserializer()); ccf.setSoTimeout(10000); TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain(); - fc.setInterceptors(new TcpConnectionInterceptorFactory[] { + fc.setInterceptors(new TcpConnectionInterceptorFactory[]{ newInterceptorFactory(), newInterceptorFactory() }); @@ -971,7 +975,7 @@ public void testNetNegotiate() throws Exception { @Test public void testNioNegotiate() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -1011,7 +1015,7 @@ public void testNioNegotiate() throws Exception { ccf.setDeserializer(new DefaultDeserializer()); ccf.setSoTimeout(10000); TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain(); - fc.setInterceptors(new TcpConnectionInterceptorFactory[] { newInterceptorFactory() }); + fc.setInterceptors(new TcpConnectionInterceptorFactory[]{ newInterceptorFactory() }); ccf.setInterceptorFactoryChain(fc); ccf.start(); TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); @@ -1023,7 +1027,7 @@ public void testNioNegotiate() throws Exception { for (int i = 0; i < 1000; i++) { handler.handleMessage(MessageBuilder.withPayload("Test").build()); } - Set results = new TreeSet(); + Set results = new TreeSet<>(); for (int i = 0; i < 1000; i++) { Message mOut = channel.receive(10000); assertThat(mOut).isNotNull(); @@ -1040,7 +1044,7 @@ public void testNioNegotiate() throws Exception { @Test public void testNetNegotiateSingleNoListen() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -1080,10 +1084,7 @@ public void testNetNegotiateSingleNoListen() throws Exception { ccf.setDeserializer(new DefaultDeserializer()); ccf.setSoTimeout(10000); TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain(); - fc.setInterceptors(new TcpConnectionInterceptorFactory[] { - newInterceptorFactory(), - newInterceptorFactory() - }); + fc.setInterceptor(newInterceptorFactory(), newInterceptorFactory()); ccf.setInterceptorFactoryChain(fc); ccf.setSingleUse(true); ccf.start(); @@ -1097,7 +1098,7 @@ public void testNetNegotiateSingleNoListen() throws Exception { @Test public void testNioNegotiateSingleNoListen() throws Exception { - final AtomicReference serverSocket = new AtomicReference(); + final AtomicReference serverSocket = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean done = new AtomicBoolean(); this.executor.execute(() -> { @@ -1137,10 +1138,7 @@ public void testNioNegotiateSingleNoListen() throws Exception { ccf.setDeserializer(new DefaultDeserializer()); ccf.setSoTimeout(10000); TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain(); - fc.setInterceptors(new TcpConnectionInterceptorFactory[] { - newInterceptorFactory(), - newInterceptorFactory() - }); + fc.setInterceptor(newInterceptorFactory(), newInterceptorFactory()); ccf.setInterceptorFactoryChain(fc); ccf.setSingleUse(true); ccf.start(); @@ -1153,18 +1151,18 @@ public void testNioNegotiateSingleNoListen() throws Exception { } @Test - public void testOutboundChannelAdapterWithinChain() throws Exception { + public void testOutboundChannelAdapterWithinChain() { AbstractApplicationContext ctx = new ClassPathXmlApplicationContext( "TcpOutboundChannelAdapterWithinChainTests-context.xml", this.getClass()); AbstractServerConnectionFactory scf = ctx.getBean(AbstractServerConnectionFactory.class); TestingUtilities.waitListening(scf, null); ctx.getBean(AbstractClientConnectionFactory.class).setPort(scf.getPort()); - ctx.getBeansOfType(ConsumerEndpointFactoryBean.class).values().forEach(c -> c.start()); + ctx.getBeansOfType(ConsumerEndpointFactoryBean.class).values().forEach(ConsumerEndpointFactoryBean::start); MessageChannel channelAdapterWithinChain = ctx.getBean("tcpOutboundChannelAdapterWithinChain", MessageChannel.class); PollableChannel inbound = ctx.getBean("inbound", PollableChannel.class); String testPayload = "Hello, world!"; - channelAdapterWithinChain.send(new GenericMessage(testPayload)); + channelAdapterWithinChain.send(new GenericMessage<>(testPayload)); Message m = inbound.receive(1000); assertThat(m).isNotNull(); assertThat(new String((byte[]) m.getPayload())).isEqualTo(testPayload); @@ -1180,7 +1178,7 @@ public void testConnectionException() throws Exception { }).when(mockCcf).getConnection(); handler.setConnectionFactory(mockCcf); try { - handler.handleMessage(new GenericMessage("foo")); + handler.handleMessage(new GenericMessage<>("foo")); fail("Expected exception"); } catch (Exception e) { @@ -1191,4 +1189,33 @@ public void testConnectionException() throws Exception { } } + @Test + public void testInterceptedCleanup() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); + scf.setSerializer(serializer); + scf.setDeserializer(serializer); + TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); + adapter.setConnectionFactory(scf); + TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); + handler.setConnectionFactory(scf); + scf.setApplicationEventPublisher(event -> { + if (event instanceof TcpConnectionCloseEvent) { + latch.countDown(); + } + }); + TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain(); + fc.setInterceptor(newInterceptorFactory(scf.getApplicationEventPublisher())); + scf.setInterceptorFactoryChain(fc); + scf.start(); + TestingUtilities.waitListening(scf, null); + int port = scf.getPort(); + Socket socket = SocketFactory.getDefault().createSocket("localhost", port); + socket.close(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(handler.getConnections().isEmpty()).isTrue(); + scf.stop(); + } + }