Skip to content

Commit

Permalink
GH-1338: Add MessageAckListener
Browse files Browse the repository at this point in the history
Resolves #1338

GH-1338: Test case for MessageAckListener
  • Loading branch information
zysaaa authored and garyrussell committed Jun 15, 2022
1 parent 5944301 commit b5f85b0
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageAckListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.converter.MessageConverter;
Expand Down Expand Up @@ -115,6 +116,8 @@ public abstract class AbstractRabbitListenerContainerFactory<C extends AbstractM

private Boolean deBatchingEnabled;

private MessageAckListener messageAckListener;

/**
* @param connectionFactory The connection factory.
* @see AbstractMessageListenerContainer#setConnectionFactory(ConnectionFactory)
Expand Down Expand Up @@ -323,6 +326,15 @@ public void setGlobalQos(boolean globalQos) {
this.globalQos = globalQos;
}

/**
* Set a {@link MessageAckListener} to use when ack a message(messages) in {@link AcknowledgeMode#AUTO} mode.
* @param messageAckListener the messageAckListener.
* @see AbstractMessageListenerContainer#setMessageAckListener(MessageAckListener)
*/
public void setMessageAckListener(MessageAckListener messageAckListener) {
this.messageAckListener = messageAckListener;
}

@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
C instance = createContainerInstance();
Expand Down Expand Up @@ -355,7 +367,8 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
.acceptIfNotNull(this.autoStartup, instance::setAutoStartup)
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors)
.acceptIfNotNull(this.deBatchingEnabled, instance::setDeBatchingEnabled);
.acceptIfNotNull(this.deBatchingEnabled, instance::setDeBatchingEnabled)
.acceptIfNotNull(this.messageAckListener, instance::setMessageAckListener);
if (this.batchListener && this.deBatchingEnabled == null) {
// turn off container debatching by default for batch listeners
instance.setDeBatchingEnabled(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private boolean asyncReplies;

private MessageAckListener messageAckListener;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -1188,6 +1190,21 @@ public void setjavaLangErrorHandler(JavaLangErrorHandler javaLangErrorHandler) {
this.javaLangErrorHandler = javaLangErrorHandler;
}

/**
* Set a {@link MessageAckListener} to use when ack a message(messages) in {@link AcknowledgeMode#AUTO} mode.
* @param messageAckListener the messageAckListener.
* @see MessageAckListener
* @see AcknowledgeMode
*/
public void setMessageAckListener(MessageAckListener messageAckListener) {
Assert.notNull(messageAckListener, "'messageAckListener' cannot be null");
this.messageAckListener = messageAckListener;
}

protected MessageAckListener getMessageAckListener() {
return this.messageAckListener;
}

/**
* Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
* @author Alex Panchenko
* @author Johno Crawford
* @author Ian Roberts
* @author Cao Weibo
*/
public class BlockingQueueConsumer {

Expand Down Expand Up @@ -836,11 +837,11 @@ public void rollbackOnExceptionIfNecessary(Throwable ex, long tag) {
/**
* Perform a commit or message acknowledgement, as appropriate.
* @param localTx Whether the channel is locally transacted.
* @param messageAckListener MessageAckListener set on the message listener.
* @return true if at least one delivery tag exists.
* @throws IOException Any IOException.
*/
public boolean commitIfNecessary(boolean localTx) throws IOException {

public boolean commitIfNecessary(boolean localTx, MessageAckListener messageAckListener) throws IOException {
if (this.deliveryTags.isEmpty()) {
return false;
}
Expand All @@ -857,7 +858,14 @@ public boolean commitIfNecessary(boolean localTx) throws IOException {

if (ackRequired && (!this.transactional || isLocallyTransacted)) {
long deliveryTag = new ArrayList<Long>(this.deliveryTags).get(this.deliveryTags.size() - 1);
this.channel.basicAck(deliveryTag, true);
try {
this.channel.basicAck(deliveryTag, true);
notifyMessageAckListener(messageAckListener, true, deliveryTag, null);
}
catch (Exception e) {
logger.error("Error acking.", e);
notifyMessageAckListener(messageAckListener, false, deliveryTag, e);
}
}

if (isLocallyTransacted) {
Expand All @@ -874,6 +882,28 @@ public boolean commitIfNecessary(boolean localTx) throws IOException {

}

/**
* Notify MessageAckListener set on the relevant message listener.
* @param messageAckListener MessageAckListener set on the message listener.
* @param success Whether ack succeeded.
* @param deliveryTag The deliveryTag of ack.
* @param cause If an exception occurs.
*/
private void notifyMessageAckListener(@Nullable MessageAckListener messageAckListener,
boolean success,
long deliveryTag,
@Nullable Throwable cause) {
if (messageAckListener == null) {
return;
}
try {
messageAckListener.onComplete(success, deliveryTag, cause);
}
catch (Exception e) {
logger.error("An exception occured on MessageAckListener.", e);
}
}

@Override
public String toString() {
return "Consumer@" + ObjectUtils.getIdentityHexString(this) + ": "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Nicolas Ristock
* @author Cao Weibo
*
* @since 2.0
*
Expand Down Expand Up @@ -531,7 +532,7 @@ private void checkConsumers(long now) {
try {
consumer.ackIfNecessary(now);
}
catch (IOException e) {
catch (Exception e) {
this.logger.error("Exception while sending delayed ack", e);
}
}
Expand Down Expand Up @@ -882,7 +883,7 @@ private void cancelConsumer(SimpleConsumer consumer) {
try {
consumer.ackIfNecessary(0L);
}
catch (IOException e) {
catch (Exception e) {
this.logger.error("Exception while sending delayed ack", e);
}
}
Expand Down Expand Up @@ -1175,7 +1176,7 @@ private void handleAck(long deliveryTag, boolean channelLocallyTransacted) {
}
}
else if (!isChannelTransacted() || isLocallyTransacted) {
getChannel().basicAck(deliveryTag, false);
sendAckWithNotify(deliveryTag, false);
}
}
if (isLocallyTransacted) {
Expand All @@ -1194,7 +1195,7 @@ else if (!isChannelTransacted() || isLocallyTransacted) {
* @param now the current time.
* @throws IOException if one occurs.
*/
synchronized void ackIfNecessary(long now) throws IOException {
synchronized void ackIfNecessary(long now) throws Exception {
if (this.pendingAcks >= this.messagesPerAck || (
this.pendingAcks > 0 && (now - this.lastAck > this.ackTimeout || this.canceled))) {
sendAck(now);
Expand All @@ -1217,7 +1218,7 @@ private void rollback(long deliveryTag, Exception e) {
getChannel().basicNack(deliveryTag, !isAsyncReplies(),
ContainerUtils.shouldRequeue(isDefaultRequeueRejected(), e, this.logger));
}
catch (IOException e1) {
catch (Exception e1) {
this.logger.error("Failed to nack message", e1);
}
}
Expand All @@ -1226,12 +1227,51 @@ private void rollback(long deliveryTag, Exception e) {
}
}

protected synchronized void sendAck(long now) throws IOException {
getChannel().basicAck(this.latestDeferredDeliveryTag, true);
protected synchronized void sendAck(long now) throws Exception {
sendAckWithNotify(this.latestDeferredDeliveryTag, true);
this.lastAck = now;
this.pendingAcks = 0;
}

/**
* Send ack and notify MessageAckListener(if set).
* @param deliveryTag DeliveryTag of this ack.
* @param multiple Whether multiple ack.
* @throws Exception Occured when ack.
*/
private void sendAckWithNotify(long deliveryTag, boolean multiple) throws Exception {
try {
getChannel().basicAck(deliveryTag, multiple);
notifyMessageAckListener(getMessageAckListener(), true, deliveryTag, null);
}
catch (Exception e) {
notifyMessageAckListener(getMessageAckListener(), false, deliveryTag, e);
throw e;
}
}

/**
* Notify MessageAckListener set on message listener.
* @param messageAckListener MessageAckListener set on the message listener.
* @param success Whether ack succeeded.
* @param deliveryTag The deliveryTag of ack.
* @param cause If an exception occurs.
*/
private void notifyMessageAckListener(@Nullable MessageAckListener messageAckListener,
boolean success,
long deliveryTag,
@Nullable Throwable cause) {
if (messageAckListener == null) {
return;
}
try {
messageAckListener.onComplete(success, deliveryTag, cause);
}
catch (Exception e) {
this.logger.error("An exception occured on MessageAckListener.", e);
}
}

@Override
public void handleConsumeOk(String consumerTag) {
super.handleConsumeOk(consumerTag);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.listener;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.lang.Nullable;

/**
* A listener for message ack when using {@link AcknowledgeMode#AUTO}.
*
* @author Cao Weibo
* @see AbstractMessageListenerContainer#setMessageAckListener(MessageAckListener)
*/
@FunctionalInterface
public interface MessageAckListener {

/**
* Listener callback.
* @param success Whether ack succeed.
* @param deliveryTag The deliveryTag of ack.
* @param cause The cause of failed ack.
*
* @throws Exception the exception during callback.
*/
void onComplete(boolean success, long deliveryTag, @Nullable Throwable cause) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
executeWithList(channel, messages, deliveryTag, consumer);
}

return consumer.commitIfNecessary(isChannelLocallyTransacted());
return consumer.commitIfNecessary(isChannelLocallyTransacted(), getMessageAckListener());

}

Expand Down

0 comments on commit b5f85b0

Please sign in to comment.