Skip to content

Commit bc4c34d

Browse files
committed
GH-3509: Register TcpSenders on wrapped connection
Fixes #3509 Dead connections are not being removed on `TcpSender` when using a `TcpConnectionInterceptor` May cause a memory leak
1 parent 13f0bde commit bc4c34d

File tree

3 files changed

+53
-5
lines changed

3 files changed

+53
-5
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.ip.tcp.connection;
1818

19+
import java.util.List;
20+
1921
import javax.net.ssl.SSLSession;
2022

2123
import org.springframework.context.ApplicationEventPublisher;
@@ -96,6 +98,11 @@ public void registerSender(TcpSender sender) {
9698
this.theConnection.registerSender(this);
9799
}
98100

101+
@Override
102+
public void registerSenders(List<TcpSender> sendersToRegister) {
103+
this.theConnection.registerSenders(sendersToRegister);
104+
}
105+
99106
@Override
100107
public String getConnectionId() {
101108
return this.theConnection.getConnectionId();

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/AbstractTcpChannelAdapterTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2019 the original author or authors.
2+
* Copyright 2013-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -42,8 +42,12 @@ public void publishEvent(Object event) {
4242
};
4343

4444
protected HelloWorldInterceptorFactory newInterceptorFactory() {
45+
return newInterceptorFactory(NOOP_PUBLISHER);
46+
}
47+
48+
protected HelloWorldInterceptorFactory newInterceptorFactory(ApplicationEventPublisher applicationEventPublisher) {
4549
HelloWorldInterceptorFactory factory = new HelloWorldInterceptorFactory();
46-
factory.setApplicationEventPublisher(NOOP_PUBLISHER);
50+
factory.setApplicationEventPublisher(applicationEventPublisher);
4751
return factory;
4852
}
4953

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpSendingMessageHandlerTests.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,13 +39,14 @@
3939
import java.util.concurrent.atomic.AtomicReference;
4040

4141
import javax.net.ServerSocketFactory;
42+
import javax.net.SocketFactory;
4243

4344
import org.apache.commons.logging.Log;
4445
import org.apache.commons.logging.LogFactory;
4546
import org.junit.Test;
4647
import org.mockito.Mockito;
47-
4848
import org.springframework.beans.factory.BeanFactory;
49+
import org.springframework.context.ApplicationEventPublisher;
4950
import org.springframework.context.support.AbstractApplicationContext;
5051
import org.springframework.context.support.ClassPathXmlApplicationContext;
5152
import org.springframework.core.serializer.DefaultDeserializer;
@@ -57,9 +58,11 @@
5758
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
5859
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
5960
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
61+
import org.springframework.integration.ip.tcp.connection.TcpConnectionCloseEvent;
6062
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory;
6163
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain;
6264
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
65+
import org.springframework.integration.ip.tcp.connection.TcpNetServerConnectionFactory;
6366
import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory;
6467
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
6568
import org.springframework.integration.ip.tcp.serializer.ByteArrayLengthHeaderSerializer;
@@ -1191,4 +1194,38 @@ public void testConnectionException() throws Exception {
11911194
}
11921195
}
11931196

1197+
@Test
1198+
public void testInterceptedCleanup() throws Exception {
1199+
final CountDownLatch latch = new CountDownLatch(1);
1200+
AbstractServerConnectionFactory scf = new TcpNetServerConnectionFactory(0);
1201+
ByteArrayCrLfSerializer serializer = new ByteArrayCrLfSerializer();
1202+
scf.setSerializer(serializer);
1203+
scf.setDeserializer(serializer);
1204+
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
1205+
adapter.setConnectionFactory(scf);
1206+
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
1207+
handler.setConnectionFactory(scf);
1208+
scf.setApplicationEventPublisher(new ApplicationEventPublisher() {
1209+
1210+
@Override
1211+
public void publishEvent(Object event) {
1212+
if (event instanceof TcpConnectionCloseEvent) {
1213+
latch.countDown();
1214+
}
1215+
}
1216+
});
1217+
TcpConnectionInterceptorFactoryChain fc = new TcpConnectionInterceptorFactoryChain();
1218+
fc.setInterceptors(new TcpConnectionInterceptorFactory[] {
1219+
newInterceptorFactory(scf.getApplicationEventPublisher()),
1220+
});
1221+
scf.setInterceptorFactoryChain(fc);
1222+
scf.start();
1223+
TestingUtilities.waitListening(scf, null);
1224+
int port = scf.getPort();
1225+
Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
1226+
socket.close();
1227+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
1228+
assertThat(handler.getConnections().isEmpty()).isTrue();
1229+
scf.stop();
1230+
}
11941231
}

0 commit comments

Comments
 (0)