Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
support different deletion strategies for sqs listener methods. It is…
Browse files Browse the repository at this point in the history
… now possible to disable the automatic deletion of messages per listener methods. Prior to this commit the deletion strategy was defined once for all listener methods on the simple message listener container and it was not possible to completely disable the deletion of messages.
  • Loading branch information
alainsahli committed Oct 31, 2015
1 parent e0e1e9b commit ae567da
Show file tree
Hide file tree
Showing 23 changed files with 1,133 additions and 517 deletions.
6 changes: 3 additions & 3 deletions docs/src/main/asciidoc/spring-cloud-aws.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ Annotation-driven listener endpoints are the easiest way for listening on SQS me

[source,java,indent=0]
----
@MessageMapping("queueName")
@SqsListener("queueName")
public void queueListener(Person person) {
// ...
}
Expand Down Expand Up @@ -969,7 +969,7 @@ the `DestinationResolvingMessageSendingOperations` interface.

[source,java,indent=0]
----
@MessageMapping("treeQueue")
@SqsListener("treeQueue")
@SendTo("leafsQueue")
public List<Leaf> extractLeafs(Tree tree) {
// ...
Expand Down Expand Up @@ -1151,7 +1151,7 @@ as shown below:

[source,java,indent=0]
----
@MessageMapping("LogicalQueueName")
@SqsListener("LogicalQueueName")
public void receiveQueueMessages(Person person) {
// Logical names can also be used with messaging templates
this.notificationMessagingTemplate.sendNotification("anotherLogicalTopicName", "Message", "Subject");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public RedrivePolicyTestListener redrivePolicyTestListener() {
return new RedrivePolicyTestListener();
}

@Bean
public ManualDeletionPolicyTestListener manualDeletionPolicyTestListener() {
return new ManualDeletionPolicyTestListener();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public RedrivePolicyTestListener redrivePolicyTestListener() {
return new RedrivePolicyTestListener();
}

@Bean
public ManualDeletionPolicyTestListener manualDeletionPolicyTestListener() {
return new ManualDeletionPolicyTestListener();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.core.env.stack.StackResourceRegistry;
import org.springframework.cloud.aws.core.support.documentation.RuntimeUse;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.core.task.TaskExecutor;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.ArrayList;
Expand Down Expand Up @@ -79,7 +79,7 @@ public void listenToAllMessagesUntilTheyAreReceivedOrTimeOut() throws Exception
static class MessageReceiver {

@RuntimeUse
@MessageMapping("LoadTestQueue")
@SqsListener("LoadTestQueue")
public void onMessage(String message) {
assertNotNull(message);
this.getCountDownLatch().countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -109,7 +109,7 @@ private void reset() {
}

@RuntimeUse
@MessageMapping("NotificationQueue")
@SqsListener("NotificationQueue")
private void messageListener(@NotificationSubject String subject, @NotificationMessage String message) {
this.subject = subject;
this.message = message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.core.support.documentation.RuntimeUse;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.Acknowledgment;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
Expand All @@ -37,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -64,6 +67,9 @@ public abstract class QueueListenerTest extends AbstractContainerTest {
@Autowired
private QueueMessagingTemplate queueMessagingTemplate;

@Autowired
private ManualDeletionPolicyTestListener manualDeletionPolicyTestListener;

@Test
public void messageMapping_singleMessageOnQueue_messageReceived() throws Exception {
// Arrange
Expand Down Expand Up @@ -146,6 +152,15 @@ public void redrivePolicy_withMessageMappingThrowingAnException_messageShouldApp
assertTrue(countDownLatch.await(15, TimeUnit.SECONDS));
}

@Test
public void manualDeletion_withAcknowledgmentCalled_shouldSucceedAndDeleteMessage() throws Exception {
// Act
this.queueMessagingTemplate.convertAndSend("ManualDeletionQueue", "Message");

// Assert
assertTrue(this.manualDeletionPolicyTestListener.getCountDownLatch().await(15, TimeUnit.SECONDS));
}

public static class MessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(MessageListener.class);
Expand All @@ -155,7 +170,7 @@ public static class MessageListener {
private Map<String, Object> allHeaders;

@RuntimeUse
@MessageMapping("QueueListenerTest")
@SqsListener("QueueListenerTest")
public void receiveMessage(String message, @Header(value = "SenderId", required = false) String senderId, @Headers Map<String, Object> allHeaders) {
LOGGER.debug("Received message with content {}", message);
this.receivedMessages.add(message);
Expand Down Expand Up @@ -191,7 +206,7 @@ public static class MessageListenerWithSendTo {
private final List<String> receivedMessages = new ArrayList<>();

@RuntimeUse
@MessageMapping("SendToQueue")
@SqsListener("SendToQueue")
@SendTo("QueueListenerTest")
public String receiveMessage(String message) {
LOGGER.debug("Received message with content {}", message);
Expand All @@ -214,13 +229,13 @@ public void setCountDownLatch(CountDownLatch countDownLatch) {
}

@RuntimeUse
@MessageMapping("QueueWithRedrivePolicy")
@SqsListener(value = "QueueWithRedrivePolicy", deletionPolicy = SqsMessageDeletionPolicy.NO_REDRIVE)
public void receiveThrowingException(String message) {
throw new RuntimeException();
}

@RuntimeUse
@MessageMapping("DeadLetterQueue")
@SqsListener("DeadLetterQueue")
public void receiveDeadLetters(String message) {
this.countDownLatch.countDown();
}
Expand All @@ -232,4 +247,20 @@ public void handle() {

}

public static class ManualDeletionPolicyTestListener {

private final CountDownLatch countDownLatch = new CountDownLatch(1);

@SqsListener(value = "ManualDeletionQueue", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receive(String message, Acknowledgment acknowledgment) throws ExecutionException, InterruptedException {
acknowledgment.acknowledge().get();
this.countDownLatch.countDown();
}

public CountDownLatch getCountDownLatch() {
return this.countDownLatch;
}

}

}
Loading

0 comments on commit ae567da

Please sign in to comment.