Skip to content

Commit

Permalink
Add option to requeue message on client runtime exception
Browse files Browse the repository at this point in the history
Requeuing a message after a client RuntimeException complies to
the JMS specification. Nevertheless, the default behavior is
still the same: the message is "lost" (as it has been AMQP-acknowledged
before the delivery to the client) if the client fails to process
it (which makes more sense to us).

The option is RMQConnectionFactory#requeueOnMessageListenerException
and is false by default.

Fixes #23
  • Loading branch information
acogoluegnes committed Jun 19, 2017
1 parent d91af11 commit 229a051
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 10 deletions.
23 changes: 23 additions & 0 deletions src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
*/
private boolean preferProducerMessageProperty = true;

/**
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
* Default is false.
*/
private boolean requeueOnMessageListenerException = false;

/** Default not to use ssl */
private boolean ssl = false;
private String tlsProtocol;
Expand Down Expand Up @@ -116,6 +123,7 @@ public Connection createConnection(String username, String password) throws JMSE
.setOnMessageTimeoutMs(getOnMessageTimeoutMs())
.setChannelsQos(channelsQos)
.setPreferProducerMessageProperty(preferProducerMessageProperty)
.setRequeueOnMessageListenerException(requeueOnMessageListenerException)
);
conn.setTrustedPackages(this.trustedPackages);
logger.debug("Connection {} created.", conn);
Expand All @@ -138,6 +146,7 @@ public Connection createConnection(String username, String password, List<Addres
.setOnMessageTimeoutMs(getOnMessageTimeoutMs())
.setChannelsQos(channelsQos)
.setPreferProducerMessageProperty(preferProducerMessageProperty)
.setRequeueOnMessageListenerException(requeueOnMessageListenerException)
);
conn.setTrustedPackages(this.trustedPackages);
logger.debug("Connection {} created.", conn);
Expand Down Expand Up @@ -652,4 +661,18 @@ public void setPreferProducerMessageProperty(boolean preferProducerMessageProper
public boolean isPreferProducerMessageProperty() {
return preferProducerMessageProperty;
}

/**
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
* Default is false.
*/
public void setRequeueOnMessageListenerException(boolean requeueOnMessageListenerException) {
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
}

public boolean isRequeueOnMessageListenerException() {
return requeueOnMessageListenerException;
}
}

