Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,28 @@ public class TcpInboundGateway extends MessagingGatewaySupport implements
private volatile boolean shuttingDown;

@Override
public boolean onMessage(Message<?> message) {
public void onMessage(Message<?> message) {
boolean isErrorMessage = message instanceof ErrorMessage;
try {
if (this.shuttingDown) {
logger.info(() -> "Inbound message ignored; shutting down; " + message.toString());
logger.info(() -> "Inbound message ignored; shutting down; " + message);
}
else {
if (isErrorMessage) {
/*
* Socket errors are sent here, so they can be conveyed to any waiting thread.
* There's not one here; simply ignore.
*/
return false;
return;
}
this.activeCount.incrementAndGet();
try {
return doOnMessage(message);
doOnMessage(message);
}
finally {
this.activeCount.decrementAndGet();
}
}
return false;
}
finally {
String connectionId = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID);
Expand All @@ -121,19 +120,19 @@ else if (this.clientConnectionFactory != null) {
}
}

private boolean doOnMessage(Message<?> message) {
private void doOnMessage(Message<?> message) {
Message<?> reply = sendAndReceiveMessage(message);
if (reply == null) {
logger.debug(() -> "null reply received for " + message + " nothing to send");
return false;
return;
}
String connectionId = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID);
if (connectionId != null) {
TcpConnection connection = this.connections.get(connectionId);
if (connection == null) {
publishNoConnectionEvent(message, connectionId);
logger.error(() -> "Connection not found when processing reply " + reply + " for " + message);
return false;
return;
}
try {
connection.send(reply);
Expand All @@ -142,7 +141,6 @@ private boolean doOnMessage(Message<?> message) {
logger.error(ex, "Failed to send reply");
}
}
return false;
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,35 +323,32 @@ private void cleanUp(boolean haveSemaphore, @Nullable TcpConnection connection,
}

@Override
public boolean onMessage(Message<?> message) {
public void onMessage(Message<?> message) {
String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class);
if (connectionId == null) {
if (unsolicitedSupported(message)) {
return false;
return;
}
logger.error("Cannot correlate response - no connection id");
publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id");
return false;
return;
}
logger.trace(() -> "onMessage: " + connectionId + "(" + message + ")");
AsyncReply reply = this.pendingReplies.get(connectionId);
if (reply == null) {
if (message instanceof ErrorMessage) {
/*
* Socket errors are sent here, so they can be conveyed to any waiting thread.
* If there's not one, simply ignore.
*/
return false;
}
else {
/*
* Socket errors are sent here, so they can be conveyed to any waiting thread.
* If there's not one, simply ignore.
*/
if (!(message instanceof ErrorMessage)) {
if (unsolicitedSupported(message)) {
return false;
return;
}
String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
logger.error(errorMessage);
publishNoConnectionEvent(message, connectionId, errorMessage);
return false;
}
return;
}
if (isAsync()) {
reply.getFuture().complete(message);
Expand All @@ -360,7 +357,6 @@ public boolean onMessage(Message<?> message) {
else {
reply.setReply(message);
}
return false;
}

private boolean unsolicitedSupported(Message<?> message) {
Expand Down Expand Up @@ -489,7 +485,8 @@ boolean isHaveSemaphore() {
* Sender blocks here until the reply is received, or we time out.
* @return The return message or null if we time out
*/
@Nullable Message<?> getReply() {
@Nullable
Message<?> getReply() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the @Nullable move to the line above the method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this method does not have modifiers and annotation is treated by formatter as for the whole method.
I agree on consistency discrepancy here, but I prefer to keep formatter happy to avoid unnecessary changes in the future .

try {
if (!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,17 @@ public class TcpReceivingChannelAdapter
private final AtomicInteger activeCount = new AtomicInteger();

@Override
public boolean onMessage(Message<?> message) {
public void onMessage(Message<?> message) {
boolean isErrorMessage = message instanceof ErrorMessage;
try {
if (this.shuttingDown) {
logger.info(() -> "Inbound message ignored; shutting down; " + message);
}
else {
if (isErrorMessage) {
/*
* Socket errors are sent here so they can be conveyed to any waiting thread.
* There's not one here; simply ignore.
*/
return false;
}
/*
* Socket errors are sent here so they can be conveyed to any waiting thread.
* There's not one here; simply ignore.
*/
else if (!isErrorMessage) {
this.activeCount.incrementAndGet();
try {
sendMessage(message);
Expand All @@ -92,7 +89,6 @@ public boolean onMessage(Message<?> message) {
this.activeCount.decrementAndGet();
}
}
return false;
}
finally {
String connectionId = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ public String toString() {
* purposes.
*/
@Override
public boolean onMessage(Message<?> message) {
public void onMessage(Message<?> message) {
Message<?> modifiedMessage;
Object connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID);
if (message instanceof ErrorMessage) {
Expand Down Expand Up @@ -492,7 +492,6 @@ public boolean onMessage(Message<?> message) {
logger.debug("Message discarded; no listener: " + message);
}
}
return true;
}

private void physicallyClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ public void setSerializer(Serializer<?> serializer) {
* purposes.
*/
@Override
public boolean onMessage(Message<?> message) {
public void onMessage(Message<?> message) {
if (this.delegate.getConnectionId().equals(message.getHeaders().get(IpHeaders.CONNECTION_ID))) {
AbstractIntegrationMessageBuilder<?> messageBuilder =
getMessageBuilderFactory()
Expand All @@ -476,17 +476,15 @@ public boolean onMessage(Message<?> message) {
if (this.logger.isDebugEnabled()) {
logger.debug("No listener for " + message);
}
return false;
}
else {
return listener.onMessage(messageBuilder.build());
listener.onMessage(messageBuilder.build());
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Message from defunct connection ignored " + message);
}
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,14 @@ public boolean isServer() {
}

@Override
public boolean onMessage(Message<?> message) {
public void onMessage(Message<?> message) {
if (this.tcpListener == null) {
if (message instanceof ErrorMessage) {
return false;
}
else {
if (!(message instanceof ErrorMessage)) {
throw new NoListenerException("No listener registered for message reception");
}
return;
}
return this.tcpListener.onMessage(message);
this.tcpListener.onMessage(message);
}

@Override
Expand Down Expand Up @@ -250,12 +248,16 @@ public void removeDeadConnection(TcpConnection connection) {
return;
}
this.removed = true;
if (this.theConnection instanceof TcpConnectionInterceptorSupport tcpConnectionInterceptorSupport && !this.theConnection.equals(this)) {
if (this.theConnection instanceof TcpConnectionInterceptorSupport tcpConnectionInterceptorSupport
&& !this.theConnection.equals(this)) {

tcpConnectionInterceptorSupport.removeDeadConnection(this);
}
TcpSender sender = getSender();
if (sender != null && this.interceptedSenders != null && !(sender instanceof TcpConnectionInterceptorSupport)) {
this.interceptedSenders.forEach(snder -> snder.removeDeadConnection(connection));
if (sender != null && this.interceptedSenders != null
&& !(sender instanceof TcpConnectionInterceptorSupport)) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blank line can probably be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, my preference for better readability is to have a blank line after multi-line if.
Same reason as with a blank line before method body after multi-line header.

this.interceptedSenders.forEach(intercepted -> intercepted.removeDeadConnection(connection));
}
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* {@link TcpConnection}.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.0
*/
Expand All @@ -33,8 +34,7 @@ public interface TcpListener {
/**
* Called by a TCPConnection when a new message arrives.
* @param message The message.
* @return true if the message was intercepted
*/
boolean onMessage(Message<?> message);
void onMessage(Message<?> message);

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public void test() throws Exception {
server.registerListener(m -> {
received.set(new ObjectToStringTransformer().transform(m));
latch.countDown();
return false;
});
server.setApplicationEventPublisher(publisher);
server.setBeanFactory(TEST_INTEGRATION_CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,7 @@ private void testCloseStream(AbstractServerConnectionFactory scf,
consumer.start();
AbstractClientConnectionFactory client = ccf.apply(port);
CountDownLatch latch = new CountDownLatch(1);
client.registerListener(message -> {
latch.countDown();
return false;
});
client.registerListener(message -> latch.countDown());
client.setBeanFactory(TEST_INTEGRATION_CONTEXT);
client.afterPropertiesSet();
client.start();
Expand Down
Loading