Skip to content

Commit

Permalink
1. Allow extending job execution time
Browse files Browse the repository at this point in the history
2. Add support to add Message processor for discard and dead letter queue
3. Upgraded gradle version
4. Moved message poller and message executor to different files.
  • Loading branch information
sonus21 committed Apr 2, 2020
1 parent f44b4f8 commit 93d1f9f
Show file tree
Hide file tree
Showing 38 changed files with 883 additions and 411 deletions.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
#Sat Oct 12 02:36:57 IST 2019
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,10 @@
package com.github.sonus21.rqueue.spring.boot.application;

import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.listener.ConsumerQueueDetail;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.processor.NoOpMessageProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
Expand All @@ -44,9 +45,13 @@ public static void main(String[] args) {
public RqueueMessageListenerContainer rqueueMessageListenerContainer(
RqueueMessageHandler rqueueMessageHandler, RedisConnectionFactory redisConnectionFactory) {
return new RqueueMessageListenerContainer(
rqueueMessageHandler, new RqueueMessageTemplate(redisConnectionFactory)) {
rqueueMessageHandler,
new RqueueMessageTemplate(redisConnectionFactory, 900000),
new NoOpMessageProcessor(),
new NoOpMessageProcessor(),
900000) {
@Override
protected void startQueue(String queueName, ConsumerQueueDetail queueDetail) {}
protected void startQueue(String queueName, QueueDetail queueDetail) {}
};
}
}
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.processor.NoOpMessageProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand Down Expand Up @@ -48,7 +49,11 @@ public RqueueMessageListenerContainer rqueueMessageListenerContainer(
RqueueMessageHandler rqueueMessageHandler, RedisConnectionFactory redisConnectionFactory) {
RqueueMessageListenerContainer rqueueMessageListenerContainer =
new RqueueMessageListenerContainer(
rqueueMessageHandler, new RqueueMessageTemplate(redisConnectionFactory));
rqueueMessageHandler,
new RqueueMessageTemplate(redisConnectionFactory, 900000),
new NoOpMessageProcessor(),
new NoOpMessageProcessor(),
900000);
rqueueMessageListenerContainer.setMaxNumWorkers(maxWorkers);
return rqueueMessageListenerContainer;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,7 @@
import com.github.sonus21.rqueue.exception.TimedOutException;
import com.github.sonus21.rqueue.producer.RqueueMessageSender;
import com.github.sonus21.rqueue.spring.boot.application.ApplicationListenerDisabled;
import com.github.sonus21.rqueue.utils.QueueInfo;
import com.github.sonus21.rqueue.utils.QueueUtility;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
Expand Down Expand Up @@ -57,6 +57,7 @@ public class MessageChannelTest {
@Autowired private RqueueMessageSender messageSender;
@Autowired private RedisConnectionFactory redisConnectionFactory;
private RedisTemplate<String, RqueueMessage> redisTemplate;

@Value("${email.queue.name}")
private String emailQueue;

Expand All @@ -75,15 +76,15 @@ public void publishMessageIsTriggeredOnMessageAddition() throws TimedOutExceptio
redisTemplate
.opsForZSet()
.add(
QueueInfo.getTimeQueueName(emailQueue),
QueueUtility.getTimeQueueName(emailQueue),
buildMessage(email, emailQueue, null, null),
currentTime - 1000L);
}
email = Email.newInstance();
log.info("adding new message {}", email);
messageSender.put(emailQueue, email, 1000L);
waitFor(
() -> redisTemplate.opsForZSet().size(QueueInfo.getTimeQueueName(emailQueue)) <= 1,
() -> redisTemplate.opsForZSet().size(QueueUtility.getTimeQueueName(emailQueue)) <= 1,
"one or less messages in zset");
assertTrue(
"Messages are correctly moved",
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@
import com.github.sonus21.rqueue.exception.TimedOutException;
import com.github.sonus21.rqueue.producer.RqueueMessageSender;
import com.github.sonus21.rqueue.spring.boot.application.ApplicationWithCustomConfiguration;
import com.github.sonus21.rqueue.utils.QueueInfo;
import com.github.sonus21.rqueue.utils.QueueUtility;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
Expand Down Expand Up @@ -70,6 +70,7 @@ public class ProcessingMessageSchedulerTest {

@Value("${job.queue.name}")
private String jobQueueName;

@Rule
public RunTestUntilFail retry =
new RunTestUntilFail(
Expand All @@ -84,6 +85,7 @@ public class ProcessingMessageSchedulerTest {
}
}
});