16 changes: 16 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ public class ConnectionParams {
*/
private boolean preferProducerMessageProperty = true;

/**
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
* Default is false.
*/
private boolean requeueOnMessageListenerException = false;

public Connection getRabbitConnection() {
return rabbitConnection;
}
Expand Down Expand Up @@ -91,4 +98,13 @@ public ConnectionParams setPreferProducerMessageProperty(boolean preferProducerM
this.preferProducerMessageProperty = preferProducerMessageProperty;
return this;
}

public boolean willRequeueOnMessageListenerException() {
return requeueOnMessageListenerException;
}

public ConnectionParams setRequeueOnMessageListenerException(boolean requeueOnMessageListenerException) {
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void deliverMessageWithProtection(RMQMessage rmqMessage, MessageListener
this.closeAbruptly();
throw new RMQJMSException("onMessage took too long and was interrupted", null);
} catch (ExecutionException e) {
throw new RMQJMSException("onMessage threw exception", e.getCause());
throw new RMQMessageListenerExecutionJMSException("onMessage threw exception", e.getCause());
}
}

Expand Down
38 changes: 32 additions & 6 deletions src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class MessageListenerConsumer implements Consumer, Abortable {
private volatile Completion completion;
private final long terminationTimeout;
private volatile boolean rejecting;
private final boolean requeueOnMessageListenerException;

/**
* Constructor
Expand All @@ -46,14 +47,16 @@ class MessageListenerConsumer implements Consumer, Abortable {
* @param messageListener to call {@link MessageListener#onMessage(javax.jms.Message) onMessage(Message)} with received messages
* @param terminationTimeout wait time (in nanoseconds) for cancel to take effect
*/
public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel channel, MessageListener messageListener, long terminationTimeout) {
public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel channel, MessageListener messageListener, long terminationTimeout,
boolean requeueOnMessageListenerException) {
this.messageConsumer = messageConsumer;
this.channel = channel;
this.messageListener = messageListener;
this.autoAck = messageConsumer.isAutoAck();
this.terminationTimeout = terminationTimeout;
this.completion = new Completion(); // completed when cancelled.
this.rejecting = this.messageConsumer.getSession().getConnection().isStopped();
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
}

private String getConsTag() {
Expand Down Expand Up @@ -110,19 +113,42 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
try {
long dtag = envelope.getDeliveryTag();
if (this.messageListener != null) {
this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag);
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(), response);
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
if (this.requeueOnMessageListenerException) {
// requeuing in case of RuntimeException from the listener
// see https://github.com/rabbitmq/rabbitmq-jms-client/issues/23
// see section 4.5.2 of JMS 1.1 specification
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(), response);
boolean runtimeExceptionInListener = false;
try {
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
} catch(RMQMessageListenerExecutionJMSException e) {
if (e.getCause() instanceof RuntimeException) {
runtimeExceptionInListener = true;
this.messageConsumer.getSession().explicitNack(dtag);
this.abort();
} else {
throw e;
}
}
if (!runtimeExceptionInListener) {
this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag);
}
} else {
// this is the "historical" behavior, not compliant with the spec
this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag);
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(), response);
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
}
} else {
// We are unable to deliver the message, nack it
logger.debug("basicNack: dtag='{}' (null MessageListener)", dtag);
this.messageConsumer.getSession().explicitNack(dtag);
}
} catch (JMSException x) {
x.printStackTrace();
logger.error("Error while delivering message", x);
throw new IOException(x);
} catch (InterruptedException ie) {
ie.printStackTrace();
logger.warn("Message delivery has been interrupted", ie);
throw new IOException("Interrupted while delivering message", ie);
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/RMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
*/
private boolean preferProducerMessageProperty;

/**
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
* Default is false.
*/
private boolean requeueOnMessageListenerException;

/**
* Classes in these packages can be transferred via ObjectMessage.
*
Expand All @@ -105,6 +112,7 @@ public RMQConnection(ConnectionParams connectionParams) {
this.onMessageTimeoutMs = connectionParams.getOnMessageTimeoutMs();
this.channelsQos = connectionParams.getChannelsQos();
this.preferProducerMessageProperty = connectionParams.willPreferProducerMessageProperty();
this.requeueOnMessageListenerException = connectionParams.willRequeueOnMessageListenerException();
}

/**
Expand Down Expand Up @@ -151,6 +159,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
.setMode(acknowledgeMode)
.setSubscriptions(this.subscriptions)
.setPreferProducerMessageProperty(this.preferProducerMessageProperty)
.setRequeueOnMessageListenerException(this.requeueOnMessageListenerException)
);
session.setTrustedPackages(this.trustedPackages);
this.sessions.add(session);
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/com/rabbitmq/jms/client/RMQMessageConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
/** Track how this consumer is being used. */
private final AtomicInteger numberOfReceives = new AtomicInteger(0);

/**
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
*/
private final boolean requeueOnMessageListenerException;

/**
* Creates a RMQMessageConsumer object. Internal constructor used by {@link RMQSession}
*
Expand All @@ -85,8 +91,9 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
* @param uuidTag - when creating queues to a topic, we need a unique queue name for each consumer. This is the
* unique name.
* @param paused - true if the connection is {@link javax.jms.Connection#stop}ped, false otherwise.
* @param requeueOnMessageListenerException true to requeue message on RuntimeException in listener, false otherwise
*/
RMQMessageConsumer(RMQSession session, RMQDestination destination, String uuidTag, boolean paused, String messageSelector) {
RMQMessageConsumer(RMQSession session, RMQDestination destination, String uuidTag, boolean paused, String messageSelector, boolean requeueOnMessageListenerException) {
this.session = session;
this.destination = destination;
this.uuidTag = uuidTag;
Expand All @@ -95,6 +102,7 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
if (!paused)
this.receiveManager.openGate();
this.autoAck = session.isAutoAck();
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
}

/**
Expand Down Expand Up @@ -186,7 +194,8 @@ private void setNewListenerConsumer(MessageListener messageListener) throws Ille
getSession().getChannel(),
messageListener,
TimeUnit.MILLISECONDS.toNanos(this.session.getConnection()
.getTerminationTimeout()));
.getTerminationTimeout()),
this.requeueOnMessageListenerException);
if (this.listenerConsumer.compareAndSet(null, mlConsumer)) {
this.abortables.add(mlConsumer);
if (!this.getSession().getConnection().isStopped()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. */
package com.rabbitmq.jms.client;

import javax.jms.JMSException;

/**
* Wraps an execution exception as a {@link JMSException}.
*/
public class RMQMessageListenerExecutionJMSException extends JMSException {

/** Default version ID */
private static final long serialVersionUID = 1L;

public RMQMessageListenerExecutionJMSException(String msg, Throwable x) {
this(msg, null, x);
}

public RMQMessageListenerExecutionJMSException(Throwable x) {
this(x.getMessage(), x);
}

private RMQMessageListenerExecutionJMSException(String msg, String errorCode, Throwable x) {
super(msg, errorCode);
this.initCause(x);
}

}
10 changes: 9 additions & 1 deletion src/main/java/com/rabbitmq/jms/client/RMQSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ public class RMQSession implements Session, QueueSession, TopicSession {
*/
private boolean preferProducerMessageProperty = true;

/**
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
* Default is false.
*/
private boolean requeueOnMessageListenerException = false;

/** The main RabbitMQ channel we use under the hood */
private final Channel channel;
/** Set to true if close() has been called and completed */
Expand Down Expand Up @@ -177,6 +184,7 @@ public RMQSession(SessionParams sessionParams) throws JMSException {
this.subscriptions = sessionParams.getSubscriptions();
this.deliveryExecutor = new DeliveryExecutor(sessionParams.getOnMessageTimeoutMs());
this.preferProducerMessageProperty = sessionParams.willPreferProducerMessageProperty();
this.requeueOnMessageListenerException = sessionParams.willRequeueOnMessageListenerException();

if (transacted) {
this.acknowledgeMode = Session.SESSION_TRANSACTED;
Expand Down Expand Up @@ -663,7 +671,7 @@ private RMQMessageConsumer createConsumerInternal(RMQDestination dest, String uu
throw new RMQJMSException("RabbitMQ Exception creating Consumer", x);
}
}
RMQMessageConsumer consumer = new RMQMessageConsumer(this, dest, consumerTag, getConnection().isStopped(), jmsSelector);
RMQMessageConsumer consumer = new RMQMessageConsumer(this, dest, consumerTag, getConnection().isStopped(), jmsSelector, this.requeueOnMessageListenerException);
this.consumers.add(consumer);
return consumer;
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/rabbitmq/jms/client/SessionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ public class SessionParams {
*/
private boolean preferProducerMessageProperty = true;

/**
* Whether requeue message on {@link RuntimeException} in the
* {@link javax.jms.MessageListener} or not.
* Default is false.
*/
private boolean requeueOnMessageListenerException = false;

public RMQConnection getConnection() {
return connection;
}
Expand Down Expand Up @@ -86,4 +93,13 @@ public SessionParams setPreferProducerMessageProperty(boolean preferProducerMess
this.preferProducerMessageProperty = preferProducerMessageProperty;
return this;
}

public boolean willRequeueOnMessageListenerException() {
return requeueOnMessageListenerException;
}

public SessionParams setRequeueOnMessageListenerException(boolean requeueOnMessageListenerException) {
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
return this;
}
}
Loading

0 comments on commit 229a051

Please sign in to comment.