Skip to content

Commit

Permalink
Bug fixes (#32)
Browse files Browse the repository at this point in the history
* * Allow prefixing the urls #30
* Allow consuming dead letter queue messages.

* Fixes for #29

* removed ignored

* updated copyright

* updated versions.

* * Fixes for String can not be cast to QueueConfig
* Fixes for messageMetaData being null in TaskAggregator
* Use x-forwarded-prefix to allow the Rqueue to be accessible via  Api Gateway and direct access.

* comment out version.

* Updated code to reflect 2.0.4
  • Loading branch information
sonus21 committed Aug 2, 2020
1 parent 60a2b17 commit ed72d94
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 46 deletions.
18 changes: 17 additions & 1 deletion CHANGELOG.md
Expand Up @@ -3,6 +3,22 @@
**NOTE**: The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


###[2.0.4] - 2-Aug-2020

### Added
- Allow a listener to be added on dead letter queue

#### Fixes:
- Rqueue views/apis not accessible via api gateway


###[2.0.2] - 13-July-2020

### Fixes
- JDK dynamic proxy
- AoP profiler issue


## [2.0.1] - 17-May-2020

### Added
Expand Down Expand Up @@ -76,4 +92,4 @@
[2.0.0]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.0-RELEASE
[2.0.1]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.1-RELEASE
[2.0.2]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.2-RELEASE
[2.0.3]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.3-RELEASE
[2.0.4]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue/2.0.4-RELEASE
8 changes: 4 additions & 4 deletions README.md
Expand Up @@ -37,14 +37,14 @@
* Add dependency
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.3-RELEASE'
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.0.4-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring-boot-starter</artifactId>
<version>2.0.3-RELEASE</version>
<version>2.0.4-RELEASE</version>
</dependency>
```

Expand All @@ -53,14 +53,14 @@
* Add Dependency
* Gradle
```groovy
implementation 'com.github.sonus21:rqueue-spring:2.0.3-RELEASE'
implementation 'com.github.sonus21:rqueue-spring:2.0.4-RELEASE'
```
* Maven
```xml
<dependency>
<groupId>com.github.sonus21</groupId>
<artifactId>rqueue-spring</artifactId>
<version>2.0.3-RELEASE</version>
<version>2.0.4-RELEASE</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -67,7 +67,7 @@ ext {

subprojects {
group = 'com.github.sonus21'
version = '2.0.3-RELEASE'
version = '2.0.4-RELEASE'

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
Expand Up @@ -32,7 +32,7 @@ public class RqueueConfig {
private final boolean sharedConnection;
private final int dbVersion;

@Value("${rqueue.version:2.0.3}")
@Value("${rqueue.version:2.0.4}")
private String version;

@Value("${rqueue.key.prefix:__rq::}")
Expand Down
Expand Up @@ -134,36 +134,39 @@ private void publishEvent(
TaskStatus status,
long jobExecutionStartTime) {
if (rqueueWebConfig.isCollectListenerStats()) {
addOrDeleteMetadata(messageMetadata, rqueueMessage, jobExecutionStartTime, false);
MessageMetadata newMessageMetaData =
addOrDeleteMetadata(rqueueMessage, messageMetadata, jobExecutionStartTime, false);
RqueueExecutionEvent event =
new RqueueExecutionEvent(queueDetail, rqueueMessage, status, messageMetadata);
new RqueueExecutionEvent(queueDetail, rqueueMessage, status, newMessageMetaData);
applicationEventPublisher.publishEvent(event);
}
}

private void addOrDeleteMetadata(
MessageMetadata messageMetadata,
private MessageMetadata addOrDeleteMetadata(
RqueueMessage rqueueMessage,
MessageMetadata messageMetadata,
long jobExecutionStartTime,
boolean saveOrDelete) {
MessageMetadata newMessageMetaData = messageMetadata;
String messageMetadataId = MessageUtils.getMessageMetaId(rqueueMessage.getId());
if (messageMetadata == null) {
messageMetadata = rqueueMessageMetadataService.get(messageMetadataId);
if (newMessageMetaData == null) {
newMessageMetaData = rqueueMessageMetadataService.get(messageMetadataId);
}
if (messageMetadata == null) {
messageMetadata = new MessageMetadata(messageMetadataId, rqueueMessage.getId());
if (newMessageMetaData == null) {
newMessageMetaData = new MessageMetadata(messageMetadataId, rqueueMessage.getId());
// do not call db delete method
if (!saveOrDelete) {
messageMetadata.addExecutionTime(jobExecutionStartTime);
return;
newMessageMetaData.addExecutionTime(jobExecutionStartTime);
return newMessageMetaData;
}
}
messageMetadata.addExecutionTime(jobExecutionStartTime);
newMessageMetaData.addExecutionTime(jobExecutionStartTime);
if (saveOrDelete) {
rqueueMessageMetadataService.save(messageMetadata, Duration.ofSeconds(SECONDS_IN_A_WEEK));
rqueueMessageMetadataService.save(newMessageMetaData, Duration.ofSeconds(SECONDS_IN_A_WEEK));
} else {
rqueueMessageMetadataService.delete(messageMetadataId);
}
return newMessageMetaData;
}

private void deleteMessage(
Expand Down Expand Up @@ -273,7 +276,7 @@ private void parkMessageForRetry(
rqueueMessage,
newMessage,
delay);
addOrDeleteMetadata(messageMetadata, rqueueMessage, jobExecutionStartTime, true);
addOrDeleteMetadata(rqueueMessage, messageMetadata, jobExecutionStartTime, true);
}

private void discardMessage(
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.github.sonus21.rqueue.models.db;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.sonus21.rqueue.models.MinMax;
import com.github.sonus21.rqueue.models.SerializableBase;
import java.util.HashMap;
Expand Down Expand Up @@ -50,7 +51,10 @@ public class QueueConfig extends SerializableBase {
private int numRetry;
private long visibilityTimeout;
private MinMax<Integer> concurrency;

@JsonProperty("deadLetterQueuesV2")
private List<DeadLetterQueue> deadLetterQueues;

private boolean systemGenerated;
private String priorityGroup;
private Map<String, Integer> priority;
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.github.sonus21.rqueue.models.enums.DataType;
import com.github.sonus21.rqueue.models.enums.NavTab;
import com.github.sonus21.rqueue.models.response.RedisDataDetail;
import com.github.sonus21.rqueue.utils.StringUtils;
import com.github.sonus21.rqueue.web.service.RqueueQDetailService;
import com.github.sonus21.rqueue.web.service.RqueueSystemManagerService;
import com.github.sonus21.rqueue.web.service.RqueueUtilityService;
Expand All @@ -36,6 +37,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map.Entry;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.jtwig.spring.JtwigViewResolver;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -81,24 +83,33 @@ private void addNavData(Model model, NavTab tab) {
}
}

private void addBasicDetails(Model model) {
private void addBasicDetails(Model model, HttpServletRequest request) {
Pair<String, String> releaseAndVersion = rqueueUtilityService.getLatestVersion();
model.addAttribute("releaseLink", releaseAndVersion.getFirst());
model.addAttribute("latestVersion", releaseAndVersion.getSecond());
model.addAttribute(
"time", OffsetDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
model.addAttribute("timeInMilli", System.currentTimeMillis());
model.addAttribute("version", rqueueConfig.getVersion());
model.addAttribute("urlPrefix", rqueueWebConfig.getUrlPrefix());
String xForwardedPrefix = request.getHeader("x-forwarded-prefix");
String prefix = "/";
if (!StringUtils.isEmpty(xForwardedPrefix)) {
if (xForwardedPrefix.endsWith("/")) {
xForwardedPrefix = xForwardedPrefix.substring(0, xForwardedPrefix.length() - 1);
}
prefix = xForwardedPrefix + prefix;
}
model.addAttribute("urlPrefix", prefix);
}

@GetMapping
public View index(Model model, HttpServletResponse response) throws Exception {
public View index(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, null);
model.addAttribute("title", "Rqueue Dashboard");
model.addAttribute("aggregatorTypes", Arrays.asList(AggregationType.values()));
Expand All @@ -107,12 +118,13 @@ public View index(Model model, HttpServletResponse response) throws Exception {
}

@GetMapping("queues")
public View queues(Model model, HttpServletResponse response) throws Exception {
public View queues(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.QUEUES);
model.addAttribute("title", "Queues");
List<QueueConfig> queueConfigs = rqueueSystemManagerService.getSortedQueueConfigs();
Expand All @@ -125,7 +137,11 @@ public View queues(Model model, HttpServletResponse response) throws Exception {
}

@GetMapping("queues/{queueName}")
public View queueDetail(@PathVariable String queueName, Model model, HttpServletResponse response)
public View queueDetail(
@PathVariable String queueName,
Model model,
HttpServletRequest request,
HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
Expand All @@ -135,7 +151,7 @@ public View queueDetail(@PathVariable String queueName, Model model, HttpServlet
List<NavTab> queueActions = rqueueQDetailService.getNavTabs(queueConfig);
List<Entry<NavTab, RedisDataDetail>> queueRedisDataDetail =
rqueueQDetailService.getQueueDataStructureDetail(queueConfig);
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.QUEUES);
model.addAttribute("title", "Queue: " + queueName);
model.addAttribute("queueName", queueName);
Expand All @@ -148,12 +164,13 @@ public View queueDetail(@PathVariable String queueName, Model model, HttpServlet
}

@GetMapping("running")
public View running(Model model, HttpServletResponse response) throws Exception {
public View running(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.RUNNING);
model.addAttribute("title", "Running Tasks");
List<List<Object>> l = rqueueQDetailService.getRunningTasks();
Expand All @@ -163,12 +180,13 @@ public View running(Model model, HttpServletResponse response) throws Exception
}

@GetMapping("scheduled")
public View scheduled(Model model, HttpServletResponse response) throws Exception {
public View scheduled(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.SCHEDULED);
model.addAttribute("title", "Scheduled Tasks");
List<List<Object>> l = rqueueQDetailService.getScheduledTasks();
Expand All @@ -178,12 +196,13 @@ public View scheduled(Model model, HttpServletResponse response) throws Exceptio
}

@GetMapping("dead")
public View dead(Model model, HttpServletResponse response) throws Exception {
public View dead(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.DEAD);
model.addAttribute("title", "Tasks moved to dead letter queue");
List<List<Object>> l = rqueueQDetailService.getDeadLetterTasks();
Expand All @@ -194,12 +213,13 @@ public View dead(Model model, HttpServletResponse response) throws Exception {
}

@GetMapping("pending")
public View pending(Model model, HttpServletResponse response) throws Exception {
public View pending(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.PENDING);
model.addAttribute("title", "Tasks waiting for execution");
List<List<Object>> l = rqueueQDetailService.getWaitingTasks();
Expand All @@ -209,12 +229,13 @@ public View pending(Model model, HttpServletResponse response) throws Exception
}

@GetMapping("utility")
public View utility(Model model, HttpServletResponse response) throws Exception {
public View utility(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.UTILITY);
model.addAttribute("title", "Utility");
model.addAttribute("supportedDataType", DataType.getEnabledDataTypes());
Expand Down
Expand Up @@ -259,13 +259,21 @@ private void processEvents(QueueEvents events) {
QueueDetail queueDetail = (QueueDetail) queueRqueueExecutionEvent.getSource();
String queueStatKey = rqueueConfig.getQueueStatisticsKey(queueDetail.getName());
String lockKey = rqueueConfig.getLockKey(queueStatKey);
if (rqueueLockManager.acquireLock(
lockKey, Duration.ofSeconds(Constants.AGGREGATION_LOCK_DURATION_IN_SECONDS))) {
aggregate(events);
rqueueLockManager.releaseLock(lockKey);
} else {
log.warn("Unable to acquire lock, will retry later");
queue.add(events);
boolean locked = false;
try {
if (rqueueLockManager.acquireLock(
lockKey, Duration.ofSeconds(Constants.AGGREGATION_LOCK_DURATION_IN_SECONDS))) {
locked = true;
aggregate(events);
} else {
log.warn("Unable to acquire lock, will retry later");
TimeoutUtils.sleep(Constants.ONE_MILLI);
queue.add(events);
}
} finally {
if (locked) {
rqueueLockManager.releaseLock(lockKey);
}
}
}
}
Expand All @@ -292,7 +300,7 @@ public void run() {
queue.add(events);
}
log.error("Error in aggregator job ", e);
TimeoutUtils.sleepLog(Constants.MIN_DELAY, false);
TimeoutUtils.sleepLog(Constants.ONE_MILLI, false);
}
}
}
Expand Down

0 comments on commit ed72d94

Please sign in to comment.