diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java index bad62d5295b..305bbce2b33 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.ip.tcp.connection; +import java.util.List; + import javax.net.ssl.SSLSession; import org.springframework.context.ApplicationEventPublisher; @@ -96,6 +98,11 @@ public void registerSender(TcpSender sender) { this.theConnection.registerSender(this); } + @Override + public void registerSenders(List sendersToRegister) { + this.theConnection.registerSenders(sendersToRegister); + } + @Override public String getConnectionId() { return this.theConnection.getConnectionId(); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java index 505f9ebc88d..4a29c6e5f01 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2019 the original author or authors. + * Copyright 2013-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,8 +42,12 @@ public void publishEvent(Object event) { }; protected HelloWorldInterceptorFactory newInterceptorFactory() { + return newInterceptorFactory(NOOP_PUBLISHER); + } + + protected HelloWorldInterceptorFactory newInterceptorFactory(ApplicationEventPublisher applicationEventPublisher) { HelloWorldInterceptorFactory factory = new HelloWorldInterceptorFactory(); - factory.setApplicationEventPublisher(NOOP_PUBLISHER); + factory.setApplicationEventPublisher(applicationEventPublisher); return factory; } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java index 06726c77fee..e3c9db20990 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,6 +47,7 @@ import org.mockito.Mockito; import org.springframework.beans.factory.BeanFactory; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.core.serializer.DefaultDeserializer; @@ -57,9 +59,11 @@ import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory; import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory; import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain; import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; +import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory; import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory; import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer; import org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer; @@ -1191,4 +1195,38 @@ public void testConnectionException() throws Exception { } } + @Test + public void testInterceptedCleanup() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0); + ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer(); + scf.setSerializer(serializer); + scf.setDeserializer(serializer); + TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter(); + adapter.setConnectionFactory(scf); + TcpSendingMessageHandler handler = new TcpSendingMessageHandler(); + handler.setConnectionFactory(scf); + scf.setApplicationEventPublisher(new ApplicationEventPublisher() { + + @Override + public void publishEvent(Object event) { + if (event instanceof TcpConnectionCloseEvent) { + latch.countDown(); + } + } + }); + TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain(); + fc.setInterceptors(new TcpConnectionInterceptorFactory[] { + newInterceptorFactory(scf.getApplicationEventPublisher()), + }); + scf.setInterceptorFactoryChain(fc); + scf.start(); + TestingUtilities.waitListening(scf, null); + int port = scf.getPort(); + Socket socket = SocketFactory.getDefault().createSocket("localhost", port); + socket.close(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(handler.getConnections().isEmpty()).isTrue(); + scf.stop(); + } }