Skip to content

Commit

Permalink
Fix AMQP module for ListenableFuture deprecation
Browse files Browse the repository at this point in the history
* Realign API for `CompletableFuture`
* Fix for Spring AMQP classes extracted to top level
  • Loading branch information
artembilan committed Jul 28, 2022
1 parent 5572c21 commit 5a178de
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;

import org.springframework.amqp.core.MessageDeliveryMode;
Expand Down Expand Up @@ -51,7 +52,6 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* A base {@link AbstractReplyProducingMessageHandler} extension for AMQP message handlers.
Expand Down Expand Up @@ -703,7 +703,7 @@ public Message<?> getMessage() {
}

@Override
public SettableListenableFuture<Confirm> getFuture() {
public CompletableFuture<Confirm> getFuture() {
if (this.userData instanceof CorrelationData) {
return ((CorrelationData) this.userData).getFuture();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,13 @@

package org.springframework.integration.amqp.outbound;

import java.util.function.BiConsumer;

import org.springframework.amqp.core.AmqpMessageReturnedException;
import org.springframework.amqp.core.AmqpReplyTimeoutException;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitMessageFuture;
import org.springframework.amqp.rabbit.RabbitMessageFuture;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
Expand All @@ -33,7 +35,6 @@
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
* An outbound gateway where the sending thread is released immediately and the reply
Expand Down Expand Up @@ -91,13 +92,13 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
generateRoutingKey(requestMessage), amqpMessage);
CorrelationData correlationData = generateCorrelationData(requestMessage);
if (correlationData != null && future.getConfirm() != null) {
future.getConfirm().addCallback(new CorrelationCallback(correlationData, future));
future.getConfirm().whenComplete(new CorrelationCallback(correlationData, future));
}
future.addCallback(new FutureCallback(requestMessage, correlationData));
future.whenComplete(new FutureCallback(requestMessage, correlationData));
return null;
}

private final class FutureCallback implements ListenableFutureCallback<org.springframework.amqp.core.Message> {
private final class FutureCallback implements BiConsumer<org.springframework.amqp.core.Message, Throwable> {

private final Message<?> requestMessage;

Expand All @@ -109,65 +110,65 @@ private final class FutureCallback implements ListenableFutureCallback<org.sprin
}

@Override
public void onSuccess(org.springframework.amqp.core.Message result) {
AbstractIntegrationMessageBuilder<?> replyMessageBuilder = null;
try {
replyMessageBuilder = buildReply(AsyncAmqpOutboundGateway.this.messageConverter, result);
sendOutputs(replyMessageBuilder, this.requestMessage);
}
catch (Exception ex) {
Exception exceptionToLogAndSend = ex;
if (!(ex instanceof MessagingException)) { // NOSONAR
exceptionToLogAndSend = new MessageHandlingException(this.requestMessage,
"failed to handle a message in the [" + AsyncAmqpOutboundGateway.this + ']', ex);
if (replyMessageBuilder != null) {
exceptionToLogAndSend =
new MessagingException(replyMessageBuilder.build(), exceptionToLogAndSend);
public void accept(org.springframework.amqp.core.Message message, Throwable throwable) {
if (throwable == null) {
AbstractIntegrationMessageBuilder<?> replyMessageBuilder = null;
try {
replyMessageBuilder = buildReply(AsyncAmqpOutboundGateway.this.messageConverter, message);
sendOutputs(replyMessageBuilder, this.requestMessage);
}
catch (Exception ex) {
Exception exceptionToLogAndSend = ex;
if (!(ex instanceof MessagingException)) { // NOSONAR
exceptionToLogAndSend = new MessageHandlingException(this.requestMessage,
"failed to handle a message in the [" + AsyncAmqpOutboundGateway.this + ']', ex);
if (replyMessageBuilder != null) {
exceptionToLogAndSend =
new MessagingException(replyMessageBuilder.build(), exceptionToLogAndSend);
}
}
logger.error(exceptionToLogAndSend, () -> "Failed to send async reply: " + message.toString());
sendErrorMessage(this.requestMessage, exceptionToLogAndSend);
}
logger.error(exceptionToLogAndSend, () -> "Failed to send async reply: " + result.toString());
sendErrorMessage(this.requestMessage, exceptionToLogAndSend);
}
}

@Override
public void onFailure(Throwable ex) {
Throwable exceptionToSend = ex;
if (ex instanceof AmqpReplyTimeoutException) {
if (getRequiresReply()) {
exceptionToSend =
new ReplyRequiredException(this.requestMessage, "Timeout on async request/reply", ex);
else {
Throwable exceptionToSend = throwable;
if (throwable instanceof AmqpReplyTimeoutException) {
if (getRequiresReply()) {
exceptionToSend =
new ReplyRequiredException(this.requestMessage, "Timeout on async request/reply",
throwable);
}
else {
logger.debug(() -> "Reply not required and async timeout for " + this.requestMessage);
return;
}
}
else {
logger.debug(() -> "Reply not required and async timeout for " + this.requestMessage);
return;
if (throwable instanceof AmqpMessageReturnedException amre) {
MessageChannel returnChannel = getReturnChannel();
if (returnChannel != null) {
Message<?> returnedMessage = buildReturnedMessage(
new ReturnedMessage(amre.getReturnedMessage(), amre.getReplyCode(), amre.getReplyText(),
amre.getExchange(), amre.getRoutingKey()),
AsyncAmqpOutboundGateway.this.messageConverter);
sendOutput(returnedMessage, returnChannel, true);
}
this.correlationData.setReturned(amre.getReturned());
/*
* Complete the user's future (if present) since the async template will only complete
* once, successfully, or with a failure.
*/
this.correlationData.getFuture().complete(new Confirm(true, null));
}
}
if (ex instanceof AmqpMessageReturnedException) {
AmqpMessageReturnedException amre = (AmqpMessageReturnedException) ex;
MessageChannel returnChannel = getReturnChannel();
if (returnChannel != null) {
Message<?> returnedMessage = buildReturnedMessage(
new ReturnedMessage(amre.getReturnedMessage(), amre.getReplyCode(), amre.getReplyText(),
amre.getExchange(), amre.getRoutingKey()),
AsyncAmqpOutboundGateway.this.messageConverter);
sendOutput(returnedMessage, returnChannel, true);
else {
sendErrorMessage(this.requestMessage, exceptionToSend);
}
this.correlationData.setReturned(amre.getReturned());
/*
* Complete the user's future (if present) since the async template will only complete
* once, successfully, or with a failure.
*/
this.correlationData.getFuture().set(new Confirm(true, null));
}
else {
sendErrorMessage(this.requestMessage, exceptionToSend);
}
}

}

private final class CorrelationCallback implements ListenableFutureCallback<Boolean> {
private final class CorrelationCallback implements BiConsumer<Boolean, Throwable> {

private final CorrelationData correlationData;

Expand All @@ -179,19 +180,17 @@ private final class CorrelationCallback implements ListenableFutureCallback<Bool
}

@Override
public void onSuccess(Boolean result) {
try {
handleConfirm(this.correlationData, result, this.replyFuture.getNackCause());
}
catch (Exception e) {
logger.error("Failed to send publisher confirm");
public void accept(Boolean result, Throwable throwable) {
if (result != null) {
try {
handleConfirm(this.correlationData, result, this.replyFuture.getNackCause());
}
catch (Exception e) {
logger.error("Failed to send publisher confirm");
}
}
}

@Override
public void onFailure(Throwable ex) {
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import static org.mockito.Mockito.spy;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand All @@ -35,7 +36,7 @@

import org.springframework.amqp.core.AmqpReplyTimeoutException;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitMessageFuture;
import org.springframework.amqp.rabbit.RabbitMessageFuture;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
Expand All @@ -58,7 +59,6 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* @author Gary Russell
Expand Down Expand Up @@ -198,15 +198,13 @@ void testConfirmsAndReturns() throws Exception {
ackChannel.receive(10000);
ackChannel.purge(null);

RabbitMessageFuture future = mock(RabbitMessageFuture.class);
willReturn("nacknack").given(future).getNackCause();
willReturn(CompletableFuture.completedFuture(false)).given(future).getConfirm();

asyncTemplate = mock(AsyncRabbitTemplate.class);
RabbitMessageFuture future = asyncTemplate.new RabbitMessageFuture(null, null);
willReturn(future).given(asyncTemplate).sendAndReceive(anyString(), anyString(),
any(org.springframework.amqp.core.Message.class));
DirectFieldAccessor dfa = new DirectFieldAccessor(future);
dfa.setPropertyValue("nackCause", "nacknack");
SettableListenableFuture<Boolean> confirmFuture = new SettableListenableFuture<>();
confirmFuture.set(false);
dfa.setPropertyValue("confirm", confirmFuture);
new DirectFieldAccessor(gateway).setPropertyValue("template", asyncTemplate);

message = MessageBuilder.withPayload("buz").setErrorChannel(errorChannel).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willDoNothing;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand All @@ -35,6 +36,7 @@

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.RabbitMessageFuture;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
Expand Down Expand Up @@ -99,7 +101,7 @@ public void testAsyncDelayExpression() {
new SimpleMessageListenerContainer(connectionFactory), "replyTo"));
amqpTemplate.setTaskScheduler(mock(TaskScheduler.class));
AsyncAmqpOutboundGateway gateway = new AsyncAmqpOutboundGateway(amqpTemplate);
willAnswer(invocation -> amqpTemplate.new RabbitMessageFuture("foo", invocation.getArgument(2)))
willReturn(mock(RabbitMessageFuture.class))
.given(amqpTemplate)
.sendAndReceive(anyString(), anyString(), any(Message.class));
gateway.setExchangeName("foo");
Expand All @@ -121,8 +123,7 @@ public void testHeaderMapperWinsAdapter() {
RabbitTemplate amqpTemplate = spy(new RabbitTemplate(connectionFactory));
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(amqpTemplate);
endpoint.setHeadersMappedLast(true);
final AtomicReference<Message> amqpMessage =
new AtomicReference<Message>();
final AtomicReference<Message> amqpMessage = new AtomicReference<>();
willAnswer(invocation -> {
amqpMessage.set(invocation.getArgument(2));
return null;
Expand All @@ -146,8 +147,7 @@ public void testHeaderMapperWinsGateway() {
DefaultAmqpHeaderMapper mapper = DefaultAmqpHeaderMapper.inboundMapper();
mapper.setRequestHeaderNames("*");
endpoint.setHeaderMapper(mapper);
final AtomicReference<Message> amqpMessage =
new AtomicReference<Message>();
final AtomicReference<Message> amqpMessage = new AtomicReference<>();
willAnswer(invocation -> {
amqpMessage.set(invocation.getArgument(2));
return null;
Expand Down

0 comments on commit 5a178de

Please sign in to comment.