Skip to content

Commit

Permalink
Merge f4af2cf into 60a2b17
Browse files Browse the repository at this point in the history
  • Loading branch information
sonus21 committed Aug 1, 2020
2 parents 60a2b17 + f4af2cf commit cd939e0
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 39 deletions.
Expand Up @@ -134,36 +134,39 @@ private void publishEvent(
TaskStatus status,
long jobExecutionStartTime) {
if (rqueueWebConfig.isCollectListenerStats()) {
addOrDeleteMetadata(messageMetadata, rqueueMessage, jobExecutionStartTime, false);
MessageMetadata newMessageMetaData =
addOrDeleteMetadata(rqueueMessage, messageMetadata, jobExecutionStartTime, false);
RqueueExecutionEvent event =
new RqueueExecutionEvent(queueDetail, rqueueMessage, status, messageMetadata);
new RqueueExecutionEvent(queueDetail, rqueueMessage, status, newMessageMetaData);
applicationEventPublisher.publishEvent(event);
}
}

private void addOrDeleteMetadata(
MessageMetadata messageMetadata,
private MessageMetadata addOrDeleteMetadata(
RqueueMessage rqueueMessage,
MessageMetadata messageMetadata,
long jobExecutionStartTime,
boolean saveOrDelete) {
MessageMetadata newMessageMetaData = messageMetadata;
String messageMetadataId = MessageUtils.getMessageMetaId(rqueueMessage.getId());
if (messageMetadata == null) {
messageMetadata = rqueueMessageMetadataService.get(messageMetadataId);
if (newMessageMetaData == null) {
newMessageMetaData = rqueueMessageMetadataService.get(messageMetadataId);
}
if (messageMetadata == null) {
messageMetadata = new MessageMetadata(messageMetadataId, rqueueMessage.getId());
if (newMessageMetaData == null) {
newMessageMetaData = new MessageMetadata(messageMetadataId, rqueueMessage.getId());
// do not call db delete method
if (!saveOrDelete) {
messageMetadata.addExecutionTime(jobExecutionStartTime);
return;
newMessageMetaData.addExecutionTime(jobExecutionStartTime);
return newMessageMetaData;
}
}
messageMetadata.addExecutionTime(jobExecutionStartTime);
newMessageMetaData.addExecutionTime(jobExecutionStartTime);
if (saveOrDelete) {
rqueueMessageMetadataService.save(messageMetadata, Duration.ofSeconds(SECONDS_IN_A_WEEK));
rqueueMessageMetadataService.save(newMessageMetaData, Duration.ofSeconds(SECONDS_IN_A_WEEK));
} else {
rqueueMessageMetadataService.delete(messageMetadataId);
}
return newMessageMetaData;
}

private void deleteMessage(
Expand Down Expand Up @@ -273,7 +276,7 @@ private void parkMessageForRetry(
rqueueMessage,
newMessage,
delay);
addOrDeleteMetadata(messageMetadata, rqueueMessage, jobExecutionStartTime, true);
addOrDeleteMetadata(rqueueMessage, messageMetadata, jobExecutionStartTime, true);
}

private void discardMessage(
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.github.sonus21.rqueue.models.db;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.sonus21.rqueue.models.MinMax;
import com.github.sonus21.rqueue.models.SerializableBase;
import java.util.HashMap;
Expand Down Expand Up @@ -50,7 +51,10 @@ public class QueueConfig extends SerializableBase {
private int numRetry;
private long visibilityTimeout;
private MinMax<Integer> concurrency;

@JsonProperty("deadLetterQueuesV2")
private List<DeadLetterQueue> deadLetterQueues;

private boolean systemGenerated;
private String priorityGroup;
private Map<String, Integer> priority;
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.github.sonus21.rqueue.models.enums.DataType;
import com.github.sonus21.rqueue.models.enums.NavTab;
import com.github.sonus21.rqueue.models.response.RedisDataDetail;
import com.github.sonus21.rqueue.utils.StringUtils;
import com.github.sonus21.rqueue.web.service.RqueueQDetailService;
import com.github.sonus21.rqueue.web.service.RqueueSystemManagerService;
import com.github.sonus21.rqueue.web.service.RqueueUtilityService;
Expand All @@ -36,6 +37,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map.Entry;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.jtwig.spring.JtwigViewResolver;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -81,24 +83,33 @@ private void addNavData(Model model, NavTab tab) {
}
}

private void addBasicDetails(Model model) {
private void addBasicDetails(Model model, HttpServletRequest request) {
Pair<String, String> releaseAndVersion = rqueueUtilityService.getLatestVersion();
model.addAttribute("releaseLink", releaseAndVersion.getFirst());
model.addAttribute("latestVersion", releaseAndVersion.getSecond());
model.addAttribute(
"time", OffsetDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
model.addAttribute("timeInMilli", System.currentTimeMillis());
model.addAttribute("version", rqueueConfig.getVersion());
model.addAttribute("urlPrefix", rqueueWebConfig.getUrlPrefix());
String xForwardedPrefix = request.getHeader("x-forwarded-prefix");
String prefix = "/";
if (!StringUtils.isEmpty(xForwardedPrefix)) {
if (xForwardedPrefix.endsWith("/")) {
xForwardedPrefix = xForwardedPrefix.substring(0, xForwardedPrefix.length() - 1);
}
prefix = xForwardedPrefix + prefix;
}
model.addAttribute("urlPrefix", prefix);
}

@GetMapping
public View index(Model model, HttpServletResponse response) throws Exception {
public View index(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, null);
model.addAttribute("title", "Rqueue Dashboard");
model.addAttribute("aggregatorTypes", Arrays.asList(AggregationType.values()));
Expand All @@ -107,12 +118,13 @@ public View index(Model model, HttpServletResponse response) throws Exception {
}

@GetMapping("queues")
public View queues(Model model, HttpServletResponse response) throws Exception {
public View queues(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.QUEUES);
model.addAttribute("title", "Queues");
List<QueueConfig> queueConfigs = rqueueSystemManagerService.getSortedQueueConfigs();
Expand All @@ -125,7 +137,11 @@ public View queues(Model model, HttpServletResponse response) throws Exception {
}

@GetMapping("queues/{queueName}")
public View queueDetail(@PathVariable String queueName, Model model, HttpServletResponse response)
public View queueDetail(
@PathVariable String queueName,
Model model,
HttpServletRequest request,
HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
Expand All @@ -135,7 +151,7 @@ public View queueDetail(@PathVariable String queueName, Model model, HttpServlet
List<NavTab> queueActions = rqueueQDetailService.getNavTabs(queueConfig);
List<Entry<NavTab, RedisDataDetail>> queueRedisDataDetail =
rqueueQDetailService.getQueueDataStructureDetail(queueConfig);
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.QUEUES);
model.addAttribute("title", "Queue: " + queueName);
model.addAttribute("queueName", queueName);
Expand All @@ -148,12 +164,13 @@ public View queueDetail(@PathVariable String queueName, Model model, HttpServlet
}

@GetMapping("running")
public View running(Model model, HttpServletResponse response) throws Exception {
public View running(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.RUNNING);
model.addAttribute("title", "Running Tasks");
List<List<Object>> l = rqueueQDetailService.getRunningTasks();
Expand All @@ -163,12 +180,13 @@ public View running(Model model, HttpServletResponse response) throws Exception
}

@GetMapping("scheduled")
public View scheduled(Model model, HttpServletResponse response) throws Exception {
public View scheduled(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.SCHEDULED);
model.addAttribute("title", "Scheduled Tasks");
List<List<Object>> l = rqueueQDetailService.getScheduledTasks();
Expand All @@ -178,12 +196,13 @@ public View scheduled(Model model, HttpServletResponse response) throws Exceptio
}

@GetMapping("dead")
public View dead(Model model, HttpServletResponse response) throws Exception {
public View dead(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.DEAD);
model.addAttribute("title", "Tasks moved to dead letter queue");
List<List<Object>> l = rqueueQDetailService.getDeadLetterTasks();
Expand All @@ -194,12 +213,13 @@ public View dead(Model model, HttpServletResponse response) throws Exception {
}

@GetMapping("pending")
public View pending(Model model, HttpServletResponse response) throws Exception {
public View pending(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.PENDING);
model.addAttribute("title", "Tasks waiting for execution");
List<List<Object>> l = rqueueQDetailService.getWaitingTasks();
Expand All @@ -209,12 +229,13 @@ public View pending(Model model, HttpServletResponse response) throws Exception
}

@GetMapping("utility")
public View utility(Model model, HttpServletResponse response) throws Exception {
public View utility(Model model, HttpServletRequest request, HttpServletResponse response)
throws Exception {
if (!rqueueWebConfig.isEnable()) {
response.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return null;
}
addBasicDetails(model);
addBasicDetails(model, request);
addNavData(model, NavTab.UTILITY);
model.addAttribute("title", "Utility");
model.addAttribute("supportedDataType", DataType.getEnabledDataTypes());
Expand Down
Expand Up @@ -259,13 +259,21 @@ private void processEvents(QueueEvents events) {
QueueDetail queueDetail = (QueueDetail) queueRqueueExecutionEvent.getSource();
String queueStatKey = rqueueConfig.getQueueStatisticsKey(queueDetail.getName());
String lockKey = rqueueConfig.getLockKey(queueStatKey);
if (rqueueLockManager.acquireLock(
lockKey, Duration.ofSeconds(Constants.AGGREGATION_LOCK_DURATION_IN_SECONDS))) {
aggregate(events);
rqueueLockManager.releaseLock(lockKey);
} else {
log.warn("Unable to acquire lock, will retry later");
queue.add(events);
boolean locked = false;
try {
if (rqueueLockManager.acquireLock(
lockKey, Duration.ofSeconds(Constants.AGGREGATION_LOCK_DURATION_IN_SECONDS))) {
locked = true;
aggregate(events);
} else {
log.warn("Unable to acquire lock, will retry later");
TimeoutUtils.sleep(Constants.ONE_MILLI);
queue.add(events);
}
} finally {
if (locked) {
rqueueLockManager.releaseLock(lockKey);
}
}
}
}
Expand All @@ -292,7 +300,7 @@ public void run() {
queue.add(events);
}
log.error("Error in aggregator job ", e);
TimeoutUtils.sleepLog(Constants.MIN_DELAY, false);
TimeoutUtils.sleepLog(Constants.ONE_MILLI, false);
}
}
}
Expand Down

0 comments on commit cd939e0

Please sign in to comment.