Skip to content

Commit

Permalink
Allow prefixing dashboard url and consuming of dead letter queue mess…
Browse files Browse the repository at this point in the history
…ages (#31)

* * Allow prefixing the urls #30
* Allow consuming dead letter queue messages.

* Fixes for #29

* removed ignored

* updated copyright

* updated versions.
  • Loading branch information
sonus21 committed Aug 1, 2020
1 parent 1cb7e7c commit a2f8de4
Show file tree
Hide file tree
Showing 249 changed files with 1,272 additions and 659 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@
[1.4.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/1.4.0-RELEASE
[2.0.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.0-RELEASE
[2.0.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.1-RELEASE
[2.0.2]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.2-RELEASE
[2.0.3]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.3-RELEASE
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
[![Coverage Status](https://coveralls.io/repos/github/sonus21/rqueue/badge.svg?branch=master)](https://coveralls.io/github/sonus21/rqueue?branch=master)
[![Maven Central](https://img.shields.io/maven-central/v/com.github.sonus21/rqueue)](https://repo1.maven.org/maven2/com/github/sonus21/rqueue)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
[![Javadoc](https://javadoc.io/badge2/com.github.sonus21/rqueue-core/javadoc.svg)](https://javadoc.io/doc/com.github.sonus21/rqueue-core)

**Rqueue** is an asynchronous task executor(worker) built for spring framework based on the spring framework's messaging library backed by Redis. It can be used as message broker as well, where all services code is in Spring.

Expand Down Expand Up @@ -36,14 +37,14 @@
* Add dependency
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.1-RELEASE'
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.3-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring-boot-starter</artifactId>
<version>2.0.1-RELEASE</version>
<version>2.0.3-RELEASE</version>
</dependency>
```

Expand All @@ -52,14 +53,14 @@
* Add Dependency
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring:2.0.1-RELEASE'
implementation 'com.github.sonus21:rqueue-spring:2.0.3-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring</artifactId>
<version>2.0.1-RELEASE</version>
<version>2.0.3-RELEASE</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ ext {

subprojects {
group = 'com.github.sonus21'
version = '2.0.2-RELEASE'
version = '2.0.3-RELEASE'

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
2 changes: 1 addition & 1 deletion rqueue-common-test/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dependencies {
compile project(":rqueue")
compile project(":rqueue-core")
compile project(":rqueue-test-util")

// https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Notification;
import com.github.sonus21.rqueue.test.dto.Reservation;
import com.github.sonus21.rqueue.test.dto.ReservationRequest;
import com.github.sonus21.rqueue.test.dto.Sms;
import com.github.sonus21.rqueue.test.service.ConsumedMessageService;
import com.github.sonus21.rqueue.test.service.FailureManager;
Expand All @@ -51,7 +52,7 @@ public void onMessage(Job job) throws Exception {
if (failureManager.shouldFail(job.getId())) {
throw new Exception("Failing job task to be retried" + job);
}
consumedMessageService.save(job);
consumedMessageService.save(job, null);
}

@RqueueListener(
Expand All @@ -65,7 +66,7 @@ public void onMessage(
if (failureManager.shouldFail(notification.getId())) {
throw new Exception("Failing notification task to be retried" + notification);
}
consumedMessageService.save(notification);
consumedMessageService.save(notification, null);
}

@RqueueListener(
Expand All @@ -80,7 +81,7 @@ public void onMessage(Email email, @Header(RqueueMessageHeaders.MESSAGE) RqueueM
if (failureManager.shouldFail(email.getId())) {
throw new Exception("Failing email task to be retried" + email);
}
consumedMessageService.save(email);
consumedMessageService.save(email, null);
}

@RqueueListener(
Expand All @@ -94,7 +95,7 @@ public void onMessage(Sms sms) throws Exception {
if (failureManager.shouldFail(sms.getId())) {
throw new Exception("Failing sms task to be retried" + sms);
}
consumedMessageService.save(sms);
consumedMessageService.save(sms, null);
}

@RqueueListener(
Expand All @@ -108,7 +109,7 @@ public void onMessage(ChatIndexing chatIndexing) throws Exception {
if (failureManager.shouldFail(chatIndexing.getId())) {
throw new Exception("Failing chat indexing task to be retried" + chatIndexing);
}
consumedMessageService.save(chatIndexing);
consumedMessageService.save(chatIndexing, null);
}

@RqueueListener(
Expand All @@ -122,7 +123,7 @@ public void onMessage(FeedGeneration feedGeneration) throws Exception {
if (failureManager.shouldFail(feedGeneration.getId())) {
throw new Exception("Failing feedGeneration task to be retried" + feedGeneration);
}
consumedMessageService.save(feedGeneration);
consumedMessageService.save(feedGeneration, null);
}

@RqueueListener(
Expand All @@ -136,6 +137,30 @@ public void onMessage(Reservation reservation) throws Exception {
if (failureManager.shouldFail(reservation.getId())) {
throw new Exception("Failing reservation task to be retried" + reservation);
}
consumedMessageService.save(reservation);
consumedMessageService.save(reservation, null);
}

@RqueueListener(
value = "${reservation.request.queue.name}",
deadLetterQueue = "${reservation.request.dead.letter.queue.name}",
deadLetterQueueListenerEnabled = "${reservation.request.dead.letter.consumer.enabled}",
active = "${reservation.request.active}",
numRetries = "${reservation.request.queue.retry.count}")
public void onMessageReservationRequest(ReservationRequest request) throws Exception {
log.info("ReservationRequest {}", request);
if (failureManager.shouldFail(request.getId())) {
throw new Exception("Failing reservation request task to be retried" + request);
}
consumedMessageService.save(request, null);
}

@RqueueListener(
value = "${reservation.request.dead.letter.queue.name}",
active = "${reservation.request.dead.letter.consumer.enabled}",
numRetries = "${reservation.request.dead.letter.queue.retry.count}")
public void onMessageReservationRequestDeadLetterQueue(ReservationRequest request)
throws Exception {
log.info("ReservationRequest Dead Letter Queue{}", request);
consumedMessageService.save(request, "reservation-request-dlq");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ public abstract class SpringTestBase extends TestBase {
@Value("${reservation.queue}")
protected String reservationQueue;

@Value("${reservation.request.dead.letter.queue.name}")
protected String reservationRequestDeadLetterQueue;

@Value("${reservation.request.queue.name}")
protected String reservationRequestQueue;

@Value("${reservation.request.queue.retry.count}")
protected int reservationRequestQueueRetryCount;

protected void enqueue(Object message, String queueName) {
RqueueMessage rqueueMessage = RqueueMessageFactory.buildMessage(message, queueName, null, null);
rqueueMessageTemplate.addMessage(queueName, rqueueMessage);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.sonus21.rqueue.test.dto;

import java.util.UUID;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@NoArgsConstructor
@Getter
@Setter
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class ReservationRequest extends BaseQueueMessage {
public static ReservationRequest newInstance() {
ReservationRequest reservationRequest = new ReservationRequest();
reservationRequest.setId(UUID.randomUUID().toString());
return reservationRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ public class ConsumedMessage {
// Around 1 MB of data
@Column(length = 1000000)
private String message;

@Column private String tag;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public class ConsumedMessageService {
@NonNull private ConsumedMessageRepository consumedMessageRepository;
@NonNull private ObjectMapper objectMapper;

public <T extends BaseQueueMessage> ConsumedMessage save(BaseQueueMessage message)
public <T extends BaseQueueMessage> ConsumedMessage save(BaseQueueMessage message, String tag)
throws JsonProcessingException {
String textMessage = objectMapper.writeValueAsString(message);
ConsumedMessage consumedMessage = new ConsumedMessage(message.getId(), textMessage);
ConsumedMessage consumedMessage = new ConsumedMessage(message.getId(), textMessage, tag);
consumedMessageRepository.save(consumedMessage);
return consumedMessage;
}
Expand All @@ -62,4 +62,8 @@ public <T> Map<String, T> getMessages(Collection<String> ids, Class<T> tClass) {
});
return idToMessage;
}

public ConsumedMessage getConsumedMessage(String id) {
return consumedMessageRepository.findById(id).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.github.sonus21.rqueue.test.dto.Email;
import com.github.sonus21.rqueue.test.dto.Job;
import com.github.sonus21.rqueue.test.dto.Notification;
import com.github.sonus21.rqueue.test.dto.ReservationRequest;
import com.github.sonus21.rqueue.test.entity.ConsumedMessage;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -48,7 +50,7 @@ protected void verifyAfterNRetryTaskIsDeletedFromProcessingQueue() throws TimedO
"message should be deleted from internal storage");
}

protected void verifyMessageMovedToDelayedQueue() throws TimedOutException {
protected void verifyMessageMovedToDeadLetterQueue() throws TimedOutException {
cleanQueue(emailQueue);
Email email = Email.newInstance();
failureManager.createFailureDetail(email.getId(), -1, 0);
Expand All @@ -60,7 +62,7 @@ protected void verifyMessageMovedToDelayedQueue() throws TimedOutException {
"all retry to be exhausted");
waitFor(
() -> stringRqueueRedisTemplate.getListSize(emailDeadLetterQueue) > 0,
"message should be moved to delayed queue");
"message should be moved to dead letter queue");
assertEquals(emailRetryCount, failureManager.getFailureCount(email.getId()));
failureManager.delete(email.getId());
}
Expand Down Expand Up @@ -126,4 +128,31 @@ public void verifyMessageIsInProcessingQueue() throws TimedOutException {
// more then one copy should not be present
assertEquals(1, messageSender.getAllMessages(jobQueue).size());
}

public void verifyMessageIsConsumedByDeadLetterQueueListener() throws TimedOutException {
cleanQueue(reservationRequestQueue);
cleanQueue(reservationRequestDeadLetterQueue);
ReservationRequest request = ReservationRequest.newInstance();
failureManager.createFailureDetail(
request.getId(), reservationRequestQueueRetryCount, reservationRequestQueueRetryCount);
messageSender.enqueue(reservationRequestQueue, request);
waitFor(
() -> failureManager.getFailureCount(request.getId()) >= reservationRequestQueueRetryCount,
60000,
"ReservationRequest to be retried");
waitFor(
() -> {
ReservationRequest requestInDb =
consumedMessageService.getMessage(request.getId(), ReservationRequest.class);
return request.equals(requestInDb);
},
30000,
"ReservationRequest to be run");
ConsumedMessage consumedMessage = consumedMessageService.getConsumedMessage(request.getId());
assertEquals(consumedMessage.getTag(), "reservation-request-dlq");
assertEquals(
new Long(0), stringRqueueRedisTemplate.getListSize(reservationRequestDeadLetterQueue));
assertEquals(0, getMessageCount(reservationQueue));
assertEquals(0, getMessageCount(reservationRequestDeadLetterQueue));
}
}
6 changes: 6 additions & 0 deletions rqueue-common-test/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ reservation.queue.active=false
reservation.queue.priority=10
reservation.queue.group=test-group
reservation.queue.concurrency=-1
reservation.request.queue.name=reservation-request
reservation.request.dead.letter.queue.name=reservation-request-dlq
reservation.request.queue.retry.count=2
reservation.request.dead.letter.consumer.enabled=false
reservation.request.active=false
reservation.request.dead.letter.queue.retry.count=1

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,16 @@

/**
* Name of the queue, where message has to be sent in case of consecutive failures configured as
* per {@link #numRetries()}
* per {@link #numRetries()}, by default sent message over this cannot be consumed, if you want
* this queue to be used in the listener then set {@link #deadLetterQueueListenerEnabled()} ()}.
*
* @return dead letter queue name
*/
String deadLetterQueue() default "";

/** @return whether any consumer is enable on this queue or not. */
String deadLetterQueueListenerEnabled() default "false";

/**
* Control visibility timeout for this/these queue(s). When a message is consumed from a queue
* then it's hidden for other consumers for this period. This can be used to fast-recovery when a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class RqueueConfig {
private final boolean sharedConnection;
private final int dbVersion;

@Value("${rqueue.version:2.0.1}")
@Value("${rqueue.version:2.0.3}")
private String version;

@Value("${rqueue.key.prefix:__rq::}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
import com.github.sonus21.rqueue.utils.RedisUtils;
import com.github.sonus21.rqueue.web.view.DateTimeFunction;
import com.github.sonus21.rqueue.web.view.DeadLetterQueuesFunction;
import org.jtwig.environment.EnvironmentConfiguration;
import org.jtwig.environment.EnvironmentConfigurationBuilder;
import org.jtwig.spring.JtwigViewResolver;
Expand Down Expand Up @@ -169,6 +170,7 @@ public JtwigViewResolver rqueueViewResolver() {
EnvironmentConfigurationBuilder.configuration()
.functions()
.add(new DateTimeFunction())
.add(new DeadLetterQueuesFunction())
.and()
.build();
JtwigRenderer renderer = new JtwigRenderer(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class RqueueWebConfig {
* Control whether web app is enabled or not. If it's marked false then it will throw HTTP 503
* (Service unavailable) error.
*/
@Value("${rqueue.web.url.prefix:/}")
private String urlPrefix;

@Value("${rqueue.web.enable:true}")
private boolean enable;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.github.sonus21.rqueue.utils.ThreadUtils.QueueThread;
import com.github.sonus21.rqueue.utils.TimeoutUtils;
import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff;
import java.util.Collections;
import org.slf4j.event.Level;

Expand All @@ -30,9 +29,9 @@ class DefaultRqueuePoller extends RqueueMessagePoller {
QueueThread queueThread,
QueueDetail queueDetail,
RqueueMessageListenerContainer container,
TaskExecutionBackOff taskBackOff,
PostProcessingHandler postProcessingHandler,
int retryPerPoll) {
super(queueDetail.getName(), container, taskBackOff, retryPerPoll);
super(queueDetail.getName(), container, postProcessingHandler, retryPerPoll);
this.queueDetail = queueDetail;
this.queueThread = queueThread;
this.queues = Collections.singletonList(queueDetail.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class MappingInformation implements Comparable<MappingInformation> {
@EqualsAndHashCode.Include private Set<String> queueNames;
private int numRetry;
private String deadLetterQueueName;
private boolean deadLetterConsumerEnabled;
private long visibilityTimeout;
private boolean active;
private Concurrency concurrency;
Expand Down
Loading

0 comments on commit a2f8de4

Please sign in to comment.