-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Requeue message on unhandled client exception #23
Comments
Hi @damonsutherland. This is how the "regular" RabbitMQ Java client used to work up to https://github.com/rabbitmq/rabbitmq-java-client/blob/master/src/main/java/com/rabbitmq/client/impl/ForgivingExceptionHandler.java. Requeueing the message is quite trivial but I'd like to suggest making this configurable, just like it is in the "regular" Java client. The default can be different ("requeue on error") if it makes more sense for JMS. |
This is a breaking change if we make "requeue on exception" the default => can go into 2.0. |
@michaelklishin, that will work perfectly! Thanks. Do you have an estimated timeframe for the 2.0 release? |
@damonsutherland it's mostly up to @acogoluegnes to decide but I'd say 4 weeks sounds doable. We haven't shipped 1.7.0 yet :) |
@damonsutherland Could you provide some sample code that reproduces the problem? |
Sure. I have attached a test project: I am also wondering if part of the problem of not redelivering message 0 is occurring because it appears that messages, when Session.AUTO_ACKNOWLEDGE is set, are being acknowledged before being sent to the MessageListener. To prevent lost messages, should messages be acknowledged after being sent to the registered MessageListener? public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
logger.trace("consumerTag='{}' envelope='{}'", consumerTag, envelope);
if (this.rejecting) {
long dtag = envelope.getDeliveryTag();
logger.debug("basicNack: dtag='{}'", dtag);
this.messageConsumer.getSession().explicitNack(dtag);
return;
}
/* Wrap the incoming message in a GetResponse */
GetResponse response = new GetResponse(envelope, properties, body, 0); // last parameter is remaining message count, which we don't know.
try {
long dtag = envelope.getDeliveryTag();
if (this.messageListener != null) {
this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag);
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(), response);
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
} else {
// We are unable to deliver the message, nack it
logger.debug("basicNack: dtag='{}' (null MessageListener)", dtag);
this.messageConsumer.getSession().explicitNack(dtag);
}
} catch (JMSException x) {
x.printStackTrace();
throw new IOException(x);
} catch (InterruptedException ie) {
ie.printStackTrace();
throw new IOException("Interrupted while delivering message", ie);
}
} see line |
Thank you, Damon!
We will get to it eventually (and I really hope in no more than
2 weeks for an RC that you can try).
…On Fri, Jun 2, 2017 at 10:22 PM, Damon Sutherland ***@***.***> wrote:
Sure. I have attached a test project:
rabbitmq.jms.error.zip
<https://github.com/rabbitmq/rabbitmq-jms-client/files/1048725/rabbitmq.jms.error.zip>
I am also wondering if part of the problem of not redelivering message 0
is occurring because it appears that messages, when
Session.AUTO_ACKNOWLEDGE is set, are being acknowledged before being sent
to the MessageListener. To prevent lost messages, should messages be
acknowledged after being sent to the registered MessageListener?
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
logger.trace("consumerTag='{}' envelope='{}'", consumerTag, envelope);
if (this.rejecting) {
long dtag = envelope.getDeliveryTag();
logger.debug("basicNack: dtag='{}'", dtag);
this.messageConsumer.getSession().explicitNack(dtag);
return;
}
/* Wrap the incoming message in a GetResponse */
GetResponse response = new GetResponse(envelope, properties, body, 0); // last parameter is remaining message count, which we don't know.
try {
long dtag = envelope.getDeliveryTag();
if (this.messageListener != null) {
this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag);
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(), response);
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
} else {
// We are unable to deliver the message, nack it
logger.debug("basicNack: dtag='{}' (null MessageListener)", dtag);
this.messageConsumer.getSession().explicitNack(dtag);
}
} catch (JMSException x) {
x.printStackTrace();
throw new IOException(x);
} catch (InterruptedException ie) {
ie.printStackTrace();
throw new IOException("Interrupted while delivering message", ie);
}
}
see line this.messageConsumer.dealWithAcknowledgements(this.autoAck,
dtag);
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#23 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAAEQp463cOJC4y7O80kxyMaEDZWLsRcks5sAGEKgaJpZM4Ntrjn>
.
--
MK
Staff Software Engineer, Pivotal/RabbitMQ
|
Perfect. I did fork this project and have a fix in 1.6 that seems to address the issue. If you would like, I can submit a pull request, or just post the change (it is pretty simple). It isn't configurable though. |
Thanks @damonsutherland for pointing this out. I'm not sure the re-delivery should be the default, the specification doesn't make much sense to me in this case (with auto-acknowledgment enabled, the onus is on the consumer to handle the message, too bad if it fails). |
Requeuing a message after a client RuntimeException complies to the JMS specification. Nevertheless, the default behavior is still the same: the message is "lost" (as it has been AMQP-acknowledged before the delivery to the client) if the client fails to process it (which makes more sense to us). The option is RMQConnectionFactory#requeueOnMessageListenerException and is false by default. Fixes #23
@damonsutherland is the solution in #30 good enough for you? That's a bit different from what we do in our "regular" Java client but it's the same fundamental idea. |
onMessageTimeoutMs: references rabbitmq/rabbitmq-jms-client#5 preferProducerMessageProperty: references rabbitmq/rabbitmq-jms-client#26 requeueOnMessageListenerException: references rabbitmq/rabbitmq-jms-client#23
@michaelklishin Yes the solution in #30 should work. Thanks. |
@damonsutherland you can have a try in 1.7.0.RC2. |
In a MessageListener's onMessage(Message) method, if a runtime exception is thrown I was expecting the message to be requeued and redelivered. I am not seeing that happen.
Here is the setup:
Results:
In section 4.5.2 of the JMS 1.1 spec, it states that, although it is a client error to throw in onMessage, if it does happen, messages should be re-queued and redelivered. I briefly checked out the JMS Compliant Test suite you're using, and it doesn't appear this type of test is covered.
The text was updated successfully, but these errors were encountered: