From 3ec32a352347ea7e1396abb402cf6f60a515d88e Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 14 Aug 2025 18:46:25 -0400 Subject: [PATCH] GH-10314: Revise `TcpListener.onMessage()` contract for `void` Fixes: https://github.com/spring-projects/spring-integration/issues/10314 Looks like a `boolean` return for the `TcpListener.onMessage()` is a leftover of some earlier design or some idea which didn't make it into the project. On the other hand, all the logic in the project around this `onMessage()` usage is properly handled by the delegation or exceptions. * Change `TcpListener.onMessage()` to have a `void` as return type * Fix all the respective usages and simplify some implementations with removing those bogus `return false;` and using proper `if..else` logic if necessary * Fix some code style in the affected classes, like proper `assertThat` or diamond operators --- .../integration/ip/tcp/TcpInboundGateway.java | 16 ++++---- .../ip/tcp/TcpOutboundGateway.java | 27 ++++++------- .../ip/tcp/TcpReceivingChannelAdapter.java | 16 +++----- .../CachingClientConnectionFactory.java | 3 +- .../FailoverClientConnectionFactory.java | 6 +-- .../TcpConnectionInterceptorSupport.java | 20 +++++----- .../ip/tcp/connection/TcpListener.java | 4 +- .../ip/dsl/ConnectionFactoryTests.java | 1 - .../ip/tcp/TcpInboundGatewayTests.java | 5 +-- .../CachingClientConnectionFactoryTests.java | 39 +++++++------------ .../tcp/connection/ConnectionEventTests.java | 3 +- .../connection/ConnectionFactoryTests.java | 6 +-- .../connection/ConnectionTimeoutTests.java | 23 ++++++----- .../FailoverClientConnectionFactoryTests.java | 34 +++++++--------- .../tcp/connection/HelloWorldInterceptor.java | 19 ++++----- .../ip/tcp/connection/SocketSupportTests.java | 12 ++---- .../TcpNetConnectionSupportTests.java | 1 - .../tcp/connection/TcpNetConnectionTests.java | 16 ++++---- .../connection/TcpNioConnectionReadTests.java | 10 ----- .../tcp/connection/TcpNioConnectionTests.java | 13 ++----- .../ip/tcp/connection/TcpSenderTests.java | 7 +++- .../TcpSyslogReceivingChannelAdapter.java | 3 +- 22 files changed, 116 insertions(+), 168 deletions(-) diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpInboundGateway.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpInboundGateway.java index 12564057096..13a130afd75 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpInboundGateway.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpInboundGateway.java @@ -84,11 +84,11 @@ public class TcpInboundGateway extends MessagingGatewaySupport implements private volatile boolean shuttingDown; @Override - public boolean onMessage(Message message) { + public void onMessage(Message message) { boolean isErrorMessage = message instanceof ErrorMessage; try { if (this.shuttingDown) { - logger.info(() -> "Inbound message ignored; shutting down; " + message.toString()); + logger.info(() -> "Inbound message ignored; shutting down; " + message); } else { if (isErrorMessage) { @@ -96,17 +96,16 @@ public boolean onMessage(Message message) { * Socket errors are sent here, so they can be conveyed to any waiting thread. * There's not one here; simply ignore. */ - return false; + return; } this.activeCount.incrementAndGet(); try { - return doOnMessage(message); + doOnMessage(message); } finally { this.activeCount.decrementAndGet(); } } - return false; } finally { String connectionId = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID); @@ -121,11 +120,11 @@ else if (this.clientConnectionFactory != null) { } } - private boolean doOnMessage(Message message) { + private void doOnMessage(Message message) { Message reply = sendAndReceiveMessage(message); if (reply == null) { logger.debug(() -> "null reply received for " + message + " nothing to send"); - return false; + return; } String connectionId = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID); if (connectionId != null) { @@ -133,7 +132,7 @@ private boolean doOnMessage(Message message) { if (connection == null) { publishNoConnectionEvent(message, connectionId); logger.error(() -> "Connection not found when processing reply " + reply + " for " + message); - return false; + return; } try { connection.send(reply); @@ -142,7 +141,6 @@ private boolean doOnMessage(Message message) { logger.error(ex, "Failed to send reply"); } } - return false; } @SuppressWarnings("NullAway") // Dataflow analysis limitation diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java index 9548946f5e7..b4e65c7ba3e 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java @@ -323,35 +323,32 @@ private void cleanUp(boolean haveSemaphore, @Nullable TcpConnection connection, } @Override - public boolean onMessage(Message message) { + public void onMessage(Message message) { String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class); if (connectionId == null) { if (unsolicitedSupported(message)) { - return false; + return; } logger.error("Cannot correlate response - no connection id"); publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id"); - return false; + return; } logger.trace(() -> "onMessage: " + connectionId + "(" + message + ")"); AsyncReply reply = this.pendingReplies.get(connectionId); if (reply == null) { - if (message instanceof ErrorMessage) { - /* - * Socket errors are sent here, so they can be conveyed to any waiting thread. - * If there's not one, simply ignore. - */ - return false; - } - else { + /* + * Socket errors are sent here, so they can be conveyed to any waiting thread. + * If there's not one, simply ignore. + */ + if (!(message instanceof ErrorMessage)) { if (unsolicitedSupported(message)) { - return false; + return; } String errorMessage = "Cannot correlate response - no pending reply for " + connectionId; logger.error(errorMessage); publishNoConnectionEvent(message, connectionId, errorMessage); - return false; } + return; } if (isAsync()) { reply.getFuture().complete(message); @@ -360,7 +357,6 @@ public boolean onMessage(Message message) { else { reply.setReply(message); } - return false; } private boolean unsolicitedSupported(Message message) { @@ -489,7 +485,8 @@ boolean isHaveSemaphore() { * Sender blocks here until the reply is received, or we time out. * @return The return message or null if we time out */ - @Nullable Message getReply() { + @Nullable + Message getReply() { try { if (!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) { return null; diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapter.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapter.java index e346e486fa0..835ff82d2a6 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapter.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpReceivingChannelAdapter.java @@ -70,20 +70,17 @@ public class TcpReceivingChannelAdapter private final AtomicInteger activeCount = new AtomicInteger(); @Override - public boolean onMessage(Message message) { + public void onMessage(Message message) { boolean isErrorMessage = message instanceof ErrorMessage; try { if (this.shuttingDown) { logger.info(() -> "Inbound message ignored; shutting down; " + message); } - else { - if (isErrorMessage) { - /* - * Socket errors are sent here so they can be conveyed to any waiting thread. - * There's not one here; simply ignore. - */ - return false; - } + /* + * Socket errors are sent here so they can be conveyed to any waiting thread. + * There's not one here; simply ignore. + */ + else if (!isErrorMessage) { this.activeCount.incrementAndGet(); try { sendMessage(message); @@ -92,7 +89,6 @@ public boolean onMessage(Message message) { this.activeCount.decrementAndGet(); } } - return false; } finally { String connectionId = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID); diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java index c256f4d3d14..0a91eedfb07 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java @@ -462,7 +462,7 @@ public String toString() { * purposes. */ @Override - public boolean onMessage(Message message) { + public void onMessage(Message message) { Message modifiedMessage; Object connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID); if (message instanceof ErrorMessage) { @@ -492,7 +492,6 @@ public boolean onMessage(Message message) { logger.debug("Message discarded; no listener: " + message); } } - return true; } private void physicallyClose() { diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java index 302c99849e8..388a9a78503 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java @@ -461,7 +461,7 @@ public void setSerializer(Serializer serializer) { * purposes. */ @Override - public boolean onMessage(Message message) { + public void onMessage(Message message) { if (this.delegate.getConnectionId().equals(message.getHeaders().get(IpHeaders.CONNECTION_ID))) { AbstractIntegrationMessageBuilder messageBuilder = getMessageBuilderFactory() @@ -476,17 +476,15 @@ public boolean onMessage(Message message) { if (this.logger.isDebugEnabled()) { logger.debug("No listener for " + message); } - return false; } else { - return listener.onMessage(messageBuilder.build()); + listener.onMessage(messageBuilder.build()); } } else { if (logger.isDebugEnabled()) { logger.debug("Message from defunct connection ignored " + message); } - return false; } } diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java index ef4c57b689e..f245ace7cdb 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java @@ -194,16 +194,14 @@ public boolean isServer() { } @Override - public boolean onMessage(Message message) { + public void onMessage(Message message) { if (this.tcpListener == null) { - if (message instanceof ErrorMessage) { - return false; - } - else { + if (!(message instanceof ErrorMessage)) { throw new NoListenerException("No listener registered for message reception"); } + return; } - return this.tcpListener.onMessage(message); + this.tcpListener.onMessage(message); } @Override @@ -250,12 +248,16 @@ public void removeDeadConnection(TcpConnection connection) { return; } this.removed = true; - if (this.theConnection instanceof TcpConnectionInterceptorSupport tcpConnectionInterceptorSupport && !this.theConnection.equals(this)) { + if (this.theConnection instanceof TcpConnectionInterceptorSupport tcpConnectionInterceptorSupport + && !this.theConnection.equals(this)) { + tcpConnectionInterceptorSupport.removeDeadConnection(this); } TcpSender sender = getSender(); - if (sender != null && this.interceptedSenders != null && !(sender instanceof TcpConnectionInterceptorSupport)) { - this.interceptedSenders.forEach(snder -> snder.removeDeadConnection(connection)); + if (sender != null && this.interceptedSenders != null + && !(sender instanceof TcpConnectionInterceptorSupport)) { + + this.interceptedSenders.forEach(intercepted -> intercepted.removeDeadConnection(connection)); } } finally { diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpListener.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpListener.java index ee6585bb4da..fd87cc3a9b1 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpListener.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpListener.java @@ -24,6 +24,7 @@ * {@link TcpConnection}. * * @author Gary Russell + * @author Artem Bilan * * @since 2.0 */ @@ -33,8 +34,7 @@ public interface TcpListener { /** * Called by a TCPConnection when a new message arrives. * @param message The message. - * @return true if the message was intercepted */ - boolean onMessage(Message message); + void onMessage(Message message); } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java index d85810ec76e..ee8ad7bf272 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/ConnectionFactoryTests.java @@ -65,7 +65,6 @@ public void test() throws Exception { server.registerListener(m -> { received.set(new ObjectToStringTransformer().transform(m)); latch.countDown(); - return false; }); server.setApplicationEventPublisher(publisher); server.setBeanFactory(TEST_INTEGRATION_CONTEXT); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpInboundGatewayTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpInboundGatewayTests.java index f7df38f99c9..b329dfadb24 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpInboundGatewayTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/TcpInboundGatewayTests.java @@ -351,10 +351,7 @@ private void testCloseStream(AbstractServerConnectionFactory scf, consumer.start(); AbstractClientConnectionFactory client = ccf.apply(port); CountDownLatch latch = new CountDownLatch(1); - client.registerListener(message -> { - latch.countDown(); - return false; - }); + client.registerListener(message -> latch.countDown()); client.setBeanFactory(TEST_INTEGRATION_CONTEXT); client.afterPropertiesSet(); client.start(); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactoryTests.java index 82a467d468b..6a0fe29ab11 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactoryTests.java @@ -533,10 +533,7 @@ public void testCachedFailoverRealClose() throws Exception { TcpNetServerConnectionFactory server1 = new TcpNetServerConnectionFactory(0); server1.setBeanName("server1"); final CountDownLatch latch1 = new CountDownLatch(3); - server1.registerListener(message -> { - latch1.countDown(); - return false; - }); + server1.registerListener(message -> latch1.countDown()); server1.setBeanFactory(TEST_INTEGRATION_CONTEXT); server1.afterPropertiesSet(); server1.start(); @@ -545,10 +542,7 @@ public void testCachedFailoverRealClose() throws Exception { TcpNetServerConnectionFactory server2 = new TcpNetServerConnectionFactory(0); server1.setBeanName("server2"); final CountDownLatch latch2 = new CountDownLatch(2); - server2.registerListener(message -> { - latch2.countDown(); - return false; - }); + server2.registerListener(message -> latch2.countDown()); server2.setBeanFactory(TEST_INTEGRATION_CONTEXT); server2.afterPropertiesSet(); server2.start(); @@ -557,10 +551,12 @@ public void testCachedFailoverRealClose() throws Exception { // Failover AbstractClientConnectionFactory factory1 = new TcpNetClientConnectionFactory("localhost", port1); factory1.setBeanName("client1"); - factory1.registerListener(message -> false); + factory1.registerListener(message -> { + }); AbstractClientConnectionFactory factory2 = new TcpNetClientConnectionFactory("localhost", port2); factory2.setBeanName("client2"); - factory2.registerListener(message -> false); + factory2.registerListener(message -> { + }); List factories = new ArrayList<>(); factories.add(factory1); factories.add(factory2); @@ -606,10 +602,7 @@ public void testCachedFailoverRealBadHost() throws Exception { TcpNetServerConnectionFactory server1 = new TcpNetServerConnectionFactory(0); server1.setBeanName("server1"); final CountDownLatch latch1 = new CountDownLatch(3); - server1.registerListener(message -> { - latch1.countDown(); - return false; - }); + server1.registerListener(message -> latch1.countDown()); server1.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); server1.setBeanFactory(TEST_INTEGRATION_CONTEXT); server1.afterPropertiesSet(); @@ -619,10 +612,7 @@ public void testCachedFailoverRealBadHost() throws Exception { TcpNetServerConnectionFactory server2 = new TcpNetServerConnectionFactory(0); server1.setBeanName("server2"); final CountDownLatch latch2 = new CountDownLatch(2); - server2.registerListener(message -> { - latch2.countDown(); - return false; - }); + server2.registerListener(message -> latch2.countDown()); server2.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); server2.setBeanFactory(TEST_INTEGRATION_CONTEXT); server2.afterPropertiesSet(); @@ -632,13 +622,15 @@ public void testCachedFailoverRealBadHost() throws Exception { // Failover AbstractClientConnectionFactory factory1 = new TcpNetClientConnectionFactory("junkjunk", port1); factory1.setBeanName("client1"); - factory1.registerListener(message -> false); + factory1.registerListener(message -> { + }); factory1.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); factory1.setBeanFactory(TEST_INTEGRATION_CONTEXT); factory1.afterPropertiesSet(); AbstractClientConnectionFactory factory2 = new TcpNetClientConnectionFactory("localhost", port2); factory2.setBeanName("client2"); - factory2.registerListener(message -> false); + factory2.registerListener(message -> { + }); factory2.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); factory2.setBeanFactory(TEST_INTEGRATION_CONTEXT); factory2.afterPropertiesSet(); @@ -682,7 +674,6 @@ public void testRealConnection() throws Exception { connectionIds.add((String) message.getHeaders().get(IpHeaders.CONNECTION_ID)); latch1.countDown(); latch2.countDown(); - return false; }); in.setBeanFactory(TEST_INTEGRATION_CONTEXT); in.afterPropertiesSet(); @@ -738,7 +729,6 @@ public void testGatewayRelease() { } handler.handleMessage(message); } - return false; }); in.afterPropertiesSet(); handler.setBeanFactory(mock(BeanFactory.class)); @@ -827,7 +817,6 @@ public boolean isActive() { received.set(message); latch.countDown(); } - return false; }); cachingFactory.start(); @@ -844,7 +833,9 @@ private TcpConnectionSupport makeMockConnection() { return connection; } - private static AbstractClientConnectionFactory createFactoryWithMockConnection(TcpConnectionSupport mockConn) throws Exception { + private static AbstractClientConnectionFactory createFactoryWithMockConnection(TcpConnectionSupport mockConn) + throws Exception { + AbstractClientConnectionFactory factory = mock(AbstractClientConnectionFactory.class); when(factory.getConnection()).thenReturn(mockConn); when(factory.isActive()).thenReturn(true); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionEventTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionEventTests.java index caf16a352a2..3f45d56f07e 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionEventTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionEventTests.java @@ -283,7 +283,8 @@ public void publishEvent(Object event) { }); factory.setBeanName("sf"); - factory.registerListener(message -> false); + factory.registerListener(message -> { + }); LogAccessor logger = spy(TestUtils.getPropertyValue(factory, "logger", LogAccessor.class)); doNothing().when(logger).error(any(Throwable.class), anyString()); new DirectFieldAccessor(factory).setPropertyValue("logger", logger); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionFactoryTests.java index 374dd764c34..7356554015f 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionFactoryTests.java @@ -91,7 +91,6 @@ void netOpenEventOnReadThread() throws InterruptedException, IOException { server.registerListener(msg -> { readThread.set(Thread.currentThread()); latch2.countDown(); - return false; }); server.setApplicationEventPublisher(event -> { if (event instanceof TcpConnectionServerListeningEvent) { @@ -174,7 +173,8 @@ public void testObtainConnectionIds(AbstractServerConnectionFactory serverFactor assertThat(((TcpConnectionServerListeningEvent) events.get(0)).getPort()).isEqualTo(serverFactory.getPort()); int port = serverFactory.getPort(); TcpNetClientConnectionFactory clientFactory = new TcpNetClientConnectionFactory("localhost", port); - clientFactory.registerListener(message -> false); + clientFactory.registerListener(message -> { + }); clientFactory.setBeanName("clientFactory"); clientFactory.setApplicationEventPublisher(publisher); clientFactory.start(); @@ -306,7 +306,6 @@ private void healthCheckSuccess(AbstractServerConnectionFactory server, boolean connection.get().send(msg); } } - return false; }); server.setBeanFactory(TEST_INTEGRATION_CONTEXT); server.afterPropertiesSet(); @@ -324,7 +323,6 @@ private void healthCheckSuccess(AbstractServerConnectionFactory server, boolean result.set(true); } latch.countDown(); - return false; }); conn.send(new GenericMessage<>("PING")); try { diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionTimeoutTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionTimeoutTests.java index 35ea934d8b7..47d20cd8461 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionTimeoutTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/ConnectionTimeoutTests.java @@ -36,6 +36,8 @@ /** * @author Gary Russell + * @author Artem Bilan + * * @since 2.2 */ @LongRunningTest @@ -66,7 +68,8 @@ public void testNetSimpleTimeout() throws Exception { server.start(); TestingUtilities.waitListening(server, null); TcpNetClientConnectionFactory client = new TcpNetClientConnectionFactory("localhost", server.getPort()); - client.registerListener(message -> false); + client.registerListener(message -> { + }); client.setSoTimeout(1000); CountDownLatch clientCloseLatch = getCloseLatch(client); setupClientCallback(client); @@ -81,7 +84,7 @@ public void testNetSimpleTimeout() throws Exception { } /** - * Ensure we don't timeout on the read side (client) if we sent a message within the + * Ensure we don't time out on the read side (client) if we sent a message within the * current timeout. * @throws Exception */ @@ -94,7 +97,7 @@ public void testNetReplyNotTimeout() throws Exception { } /** - * Ensure we don't timeout on the read side (client) if we sent a message within the + * Ensure we don't time out on the read side (client) if we sent a message within the * current timeout. * @throws Exception */ @@ -107,15 +110,14 @@ public void testNioReplyNotTimeout() throws Exception { } private void notTimeoutGuts(AbstractServerConnectionFactory server, AbstractClientConnectionFactory client) - throws Exception, InterruptedException { - final AtomicReference> reply = new AtomicReference>(); + throws Exception { + final AtomicReference> reply = new AtomicReference<>(); final CountDownLatch replyLatch = new CountDownLatch(1); client.registerListener(message -> { if (!(message instanceof ErrorMessage)) { reply.set(message); replyLatch.countDown(); } - return false; }); client.setSoTimeout(2000); CountDownLatch clientClosedLatch = getCloseLatch(client); @@ -147,7 +149,7 @@ private void setupAndStartServer(AbstractServerConnectionFactory server) { public void testNetReplyTimeout() throws Exception { TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0); this.setupServerCallbacks(server, 4500); - final AtomicReference> reply = new AtomicReference>(); + final AtomicReference> reply = new AtomicReference<>(); server.start(); TestingUtilities.waitListening(server, null); TcpNetClientConnectionFactory client = new TcpNetClientConnectionFactory("localhost", server.getPort()); @@ -155,7 +157,6 @@ public void testNetReplyTimeout() throws Exception { if (!(message instanceof ErrorMessage)) { reply.set(message); } - return false; }); client.setSoTimeout(2000); CountDownLatch clientCloseLatch = getCloseLatch(client); @@ -184,7 +185,7 @@ public void testNetReplyTimeout() throws Exception { public void testNioReplyTimeout() throws Exception { TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0); this.setupServerCallbacks(server, 2100); - final AtomicReference> reply = new AtomicReference>(); + final AtomicReference> reply = new AtomicReference<>(); server.start(); TestingUtilities.waitListening(server, null); TcpNioClientConnectionFactory client = new TcpNioClientConnectionFactory("localhost", server.getPort()); @@ -192,7 +193,6 @@ public void testNioReplyTimeout() throws Exception { if (!(message instanceof ErrorMessage)) { reply.set(message); } - return false; }); client.setSoTimeout(1000); CountDownLatch clientCloseLatch = getCloseLatch(client); @@ -212,7 +212,7 @@ public void testNioReplyTimeout() throws Exception { private void setupServerCallbacks(AbstractServerConnectionFactory server, final int serverDelay) { server.setComponentName("serverFactory"); - final AtomicReference serverConnection = new AtomicReference(); + final AtomicReference serverConnection = new AtomicReference<>(); server.registerListener(message -> { try { Thread.sleep(serverDelay); @@ -221,7 +221,6 @@ private void setupServerCallbacks(AbstractServerConnectionFactory server, final catch (Exception e) { e.printStackTrace(); } - return false; }); server.registerSender(new TcpSender() { diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java index b3624f1cd17..7a52c2c2c2a 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java @@ -215,7 +215,7 @@ void failoverAllDeadAfterSuccess() throws Exception { FailoverClientConnectionFactory fccf = new FailoverClientConnectionFactory(List.of(cf1, cf2)); CompletableFuture> messageCompletableFuture = new CompletableFuture<>(); - fccf.registerListener(messageCompletableFuture::complete); + fccf.registerListener(value -> messageCompletableFuture.complete(value)); fccf.start(); fccf.getConnection().send(new GenericMessage<>("test")); @@ -392,10 +392,7 @@ public void testFailoverCachedRealClose() throws Exception { server1.setBeanFactory(TEST_INTEGRATION_CONTEXT); server1.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); final CountDownLatch latch1 = new CountDownLatch(3); - server1.registerListener(message -> { - latch1.countDown(); - return false; - }); + server1.registerListener(message -> latch1.countDown()); server1.afterPropertiesSet(); server1.start(); TestingUtilities.waitListening(server1, 10000L); @@ -405,20 +402,19 @@ public void testFailoverCachedRealClose() throws Exception { server2.setBeanFactory(TEST_INTEGRATION_CONTEXT); server2.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); final CountDownLatch latch2 = new CountDownLatch(2); - server2.registerListener(message -> { - latch2.countDown(); - return false; - }); + server2.registerListener(message -> latch2.countDown()); server2.afterPropertiesSet(); server2.start(); TestingUtilities.waitListening(server2, 10000L); int port2 = server2.getPort(); AbstractClientConnectionFactory factory1 = new TcpNetClientConnectionFactory("localhost", port1); factory1.setBeanName("client1"); - factory1.registerListener(message -> false); + factory1.registerListener(message -> { + }); AbstractClientConnectionFactory factory2 = new TcpNetClientConnectionFactory("localhost", port2); factory2.setBeanName("client2"); - factory2.registerListener(message -> false); + factory2.registerListener(message -> { + }); // Cache CachingClientConnectionFactory cachingFactory1 = new CachingClientConnectionFactory(factory1, 2); cachingFactory1.setBeanName("cache1"); @@ -535,10 +531,7 @@ public void testFailoverCachedRealBadHost() throws Exception { server1.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); server1.setBeanFactory(TEST_INTEGRATION_CONTEXT); final CountDownLatch latch1 = new CountDownLatch(3); - server1.registerListener(message -> { - latch1.countDown(); - return false; - }); + server1.registerListener(message -> latch1.countDown()); server1.afterPropertiesSet(); server1.start(); TestingUtilities.waitListening(server1, 10000L); @@ -549,10 +542,7 @@ public void testFailoverCachedRealBadHost() throws Exception { server2.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); server2.setBeanFactory(TEST_INTEGRATION_CONTEXT); final CountDownLatch latch2 = new CountDownLatch(2); - server2.registerListener(message -> { - latch2.countDown(); - return false; - }); + server2.registerListener(message -> latch2.countDown()); server2.afterPropertiesSet(); server2.start(); TestingUtilities.waitListening(server2, 10000L); @@ -560,14 +550,16 @@ public void testFailoverCachedRealBadHost() throws Exception { AbstractClientConnectionFactory factory1 = new TcpNetClientConnectionFactory("junkjunk", port1); factory1.setBeanName("client1"); - factory1.registerListener(message -> false); + factory1.registerListener(message -> { + }); factory1.setBeanFactory(TEST_INTEGRATION_CONTEXT); factory1.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); factory1.afterPropertiesSet(); AbstractClientConnectionFactory factory2 = new TcpNetClientConnectionFactory("localhost", port2); factory2.setBeanName("client2"); - factory2.registerListener(message -> false); + factory2.registerListener(message -> { + }); factory2.setBeanFactory(TEST_INTEGRATION_CONTEXT); factory2.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); factory2.afterPropertiesSet(); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/HelloWorldInterceptor.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/HelloWorldInterceptor.java index 85fd0229aae..87ca95c18a8 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/HelloWorldInterceptor.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/HelloWorldInterceptor.java @@ -26,6 +26,8 @@ /** * @author Gary Russell + * @author Artem Bilan + * * @since 2.0 * */ @@ -55,19 +57,18 @@ public HelloWorldInterceptor(String hello, String world, ApplicationEventPublish } @Override - public boolean onMessage(Message message) { + public void onMessage(Message message) { if (!this.negotiated) { synchronized (this) { if (!this.negotiated) { Object payload = message.getPayload(); - logger.debug(this.toString() + " received " + payload); + logger.debug(this + " received " + payload); if (this.isServer()) { if (payload.equals(hello)) { try { - logger.debug(this.toString() + " sending " + this.world); + logger.debug(this + " sending " + this.world); super.send(MessageBuilder.withPayload(world).build()); this.negotiated = true; - return true; } catch (Exception e) { throw new MessagingException("Negotiation error", e); @@ -87,18 +88,18 @@ public boolean onMessage(Message message) { throw new MessagingException("Negotiation error - expected '" + world + "' received " + payload); } - return true; } + return; } } } try { - return super.onMessage(message); + super.onMessage(message); } finally { // on the server side, we don't want to close if we are expecting a response - if (!(this.isServer() && this.hasRealSender()) && !this.pendingSend) { - this.checkDeferredClose(); + if (!(isServer() && hasRealSender()) && !this.pendingSend) { + checkDeferredClose(); } } } @@ -109,7 +110,7 @@ public void send(Message message) { try { if (!this.negotiated) { if (!this.isServer()) { - logger.debug(this.toString() + " Sending " + hello); + logger.debug(this + " Sending " + hello); super.send(MessageBuilder.withPayload(hello).build()); try { this.negotiationSemaphore.tryAcquire(this.timeout, TimeUnit.MILLISECONDS); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/SocketSupportTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/SocketSupportTests.java index 77e473e4329..a9413df836f 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/SocketSupportTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/SocketSupportTests.java @@ -181,7 +181,8 @@ public void testNetServer() throws Exception { @Test public void testNioClientAndServer() throws Exception { TcpNioServerConnectionFactory serverConnectionFactory = new TcpNioServerConnectionFactory(0); - serverConnectionFactory.registerListener(message -> false); + serverConnectionFactory.registerListener(message -> { + }); final AtomicInteger ppSocketCountServer = new AtomicInteger(); final AtomicInteger ppServerSocketCountServer = new AtomicInteger(); final CountDownLatch latch = new CountDownLatch(1); @@ -404,7 +405,6 @@ public void testNetClientAndServerSSL() throws Exception { server.registerListener(message -> { messages.add(message); latch.countDown(); - return false; }); SSLMapper mapper = new SSLMapper(); mapper.setBeanFactory(TEST_INTEGRATION_CONTEXT); @@ -455,7 +455,6 @@ private void testNetClientAndServerSSLDifferentContexts(boolean badServer) throw messages.add(message); latch.countDown(); } - return false; }); server.setTcpSocketSupport(new DefaultTcpSocketSupport(false) { @@ -511,7 +510,6 @@ public void testNioClientAndServerSSL() throws Exception { server.registerListener(message -> { messages.add(message); latch.countDown(); - return false; }); SSLMapper mapper = new SSLMapper(); mapper.setBeanFactory(TEST_INTEGRATION_CONTEXT); @@ -530,7 +528,8 @@ public void testNioClientAndServerSSL() throws Exception { TcpNioClientConnectionFactory client = new TcpNioClientConnectionFactory("localhost", server.getPort()); client.setSslHandshakeTimeout(34); client.setTcpNioConnectionSupport(tcpNioConnectionSupport); - client.registerListener(message -> false); + client.registerListener(message -> { + }); client.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); client.setBeanFactory(TEST_INTEGRATION_CONTEXT); client.afterPropertiesSet(); @@ -580,7 +579,6 @@ protected void postProcessSSLEngine(SSLEngine sslEngine) { server.registerListener(message -> { messages.add(message); latch.countDown(); - return false; }); server.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); server.setBeanFactory(TEST_INTEGRATION_CONTEXT); @@ -633,7 +631,6 @@ public void testNioClientAndServerSSLDifferentContextsLargeDataWithReply() throw throw new RuntimeException(e); } latch.countDown(); - return false; }); ByteArrayCrLfSerializer deserializer = new ByteArrayCrLfSerializer(); deserializer.setMaxMessageSize(120000); @@ -658,7 +655,6 @@ public void testNioClientAndServerSSLDifferentContextsLargeDataWithReply() throw client.registerListener(message -> { messages.add(message); latch.countDown(); - return false; }); client.setDeserializer(deserializer); client.setApplicationEventPublisher(TEST_INTEGRATION_CONTEXT); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionSupportTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionSupportTests.java index 451abc7b061..369c0a89da5 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionSupportTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionSupportTests.java @@ -49,7 +49,6 @@ public void testBadCode() throws Exception { server.registerListener(m -> { message.set(m); latch1.countDown(); - return false; }); AtomicBoolean firstTime = new AtomicBoolean(true); server.setTcpNetConnectionSupport(new DefaultTcpNetConnectionSupport() { diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java index c519da02367..a0050c523a6 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNetConnectionTests.java @@ -50,7 +50,7 @@ import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -74,7 +74,7 @@ public void testErrorLog() throws Exception { TcpNetConnection connection = new TcpNetConnection(socket, true, false, e -> { }, null); connection.setDeserializer(new ByteArrayStxEtxSerializer()); - final AtomicReference log = new AtomicReference(); + final AtomicReference log = new AtomicReference<>(); Log logger = mock(Log.class); given(logger.isErrorEnabled()).willReturn(true); doAnswer(invocation -> { @@ -138,18 +138,17 @@ public void transferHeaders() throws Exception { out.write(baos.toByteArray()); out.close(); - final AtomicReference> inboundMessage = new AtomicReference>(); + final AtomicReference> inboundMessage = new AtomicReference<>(); TcpListener listener = message1 -> { if (!(message1 instanceof ErrorMessage)) { inboundMessage.set(message1); } - return false; }; inboundConnection.registerListener(listener); inboundConnection.run(); assertThat(inboundMessage.get()).isNotNull(); assertThat(inboundMessage.get().getPayload()).isEqualTo("foo"); - assertThat(inboundMessage.get().getHeaders().get("bar")).isEqualTo("baz"); + assertThat(inboundMessage.get().getHeaders()).containsEntry("bar", "baz"); } @Test @@ -165,7 +164,8 @@ public void socketClosedNextRead() throws InterruptedException, IOException { } }; server.setApplicationEventPublisher(publisher); - server.registerListener(message -> false); + server.registerListener(message -> { + }); server.setBeanFactory(TEST_INTEGRATION_CONTEXT); server.afterPropertiesSet(); server.start(); @@ -173,8 +173,8 @@ public void socketClosedNextRead() throws InterruptedException, IOException { Socket socket = SocketFactory.getDefault().createSocket("localhost", port.get()); TcpNetConnection connection = new TcpNetConnection(socket, false, false, publisher, "socketClosedNextRead"); socket.close(); - assertThatThrownBy(() -> connection.getPayload()) - .isInstanceOf(SoftEndOfStreamException.class); + assertThatExceptionOfType(SoftEndOfStreamException.class) + .isThrownBy(connection::getPayload); server.stop(); } diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionReadTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionReadTests.java index 1dd6b14a9b5..f36b02f826f 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionReadTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionReadTests.java @@ -89,7 +89,6 @@ public void testReadLength() throws Exception { AbstractServerConnectionFactory scf = getConnectionFactory(serializer, message -> { responses.add(message); semaphore.release(); - return false; }); // Fire up the sender. @@ -122,7 +121,6 @@ public void testFragmented() throws Exception { Thread.currentThread().interrupt(); } semaphore.release(); - return false; }); int howMany = 2; scf.setBacklog(howMany + 5); @@ -146,7 +144,6 @@ public void testReadStxEtx() throws Exception { AbstractServerConnectionFactory scf = getConnectionFactory(serializer, message -> { responses.add(message); semaphore.release(); - return false; }); // Fire up the sender. @@ -172,7 +169,6 @@ public void testReadCrLf() throws Exception { AbstractServerConnectionFactory scf = getConnectionFactory(serializer, message -> { responses.add(message); semaphore.release(); - return false; }); // Fire up the sender. @@ -205,7 +201,6 @@ public void testReadLengthOverflow() throws Exception { errorMessageRef.set(((ErrorMessage) message).getPayload()); errorMessageLetch.countDown(); } - return false; }, new TcpSender() { @Override @@ -258,7 +253,6 @@ public void testReadStxEtxOverflow() throws Exception { errorMessageRef.set(((ErrorMessage) message).getPayload()); errorMessageLetch.countDown(); } - return false; }, new TcpSender() { @Override @@ -312,7 +306,6 @@ public void testReadCrLfOverflow() throws Exception { errorMessageRef.set(((ErrorMessage) message).getPayload()); errorMessageLetch.countDown(); } - return false; }, new TcpSender() { @Override @@ -367,7 +360,6 @@ public void testCloseCleanupNoData() throws Exception { errorMessageRef.set(((ErrorMessage) message).getPayload()); errorMessageLetch.countDown(); } - return false; }, new TcpSender() { @Override @@ -419,7 +411,6 @@ public void testCloseCleanupPartialData() throws Exception { errorMessageRef.set(((ErrorMessage) message).getPayload()); errorMessageLetch.countDown(); } - return false; }, new TcpSender() { @Override @@ -494,7 +485,6 @@ private void testClosureMidMessageGuts(AbstractByteArraySerializer serializer, S errorMessageRef.set(((ErrorMessage) message).getPayload()); errorMessageLetch.countDown(); } - return false; }, new TcpSender() { @Override diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java index 541859cc805..60547104935 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java @@ -361,10 +361,7 @@ public void testSufficientThreads() throws Exception { final TcpNioConnection connection = new TcpNioConnection(channel, false, false, null, null); connection.setTaskExecutor(exec); - connection.registerListener(message -> { - messageLatch.countDown(); - return false; - }); + connection.registerListener(message -> messageLatch.countDown()); TcpMessageMapper mapper = new TcpMessageMapper(); mapper.setBeanFactory(TEST_INTEGRATION_CONTEXT); connection.setMapper(mapper); @@ -535,7 +532,6 @@ public Integer answer(InvocationOnMock invocation) { TcpListener listener = message1 -> { inboundMessage.set(message1); latch.countDown(); - return false; }; inboundConnection.registerListener(listener); inboundConnection.readPacket(); @@ -560,7 +556,6 @@ public void testAssemblerUsesSecondaryExecutor() throws Exception { threadName.set(Thread.currentThread().getName()); latch.countDown(); } - return false; }); factory.setBeanFactory(TEST_INTEGRATION_CONTEXT); factory.afterPropertiesSet(); @@ -617,7 +612,6 @@ public void testAllMessagesDelivered() throws Exception { if (!(message instanceof ErrorMessage)) { latch.countDown(); } - return false; }); factory.setBeanFactory(TEST_INTEGRATION_CONTEXT); factory.afterPropertiesSet(); @@ -720,7 +714,6 @@ public void publishEvent(Object event) { assembler.set(Thread.currentThread()); assemblerLatch.countDown(); } - return false; }); ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); te.setCorePoolSize(3); // selector, reader, assembler @@ -797,7 +790,8 @@ public void testNoDelayOnClose() throws Exception { watch.stop(); return null; }); - cf.registerListener(m -> false); + cf.registerListener(m -> { + }); final CountDownLatch listening = new CountDownLatch(1); cf.setApplicationEventPublisher(e -> listening.countDown()); cf.afterPropertiesSet(); @@ -841,7 +835,6 @@ private void testMulti(boolean multiAccept) throws InterruptedException, IOExcep server.registerListener(m -> { messages.add(m); latch.countDown(); - return false; }); server.setBeanFactory(TEST_INTEGRATION_CONTEXT); server.afterPropertiesSet(); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java index fe6746a77fc..c9ffff73f35 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java @@ -47,7 +47,8 @@ void senderCalledForDeadConnectionClientNet() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0); server.setTaskScheduler(new SimpleAsyncTaskScheduler()); - server.registerListener(msg -> false); + server.registerListener(msg -> { + }); server.setBeanFactory(TEST_INTEGRATION_CONTEXT); server.afterPropertiesSet(); server.setApplicationEventPublisher(event -> { @@ -68,7 +69,8 @@ void senderCalledForDeadConnectionClientNio() throws InterruptedException { TcpNetServerConnectionFactory server = new TcpNetServerConnectionFactory(0); server.setTaskScheduler(new SimpleAsyncTaskScheduler()); server.setBeanFactory(TEST_INTEGRATION_CONTEXT); - server.registerListener(msg -> false); + server.registerListener(msg -> { + }); server.afterPropertiesSet(); server.setApplicationEventPublisher(event -> { if (event instanceof TcpConnectionServerListeningEvent) { @@ -168,4 +170,5 @@ public synchronized void removeDeadConnection(TcpConnection connection) { assertThat(passedConnectionsToSenderViaAddNewConnection.get(0)).isSameAs(interceptorsPerInstance.get(3)); assertThat(passedConnectionsToSenderViaAddNewConnection.get(1)).isSameAs(interceptorsPerInstance.get(6)); } + } diff --git a/spring-integration-syslog/src/main/java/org/springframework/integration/syslog/inbound/TcpSyslogReceivingChannelAdapter.java b/spring-integration-syslog/src/main/java/org/springframework/integration/syslog/inbound/TcpSyslogReceivingChannelAdapter.java index fbab5d63893..e9b6838d4ef 100644 --- a/spring-integration-syslog/src/main/java/org/springframework/integration/syslog/inbound/TcpSyslogReceivingChannelAdapter.java +++ b/spring-integration-syslog/src/main/java/org/springframework/integration/syslog/inbound/TcpSyslogReceivingChannelAdapter.java @@ -95,9 +95,8 @@ protected void doStop() { } @Override - public boolean onMessage(Message message) { + public void onMessage(Message message) { convertAndSend(message); - return false; } }