Skip to content

Commit

Permalink
Make JMSXDeliveryCount mandatory
Browse files Browse the repository at this point in the history
JMS 3.1 spec, "A.3.13. JMSXDeliveryCount (JMS_SPEC-42)"
https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1.html#jmsxdeliverycount-jms_spec-42

The header is set on every message:
 * to 1 for each message which is not redelivered
 * to the value of the AMQP `x-delivery-count` header + 1 for
a delivered message and if the header is present (for *quorum queues* only)
 * to 2 for a delivered message and if the AMQP `x-delivery-count` header
is not present

References #180

Fixes #202
  • Loading branch information
acogoluegnes committed Sep 27, 2022
1 parent 0037dee commit 197fddd
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 43 deletions.
31 changes: 28 additions & 3 deletions src/main/java/com/rabbitmq/jms/client/RMQMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ protected void loggerDebugByteArray(String format, byte[] buffer, Object arg) {
static final String JMS_MESSAGE_EXPIRATION = PREFIX + "jms.message.expiration";
static final String JMS_MESSAGE_PRIORITY = PREFIX + "jms.message.priority";

/**
* JMS Defined Properties
*/
private static final String JMS_X_DELIVERY_COUNT = "JMSXDeliveryCount";

/**
* For turning {@link String}s into <code>byte[]</code> and back we use this {@link Charset} instance.
* This is used for {@link RMQMessage#getJMSCorrelationIDAsBytes()}.
Expand All @@ -128,7 +133,7 @@ protected void loggerDebugByteArray(String format, byte[] buffer, Object arg) {
/** Here we store the JMS_ properties that would have been fields */
private final Map<String, Serializable> rmqProperties = new HashMap<String, Serializable>();
/** Here we store the user’s custom JMS properties */
private final Map<String, Serializable> userJmsProperties = new HashMap<String, Serializable>();
private final Map<String, Serializable> userJmsProperties = new HashMap<>();
/**
* We generate a unique message ID each time we send a message
* It is stored here. This is also used for
Expand Down Expand Up @@ -917,7 +922,7 @@ static RMQMessage convertJmsMessage(RMQSession session, GetResponse response, Re
RMQMessage message = fromMessage(response.getBody(), session.getTrustedPackages());

message.setSession(session); // Insert session in received message for Message.acknowledge
message.setJMSRedelivered(response.getEnvelope().isRedeliver()); // Set the redelivered flag
message = handleJmsRedeliveredAndDeliveryCount(response, message);
message.setRabbitDeliveryTag(response.getEnvelope().getDeliveryTag()); // Insert delivery tag in received message for Message.acknowledge
// message.setJMSDestination(dest); // DO NOT set the destination bug#57214768
// JMSProperties already set
Expand All @@ -937,7 +942,7 @@ private static RMQMessage convertAmqpMessage(RMQSession session, RMQDestination
message = RMQMessage.fromAmqpMessage(response.getBody(), message); // Deserialize the message payload from the byte[] body

message.setSession(session); // Insert session in received message for Message.acknowledge
message.setJMSRedelivered(response.getEnvelope().isRedeliver()); // Set the redelivered flag
message = handleJmsRedeliveredAndDeliveryCount(response, message);
message.setRabbitDeliveryTag(response.getEnvelope().getDeliveryTag()); // Insert delivery tag in received message for Message.acknowledge
message.setJMSDestination(dest); // We cannot know the original destination, so set local one
message.setJMSPropertiesFromAmqpProperties(props);
Expand All @@ -952,6 +957,26 @@ private static RMQMessage convertAmqpMessage(RMQSession session, RMQDestination
}
}

private static RMQMessage handleJmsRedeliveredAndDeliveryCount(GetResponse response, RMQMessage message)
throws JMSException {
boolean redelivered = response.getEnvelope().isRedeliver();
message.setJMSRedelivered(redelivered);
if (redelivered) {
Number deliveryCount = (Number) response.getProps().getHeaders().get("x-delivery-count");
if (deliveryCount == null) {
message.setIntProperty(JMS_X_DELIVERY_COUNT, 2);
} else {
// We add one, as the count starts at 0 for RabbitMQ.
// This is modeled after the AMQP 1.0 "delivery-count" transport header for a message
// (AMQP 1.0 specification, section 3.2.1)
message.setIntProperty(JMS_X_DELIVERY_COUNT, deliveryCount.intValue() + 1);
}
} else {
message.setIntProperty(JMS_X_DELIVERY_COUNT, 1);
}
return message;
}

/**
* Properly assign JMSReplyTo header when using direct reply to.
* <p>
Expand Down
59 changes: 59 additions & 0 deletions src/test/java/com/rabbitmq/Assertions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.


package com.rabbitmq;

import javax.jms.JMSException;
import javax.jms.Message;
import org.assertj.core.api.AbstractObjectAssert;

public abstract class Assertions {

public static JmsMessageAssert assertThat(Message message) {
return new JmsMessageAssert(message);
}

public static class JmsMessageAssert extends AbstractObjectAssert<JmsMessageAssert, Message> {

private static final String JMS_X_DELIVERY_COUNT = "JMSXDeliveryCount";

public JmsMessageAssert isRedelivered() throws JMSException {
isNotNull();
if (!actual.getJMSRedelivered()) {
failWithMessage("Message is expected to be redelivered");
}
return this;
}

public JmsMessageAssert isNotRedelivered() throws JMSException {
isNotNull();
if (actual.getJMSRedelivered()) {
failWithMessage("Message is not expected to be redelivered");
}
return this;
}

public JmsMessageAssert hasDeliveryCount(int expectedDeliveryCount) throws JMSException {
isNotNull();
int actualDeliveryCount = this.actual.getIntProperty(JMS_X_DELIVERY_COUNT);
if (actualDeliveryCount != expectedDeliveryCount) {
failWithMessage("Delivery count is expected to be %d but is %d", expectedDeliveryCount, actualDeliveryCount);
}
return this;
}

public JmsMessageAssert(Message message) {
super(message, JmsMessageAssert.class);
}

public static JmsMessageAssert assertThat(Message message) {
return new JmsMessageAssert(message);
}

}

}
14 changes: 14 additions & 0 deletions src/test/java/com/rabbitmq/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.UUID;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
import org.junit.jupiter.api.extension.ExecutionCondition;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -83,4 +86,15 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
@Documented
@ExtendWith(DisabledIfTlsNotEnabledCondition.class)
public @interface DisabledIfTlsNotEnabled {}

public static String queueName(TestInfo info) {
return queueName(info.getTestClass().get(), info.getTestMethod().get());
}

private static String queueName(Class<?> testClass, Method testMethod) {
String uuid = UUID.randomUUID().toString();
return String.format(
"%s_%s%s",
testClass.getSimpleName(), testMethod.getName(), uuid.substring(uuid.length() / 2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2013-2022 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.integration.tests;

import static org.junit.jupiter.api.Assertions.fail;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.jms.client.RMQConnection;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.concurrent.TimeoutException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
Expand Down Expand Up @@ -61,4 +67,26 @@ public void afterTests() throws Exception {
if (queueConn != null)
queueConn.close();
}

protected Connection amqpConnection() {
Connection amqpConnection = null;
if (this.queueConn != null) {
try {
Field connectionField = RMQConnection.class.getDeclaredField("rabbitConnection");
connectionField.setAccessible(true);
amqpConnection = (Connection) connectionField.get(this.queueConn);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return amqpConnection;
}

protected void deleteQueue(String queue) {
try (Channel ch = amqpConnection().createChannel()) {
ch.queueDelete(queue);
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
102 changes: 65 additions & 37 deletions src/test/java/com/rabbitmq/integration/tests/RecoverMessagesIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2013-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2013-2022 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.integration.tests;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static com.rabbitmq.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import com.rabbitmq.TestUtils;
import com.rabbitmq.jms.admin.RMQDestination;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
Expand All @@ -25,6 +26,7 @@
import javax.jms.TextMessage;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

/**
* Integration test
Expand All @@ -44,20 +46,49 @@ public void testRecoverTextMessageSync() throws Exception {
TextMessage message = queueSession.createTextMessage(MESSAGE);
queueSender.send(message);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
TextMessage tmsg1 = (TextMessage) queueReceiver.receive();
assertFalse(tmsg1.getJMSRedelivered());
Message msg1 = queueReceiver.receive();
assertThat(msg1).isNotRedelivered().hasDeliveryCount(1);
queueSession.recover();
TextMessage tmsg2 = (TextMessage) queueReceiver.receive();
assertEquals(tmsg1, tmsg2);
assertTrue(tmsg2.getJMSRedelivered());
tmsg2.acknowledge();
TextMessage tmsg3 = (TextMessage) queueReceiver.receiveNoWait();
assertNull(tmsg3);
Message msg2 = queueReceiver.receive();
assertThat(msg2).isEqualTo(msg1).isRedelivered().hasDeliveryCount(2);
msg2.acknowledge();
Message msg3 = queueReceiver.receiveNoWait();
assertThat(msg3).isNull();
}

@Test
public void testRecoverTextMessageSyncRedeliveryCountShouldBeSetWithQuorumQueue(TestInfo info) throws Exception {
queueConn.start();
QueueSession queueSession = queueConn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
String queueName = TestUtils.queueName(info);
Queue queue =
new RMQDestination(
queueName, true, false, Collections.singletonMap("x-queue-type", "quorum"));
try {
QueueSender queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = queueSession.createTextMessage(MESSAGE);
queueSender.send(message);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
Message msg1 = queueReceiver.receive();
assertThat(msg1).isNotRedelivered().hasDeliveryCount(1);
queueSession.recover();
Message msg2 = queueReceiver.receive();
assertThat(msg2).isEqualTo(msg1).isRedelivered().hasDeliveryCount(2);
queueSession.recover();
Message msg3 = queueReceiver.receiveNoWait();
assertThat(msg3).isEqualTo(msg1).isRedelivered().hasDeliveryCount(3);
msg3.acknowledge();
Message tmsg4 = queueReceiver.receiveNoWait();
assertThat(tmsg4).isNull();
} finally {
deleteQueue(queueName);
}
}

@Test
public void testRecoverTextMessageAsyncSync() throws Exception {
final ArrayList<Message> messages = new ArrayList<Message>();
List<Message> messages = new CopyOnWriteArrayList<>();

queueConn.start();
QueueSession queueSession = queueConn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
Expand All @@ -67,32 +98,29 @@ public void testRecoverTextMessageAsyncSync() throws Exception {
TextMessage message = queueSession.createTextMessage(MESSAGE);
queueSender.send(message);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
final CountDownLatch latch = new CountDownLatch(2);
queueReceiver.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
messages.add(message);
latch.countDown();
}
final CountDownLatch firstMessageLatch = new CountDownLatch(1);
final CountDownLatch secondMessageLatch = new CountDownLatch(2);
queueReceiver.setMessageListener(message1 -> {
messages.add(message1);
firstMessageLatch.countDown();
secondMessageLatch.countDown();
});
// allow subscription to take place
Thread.sleep(100);
firstMessageLatch.await(5000, TimeUnit.MILLISECONDS);
// we should have received one message
assertEquals(1, messages.size());
TextMessage tmsg1 = (TextMessage) messages.get(0);
assertFalse(tmsg1.getJMSRedelivered());
assertThat(messages).hasSize(1);
Message msg1 = messages.get(0);
assertThat(msg1).isNotRedelivered().hasDeliveryCount(1);
queueSession.recover();
latch.await(1000, TimeUnit.MILLISECONDS);
secondMessageLatch.await(5000, TimeUnit.MILLISECONDS);
// we should have received two messages
// There is no synchronisation so no guarantee we see the latest messages!
assertEquals(2, messages.size());
TextMessage tmsg2 = (TextMessage) messages.get(1);
assertEquals(tmsg1, tmsg2);
assertTrue(tmsg2.getJMSRedelivered());

tmsg2.acknowledge();
assertThat(messages).hasSize(2);
Message msg2 = messages.get(1);
assertThat(msg2).isEqualTo(msg1).isRedelivered().hasDeliveryCount(2);
msg2.acknowledge();
queueReceiver.setMessageListener(null);
TextMessage tmsg3 = (TextMessage) queueReceiver.receiveNoWait();
assertNull(tmsg3);
Message msg3 = queueReceiver.receiveNoWait();
assertThat(msg3).isNull();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void testSendFromAmqpAndReceiveBytesMessage() throws Exception {
propNameSet.add((String) propNames.nextElement());
}

assertEquals(new HashSet<>(Arrays.asList(USER_STRING_PROPERTY_NAME, "DummyProp"))
assertEquals(new HashSet<>(Arrays.asList(USER_STRING_PROPERTY_NAME, "DummyProp", "JMSXDeliveryCount"))
, propNameSet
, "Headers not set correctly");

Expand Down Expand Up @@ -238,7 +238,7 @@ public void testSendFromAmqpAndReceiveTextMessage() throws Exception {
propNameSet.add((String) propNames.nextElement());
}

assertEquals(new HashSet<>(Arrays.asList(USER_STRING_PROPERTY_NAME, "DummyProp"))
assertEquals(new HashSet<>(Arrays.asList(USER_STRING_PROPERTY_NAME, "DummyProp", "JMSXDeliveryCount"))
, propNameSet
, "Headers not set correctly");

Expand Down

0 comments on commit 197fddd

Please sign in to comment.