private int messageCount = 110;

@PostConstruct
Expand All @@ -94,7 +96,7 @@ public void init() {
@Test
public void publishMessageIsTriggeredOnMessageRemoval()
throws InterruptedException, TimedOutException {
String processingQueueName = QueueInfo.getProcessingQueueName(jobQueueName);
String processingQueueName = QueueUtility.getProcessingQueueName(jobQueueName);
long currentTime = System.currentTimeMillis();
List<Job> jobs = new ArrayList<>();
List<String> ids = new ArrayList<>();
Expand Down
Expand Up @@ -19,7 +19,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.github.sonus21.rqueue.listener.ConsumerQueueDetail;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.spring.app.AppWithMetricEnabled;
import java.util.Map;
Expand Down Expand Up @@ -54,7 +54,7 @@ public class SpringAppTest {

@Test
public void numListeners() {
Map<String, ConsumerQueueDetail> registeredQueue = container.getRegisteredQueues();
Map<String, QueueDetail> registeredQueue = container.getRegisteredQueues();
assertEquals(3, registeredQueue.size());
assertTrue(registeredQueue.containsKey(notificationQueueName));
assertTrue(registeredQueue.containsKey(emailQueue));
Expand Down
12 changes: 6 additions & 6 deletions rqueue-test-common/src/main/java/rqueue/test/Utility.java
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Sonu Kumar
* Copyright 2020 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@

import com.github.sonus21.rqueue.converter.GenericMessageConverter;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.utils.QueueInfo;
import com.github.sonus21.rqueue.utils.QueueUtility;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -51,22 +51,22 @@ public static Map<String, List<RqueueMessage>> getMessageMap(
queueNameToMessage.put(queueName, messages);

Set<RqueueMessage> messagesFromZset =
redisTemplate.opsForZSet().range(QueueInfo.getTimeQueueName(queueName), 0, -1);
redisTemplate.opsForZSet().range(QueueUtility.getTimeQueueName(queueName), 0, -1);
if (!CollectionUtils.isEmpty(messagesFromZset)) {
messages = new ArrayList<>(messagesFromZset);
} else {
messages = new ArrayList<>();
}
queueNameToMessage.put(QueueInfo.getTimeQueueName(queueName), messages);
queueNameToMessage.put(QueueUtility.getTimeQueueName(queueName), messages);

Set<RqueueMessage> messagesInProcessingQueue =
redisTemplate.opsForZSet().range(QueueInfo.getProcessingQueueName(queueName), 0, -1);
redisTemplate.opsForZSet().range(QueueUtility.getProcessingQueueName(queueName), 0, -1);
if (!CollectionUtils.isEmpty(messagesInProcessingQueue)) {
messages = new ArrayList<>(messagesInProcessingQueue);
} else {
messages = new ArrayList<>();
}
queueNameToMessage.put(QueueInfo.getProcessingQueueName(queueName), messages);
queueNameToMessage.put(QueueUtility.getProcessingQueueName(queueName), messages);
return queueNameToMessage;
}

Expand Down
Expand Up @@ -26,7 +26,7 @@
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.exception.TimedOutException;
import com.github.sonus21.rqueue.producer.RqueueMessageSender;
import com.github.sonus21.rqueue.utils.QueueInfo;
import com.github.sonus21.rqueue.utils.QueueUtility;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.Random;
import javax.annotation.PostConstruct;
Expand All @@ -48,24 +48,20 @@ public class MetricTestBase {
@Autowired protected RedisConnectionFactory redisConnectionFactory;
@Autowired protected MeterRegistry meterRegistry;
protected RedisTemplate<String, RqueueMessage> redisTemplate;

@PostConstruct
public void init() {
redisTemplate = getRedisTemplate(redisConnectionFactory);
}

@Value("${email.dead.letter.queue.name}")
private String emailDlq;

@Value("${email.queue.name}")
private String emailQueueName;

@Value("${job.queue.name}")
private String jobQueue;

@Value("${notification.queue.name}")
private String notificationQueue;

@PostConstruct
public void init() {
redisTemplate = getRedisTemplate(redisConnectionFactory);
}

public void delayedQueueStatus(RedisTemplate<String, RqueueMessage> redisTemplate)
throws TimedOutException {
Random random = new Random();
Expand All @@ -81,7 +77,7 @@ public void delayedQueueStatus(RedisTemplate<String, RqueueMessage> redisTemplat
redisTemplate
.opsForZSet()
.add(
QueueInfo.getTimeQueueName(notificationQueue),
QueueUtility.getTimeQueueName(notificationQueue),
buildMessage(notification, notificationQueue, null, null),
System.currentTimeMillis() - delay);
} else {
Expand Down
Expand Up @@ -43,17 +43,27 @@ public abstract class RqueueConfig {
new SimpleRqueueListenerContainerFactory();

@Autowired protected BeanFactory beanFactory;

/**
* This is used to control message scheduler auto start feature, if it's disabled then messages
* are moved only when a message is received from Redis PUB/SUB channel.
*/
@Value("${rqueue.scheduler.auto.start:true}")
private boolean schedulerAutoStart;

/**
* This is more for testing features where scheduler is not started automatically, based on the
* messages from Redis PUB/SUB channel tasks are executed.
* This is used to control message scheduler redis pub/sub interaction, this can be used to
* completely disable the redis PUB/SUB interaction
*/
@Value("${auto.start.scheduler:true}")
private boolean autoStartScheduler;
@Value("${rqueue.scheduler.redis.enabled:true}")
private boolean schedulerRedisEnabled;

@Value("${delayed.queue.thread.pool.size:5}")
// Number of threads used to process delayed queue messages by scheduler
@Value("${rqueue.scheduler.delayed.queue.thread.pool.size:5}")
private int delayedQueueSchedulerPoolSize;

@Value("${processing.queue.thread.pool.size:1}")
// Number of threads used to process processing queue messages by scheduler
@Value("${rqueue.processing.delayed.queue.thread.pool.size:1}")
private int processingQueueSchedulerPoolSize;

/**
Expand Down Expand Up @@ -84,7 +94,8 @@ protected RqueueMessageTemplate getMessageTemplate(RedisConnectionFactory connec
return simpleRqueueListenerContainerFactory.getRqueueMessageTemplate();
}
simpleRqueueListenerContainerFactory.setRqueueMessageTemplate(
new RqueueMessageTemplate(connectionFactory));
new RqueueMessageTemplate(
connectionFactory, simpleRqueueListenerContainerFactory.getMaxJobExecutionTime()));
return simpleRqueueListenerContainerFactory.getRqueueMessageTemplate();
}

Expand All @@ -99,7 +110,9 @@ public DelayedMessageScheduler delayedMessageScheduler() {
return new DelayedMessageScheduler(
getRedisTemplate(getRedisConnectionFactory()),
delayedQueueSchedulerPoolSize,
autoStartScheduler);
schedulerAutoStart,
schedulerRedisEnabled,
simpleRqueueListenerContainerFactory.getMaxJobExecutionTime());
}

/**
Expand All @@ -113,6 +126,8 @@ public ProcessingMessageScheduler processingMessageScheduler() {
return new ProcessingMessageScheduler(
getRedisTemplate(getRedisConnectionFactory()),
processingQueueSchedulerPoolSize,
autoStartScheduler);
schedulerAutoStart,
schedulerRedisEnabled,
simpleRqueueListenerContainerFactory.getMaxJobExecutionTime());
}
}

0 comments on commit 93d1f9f

Please sign in to comment.