Skip to content

Commit

Permalink
ARTEMIS-3082 fix non-destructive + rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Jan 28, 2021
1 parent 0845ff2 commit 88b21f9
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3919,10 +3919,14 @@ void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {

return;
}
if (sorted) {
addSorted(refs, false);
} else {
addHead(refs, false);

// if the queue is non-destructive then any ack is ignored so no need to add messages back onto the queue
if (!isNonDestructive()) {
if (sorted) {
addSorted(refs, false);
} else {
addHead(refs, false);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;


import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
Expand Down Expand Up @@ -329,6 +328,30 @@ public void testNonDestructiveLVQTombstone(ConnectionSupplier producerConnection

}

@Test
public void testMessageCount() throws Exception {
sendMessage(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);

QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(NON_DESTRUCTIVE_QUEUE_NAME));
assertEquals("Ensure Message count", 1, queueBinding.getQueue().getMessageCount());

//Consume Once
receive(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
assertEquals("Ensure Message count", 1, queueBinding.getQueue().getMessageCount());

sendMessage(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
assertEquals("Ensure Message count", 2, queueBinding.getQueue().getMessageCount());

//Consume Again as should be non-destructive
receive(CoreConnection, NON_DESTRUCTIVE_QUEUE_NAME);
assertEquals("Ensure Message count", 2, queueBinding.getQueue().getMessageCount());

QueueControl control = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + NON_DESTRUCTIVE_QUEUE_NAME);
control.removeAllMessages();

assertEquals("Message count after clearing queue via queue control should be 0", 0, queueBinding.getQueue().getMessageCount());
}


private void receive(ConnectionSupplier consumerConnectionSupplier, String queueName, int i) throws JMSException {
try (Connection consumerConnection = consumerConnectionSupplier.createConnection()) {
Expand Down

0 comments on commit 88b21f9

Please sign in to comment.