From 93d1f9f6b76677e4bde67661b394bf0f4cdc4644 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Fri, 3 Apr 2020 00:45:18 +0530 Subject: [PATCH] 1. Allow extending job execution time 2. Add support to add Message processor for discard and dead letter queue 3. Upgraded gradle version 4. Moved message poller and message executor to different files. --- gradle/wrapper/gradle-wrapper.properties | 2 +- .../ApplicationListenerDisabled.java | 13 +- .../ApplicationWithCustomConfiguration.java | 9 +- .../tests/integration/MessageChannelTest.java | 9 +- .../ProcessingMessageSchedulerTest.java | 8 +- .../sonus21/rqueue/spring/SpringAppTest.java | 4 +- .../src/main/java/rqueue/test/Utility.java | 12 +- .../rqueue/test/tests/MetricTestBase.java | 18 +- .../sonus21/rqueue/config/RqueueConfig.java | 33 ++- .../SimpleRqueueListenerContainerFactory.java | 87 ++++++- .../rqueue/core/DelayedMessageScheduler.java | 26 +- .../sonus21/rqueue/core/MessageScheduler.java | 125 +++++---- .../core/ProcessingMessageScheduler.java | 22 +- .../rqueue/core/RqueueMessageTemplate.java | 29 ++- .../listener/AsynchronousMessageListener.java | 80 ++++++ .../rqueue/listener/MessageContainerBase.java | 68 +++++ .../rqueue/listener/MessageExecutor.java | 159 ++++++++++++ ...sumerQueueDetail.java => QueueDetail.java} | 6 +- .../rqueue/listener/RqueueMessageHandler.java | 5 +- .../RqueueMessageListenerContainer.java | 245 ++++-------------- .../sonus21/rqueue/metrics/RqueueMetrics.java | 14 +- .../metrics/RqueueMetricsProperties.java | 10 +- .../rqueue/processor/MessageProcessor.java | 29 +++ .../processor/NoOpMessageProcessor.java | 22 ++ .../rqueue/producer/MessageWriter.java | 7 +- .../rqueue/producer/RqueueMessageSender.java | 12 +- .../sonus21/rqueue/utils/Constants.java | 32 +++ .../sonus21/rqueue/utils/MessageUtility.java | 36 +++ .../utils/QueueInitializationEvent.java | 9 +- .../{QueueInfo.java => QueueUtility.java} | 14 +- .../rqueue/core/MessageSchedulerTest.java | 35 +-- ...hedulerWithSchedulerDisabledAtStartup.java | 11 +- .../core/ProcessingMessageSchedulerTest.java | 18 +- .../core/RqueueMessageTemplateTest.java | 2 +- .../listener/RqueueMessageHandlerTest.java | 4 +- .../RqueueMessageListenerContainerTest.java | 59 ++++- .../rqueue/metrics/RqueueMetricsTest.java | 16 +- .../producer/RqueueMessageSenderTest.java | 4 +- 38 files changed, 883 insertions(+), 411 deletions(-) create mode 100644 rqueue/src/main/java/com/github/sonus21/rqueue/listener/AsynchronousMessageListener.java create mode 100644 rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageContainerBase.java create mode 100644 rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageExecutor.java rename rqueue/src/main/java/com/github/sonus21/rqueue/listener/{ConsumerQueueDetail.java => QueueDetail.java} (93%) create mode 100644 rqueue/src/main/java/com/github/sonus21/rqueue/processor/MessageProcessor.java create mode 100644 rqueue/src/main/java/com/github/sonus21/rqueue/processor/NoOpMessageProcessor.java create mode 100644 rqueue/src/main/java/com/github/sonus21/rqueue/utils/Constants.java create mode 100644 rqueue/src/main/java/com/github/sonus21/rqueue/utils/MessageUtility.java rename rqueue/src/main/java/com/github/sonus21/rqueue/utils/{QueueInfo.java => QueueUtility.java} (81%) diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 5ee2cf0f..0a2a08e2 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ #Sat Oct 12 02:36:57 IST 2019 -distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.2-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.3-all.zip distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStorePath=wrapper/dists diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/application/ApplicationListenerDisabled.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/application/ApplicationListenerDisabled.java index d3cc6c5b..76b588e0 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/application/ApplicationListenerDisabled.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/application/ApplicationListenerDisabled.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,9 +17,10 @@ package com.github.sonus21.rqueue.spring.boot.application; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; +import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import com.github.sonus21.rqueue.processor.NoOpMessageProcessor; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @@ -44,9 +45,13 @@ public static void main(String[] args) { public RqueueMessageListenerContainer rqueueMessageListenerContainer( RqueueMessageHandler rqueueMessageHandler, RedisConnectionFactory redisConnectionFactory) { return new RqueueMessageListenerContainer( - rqueueMessageHandler, new RqueueMessageTemplate(redisConnectionFactory)) { + rqueueMessageHandler, + new RqueueMessageTemplate(redisConnectionFactory, 900000), + new NoOpMessageProcessor(), + new NoOpMessageProcessor(), + 900000) { @Override - protected void startQueue(String queueName, ConsumerQueueDetail queueDetail) {} + protected void startQueue(String queueName, QueueDetail queueDetail) {} }; } } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/application/ApplicationWithCustomConfiguration.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/application/ApplicationWithCustomConfiguration.java index 92f81483..49af88bd 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/application/ApplicationWithCustomConfiguration.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/application/ApplicationWithCustomConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import com.github.sonus21.rqueue.processor.NoOpMessageProcessor; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -48,7 +49,11 @@ public RqueueMessageListenerContainer rqueueMessageListenerContainer( RqueueMessageHandler rqueueMessageHandler, RedisConnectionFactory redisConnectionFactory) { RqueueMessageListenerContainer rqueueMessageListenerContainer = new RqueueMessageListenerContainer( - rqueueMessageHandler, new RqueueMessageTemplate(redisConnectionFactory)); + rqueueMessageHandler, + new RqueueMessageTemplate(redisConnectionFactory, 900000), + new NoOpMessageProcessor(), + new NoOpMessageProcessor(), + 900000); rqueueMessageListenerContainer.setMaxNumWorkers(maxWorkers); return rqueueMessageListenerContainer; } diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageChannelTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageChannelTest.java index 0ae08971..f6bf90e9 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageChannelTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageChannelTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ import com.github.sonus21.rqueue.exception.TimedOutException; import com.github.sonus21.rqueue.producer.RqueueMessageSender; import com.github.sonus21.rqueue.spring.boot.application.ApplicationListenerDisabled; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.utils.QueueUtility; import javax.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.junit.Test; @@ -57,6 +57,7 @@ public class MessageChannelTest { @Autowired private RqueueMessageSender messageSender; @Autowired private RedisConnectionFactory redisConnectionFactory; private RedisTemplate redisTemplate; + @Value("${email.queue.name}") private String emailQueue; @@ -75,7 +76,7 @@ public void publishMessageIsTriggeredOnMessageAddition() throws TimedOutExceptio redisTemplate .opsForZSet() .add( - QueueInfo.getTimeQueueName(emailQueue), + QueueUtility.getTimeQueueName(emailQueue), buildMessage(email, emailQueue, null, null), currentTime - 1000L); } @@ -83,7 +84,7 @@ public void publishMessageIsTriggeredOnMessageAddition() throws TimedOutExceptio log.info("adding new message {}", email); messageSender.put(emailQueue, email, 1000L); waitFor( - () -> redisTemplate.opsForZSet().size(QueueInfo.getTimeQueueName(emailQueue)) <= 1, + () -> redisTemplate.opsForZSet().size(QueueUtility.getTimeQueueName(emailQueue)) <= 1, "one or less messages in zset"); assertTrue( "Messages are correctly moved", diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ProcessingMessageSchedulerTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ProcessingMessageSchedulerTest.java index 96760110..b982aac9 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ProcessingMessageSchedulerTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ProcessingMessageSchedulerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import com.github.sonus21.rqueue.exception.TimedOutException; import com.github.sonus21.rqueue.producer.RqueueMessageSender; import com.github.sonus21.rqueue.spring.boot.application.ApplicationWithCustomConfiguration; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.utils.QueueUtility; import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; @@ -70,6 +70,7 @@ public class ProcessingMessageSchedulerTest { @Value("${job.queue.name}") private String jobQueueName; + @Rule public RunTestUntilFail retry = new RunTestUntilFail( @@ -84,6 +85,7 @@ public class ProcessingMessageSchedulerTest { } } }); + private int messageCount = 110; @PostConstruct @@ -94,7 +96,7 @@ public void init() { @Test public void publishMessageIsTriggeredOnMessageRemoval() throws InterruptedException, TimedOutException { - String processingQueueName = QueueInfo.getProcessingQueueName(jobQueueName); + String processingQueueName = QueueUtility.getProcessingQueueName(jobQueueName); long currentTime = System.currentTimeMillis(); List jobs = new ArrayList<>(); List ids = new ArrayList<>(); diff --git a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringAppTest.java b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringAppTest.java index 2a5f8662..c5de5c62 100644 --- a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringAppTest.java +++ b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringAppTest.java @@ -19,7 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; +import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.spring.app.AppWithMetricEnabled; import java.util.Map; @@ -54,7 +54,7 @@ public class SpringAppTest { @Test public void numListeners() { - Map registeredQueue = container.getRegisteredQueues(); + Map registeredQueue = container.getRegisteredQueues(); assertEquals(3, registeredQueue.size()); assertTrue(registeredQueue.containsKey(notificationQueueName)); assertTrue(registeredQueue.containsKey(emailQueue)); diff --git a/rqueue-test-common/src/main/java/rqueue/test/Utility.java b/rqueue-test-common/src/main/java/rqueue/test/Utility.java index d0a66f29..758ce5f0 100644 --- a/rqueue-test-common/src/main/java/rqueue/test/Utility.java +++ b/rqueue-test-common/src/main/java/rqueue/test/Utility.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ import com.github.sonus21.rqueue.converter.GenericMessageConverter; import com.github.sonus21.rqueue.core.RqueueMessage; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.utils.QueueUtility; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -51,22 +51,22 @@ public static Map> getMessageMap( queueNameToMessage.put(queueName, messages); Set messagesFromZset = - redisTemplate.opsForZSet().range(QueueInfo.getTimeQueueName(queueName), 0, -1); + redisTemplate.opsForZSet().range(QueueUtility.getTimeQueueName(queueName), 0, -1); if (!CollectionUtils.isEmpty(messagesFromZset)) { messages = new ArrayList<>(messagesFromZset); } else { messages = new ArrayList<>(); } - queueNameToMessage.put(QueueInfo.getTimeQueueName(queueName), messages); + queueNameToMessage.put(QueueUtility.getTimeQueueName(queueName), messages); Set messagesInProcessingQueue = - redisTemplate.opsForZSet().range(QueueInfo.getProcessingQueueName(queueName), 0, -1); + redisTemplate.opsForZSet().range(QueueUtility.getProcessingQueueName(queueName), 0, -1); if (!CollectionUtils.isEmpty(messagesInProcessingQueue)) { messages = new ArrayList<>(messagesInProcessingQueue); } else { messages = new ArrayList<>(); } - queueNameToMessage.put(QueueInfo.getProcessingQueueName(queueName), messages); + queueNameToMessage.put(QueueUtility.getProcessingQueueName(queueName), messages); return queueNameToMessage; } diff --git a/rqueue-test-common/src/main/java/rqueue/test/tests/MetricTestBase.java b/rqueue-test-common/src/main/java/rqueue/test/tests/MetricTestBase.java index de1d7a3e..80091c3e 100644 --- a/rqueue-test-common/src/main/java/rqueue/test/tests/MetricTestBase.java +++ b/rqueue-test-common/src/main/java/rqueue/test/tests/MetricTestBase.java @@ -26,7 +26,7 @@ import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.exception.TimedOutException; import com.github.sonus21.rqueue.producer.RqueueMessageSender; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.utils.QueueUtility; import io.micrometer.core.instrument.MeterRegistry; import java.util.Random; import javax.annotation.PostConstruct; @@ -48,24 +48,20 @@ public class MetricTestBase { @Autowired protected RedisConnectionFactory redisConnectionFactory; @Autowired protected MeterRegistry meterRegistry; protected RedisTemplate redisTemplate; - - @PostConstruct - public void init() { - redisTemplate = getRedisTemplate(redisConnectionFactory); - } - @Value("${email.dead.letter.queue.name}") private String emailDlq; - @Value("${email.queue.name}") private String emailQueueName; - @Value("${job.queue.name}") private String jobQueue; - @Value("${notification.queue.name}") private String notificationQueue; + @PostConstruct + public void init() { + redisTemplate = getRedisTemplate(redisConnectionFactory); + } + public void delayedQueueStatus(RedisTemplate redisTemplate) throws TimedOutException { Random random = new Random(); @@ -81,7 +77,7 @@ public void delayedQueueStatus(RedisTemplate redisTemplat redisTemplate .opsForZSet() .add( - QueueInfo.getTimeQueueName(notificationQueue), + QueueUtility.getTimeQueueName(notificationQueue), buildMessage(notification, notificationQueue, null, null), System.currentTimeMillis() - delay); } else { diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java b/rqueue/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java index 70b15f7d..6213ef71 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java @@ -43,17 +43,27 @@ public abstract class RqueueConfig { new SimpleRqueueListenerContainerFactory(); @Autowired protected BeanFactory beanFactory; + + /** + * This is used to control message scheduler auto start feature, if it's disabled then messages + * are moved only when a message is received from Redis PUB/SUB channel. + */ + @Value("${rqueue.scheduler.auto.start:true}") + private boolean schedulerAutoStart; + /** - * This is more for testing features where scheduler is not started automatically, based on the - * messages from Redis PUB/SUB channel tasks are executed. + * This is used to control message scheduler redis pub/sub interaction, this can be used to + * completely disable the redis PUB/SUB interaction */ - @Value("${auto.start.scheduler:true}") - private boolean autoStartScheduler; + @Value("${rqueue.scheduler.redis.enabled:true}") + private boolean schedulerRedisEnabled; - @Value("${delayed.queue.thread.pool.size:5}") + // Number of threads used to process delayed queue messages by scheduler + @Value("${rqueue.scheduler.delayed.queue.thread.pool.size:5}") private int delayedQueueSchedulerPoolSize; - @Value("${processing.queue.thread.pool.size:1}") + // Number of threads used to process processing queue messages by scheduler + @Value("${rqueue.processing.delayed.queue.thread.pool.size:1}") private int processingQueueSchedulerPoolSize; /** @@ -84,7 +94,8 @@ protected RqueueMessageTemplate getMessageTemplate(RedisConnectionFactory connec return simpleRqueueListenerContainerFactory.getRqueueMessageTemplate(); } simpleRqueueListenerContainerFactory.setRqueueMessageTemplate( - new RqueueMessageTemplate(connectionFactory)); + new RqueueMessageTemplate( + connectionFactory, simpleRqueueListenerContainerFactory.getMaxJobExecutionTime())); return simpleRqueueListenerContainerFactory.getRqueueMessageTemplate(); } @@ -99,7 +110,9 @@ public DelayedMessageScheduler delayedMessageScheduler() { return new DelayedMessageScheduler( getRedisTemplate(getRedisConnectionFactory()), delayedQueueSchedulerPoolSize, - autoStartScheduler); + schedulerAutoStart, + schedulerRedisEnabled, + simpleRqueueListenerContainerFactory.getMaxJobExecutionTime()); } /** @@ -113,6 +126,8 @@ public ProcessingMessageScheduler processingMessageScheduler() { return new ProcessingMessageScheduler( getRedisTemplate(getRedisConnectionFactory()), processingQueueSchedulerPoolSize, - autoStartScheduler); + schedulerAutoStart, + schedulerRedisEnabled, + simpleRqueueListenerContainerFactory.getMaxJobExecutionTime()); } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java b/rqueue/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java index b1671b1c..e3b8df87 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,9 @@ import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import com.github.sonus21.rqueue.processor.MessageProcessor; +import com.github.sonus21.rqueue.processor.NoOpMessageProcessor; +import com.github.sonus21.rqueue.utils.Constants; import java.util.List; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; @@ -32,13 +35,36 @@ * of requirements. */ public class SimpleRqueueListenerContainerFactory { + // Provide task executor, this can be used to provide some additional details like some threads + // name, etc otherwise a default task executor would be created private AsyncTaskExecutor taskExecutor; + // whether container should auto start or not private boolean autoStartup = true; + // Redis connection factory for the listener container private RedisConnectionFactory redisConnectionFactory; + // Custom requeue message handler private RqueueMessageHandler rqueueMessageHandler; + // List of message converters to convert messages to/from private List messageConverters; + // In case of failure how much time, we should wait for next job private Long backOffTime; + // Number of workers requires for execution private Integer maxNumWorkers; + // Control how much time a job takes in execution, this can be used to fast-recovery + // when a job goes to running state then if it's not deleted within N secs then + // it has to be re-processed, that re-process time can be controller using this field. + // For example a job started execution at 10:30AM and executor was shutdown so this task requires + // retry By default it will be retried in 15 minutes, but if you want to reprocess quickly/defer + // further than this can be used to reprocess + private long maxJobExecutionTime = Constants.MAX_JOB_EXECUTION_TIME; + + // This message processor would be called whenever a message is discarded due to retry limit + // exhaustion + private MessageProcessor discardMessageProcessor = new NoOpMessageProcessor(); + // This message processor would be called whenever a message is moved to dead letter queue + private MessageProcessor deadLetterQueueMessageProcessor = new NoOpMessageProcessor(); + + // Any custom message requeue message template. private RqueueMessageTemplate rqueueMessageTemplate; /** @@ -181,13 +207,19 @@ public void setRqueueMessageTemplate(RqueueMessageTemplate messageTemplate) { * @return an object of {@link RqueueMessageListenerContainer} object */ public RqueueMessageListenerContainer createMessageListenerContainer() { - Assert.notNull(rqueueMessageHandler, "rqueueMessageHandler must not be null"); + Assert.notNull(getRqueueMessageHandler(), "rqueueMessageHandler must not be null"); Assert.notNull(redisConnectionFactory, "redisConnectionFactory must not be null"); if (rqueueMessageTemplate == null) { - rqueueMessageTemplate = new RqueueMessageTemplate(redisConnectionFactory); + rqueueMessageTemplate = + new RqueueMessageTemplate(redisConnectionFactory, maxJobExecutionTime); } RqueueMessageListenerContainer messageListenerContainer = - new RqueueMessageListenerContainer(rqueueMessageHandler, rqueueMessageTemplate); + new RqueueMessageListenerContainer( + getRqueueMessageHandler(), + rqueueMessageTemplate, + getDiscardMessageProcessor(), + getDeadLetterQueueMessageProcessor(), + getMaxJobExecutionTime()); messageListenerContainer.setAutoStartup(autoStartup); if (taskExecutor != null) { messageListenerContainer.setTaskExecutor(taskExecutor); @@ -200,4 +232,51 @@ public RqueueMessageListenerContainer createMessageListenerContainer() { } return messageListenerContainer; } + + public long getMaxJobExecutionTime() { + return maxJobExecutionTime; + } + + /* Control how much time a job takes in execution, this can be used to fast-recovery + *when a job goes to running state then if it's not deleted within N secs then + *it has to be re-processed, that re-process time can be controller using this field. + *For example a job started execution at 10:30AM and executor was shutdown so this task requires + *retry By default it will be retried in 15 minutes, but if you want to reprocess quickly/defer + *further than this can be used to reprocess. + * + * @param maxJobExecutionTime + */ + public void setMaxJobProcessTime(long maxJobExecutionTime) { + this.maxJobExecutionTime = maxJobExecutionTime; + } + + public MessageProcessor getDiscardMessageProcessor() { + return discardMessageProcessor; + } + + /** + * This message processor would be called whenever a message is discarded due to retry limit + * exhaust. + * + * @param discardMessageProcessor object of the discard message processor. + */ + public void setDiscardMessageProcessor(MessageProcessor discardMessageProcessor) { + Assert.notNull(discardMessageProcessor, "discardMessageProcessor cannot be null"); + this.discardMessageProcessor = discardMessageProcessor; + } + + public MessageProcessor getDeadLetterQueueMessageProcessor() { + return deadLetterQueueMessageProcessor; + } + + /** + * This message processor would be called whenever a message is moved to dead letter queue + * + * @param deadLetterQueueMessageProcessor object of message processor. + */ + public void setDeadLetterQueueMessageProcessor(MessageProcessor deadLetterQueueMessageProcessor) { + Assert.notNull( + deadLetterQueueMessageProcessor, "deadLetterQueueMessageProcessor cannot be null"); + this.deadLetterQueueMessageProcessor = deadLetterQueueMessageProcessor; + } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/DelayedMessageScheduler.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/DelayedMessageScheduler.java index bb9f541c..c5e00f82 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/DelayedMessageScheduler.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/DelayedMessageScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,19 +16,23 @@ package com.github.sonus21.rqueue.core; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.QueueUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; public class DelayedMessageScheduler extends MessageScheduler { - private static final long DEFAULT_DELAY = 5000L; private final Logger logger = LoggerFactory.getLogger(DelayedMessageScheduler.class); public DelayedMessageScheduler( - RedisTemplate redisTemplate, int poolSize, boolean scheduleTaskAtStartup) { - super(redisTemplate, poolSize, scheduleTaskAtStartup); + RedisTemplate redisTemplate, + int poolSize, + boolean scheduleTaskAtStartup, + boolean redisEnabled, + long maxJobExecutionTime) { + super(redisTemplate, poolSize, scheduleTaskAtStartup, redisEnabled, maxJobExecutionTime); } @Override @@ -39,22 +43,22 @@ public Logger getLogger() { @Override protected long getNextScheduleTime(long currentTime, Long value) { if (value == null) { - return currentTime + DEFAULT_DELAY; + return currentTime + Constants.DEFAULT_DELAY; } if (value < currentTime) { return currentTime; } - return currentTime + DEFAULT_DELAY; + return currentTime + Constants.DEFAULT_DELAY; } @Override protected String getChannelName(String queueName) { - return QueueInfo.getChannelName(queueName); + return QueueUtility.getChannelName(queueName); } @Override protected String getZsetName(String queueName) { - return QueueInfo.getTimeQueueName(queueName); + return QueueUtility.getTimeQueueName(queueName); } @Override @@ -63,7 +67,7 @@ protected String getThreadNamePrefix() { } @Override - protected boolean isQueueValid(ConsumerQueueDetail queueDetail) { + protected boolean isQueueValid(QueueDetail queueDetail) { return queueDetail.isDelayedQueue(); } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java index 608de6d9..82224fa7 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java @@ -16,11 +16,14 @@ package com.github.sonus21.rqueue.core; +import static com.github.sonus21.rqueue.utils.Constants.MAX_MESSAGES; +import static com.github.sonus21.rqueue.utils.Constants.MIN_DELAY; import static java.lang.Math.max; import static java.lang.Math.min; import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.QueueInitializationEvent; import com.github.sonus21.rqueue.utils.SchedulerFactory; import java.time.Instant; @@ -53,13 +56,10 @@ public abstract class MessageScheduler implements DisposableBean, ApplicationListener { - private static final long DEFAULT_SCRIPT_EXECUTION_TIME = 5000L; - private static final long MIN_DELAY = 10L; - private static final int MAX_MESSAGE = 100; - private static final long TASK_ALIVE_TIME = -30 * 1000L; - + protected final long maxJobExecutionTime; private final int poolSize; - private boolean scheduleTaskAtStartup; + private final boolean scheduleTaskAtStartup; + private final boolean redisEnabled; private RedisScript redisScript; private MessageSchedulerListener messageSchedulerListener; private RedisTemplate redisTemplate; @@ -73,10 +73,16 @@ public abstract class MessageScheduler @Autowired private RedisMessageListenerContainer redisMessageListenerContainer; public MessageScheduler( - RedisTemplate redisTemplate, int poolSize, boolean scheduleTaskAtStartup) { + RedisTemplate redisTemplate, + int poolSize, + boolean scheduleTaskAtStartup, + boolean redisEnabled, + long maxJobExecutionTime) { this.poolSize = poolSize; this.scheduleTaskAtStartup = scheduleTaskAtStartup; + this.redisEnabled = redisEnabled; this.redisTemplate = redisTemplate; + this.maxJobExecutionTime = maxJobExecutionTime; } protected abstract Logger getLogger(); @@ -89,7 +95,7 @@ public MessageScheduler( protected abstract String getThreadNamePrefix(); - protected abstract boolean isQueueValid(ConsumerQueueDetail queueDetail); + protected abstract boolean isQueueValid(QueueDetail queueDetail); private void doStart() { for (String queueName : queueRunningState.keySet()) { @@ -102,14 +108,16 @@ private void startQueue(String queueName) { return; } queueRunningState.put(queueName, true); - if (isScheduleTaskAtStartup()) { + if (scheduleTaskAtStartup() || !isRedisEnabled()) { long scheduleAt = System.currentTimeMillis() + MIN_DELAY; schedule(queueName, getZsetName(queueName), scheduleAt, false); } - redisMessageListenerContainer.addMessageListener( - messageSchedulerListener, new ChannelTopic(getChannelName(queueName))); - channelNameToQueueName.put(getChannelName(queueName), queueName); - queueNameToZsetName.put(queueName, getZsetName(queueName)); + if (isRedisEnabled()) { + redisMessageListenerContainer.addMessageListener( + messageSchedulerListener, new ChannelTopic(getChannelName(queueName))); + channelNameToQueueName.put(getChannelName(queueName), queueName); + queueNameToZsetName.put(queueName, getZsetName(queueName)); + } } private void doStop() { @@ -146,10 +154,14 @@ private void stopQueue(String queueName) { queueRunningState.put(queueName, false); } - private boolean isScheduleTaskAtStartup() { + private boolean scheduleTaskAtStartup() { return scheduleTaskAtStartup; } + private boolean isRedisEnabled() { + return redisEnabled; + } + @Override public void destroy() throws Exception { doStop(); @@ -210,9 +222,11 @@ protected synchronized void schedule( Future submittedTask = scheduledTaskDetail.getFuture(); boolean completedOrCancelled = submittedTask.isDone() || submittedTask.isCancelled(); // tasks older than TASK_ALIVE_TIME are considered dead - if (!completedOrCancelled && existingDelay < MIN_DELAY && existingDelay > TASK_ALIVE_TIME) { + if (!completedOrCancelled + && existingDelay < MIN_DELAY + && existingDelay > Constants.TASK_ALIVE_TIME) { try { - submittedTask.get(DEFAULT_SCRIPT_EXECUTION_TIME, TimeUnit.MILLISECONDS); + submittedTask.get(Constants.DEFAULT_SCRIPT_EXECUTION_TIME, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException | TimeoutException | CancellationException e) { @@ -229,6 +243,44 @@ protected synchronized void schedule( timerTask.getQueueName(), new ScheduledTaskDetail(startTime, future)); } + @SuppressWarnings("unchecked") + private void initialize(Map queueDetailMap) { + Set queueNames = new HashSet<>(); + for (Entry entry : queueDetailMap.entrySet()) { + String queueName = entry.getKey(); + QueueDetail queueDetail = entry.getValue(); + if (isQueueValid(queueDetail)) { + queueNames.add(queueName); + } + } + defaultScriptExecutor = new DefaultScriptExecutor<>(redisTemplate); + redisScript = (RedisScript) RedisScriptFactory.getScript(ScriptType.PUSH_MESSAGE); + queueRunningState = new ConcurrentHashMap<>(queueNames.size()); + queueNameToScheduledTask = new ConcurrentHashMap<>(queueNames.size()); + channelNameToQueueName = new ConcurrentHashMap<>(queueNames.size()); + queueNameToZsetName = new ConcurrentHashMap<>(queueNames.size()); + queueNameToLastMessageSeenTime = new ConcurrentHashMap<>(queueNames.size()); + createScheduler(queueNames.size()); + if (isRedisEnabled()) { + messageSchedulerListener = new MessageSchedulerListener(); + } + for (String queueName : queueNames) { + queueRunningState.put(queueName, false); + } + } + + @Override + public void onApplicationEvent(QueueInitializationEvent event) { + doStop(); + if (event.isStart()) { + if (CollectionUtils.isEmpty(event.getQueueDetailMap())) { + return; + } + initialize(event.getQueueDetailMap()); + doStart(); + } + } + private class MessageMoverTask implements Runnable { private final String queueName; private final String zsetName; @@ -249,7 +301,8 @@ public void run() { long currentTime = System.currentTimeMillis(); Long value = defaultScriptExecutor.execute( - redisScript, Arrays.asList(queueName, zsetName), currentTime, MAX_MESSAGE); + redisScript, Arrays.asList(queueName, zsetName), currentTime, + MAX_MESSAGES); schedule( queueName, zsetName, getNextScheduleTime(System.currentTimeMillis(), value), true); } @@ -288,40 +341,4 @@ public void onMessage(Message message, byte[] pattern) { } } } - - @SuppressWarnings("unchecked") - private void initialize(Map queueDetailMap) { - Set queueNames = new HashSet<>(); - for (Entry entry : queueDetailMap.entrySet()) { - String queueName = entry.getKey(); - ConsumerQueueDetail queueDetail = entry.getValue(); - if (isQueueValid(queueDetail)) { - queueNames.add(queueName); - } - } - defaultScriptExecutor = new DefaultScriptExecutor<>(redisTemplate); - messageSchedulerListener = new MessageSchedulerListener(); - redisScript = (RedisScript) RedisScriptFactory.getScript(ScriptType.PUSH_MESSAGE); - queueRunningState = new ConcurrentHashMap<>(queueNames.size()); - queueNameToScheduledTask = new ConcurrentHashMap<>(queueNames.size()); - channelNameToQueueName = new ConcurrentHashMap<>(queueNames.size()); - queueNameToZsetName = new ConcurrentHashMap<>(queueNames.size()); - queueNameToLastMessageSeenTime = new ConcurrentHashMap<>(queueNames.size()); - createScheduler(queueNames.size()); - for (String queueName : queueNames) { - queueRunningState.put(queueName, false); - } - } - - @Override - public void onApplicationEvent(QueueInitializationEvent event) { - doStop(); - if (event.isStart()) { - if (CollectionUtils.isEmpty(event.getQueueDetailMap())) { - return; - } - initialize(event.getQueueDetailMap()); - doStart(); - } - } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/ProcessingMessageScheduler.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/ProcessingMessageScheduler.java index 5e12a8a5..934bad1a 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/ProcessingMessageScheduler.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/ProcessingMessageScheduler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,8 @@ import static java.lang.Long.max; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.QueueUtility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; @@ -28,8 +28,12 @@ public class ProcessingMessageScheduler extends MessageScheduler { private final Logger logger = LoggerFactory.getLogger(ProcessingMessageScheduler.class); public ProcessingMessageScheduler( - RedisTemplate redisTemplate, int poolSize, boolean scheduleTaskAtStartup) { - super(redisTemplate, poolSize, scheduleTaskAtStartup); + RedisTemplate redisTemplate, + int poolSize, + boolean scheduleTaskAtStartup, + boolean redisEnabled, + long maxJobExecutionTime) { + super(redisTemplate, poolSize, scheduleTaskAtStartup, redisEnabled, maxJobExecutionTime); } @Override @@ -39,16 +43,16 @@ protected Logger getLogger() { @Override protected String getChannelName(String queueName) { - return QueueInfo.getProcessingQueueChannelName(queueName); + return QueueUtility.getProcessingQueueChannelName(queueName); } @Override protected String getZsetName(String queueName) { - return QueueInfo.getProcessingQueueName(queueName); + return QueueUtility.getProcessingQueueName(queueName); } @Override - protected boolean isQueueValid(ConsumerQueueDetail queueDetail) { + protected boolean isQueueValid(QueueDetail queueDetail) { return true; } @@ -60,7 +64,7 @@ protected String getThreadNamePrefix() { @Override protected long getNextScheduleTime(long currentTime, Long value) { if (value == null) { - return QueueInfo.getMessageReEnqueueTime(currentTime); + return QueueUtility.getMessageReEnqueueTimeWithDelay(currentTime, maxJobExecutionTime); } return max(currentTime, value); } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java index a9ea95b5..5df4b6a8 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java @@ -17,13 +17,14 @@ package com.github.sonus21.rqueue.core; import static com.github.sonus21.rqueue.core.RedisScriptFactory.getScript; -import static com.github.sonus21.rqueue.utils.QueueInfo.getChannelName; -import static com.github.sonus21.rqueue.utils.QueueInfo.getProcessingQueueChannelName; -import static com.github.sonus21.rqueue.utils.QueueInfo.getProcessingQueueName; -import static com.github.sonus21.rqueue.utils.QueueInfo.getTimeQueueName; +import static com.github.sonus21.rqueue.utils.QueueUtility.getChannelName; +import static com.github.sonus21.rqueue.utils.QueueUtility.getProcessingQueueChannelName; +import static com.github.sonus21.rqueue.utils.QueueUtility.getProcessingQueueName; +import static com.github.sonus21.rqueue.utils.QueueUtility.getTimeQueueName; import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.QueueUtility; import com.github.sonus21.rqueue.utils.RqueueRedisTemplate; import java.util.ArrayList; import java.util.Arrays; @@ -37,11 +38,13 @@ @SuppressWarnings("unchecked") public class RqueueMessageTemplate extends RqueueRedisTemplate { - private static final int MESSAGE_BATCH_SIZE = 100; + private final long maxJobExecutionTime; private DefaultScriptExecutor scriptExecutor; - public RqueueMessageTemplate(RedisConnectionFactory redisConnectionFactory) { + public RqueueMessageTemplate( + RedisConnectionFactory redisConnectionFactory, long maxJobExecutionTime) { super(redisConnectionFactory); + this.maxJobExecutionTime = maxJobExecutionTime; scriptExecutor = new DefaultScriptExecutor<>(redisTemplate); } @@ -58,7 +61,7 @@ public RqueueMessage pop(String queueName) { Arrays.asList( queueName, getProcessingQueueName(queueName), getProcessingQueueChannelName(queueName)), currentTime, - QueueInfo.getMessageReEnqueueTime(currentTime)); + QueueUtility.getMessageReEnqueueTimeWithDelay(currentTime, maxJobExecutionTime)); } public void addWithDelay(String queueName, RqueueMessage rqueueMessage) { @@ -87,12 +90,12 @@ public List getAllMessages(String queueName) { messages = new ArrayList<>(); } Set messagesFromZset = - redisTemplate.opsForZSet().range(QueueInfo.getTimeQueueName(queueName), 0, -1); + redisTemplate.opsForZSet().range(QueueUtility.getTimeQueueName(queueName), 0, -1); if (!CollectionUtils.isEmpty(messagesFromZset)) { messages.addAll(messagesFromZset); } Set messagesInProcessingQueue = - redisTemplate.opsForZSet().range(QueueInfo.getProcessingQueueName(queueName), 0, -1); + redisTemplate.opsForZSet().range(QueueUtility.getProcessingQueueName(queueName), 0, -1); if (!CollectionUtils.isEmpty(messagesInProcessingQueue)) { messages.addAll(messagesInProcessingQueue); } @@ -109,15 +112,15 @@ public Long getZsetSize(String zsetName) { public boolean moveMessage(String srcQueueName, String dstQueueName, int maxMessage) { RedisScript script = (RedisScript) getScript(ScriptType.MOVE_MESSAGE); - int offset = MESSAGE_BATCH_SIZE; + int offset = Constants.MAX_MESSAGES; while (true) { long remainingMessages = scriptExecutor.execute( - script, Arrays.asList(srcQueueName, dstQueueName), MESSAGE_BATCH_SIZE); + script, Arrays.asList(srcQueueName, dstQueueName), Constants.MAX_MESSAGES); if (remainingMessages <= 0 || offset >= maxMessage) { break; } - offset += MESSAGE_BATCH_SIZE; + offset += Constants.MAX_MESSAGES; } return true; } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/AsynchronousMessageListener.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/AsynchronousMessageListener.java new file mode 100644 index 00000000..350e73fe --- /dev/null +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/AsynchronousMessageListener.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.listener; + +import com.github.sonus21.rqueue.core.RqueueMessage; +import java.util.concurrent.Executor; + +class AsynchronousMessageListener extends MessageContainerBase implements Runnable { + private final String queueName; + private final QueueDetail queueDetail; + + AsynchronousMessageListener( + String queueName, QueueDetail value, RqueueMessageListenerContainer container) { + super(container); + this.queueName = queueName; + this.queueDetail = value; + } + + private RqueueMessage getMessage() { + return getRqueueMessageTemplate().pop(queueName); + } + + @Override + public void run() { + getLogger().debug("Running Queue {}", queueName); + while (isQueueActive(queueName)) { + try { + RqueueMessage message = getMessage(); + getLogger().debug("Queue: {} Fetched Msg {}", queueName, message); + if (message != null) { + getTaskExecutor().execute(new MessageExecutor(message, queueDetail, container)); + } else { + try { + Thread.sleep(getPollingInterval()); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + } catch (Exception e) { + getLogger() + .warn( + "Message listener failed for the queue {}, it will be retried in {} Ms", + queueName, + getBackOffTime(), + e); + try { + Thread.sleep(getBackOffTime()); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + } + } + + private long getPollingInterval() { + return container.get().getPollingInterval(); + } + + private long getBackOffTime() { + return container.get().getBackOffTime(); + } + + private Executor getTaskExecutor() { + return container.get().getTaskExecutor(); + } +} diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageContainerBase.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageContainerBase.java new file mode 100644 index 00000000..e2eb720a --- /dev/null +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageContainerBase.java @@ -0,0 +1,68 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.listener; + +import static com.github.sonus21.rqueue.utils.Constants.DELTA_BETWEEN_RE_ENQUEUE_TIME; + +import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.utils.QueueUtility; +import java.lang.ref.WeakReference; +import java.util.List; +import org.slf4j.Logger; +import org.springframework.messaging.converter.MessageConverter; + +public class MessageContainerBase { + protected final WeakReference container; + + MessageContainerBase(RqueueMessageListenerContainer container) { + this.container = new WeakReference<>(container); + } + + MessageContainerBase(WeakReference container) { + this.container = container; + } + + Logger getLogger() { + container.get(); + return RqueueMessageListenerContainer.logger; + } + + @SuppressWarnings("ConstantConditions") + RqueueMessageHandler getMessageHandler() { + return container.get().getRqueueMessageHandler(); + } + + protected List getMessageConverters() { + return getMessageHandler().getMessageConverters(); + } + + @SuppressWarnings("ConstantConditions") + protected RqueueMessageTemplate getRqueueMessageTemplate() { + return container.get().getRqueueMessageTemplate(); + } + + @SuppressWarnings("ConstantConditions") + long getMaxProcessingTime() { + return QueueUtility.getMessageReEnqueueTimeWithDelay(container.get().getMaxJobExecutionTime()) + - DELTA_BETWEEN_RE_ENQUEUE_TIME; + } + + @SuppressWarnings("ConstantConditions") + boolean isQueueActive(String queueName) { + return container.get().isQueueActive(queueName); + } +} diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageExecutor.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageExecutor.java new file mode 100644 index 00000000..8d5316ff --- /dev/null +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageExecutor.java @@ -0,0 +1,159 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.listener; + +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.metrics.RqueueCounter; +import com.github.sonus21.rqueue.processor.MessageProcessor; +import com.github.sonus21.rqueue.utils.MessageUtility; +import com.github.sonus21.rqueue.utils.QueueUtility; +import java.lang.ref.WeakReference; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +class MessageExecutor extends MessageContainerBase implements Runnable { + private static final int MAX_RETRY_COUNT = Integer.MAX_VALUE; + private final QueueDetail queueDetail; + private final Message message; + private final RqueueMessage rqueueMessage; + + MessageExecutor( + RqueueMessage message, + QueueDetail queueDetail, + WeakReference container) { + super(container); + rqueueMessage = message; + this.queueDetail = queueDetail; + this.message = + new GenericMessage<>( + message.getMessage(), QueueUtility.getQueueHeaders(queueDetail.getQueueName())); + } + + private int getMaxRetryCount() { + int maxRetryCount = + rqueueMessage.getRetryCount() == null + ? queueDetail.getNumRetries() + : rqueueMessage.getRetryCount(); + // DLQ is not specified so retry it for max number of counts + if (maxRetryCount == -1 && !queueDetail.isDlqSet()) { + maxRetryCount = MAX_RETRY_COUNT; + } + return maxRetryCount; + } + + private Object getPayload() { + return MessageUtility.convertMessageToObject(message, getMessageConverters()); + } + + @SuppressWarnings("ConstantConditions") + private void callMessageProcessor(boolean discardOrDlq, RqueueMessage message) { + MessageProcessor messageProcessor; + if (discardOrDlq) { + messageProcessor = container.get().getDiscardMessageProcessor(); + } else { + messageProcessor = container.get().getDlqMessageProcessor(); + } + String name = discardOrDlq ? "Discard Message Queue" : "Dead Letter Queue"; + try { + getLogger().debug("Calling {} processor for {}", name, message); + Object payload = getPayload(); + messageProcessor.process(payload); + } catch (Exception e) { + getLogger().error("Message processor call failed", e); + } + } + + @SuppressWarnings("ConstantConditions") + private void updateCounter(boolean failOrExecution) { + RqueueCounter rqueueCounter = container.get().rqueueCounter; + if (rqueueCounter == null) { + return; + } + if (failOrExecution) { + rqueueCounter.updateFailureCount(queueDetail.getQueueName()); + } else { + rqueueCounter.updateExecutionCount(queueDetail.getQueueName()); + } + } + + private void handlePostProcessing(boolean executed, int currentFailureCount, int maxRetryCount) { + if (!isQueueActive(queueDetail.getQueueName())) { + return; + } + try { + String processingQueueName = QueueUtility.getProcessingQueueName(queueDetail.getQueueName()); + if (!executed) { + // move to DLQ + if (queueDetail.isDlqSet()) { + RqueueMessage newMessage = rqueueMessage.clone(); + newMessage.setFailureCount(currentFailureCount); + newMessage.updateReEnqueuedAt(); + callMessageProcessor(false, newMessage); + // No transaction?? + getRqueueMessageTemplate().add(queueDetail.getDlqName(), newMessage); + getRqueueMessageTemplate().removeFromZset(processingQueueName, rqueueMessage); + } else if (currentFailureCount < maxRetryCount) { + // replace the existing message with the update message + // this will reflect new retry count + RqueueMessage newMessage = rqueueMessage.clone(); + newMessage.setFailureCount(currentFailureCount); + newMessage.updateReEnqueuedAt(); + getRqueueMessageTemplate().replaceMessage(processingQueueName, rqueueMessage, newMessage); + } else { + // discard this message + getLogger() + .warn( + "Message {} discarded due to retry limit queue: {}", + getPayload(), + queueDetail.getQueueName()); + getRqueueMessageTemplate().removeFromZset(processingQueueName, rqueueMessage); + callMessageProcessor(true, rqueueMessage); + } + } else { + getLogger().debug("Delete Queue: {} message: {}", processingQueueName, rqueueMessage); + // delete it from processing queue + getRqueueMessageTemplate().removeFromZset(processingQueueName, rqueueMessage); + } + } catch (Exception e) { + getLogger().error("Error occurred in post processing", e); + } + } + + @Override + public void run() { + boolean executed = false; + int currentFailureCount = rqueueMessage.getFailureCount(); + int maxRetryCount = getMaxRetryCount(); + long maxRetryTime = getMaxProcessingTime(); + do { + if (!isQueueActive(queueDetail.getQueueName())) { + return; + } + try { + updateCounter(false); + getMessageHandler().handleMessage(message); + executed = true; + } catch (Exception e) { + updateCounter(true); + currentFailureCount += 1; + } + } while (currentFailureCount < maxRetryCount + && !executed + && System.currentTimeMillis() < maxRetryTime); + handlePostProcessing(executed, currentFailureCount, maxRetryCount); + } +} diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/ConsumerQueueDetail.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java similarity index 93% rename from rqueue/src/main/java/com/github/sonus21/rqueue/listener/ConsumerQueueDetail.java rename to rqueue/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java index 8b11fde2..c1ca6244 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/ConsumerQueueDetail.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,13 @@ package com.github.sonus21.rqueue.listener; -public class ConsumerQueueDetail { +public class QueueDetail { private final String queueName; private final boolean delayedQueue; private final String dlqName; private final int numRetries; - public ConsumerQueueDetail( + public QueueDetail( String queueName, int numRetries, String deadLetterQueueName, boolean delayedQueue) { this.queueName = queueName; this.numRetries = numRetries; diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java index 6f0b5f2d..476bc6eb 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ package com.github.sonus21.rqueue.listener; -import static com.github.sonus21.rqueue.utils.QueueInfo.QUEUE_NAME; +import static com.github.sonus21.rqueue.utils.QueueUtility.QUEUE_NAME; import com.github.sonus21.rqueue.annotation.RqueueListener; import com.github.sonus21.rqueue.converter.GenericMessageConverter; @@ -35,7 +35,6 @@ import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.handler.HandlerMethod; import org.springframework.messaging.handler.annotation.support.AnnotationExceptionHandlerMethodResolver; -import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver; import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver; import org.springframework.messaging.handler.invocation.AbstractExceptionHandlerMethodResolver; import org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler; diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index 6a6fa98b..cee35560 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -16,10 +16,12 @@ package com.github.sonus21.rqueue.listener; -import com.github.sonus21.rqueue.core.RqueueMessage; +import static com.github.sonus21.rqueue.utils.Constants.DEFAULT_WORKER_COUNT_PER_QUEUE; + import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.metrics.RqueueCounter; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.processor.MessageProcessor; +import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.QueueInitializationEvent; import java.util.Collections; import java.util.Map; @@ -37,8 +39,6 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.SmartLifecycle; import org.springframework.core.task.AsyncTaskExecutor; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.GenericMessage; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -47,40 +47,48 @@ public class RqueueMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle, BeanNameAware { private static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RqueueMessageListenerContainer.class); - private static final int DEFAULT_WORKER_COUNT_PER_QUEUE = 2; - private static Logger logger = LoggerFactory.getLogger(RqueueMessageListenerContainer.class); + static Logger logger = LoggerFactory.getLogger(RqueueMessageListenerContainer.class); private final Object lifecycleMgr = new Object(); + private final RqueueMessageHandler rqueueMessageHandler; + private final MessageProcessor discardMessageProcessor; + private final MessageProcessor dlqMessageProcessor; + private final RqueueMessageTemplate rqueueMessageTemplate; + private final long maxJobExecutionTime; + + @Autowired(required = false) + RqueueCounter rqueueCounter; + private Integer maxNumWorkers; private String beanName; - private RqueueMessageHandler rqueueMessageHandler; private boolean defaultTaskExecutor = false; private AsyncTaskExecutor taskExecutor; private AsyncTaskExecutor spinningTaskExecutor; private boolean autoStartup = true; - private Map registeredQueues = new ConcurrentHashMap<>(); + private Map registeredQueues = new ConcurrentHashMap<>(); private Map queueRunningState = new ConcurrentHashMap<>(); private ConcurrentHashMap> scheduledFutureByQueue = new ConcurrentHashMap<>(); private boolean running = false; - // 5 seconds - private long backOffTime = 5000L; - // 20 seconds - private long maxWorkerWaitTime = 200000L; - + private long backOffTime = 5 * Constants.ONE_MILLI; + private long maxWorkerWaitTime = 20 * Constants.ONE_MILLI; private long pollingInterval = 200L; private int phase = Integer.MAX_VALUE; - private RqueueMessageTemplate rqueueMessageTemplate; - @Autowired private ApplicationEventPublisher applicationEventPublisher; - @Autowired(required = false) - private RqueueCounter rqueueCounter; - public RqueueMessageListenerContainer( - RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) { - Assert.notNull(rqueueMessageHandler, "rqueueMessageHandler can not be null"); - Assert.notNull(rqueueMessageTemplate, "rqueueMessageTemplate can not be null"); + RqueueMessageHandler rqueueMessageHandler, + RqueueMessageTemplate rqueueMessageTemplate, + MessageProcessor discardMessageProcessor, + MessageProcessor dlqMessageProcessor, + long maxJobExecutionTime) { + Assert.notNull(rqueueMessageHandler, "rqueueMessageHandler cannot be null"); + Assert.notNull(rqueueMessageTemplate, "rqueueMessageTemplate cannot be null"); + Assert.notNull(discardMessageProcessor, "discardMessageProcessor cannot be null"); + Assert.notNull(dlqMessageProcessor, "dlqMessageProcessor cannot be null"); this.rqueueMessageHandler = rqueueMessageHandler; this.rqueueMessageTemplate = rqueueMessageTemplate; + this.discardMessageProcessor = discardMessageProcessor; + this.dlqMessageProcessor = dlqMessageProcessor; + this.maxJobExecutionTime = maxJobExecutionTime; } public RqueueMessageTemplate getRqueueMessageTemplate() { @@ -116,14 +124,14 @@ public void setMaxNumWorkers(int maxNumWorkers) { this.maxNumWorkers = maxNumWorkers; } - public void setBackOffTime(long backOffTime) { - this.backOffTime = backOffTime; - } - public long getBackOffTime() { return backOffTime; } + public void setBackOffTime(long backOffTime) { + this.backOffTime = backOffTime; + } + @Override public void destroy() throws Exception { synchronized (lifecycleMgr) { @@ -162,6 +170,10 @@ public boolean isAutoStartup() { return autoStartup; } + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + @Override public void stop(Runnable callback) { synchronized (this.lifecycleMgr) { @@ -170,19 +182,14 @@ public void stop(Runnable callback) { } } - public void setAutoStartup(boolean autoStartup) { - this.autoStartup = autoStartup; - } - @Override public void afterPropertiesSet() throws Exception { synchronized (lifecycleMgr) { for (MappingInformation mappingInformation : rqueueMessageHandler.getHandlerMethods().keySet()) { for (String queue : mappingInformation.getQueueNames()) { - ConsumerQueueDetail consumerQueueDetail = - getConsumerQueueDetail(queue, mappingInformation); - registeredQueues.put(queue, consumerQueueDetail); + QueueDetail queueDetail = getQueueDetail(queue, mappingInformation); + registeredQueues.put(queue, queueDetail); } } registeredQueues = Collections.unmodifiableMap(registeredQueues); @@ -205,7 +212,7 @@ private AsyncTaskExecutor createSpinningTaskExecutor() { return createTaskExecutor(true); } - public Map getRegisteredQueues() { + public Map getRegisteredQueues() { return registeredQueues; } @@ -247,9 +254,8 @@ public AsyncTaskExecutor createDefaultTaskExecutor() { return createTaskExecutor(false); } - private ConsumerQueueDetail getConsumerQueueDetail( - String queue, MappingInformation mappingInformation) { - return new ConsumerQueueDetail( + private QueueDetail getQueueDetail(String queue, MappingInformation mappingInformation) { + return new QueueDetail( queue, mappingInformation.getNumRetries(), mappingInformation.getDeadLetterQueueName(), @@ -269,21 +275,20 @@ public void start() { } protected void doStart() { - for (Map.Entry registeredQueue : - getRegisteredQueues().entrySet()) { - ConsumerQueueDetail queueDetail = registeredQueue.getValue(); + for (Map.Entry registeredQueue : getRegisteredQueues().entrySet()) { + QueueDetail queueDetail = registeredQueue.getValue(); startQueue(registeredQueue.getKey(), queueDetail); } } - protected void startQueue(String queueName, ConsumerQueueDetail queueDetail) { + protected void startQueue(String queueName, QueueDetail queueDetail) { if (queueRunningState.containsKey(queueName) && queueRunningState.get(queueName)) { return; } queueRunningState.put(queueName, true); Future future; AsynchronousMessageListener messageListener = - new AsynchronousMessageListener(queueName, queueDetail); + new AsynchronousMessageListener(queueName, queueDetail, this); if (spinningTaskExecutor == null) { future = getTaskExecutor().submit(messageListener); } else { @@ -292,7 +297,7 @@ protected void startQueue(String queueName, ConsumerQueueDetail queueDetail) { scheduledFutureByQueue.put(queueName, future); } - private boolean isQueueActive(String queueName) { + boolean isQueueActive(String queueName) { return queueRunningState.getOrDefault(queueName, false); } @@ -328,7 +333,7 @@ protected void doStop() { private void waitForRunningQueuesToStop() { for (Map.Entry queueRunningState : queueRunningState.entrySet()) { String queueName = queueRunningState.getKey(); - ConsumerQueueDetail queueDetail = getRegisteredQueues().get(queueName); + QueueDetail queueDetail = getRegisteredQueues().get(queueName); Future queueSpinningThread = scheduledFutureByQueue.get(queueName); if (queueSpinningThread != null && !queueSpinningThread.isDone() @@ -366,155 +371,15 @@ public void setPollingInterval(long pollingInterval) { this.pollingInterval = pollingInterval; } - private class AsynchronousMessageListener implements Runnable { - private final String queueName; - private final ConsumerQueueDetail queueDetail; - - AsynchronousMessageListener(String queueName, ConsumerQueueDetail value) { - this.queueName = queueName; - queueDetail = value; - } - - private RqueueMessage getMessage() { - return rqueueMessageTemplate.pop(queueName); - } - - @Override - public void run() { - logger.debug("Running Queue {}", queueName); - while (isQueueActive(queueName)) { - try { - RqueueMessage message = getMessage(); - logger.debug("Queue: {} Fetched Msg {}", queueName, message); - if (message != null) { - getTaskExecutor().execute(new MessageExecutor(message, queueDetail)); - } else { - try { - Thread.sleep(getPollingInterval()); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - } - } catch (Exception e) { - logger.warn( - "Message listener failed for the queue {}, it will be retried in {} Ms", - queueName, - getBackOffTime(), - e); - try { - Thread.sleep(getBackOffTime()); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } - } - } - } + public long getMaxJobExecutionTime() { + return maxJobExecutionTime; } - class MessageExecutor implements Runnable { - private static final int MAX_RETRY_COUNT = Integer.MAX_VALUE; - private static final long DELTA_BETWEEN_RE_ENQUEUE_TIME = 5000L; - private final ConsumerQueueDetail queueDetail; - private final Message message; - private final RqueueMessage rqueueMessage; - - MessageExecutor(RqueueMessage message, ConsumerQueueDetail queueDetail) { - rqueueMessage = message; - this.queueDetail = queueDetail; - this.message = - new GenericMessage<>( - message.getMessage(), QueueInfo.getQueueHeaders(queueDetail.getQueueName())); - } - - private int getMaxRetryCount() { - int maxRetryCount = - rqueueMessage.getRetryCount() == null - ? queueDetail.getNumRetries() - : rqueueMessage.getRetryCount(); - // DLQ is not specified so retry it for max number of counts - if (maxRetryCount == -1 && !queueDetail.isDlqSet()) { - maxRetryCount = MAX_RETRY_COUNT; - } - return maxRetryCount; - } - - private long getMaxProcessingTime() { - return QueueInfo.getMessageReEnqueueTime() - DELTA_BETWEEN_RE_ENQUEUE_TIME; - } - - private void handlePostProcessing( - boolean executed, int currentFailureCount, int maxRetryCount) { - if (!isQueueActive(queueDetail.getQueueName())) { - return; - } - try { - String processingQueueName = QueueInfo.getProcessingQueueName(queueDetail.getQueueName()); - if (!executed) { - // move to DLQ - if (queueDetail.isDlqSet()) { - RqueueMessage newMessage = rqueueMessage.clone(); - newMessage.setFailureCount(currentFailureCount); - newMessage.updateReEnqueuedAt(); - rqueueMessageTemplate.add(queueDetail.getDlqName(), newMessage); - rqueueMessageTemplate.removeFromZset(processingQueueName, rqueueMessage); - } else if (currentFailureCount < maxRetryCount) { - // replace the existing message with the update message - // this will reflect the retry count - RqueueMessage newMessage = rqueueMessage.clone(); - newMessage.setFailureCount(currentFailureCount); - newMessage.updateReEnqueuedAt(); - rqueueMessageTemplate.replaceMessage(processingQueueName, rqueueMessage, newMessage); - } else { - // discard this message - logger.warn("Message {} discarded due to retry limit", rqueueMessage); - rqueueMessageTemplate.removeFromZset(processingQueueName, rqueueMessage); - } - } else { - logger.debug("Delete Queue: {} message: {}", processingQueueName, rqueueMessage); - // delete it from processing queue - rqueueMessageTemplate.removeFromZset(processingQueueName, rqueueMessage); - } - } catch (Exception e) { - logger.error("Error occurred in post processing", e); - } - } - - private void updateExecutionCount() { - if (rqueueCounter == null) { - return; - } - rqueueCounter.updateExecutionCount(queueDetail.getQueueName()); - } - - private void updateFailureCount() { - if (rqueueCounter == null) { - return; - } - rqueueCounter.updateFailureCount(queueDetail.getQueueName()); - } + public MessageProcessor getDiscardMessageProcessor() { + return discardMessageProcessor; + } - @Override - public void run() { - boolean executed = false; - int currentFailureCount = rqueueMessage.getFailureCount(); - int maxRetryCount = getMaxRetryCount(); - long maxRetryTime = getMaxProcessingTime(); - do { - if (!isQueueActive(queueDetail.getQueueName())) { - return; - } - try { - updateExecutionCount(); - getRqueueMessageHandler().handleMessage(message); - executed = true; - } catch (Exception e) { - updateFailureCount(); - currentFailureCount += 1; - } - } while (currentFailureCount < maxRetryCount - && !executed - && System.currentTimeMillis() < maxRetryTime); - handlePostProcessing(executed, currentFailureCount, maxRetryCount); - } + public MessageProcessor getDlqMessageProcessor() { + return dlqMessageProcessor; } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java index ab4f0f89..c291fa77 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java @@ -17,8 +17,8 @@ package com.github.sonus21.rqueue.metrics; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.QueueUtility; import com.github.sonus21.rqueue.utils.QueueInitializationEvent; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Gauge.Builder; @@ -67,8 +67,8 @@ private long size(String name, boolean isZset) { return val; } - private void monitor(Map queueDetailMap) { - for (ConsumerQueueDetail queueDetail : queueDetailMap.values()) { + private void monitor(Map queueDetailMap) { + for (QueueDetail queueDetail : queueDetailMap.values()) { Tags queueTags = Tags.concat(metricsProperties.getMetricTags(), "queue", queueDetail.getQueueName()); Gauge.builder(QUEUE_SIZE, queueDetail, c -> size(queueDetail.getQueueName(), false)) @@ -78,7 +78,7 @@ private void monitor(Map queueDetailMap) { Gauge.builder( PROCESSING_QUEUE_SIZE, queueDetail, - c -> size(QueueInfo.getProcessingQueueName(queueDetail.getQueueName()), true)) + c -> size(QueueUtility.getProcessingQueueName(queueDetail.getQueueName()), true)) .tags(queueTags) .description("The number of entries in the processing queue") .register(meterRegistry); @@ -87,13 +87,13 @@ private void monitor(Map queueDetailMap) { Gauge.builder( DELAYED_QUEUE_SIZE, queueDetail, - c -> size(QueueInfo.getTimeQueueName(queueDetail.getQueueName()), true)) + c -> size(QueueUtility.getTimeQueueName(queueDetail.getQueueName()), true)) .tags(queueTags) .description("The number of entries waiting in the delayed queue") .register(meterRegistry); } if (queueDetail.isDlqSet()) { - Builder builder = + Builder builder = Gauge.builder( DEAD_LETTER_QUEUE_SIZE, queueDetail, c -> size(queueDetail.getDlqName(), false)); builder.tags(queueTags); diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsProperties.java b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsProperties.java index 7bf9bf3c..d70107a1 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsProperties.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetricsProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,10 @@ public Count getCount() { return count; } + public void setCount(Count count) { + this.count = count; + } + public Map getTags() { return tags; } @@ -50,10 +54,6 @@ public void setTags(Map tags) { this.tags = tags; } - public void setCount(Count count) { - this.count = count; - } - /** * Get Tags object that can be used in metric. Tags can be either configured manually or using * properties or XML file. diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/processor/MessageProcessor.java b/rqueue/src/main/java/com/github/sonus21/rqueue/processor/MessageProcessor.java new file mode 100644 index 00000000..3bfdb5e5 --- /dev/null +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/processor/MessageProcessor.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.processor; + +/** This interface can be used to take some action when ever a message is processed */ +public interface MessageProcessor { + + /** + * This method would be called with the specified object, this will happen only when message is + * deserialize successfully. + * + * @param message message + */ + void process(Object message); +} diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/processor/NoOpMessageProcessor.java b/rqueue/src/main/java/com/github/sonus21/rqueue/processor/NoOpMessageProcessor.java new file mode 100644 index 00000000..e674a074 --- /dev/null +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/processor/NoOpMessageProcessor.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.processor; + +public class NoOpMessageProcessor implements MessageProcessor { + @Override + public void process(Object o) {} +} diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/producer/MessageWriter.java b/rqueue/src/main/java/com/github/sonus21/rqueue/producer/MessageWriter.java index 62697146..0958fafe 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/producer/MessageWriter.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/producer/MessageWriter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package com.github.sonus21.rqueue.producer; +import static com.github.sonus21.rqueue.utils.Constants.MIN_DELAY; + import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import java.util.List; @@ -28,7 +30,6 @@ import org.springframework.messaging.support.GenericMessage; class MessageWriter { - private static final long MIN_DELAY_TIME = 100; private static Logger logger = LoggerFactory.getLogger(RqueueMessageSender.class); private RqueueMessageTemplate rqueueMessageTemplate; private CompositeMessageConverter messageConverter; @@ -48,7 +49,7 @@ class MessageWriter { boolean pushMessage(String queueName, Object message, Integer retryCount, Long delayInMilliSecs) { RqueueMessage rqueueMessage = buildMessage(queueName, message, retryCount, delayInMilliSecs); try { - if (delayInMilliSecs == null || delayInMilliSecs <= MIN_DELAY_TIME) { + if (delayInMilliSecs == null || delayInMilliSecs <= MIN_DELAY) { rqueueMessageTemplate.add(queueName, rqueueMessage); } else { rqueueMessageTemplate.addWithDelay(queueName, rqueueMessage); diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/producer/RqueueMessageSender.java b/rqueue/src/main/java/com/github/sonus21/rqueue/producer/RqueueMessageSender.java index ba7ec3fe..4343eb39 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/producer/RqueueMessageSender.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/producer/RqueueMessageSender.java @@ -20,7 +20,8 @@ import com.github.sonus21.rqueue.converter.GenericMessageConverter; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.QueueUtility; import com.github.sonus21.rqueue.utils.Validator; import java.util.ArrayList; import java.util.Collections; @@ -37,7 +38,6 @@ * @author Sonu Kumar */ public class RqueueMessageSender { - private static final int DEFAULT_MAX_MESSAGE = 1000; private MessageWriter messageWriter; private RqueueMessageTemplate messageTemplate; @@ -166,7 +166,7 @@ public List getMessageConverters() { * * @param deadLetterQueueName dead letter queue name * @param queueName queue name - * @param maxMessages number of messages to be moved by default move {@link #DEFAULT_MAX_MESSAGE} + * @param maxMessages number of messages to be moved by default move {@link Constants#ONE_MILLI} * messages * @return success or failure */ @@ -178,7 +178,7 @@ public boolean moveMessageFromDeadLetterToQueue( !deadLetterQueueName.equals(queueName), "deadLetterQueueName and queueName must be different"); if (maxMessages == null) { - maxMessages = DEFAULT_MAX_MESSAGE; + maxMessages = Constants.MAX_MESSAGES; } Assert.isTrue(maxMessages > 0, "maxMessage must be greater than zero"); return messageTemplate.moveMessage(deadLetterQueueName, queueName, maxMessages); @@ -202,7 +202,7 @@ public boolean moveMessageFromDeadLetterToQueue(String deadLetterQueueName, Stri */ public void deleteAllMessages(String queueName) { messageTemplate.deleteKey(queueName); - messageTemplate.deleteKey(QueueInfo.getProcessingQueueName(queueName)); - messageTemplate.deleteKey(QueueInfo.getTimeQueueName(queueName)); + messageTemplate.deleteKey(QueueUtility.getProcessingQueueName(queueName)); + messageTemplate.deleteKey(QueueUtility.getTimeQueueName(queueName)); } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/utils/Constants.java b/rqueue/src/main/java/com/github/sonus21/rqueue/utils/Constants.java new file mode 100644 index 00000000..68a7415d --- /dev/null +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/utils/Constants.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.utils; + +public class Constants { + private Constants() {} + + public static final int SECONDS_IN_A_MINUTE = 60; + public static final long ONE_MILLI = 1000; + public static final long MAX_JOB_EXECUTION_TIME = 15 * SECONDS_IN_A_MINUTE * ONE_MILLI; + public static final long DEFAULT_DELAY = 5 * ONE_MILLI; + public static final long DEFAULT_SCRIPT_EXECUTION_TIME = DEFAULT_DELAY; + public static final long DELTA_BETWEEN_RE_ENQUEUE_TIME = DEFAULT_DELAY; + public static final long MIN_DELAY = 100L; + public static final long TASK_ALIVE_TIME = -30 * Constants.ONE_MILLI; + public static final int MAX_MESSAGES = 100; + public static final int DEFAULT_WORKER_COUNT_PER_QUEUE = 2; +} diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/utils/MessageUtility.java b/rqueue/src/main/java/com/github/sonus21/rqueue/utils/MessageUtility.java new file mode 100644 index 00000000..14af2b9c --- /dev/null +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/utils/MessageUtility.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.utils; + +import java.util.List; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.util.Assert; + +public abstract class MessageUtility { + public static Object convertMessageToObject( + Message message, List messageConverters) { + Assert.notEmpty(messageConverters, "messageConverters cannot be empty"); + for (MessageConverter messageConverter : messageConverters) { + try { + return messageConverter.fromMessage(message, null); + } catch (Exception e) { + } + } + return null; + } +} diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueInitializationEvent.java b/rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueInitializationEvent.java index 396132dc..0e8e116e 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueInitializationEvent.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueInitializationEvent.java @@ -16,21 +16,22 @@ package com.github.sonus21.rqueue.utils; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; +import com.github.sonus21.rqueue.listener.QueueDetail; import java.util.Map; import org.springframework.context.ApplicationEvent; public class QueueInitializationEvent extends ApplicationEvent { - private final Map queueDetailMap; + private final Map queueDetailMap; private final boolean start; + public QueueInitializationEvent( - Object source, Map queueDetailMap, boolean start) { + Object source, Map queueDetailMap, boolean start) { super(source); this.queueDetailMap = queueDetailMap; this.start = start; } - public Map getQueueDetailMap() { + public Map getQueueDetailMap() { return queueDetailMap; } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueInfo.java b/rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueUtility.java similarity index 81% rename from rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueInfo.java rename to rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueUtility.java index 6318ff06..557fed7f 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueInfo.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/utils/QueueUtility.java @@ -19,14 +19,14 @@ import java.util.Collections; import java.util.Map; -public abstract class QueueInfo { +public class QueueUtility { public static final String QUEUE_NAME = "QUEUE_NAME"; private static final String DELAYED_QUEUE_PREFIX = "rqueue-delay::"; private static final String CHANNEL_PREFIX = "rqueue-channel::"; private static final String PROCESSING_PREFIX = "rqueue-processing::"; private static final String PROCESSING_CHANNEL_PREFIX = "rqueue-processing-channel::"; - // 15 minutes in millis - private static final long MAX_MESSAGE_PROCESSING_TIME = 15 * 60 * 1000L; + + private QueueUtility() {} public static Map getQueueHeaders(String queueName) { return Collections.singletonMap(QUEUE_NAME, queueName); @@ -48,11 +48,11 @@ public static String getProcessingQueueChannelName(String queueName) { return PROCESSING_CHANNEL_PREFIX + queueName; } - public static long getMessageReEnqueueTime() { - return getMessageReEnqueueTime(System.currentTimeMillis()); + public static long getMessageReEnqueueTimeWithDelay(long maxDelay) { + return getMessageReEnqueueTimeWithDelay(System.currentTimeMillis(), maxDelay); } - public static long getMessageReEnqueueTime(long currentTime) { - return currentTime + MAX_MESSAGE_PROCESSING_TIME; + public static long getMessageReEnqueueTimeWithDelay(long currentTime, long maxDelay) { + return currentTime + maxDelay; } } diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java index 634246ea..4a4beb88 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java @@ -26,8 +26,8 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.QueueUtility; import com.github.sonus21.rqueue.utils.QueueInitializationEvent; import com.github.sonus21.rqueue.utils.SchedulerFactory; import java.time.Instant; @@ -64,21 +64,18 @@ @RunWith(PowerMockRunner.class) @PrepareForTest(fullyQualifiedNames = {"com.github.sonus21.rqueue.utils.SchedulerFactory"}) public class MessageSchedulerTest { + @Rule public MockitoRule mockito = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); private int poolSize = 1; @Mock private RedisMessageListenerContainer redisMessageListenerContainer; @Mock private RedisTemplate redisTemplate; - @InjectMocks private TestMessageScheduler messageScheduler = - new TestMessageScheduler(redisTemplate, poolSize, true); - + new TestMessageScheduler(redisTemplate, poolSize, true, true, 900000); private String slowQueue = "slow-queue"; private String fastQueue = "fast-queue"; - private ConsumerQueueDetail slowQueueDetail = new ConsumerQueueDetail(slowQueue, -1, "", true); - private ConsumerQueueDetail fastQueueDetail = new ConsumerQueueDetail(fastQueue, -1, "", false); - private Map queueNameToQueueDetail = new HashMap<>(); - - @Rule public MockitoRule mockito = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS); + private QueueDetail slowQueueDetail = new QueueDetail(slowQueue, -1, "", true); + private QueueDetail fastQueueDetail = new QueueDetail(fastQueue, -1, "", false); + private Map queueNameToQueueDetail = new HashMap<>(); @Before public void init() { @@ -89,12 +86,12 @@ public void init() { @Test public void getChannelName() { - assertEquals(QueueInfo.getChannelName(slowQueue), messageScheduler.getChannelName(slowQueue)); + assertEquals(QueueUtility.getChannelName(slowQueue), messageScheduler.getChannelName(slowQueue)); } @Test public void getZsetName() { - assertEquals(QueueInfo.getTimeQueueName(slowQueue), messageScheduler.getZsetName(slowQueue)); + assertEquals(QueueUtility.getTimeQueueName(slowQueue), messageScheduler.getZsetName(slowQueue)); } @Test @@ -146,7 +143,7 @@ public void start() throws Exception { public void startAddsChannelToMessageListener() throws Exception { doNothing() .when(redisMessageListenerContainer) - .addMessageListener(any(), eq(new ChannelTopic(QueueInfo.getChannelName(slowQueue)))); + .addMessageListener(any(), eq(new ChannelTopic(QueueUtility.getChannelName(slowQueue)))); messageScheduler.onApplicationEvent( new QueueInitializationEvent("Test", queueNameToQueueDetail, true)); Thread.sleep(500L); @@ -276,14 +273,14 @@ public void onMessageListenerTest() throws Exception { // invalid body messageListener.onMessage( - new DefaultMessage(QueueInfo.getChannelName(slowQueue).getBytes(), "sss".getBytes()), null); + new DefaultMessage(QueueUtility.getChannelName(slowQueue).getBytes(), "sss".getBytes()), null); Thread.sleep(50); assertEquals(1, messageScheduler.scheduleList.stream().filter(e -> !e).count()); // both are correct messageListener.onMessage( new DefaultMessage( - QueueInfo.getChannelName(slowQueue).getBytes(), + QueueUtility.getChannelName(slowQueue).getBytes(), String.valueOf(System.currentTimeMillis()).getBytes()), null); Thread.sleep(50); @@ -322,8 +319,12 @@ static class TestMessageScheduler extends DelayedMessageScheduler { List scheduleList = new Vector<>(); TestMessageScheduler( - RedisTemplate redisTemplate, int poolSize, boolean scheduleTaskAtStartup) { - super(redisTemplate, poolSize, scheduleTaskAtStartup); + RedisTemplate redisTemplate, + int poolSize, + boolean scheduleTaskAtStartup, + boolean redisEnabled, + long maxJobExecutionTime) { + super(redisTemplate, poolSize, scheduleTaskAtStartup, redisEnabled, maxJobExecutionTime); } @Override diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerWithSchedulerDisabledAtStartup.java b/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerWithSchedulerDisabledAtStartup.java index 2919942f..889cdb56 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerWithSchedulerDisabledAtStartup.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerWithSchedulerDisabledAtStartup.java @@ -20,7 +20,7 @@ import com.github.sonus21.rqueue.core.MessageSchedulerTest.TestMessageScheduler; import com.github.sonus21.rqueue.core.MessageSchedulerTest.TestTaskScheduler; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; +import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.utils.QueueInitializationEvent; import java.util.HashMap; import java.util.Map; @@ -41,13 +41,14 @@ public class MessageSchedulerWithSchedulerDisabledAtStartup { @Mock private RedisTemplate redisTemplate; @InjectMocks - private TestMessageScheduler messageScheduler = new TestMessageScheduler(redisTemplate, 1, false); + private TestMessageScheduler messageScheduler = + new TestMessageScheduler(redisTemplate, 1, false, true, 900000); private String slowQueue = "slow-queue"; private String fastQueue = "fast-queue"; - private ConsumerQueueDetail slowQueueDetail = new ConsumerQueueDetail(slowQueue, -1, "", true); - private ConsumerQueueDetail fastQueueDetail = new ConsumerQueueDetail(fastQueue, -1, "", false); - private Map queueNameToQueueDetail = new HashMap<>(); + private QueueDetail slowQueueDetail = new QueueDetail(slowQueue, -1, "", true); + private QueueDetail fastQueueDetail = new QueueDetail(fastQueue, -1, "", false); + private Map queueNameToQueueDetail = new HashMap<>(); @Before public void init() { diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/core/ProcessingMessageSchedulerTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/core/ProcessingMessageSchedulerTest.java index 98db4b33..34bfdad3 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/core/ProcessingMessageSchedulerTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/core/ProcessingMessageSchedulerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +19,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.QueueUtility; import org.junit.Test; import org.mockito.Mock; import org.springframework.data.redis.core.RedisTemplate; @@ -29,23 +29,23 @@ public class ProcessingMessageSchedulerTest { private int poolSize = 1; @Mock private RedisTemplate redisTemplate; private ProcessingMessageScheduler messageScheduler = - new ProcessingMessageScheduler(redisTemplate, poolSize, true); + new ProcessingMessageScheduler(redisTemplate, poolSize, true, true, 900000); private String slowQueue = "slow-queue"; private String fastQueue = "fast-queue"; - private ConsumerQueueDetail slowQueueDetail = new ConsumerQueueDetail(slowQueue, -1, "", true); - private ConsumerQueueDetail fastQueueDetail = new ConsumerQueueDetail(fastQueue, -1, "", false); + private QueueDetail slowQueueDetail = new QueueDetail(slowQueue, -1, "", true); + private QueueDetail fastQueueDetail = new QueueDetail(fastQueue, -1, "", false); @Test public void getChannelName() { assertEquals( - QueueInfo.getProcessingQueueChannelName(slowQueue), + QueueUtility.getProcessingQueueChannelName(slowQueue), messageScheduler.getChannelName(slowQueue)); } @Test public void getZsetName() { assertEquals( - QueueInfo.getProcessingQueueName(slowQueue), messageScheduler.getZsetName(slowQueue)); + QueueUtility.getProcessingQueueName(slowQueue), messageScheduler.getZsetName(slowQueue)); } @Test @@ -58,7 +58,7 @@ public void isQueueValid() { public void getNextScheduleTime() { long currentTime = System.currentTimeMillis(); assertEquals( - QueueInfo.getMessageReEnqueueTime(currentTime), + QueueUtility.getMessageReEnqueueTimeWithDelay(currentTime, 900000), messageScheduler.getNextScheduleTime(currentTime, null)); assertEquals( currentTime + 1000L, diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/core/RqueueMessageTemplateTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/core/RqueueMessageTemplateTest.java index 71032f5c..2c266625 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/core/RqueueMessageTemplateTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/core/RqueueMessageTemplateTest.java @@ -45,7 +45,7 @@ public class RqueueMessageTemplateTest { private DefaultScriptExecutor scriptExecutor = mock(DefaultScriptExecutor.class); private RqueueMessageTemplate rqueueMessageTemplate = - new RqueueMessageTemplate(redisConnectionFactory); + new RqueueMessageTemplate(redisConnectionFactory, 900000); private String key = "test-queue"; private RqueueMessage message = new RqueueMessage(key, "This is a message", null, 100L); diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageHandlerTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageHandlerTest.java index ffb9121f..4474a15f 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageHandlerTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageHandlerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ package com.github.sonus21.rqueue.listener; -import static com.github.sonus21.rqueue.utils.QueueInfo.QUEUE_NAME; +import static com.github.sonus21.rqueue.utils.QueueUtility.QUEUE_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java index 62519598..2e5f7968 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java @@ -28,6 +28,8 @@ import com.github.sonus21.rqueue.annotation.RqueueListener; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; +import com.github.sonus21.rqueue.processor.MessageProcessor; +import com.github.sonus21.rqueue.processor.NoOpMessageProcessor; import io.lettuce.core.RedisCommandExecutionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -45,9 +47,15 @@ public class RqueueMessageListenerContainerTest { private static final String slowQueue = "slow-queue"; private static final String fastQueue = "fast-queue"; + private MockedMessageProcessor deadLetterMessageProcessor = new MockedMessageProcessor(); + private MockedMessageProcessor discardMessageProcessor = new MockedMessageProcessor(); private RqueueMessageListenerContainer container = new RqueueMessageListenerContainer( - mock(RqueueMessageHandler.class), mock(RqueueMessageTemplate.class)); + mock(RqueueMessageHandler.class), + mock(RqueueMessageTemplate.class), + discardMessageProcessor, + deadLetterMessageProcessor, + 900000); @Test public void testPollingInterval() { @@ -179,7 +187,12 @@ public void testMessagesAreGettingFetchedFromRedis() throws Exception { messageHandler.afterPropertiesSet(); RqueueMessageListenerContainer container = - new RqueueMessageListenerContainer(messageHandler, rqueueMessageTemplate); + new RqueueMessageListenerContainer( + messageHandler, + rqueueMessageTemplate, + new NoOpMessageProcessor(), + new NoOpMessageProcessor(), + 900000); FieldUtils.writeField( container, "applicationEventPublisher", mock(ApplicationEventPublisher.class), true); AtomicInteger fastQueueCounter = new AtomicInteger(0); @@ -226,7 +239,12 @@ public void testMessageFetcherRetryWorking() throws Exception { container.setBackOffTime(10L); RqueueMessageListenerContainer container = - new RqueueMessageListenerContainer(messageHandler, rqueueMessageTemplate); + new RqueueMessageListenerContainer( + messageHandler, + rqueueMessageTemplate, + new NoOpMessageProcessor(), + new NoOpMessageProcessor(), + 900000); FieldUtils.writeField( container, "applicationEventPublisher", mock(ApplicationEventPublisher.class), true); doAnswer( @@ -266,7 +284,12 @@ public void testMessageHandlersAreInvoked() throws Exception { messageHandler.afterPropertiesSet(); RqueueMessageListenerContainer container = - new RqueueMessageListenerContainer(messageHandler, rqueueMessageTemplate); + new RqueueMessageListenerContainer( + messageHandler, + rqueueMessageTemplate, + new NoOpMessageProcessor(), + new NoOpMessageProcessor(), + 900000); FieldUtils.writeField( container, "applicationEventPublisher", mock(ApplicationEventPublisher.class), true); FastMessageSchedulerListener fastMessageListener = @@ -337,7 +360,12 @@ public Future submit(Runnable task) { messageHandler.setApplicationContext(applicationContext); messageHandler.afterPropertiesSet(); RqueueMessageListenerContainer container = - new RqueueMessageListenerContainer(messageHandler, mock(RqueueMessageTemplate.class)); + new RqueueMessageListenerContainer( + messageHandler, + mock(RqueueMessageTemplate.class), + new NoOpMessageProcessor(), + new NoOpMessageProcessor(), + 900000); FieldUtils.writeField( container, "applicationEventPublisher", mock(ApplicationEventPublisher.class), true); TestTaskExecutor taskExecutor = new TestTaskExecutor(); @@ -357,7 +385,12 @@ private static class StubMessageSchedulerListenerContainer private boolean doStopMethodIsCalled = false; StubMessageSchedulerListenerContainer() { - super(mock(RqueueMessageHandler.class), mock(RqueueMessageTemplate.class)); + super( + mock(RqueueMessageHandler.class), + mock(RqueueMessageTemplate.class), + new NoOpMessageProcessor(), + new NoOpMessageProcessor(), + 900000); } @Override @@ -395,4 +428,18 @@ public void onMessage(String message) { lastMessage = message; } } + + @Getter + private static class MockedMessageProcessor implements MessageProcessor { + private int count; + + @Override + public void process(Object message) { + count += 1; + } + + public void resetCount() { + count = 0; + } + } } diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java index 84ecc411..402f05a4 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java @@ -27,8 +27,8 @@ import static org.mockito.Mockito.verify; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; -import com.github.sonus21.rqueue.listener.ConsumerQueueDetail; -import com.github.sonus21.rqueue.utils.QueueInfo; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.utils.QueueUtility; import com.github.sonus21.rqueue.utils.QueueInitializationEvent; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; @@ -46,7 +46,7 @@ public class RqueueMetricsTest { private RqueueMessageTemplate template = mock(RqueueMessageTemplate.class); private RqueueMetricsProperties metricsProperties = new RqueueMetricsProperties() {}; private QueueCounter queueCounter = mock(QueueCounter.class); - private Map queueDetails = new HashMap<>(); + private Map queueDetails = new HashMap<>(); private String simpleQueue = "simple-queue"; private String delayedQueue = "delayed-queue"; private String deadLetterQueue = "dlq"; @@ -54,18 +54,18 @@ public class RqueueMetricsTest { @Before public void init() { - queueDetails.put(simpleQueue, new ConsumerQueueDetail(simpleQueue, -1, deadLetterQueue, false)); - queueDetails.put(delayedQueue, new ConsumerQueueDetail(delayedQueue, -1, "", true)); + queueDetails.put(simpleQueue, new QueueDetail(simpleQueue, -1, deadLetterQueue, false)); + queueDetails.put(delayedQueue, new QueueDetail(delayedQueue, -1, "", true)); doAnswer( invocation -> { String zsetName = (String) invocation.getArguments()[0]; - if (zsetName.equals(QueueInfo.getTimeQueueName(delayedQueue))) { + if (zsetName.equals(QueueUtility.getTimeQueueName(delayedQueue))) { return 5L; } - if (zsetName.equals(QueueInfo.getProcessingQueueName(simpleQueue))) { + if (zsetName.equals(QueueUtility.getProcessingQueueName(simpleQueue))) { return 10L; } - if (zsetName.equals(QueueInfo.getProcessingQueueName(delayedQueue))) { + if (zsetName.equals(QueueUtility.getProcessingQueueName(delayedQueue))) { return 15L; } return null; diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/producer/RqueueMessageSenderTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/producer/RqueueMessageSenderTest.java index 400547fb..cdcf1159 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/producer/RqueueMessageSenderTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/producer/RqueueMessageSenderTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Sonu Kumar + * Copyright 2020 Sonu Kumar * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -116,7 +116,7 @@ public void moveMessageFromQueueExceptions() { @Test public void moveMessageFromDeadLetterToQueue() { - doReturn(true).when(rqueueMessageTemplate).moveMessage("dlq" + queueName, queueName, 1000); + doReturn(true).when(rqueueMessageTemplate).moveMessage("dlq" + queueName, queueName, 100); rqueueMessageSender.moveMessageFromDeadLetterToQueue("dlq" + queueName, queueName, null); }