Skip to content

Commit

Permalink
Add option to requeue message if processing takes too long
Browse files Browse the repository at this point in the history
Fixes #137

References #23
  • Loading branch information
acogoluegnes committed Feb 23, 2021
1 parent c07515b commit c51297a
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 40 deletions.
34 changes: 33 additions & 1 deletion src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved. */
/* Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved. */
package com.rabbitmq.jms.admin;

import com.rabbitmq.client.AMQP;
Expand Down Expand Up @@ -68,9 +68,23 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
* Default is false.
*
* @since 1.7.0
* @see RMQConnectionFactory#requeueOnTimeout
*/
private boolean requeueOnMessageListenerException = false;

/**
* Whether to requeue a message that timed out or not.
*
* Only taken into account if requeueOnMessageListenerException is true.
* Default is false.
*
* @since 2.3.0
* @see RMQConnectionFactory#requeueOnMessageListenerException
*/
private boolean requeueOnTimeout = false;

/**
* Whether to <a href="https://www.rabbitmq.com/nack.html">nack</a> messages on rollback or not.
*
Expand Down Expand Up @@ -275,6 +289,7 @@ protected Connection createConnection(String username, String password, Connecti
.setReceivingContextConsumer(rcc)
.setConfirmListener(confirmListener)
.setTrustedPackages(this.trustedPackages)
.setRequeueOnTimeout(this.requeueOnTimeout)
);
logger.debug("Connection {} created.", conn);
return conn;
Expand Down Expand Up @@ -858,7 +873,11 @@ public boolean isPreferProducerMessageProperty() {
/**
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
*
* Default is false.
*
* @since 1.7.0
* @see RMQConnectionFactory#setRequeueOnTimeout(boolean)
*/
public void setRequeueOnMessageListenerException(boolean requeueOnMessageListenerException) {
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
Expand Down Expand Up @@ -1002,6 +1021,19 @@ public void setConfirmListener(ConfirmListener confirmListener) {
this.confirmListener = confirmListener;
}

/**
* Whether to requeue a message that timed out or not.
*
* Only taken into account if requeueOnMessageListenerException is true.
* Default is false.
*
* @since 2.3.0
* @see RMQConnectionFactory#setRequeueOnMessageListenerException(boolean)
*/
public void setRequeueOnTimeout(boolean requeueOnTimeout) {
this.requeueOnTimeout = requeueOnTimeout;
}

@FunctionalInterface
private interface ConnectionCreator {
com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception;
Expand Down
25 changes: 24 additions & 1 deletion src/main/java/com/rabbitmq/jms/client/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2016-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2016-2021 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.jms.client;

import com.rabbitmq.client.AMQP;
Expand Down Expand Up @@ -50,9 +50,23 @@ public class ConnectionParams {
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
* Default is false.
*
* @since 1.7.0
* @see ConnectionParams#requeueOnTimeout
*/
private boolean requeueOnMessageListenerException = false;

/**
* Whether to requeue a message that timed out or not.
*
* Only taken into account if requeueOnMessageListenerException is true.
* Default is false.
*
* @since 2.3.0
* @see ConnectionParams#requeueOnMessageListenerException
*/
private boolean requeueOnTimeout = false;

/**
* Whether to commit nack on rollback or not.
* Default is false.
Expand Down Expand Up @@ -226,4 +240,13 @@ public ConnectionParams setTrustedPackages(List<String> trustedPackages) {
public List<String> getTrustedPackages() {
return trustedPackages;
}

public ConnectionParams setRequeueOnTimeout(boolean requeueOnTimeout) {
this.requeueOnTimeout = requeueOnTimeout;
return this;
}

public boolean willRequeueOnTimeout() {
return requeueOnTimeout;
}
}
27 changes: 21 additions & 6 deletions src/main/java/com/rabbitmq/jms/client/DeliveryExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.jms.client;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -22,7 +23,7 @@
* if execution takes too long (set on instantiation), and interrupts execution on closure or timeout. Also serialises
* calls. There is one instance of this executor per session.
*/
public class DeliveryExecutor {
class DeliveryExecutor {

private final class CallOnMessage implements Callable<Boolean> {
private final RMQMessage rmqMessage;
Expand All @@ -42,12 +43,15 @@ public Boolean call() throws Exception {
/** Timeout for onMessage executions */
private final long onMessageTimeoutMs;

private final boolean closeOnTimeout;

/** Executor allocated if/when onMessage calls are made; used to isolate us from potential hangs. */
private ExecutorService onMessageExecutorService = null;
private final Object lockOnMessageExecutorService = new Object();

public DeliveryExecutor(long onMessageTimeoutMs) {
DeliveryExecutor(long onMessageTimeoutMs, boolean closeOnTimeout) {
this.onMessageTimeoutMs = onMessageTimeoutMs;
this.closeOnTimeout = closeOnTimeout;
}

/**
Expand All @@ -62,11 +66,18 @@ public DeliveryExecutor(long onMessageTimeoutMs) {
* @throws InterruptedException if executing thread is interrupted
*/
public void deliverMessageWithProtection(RMQMessage rmqMessage, MessageListener messageListener) throws JMSException, InterruptedException {
Future<Boolean> task = null;
try {
this.getExecutorService().submit(new CallOnMessage(rmqMessage, messageListener)).get(this.onMessageTimeoutMs, TimeUnit.MILLISECONDS);
task = this.getExecutorService().submit(new CallOnMessage(rmqMessage, messageListener));
task.get(this.onMessageTimeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
this.closeAbruptly();
throw new RMQJMSException("onMessage took too long and was interrupted", null);
if (this.closeOnTimeout) {
this.closeAbruptly();
throw new RMQJMSException("onMessage took too long and was interrupted", null);
} else {
task.cancel(true);
throw new DeliveryProcessingTimeoutException();
}
} catch (ExecutionException e) {
throw new RMQMessageListenerExecutionJMSException("onMessage threw exception", e.getCause());
}
Expand Down Expand Up @@ -113,4 +124,8 @@ private boolean waitForTerminatedExecutorService(ExecutorService executorService
return false;
}
}

static class DeliveryProcessingTimeoutException extends RuntimeException {

}
}
25 changes: 18 additions & 7 deletions src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.jms.client;

import java.io.IOException;
Expand Down Expand Up @@ -44,6 +44,7 @@ class MessageListenerConsumer implements Consumer, Abortable {
private final long terminationTimeout;
private volatile boolean rejecting;
private final boolean requeueOnMessageListenerException;
private final boolean requeueOnTimeout;

/**
* True when AMQP auto-ack is true as well. Happens
Expand All @@ -61,8 +62,12 @@ class MessageListenerConsumer implements Consumer, Abortable {
* @param messageListener to call {@link MessageListener#onMessage(javax.jms.Message) onMessage(Message)} with received messages
* @param terminationTimeout wait time (in nanoseconds) for cancel to take effect
*/
public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel channel, MessageListener messageListener, long terminationTimeout,
boolean requeueOnMessageListenerException, ReceivingContextConsumer receivingContextConsumer) {
MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel channel, MessageListener messageListener, long terminationTimeout,
boolean requeueOnMessageListenerException, ReceivingContextConsumer receivingContextConsumer,
boolean requeueOnTimeout) {
if (requeueOnTimeout && !requeueOnMessageListenerException) {
throw new IllegalArgumentException("requeueOnTimeout can be true only if requeueOnMessageListenerException is true as well");
}
this.messageConsumer = messageConsumer;
this.channel = channel;
this.messageListener = messageListener;
Expand All @@ -73,6 +78,7 @@ public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel chann
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
this.skipAck = messageConsumer.amqpAutoAck();
this.receivingContextConsumer = receivingContextConsumer;
this.requeueOnTimeout = requeueOnTimeout;
}

private String getConsTag() {
Expand Down Expand Up @@ -110,7 +116,7 @@ public void handleCancelOk(String consumerTag) {
* {@inheritDoc}
*/
@Override
public void handleCancel(String consumerTag) throws IOException {
public void handleCancel(String consumerTag) {
logger.trace("consumerTag='{}'", consumerTag);
this.completion.setComplete();
}
Expand All @@ -136,19 +142,24 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(),
response, this.receivingContextConsumer);
this.messageConsumer.getSession().addUncommittedTag(dtag);
boolean runtimeExceptionInListener = false;
boolean alreadyNacked = false;
try {
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
} catch (DeliveryExecutor.DeliveryProcessingTimeoutException timeoutException) {
// happens only if requeueOnTimeout is true
logger.debug("nacking {} because of timeout", dtag);
alreadyNacked = true;
nack(dtag);
} catch(RMQMessageListenerExecutionJMSException e) {
if (e.getCause() instanceof RuntimeException) {
runtimeExceptionInListener = true;
alreadyNacked = true;
nack(dtag);
this.abort();
} else {
throw e;
}
}
if (!runtimeExceptionInListener) {
if (!alreadyNacked) {
dealWithAcknowledgments(dtag);
}
} else {
Expand Down
21 changes: 20 additions & 1 deletion src/main/java/com/rabbitmq/jms/client/RMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.jms.client;

import java.io.IOException;
Expand Down Expand Up @@ -94,9 +94,23 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
* Default is false.
*
* @since 1.7.0
* @see RMQConnection#requeueOnTimeout
*/
private final boolean requeueOnMessageListenerException;

/**
* Whether to requeue a message that timed out or not.
*
* Only taken into account if requeueOnMessageListenerException is true.
* Default is false.
*
* @since 2.3.0
* @see RMQConnection#requeueOnMessageListenerException
*/
private final boolean requeueOnTimeout;

/**
* Whether to commit nack on rollback or not.
* Default is false.
Expand Down Expand Up @@ -149,6 +163,9 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
* @param connectionParams parameters for this connection
*/
public RMQConnection(ConnectionParams connectionParams) {
if (connectionParams.willRequeueOnTimeout() && !connectionParams.willRequeueOnMessageListenerException()) {
throw new IllegalArgumentException("requeueOnTimeout can be true only if requeueOnMessageListenerException is true as well");
}

connectionParams.getRabbitConnection().addShutdownListener(new RMQConnectionShutdownListener());

Expand All @@ -166,6 +183,7 @@ public RMQConnection(ConnectionParams connectionParams) {
this.receivingContextConsumer = connectionParams.getReceivingContextConsumer();
this.confirmListener = connectionParams.getConfirmListener();
this.trustedPackages = connectionParams.getTrustedPackages();
this.requeueOnTimeout = connectionParams.willRequeueOnTimeout();
}

/**
Expand Down Expand Up @@ -220,6 +238,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
.setReceivingContextConsumer(this.receivingContextConsumer)
.setConfirmListener(this.confirmListener)
.setTrustedPackages(this.trustedPackages)
.setRequeueOnTimeout(this.requeueOnTimeout)
);
this.sessions.add(session);
return session;
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/com/rabbitmq/jms/client/RMQMessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2013-2021 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.jms.client;

import java.io.IOException;
Expand Down Expand Up @@ -45,7 +45,7 @@
* {@link MessageListener#onMessage} calls are implemented with a more conventional {@link Consumer}.
* </p>
*/
public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber {
class RMQMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber {
private final Logger logger = LoggerFactory.getLogger(RMQMessageConsumer.class);

private static final String DIRECT_REPLY_TO = "amq.rabbitmq.reply-to";
Expand Down Expand Up @@ -80,6 +80,7 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
private final DelayedReceiver delayedReceiver;
/** Record and preserve the need to acknowledge automatically */
private final boolean autoAck;
private final boolean requeueOnTimeout;

/** Track how this consumer is being used. */
private final AtomicInteger numberOfReceives = new AtomicInteger(0);
Expand All @@ -103,7 +104,10 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
* @param requeueOnMessageListenerException true to requeue message on RuntimeException in listener, false otherwise
*/
RMQMessageConsumer(RMQSession session, RMQDestination destination, String uuidTag, boolean paused, String messageSelector, boolean requeueOnMessageListenerException,
ReceivingContextConsumer receivingContextConsumer) {
ReceivingContextConsumer receivingContextConsumer, boolean requeueOnTimeout) {
if (requeueOnTimeout && !requeueOnMessageListenerException) {
throw new IllegalArgumentException("requeueOnTimeout can be true only if requeueOnMessageListenerException is true as well");
}
this.session = session;
this.destination = destination;
this.uuidTag = uuidTag;
Expand All @@ -114,6 +118,7 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
this.autoAck = session.isAutoAck();
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
this.receivingContextConsumer = receivingContextConsumer;
this.requeueOnTimeout = requeueOnTimeout;
}

/**
Expand Down Expand Up @@ -212,7 +217,8 @@ private void setNewListenerConsumer(MessageListener messageListener) throws Exce
messageListener,
TimeUnit.MILLISECONDS.toNanos(this.session.getConnection()
.getTerminationTimeout()),
this.requeueOnMessageListenerException, this.receivingContextConsumer);
this.requeueOnMessageListenerException, this.receivingContextConsumer,
this.requeueOnTimeout);
if (this.listenerConsumer.compareAndSet(null, mlConsumer)) {
this.abortables.add(mlConsumer);
if (!this.getSession().getConnection().isStopped()) {
Expand Down
Loading

0 comments on commit c51297a

Please sign in to comment.