From f1fd0da4c5abe5018302d748411d029557819c51 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sat, 9 May 2020 12:48:54 +0530 Subject: [PATCH] Queue priority --- CHANGELOG.md | 9 +- .../sonus21/rqueue/test/MessageListener.java | 55 +++++++++++ .../sonus21/rqueue/test/dto/ChatIndexing.java | 44 +++++++++ .../rqueue/test/dto/FeedGeneration.java | 44 +++++++++ .../rqueue/test/dto/OrderConfirmation.java | 45 +++++++++ .../github/sonus21/rqueue/test/dto/Otp.java | 47 ++++++++++ .../sonus21/rqueue/test/dto/Reservation.java | 53 +++++++++++ .../src/main/resources/application.properties | 9 ++ .../rqueue/example/MessageListener.java | 8 +- .../tests/integration/ApplicationTest.java | 2 + ...{MetricsTest.java => BootMetricsTest.java} | 5 +- .../tests/integration/MessageChannelTest.java | 2 +- .../tests/integration/MessageRetryTest.java | 3 +- .../ProcessingMessageSchedulerTest.java | 2 +- .../RqueueMessageTemplateTest.java | 2 +- .../RqueueRestControllerTest.java | 4 +- .../RqueueViewControllerTest.java | 7 +- .../integration}/RqueueViewsDisabledTest.java | 4 +- .../integration}/SpringAppTest.java | 6 +- .../integration}/SpringMetricTest.java | 4 +- .../unit}/RqueueMessageConfigTest.java | 3 +- .../rqueue/annotation/RqueueListener.java | 14 ++- .../sonus21/rqueue/config/RqueueConfig.java | 4 +- .../rqueue/core/DelayedMessageScheduler.java | 9 +- .../sonus21/rqueue/core/MessageScheduler.java | 50 ++++------ .../core/ProcessingMessageScheduler.java | 8 +- .../sonus21/rqueue/core/QueueRegistry.java | 17 +++- .../rqueue/core/RqueueMessageSender.java | 22 +++++ .../rqueue/core/RqueueMessageSenderImpl.java | 11 +++ .../rqueue/listener/MappingInformation.java | 7 +- .../rqueue/listener/MessageExecutor.java | 2 +- .../sonus21/rqueue/listener/QueueDetail.java | 61 ++++++------ .../rqueue/listener/RqueueMessageHandler.java | 70 +++++++------- .../RqueueMessageListenerContainer.java | 94 +++++++++---------- .../sonus21/rqueue/models/db/QueueConfig.java | 45 ++++++++- .../models/event/RqueueExecutionEvent.java | 13 ++- .../service/RqueueTaskAggregatorService.java | 12 ++- .../impl/RqueueQDetailServiceImpl.java | 4 +- .../impl/RqueueSystemManagerServiceImpl.java | 2 + .../core/DelayedMessageSchedulerTest.java | 44 +++++---- .../core/MessageSchedulerRedisDisabled.java | 7 +- .../rqueue/core/MessageSchedulerTest.java | 6 +- .../core/ProcessingMessageSchedulerTest.java | 13 +-- .../rqueue/listener/MessageExecutorTest.java | 70 ++++++++++---- .../listener/RqueueMessageHandlerTest.java | 3 +- .../RqueueMessageListenerContainerTest.java | 11 ++- .../rqueue/metrics/QueueCounterTest.java | 6 +- .../rqueue/metrics/RqueueMetricsTest.java | 6 +- .../rqueue/models/db/QueueConfigTest.java | 48 ++++++---- .../producer/RqueueMessageSenderTest.java | 2 +- .../sonus21/rqueue/utils/TestUtils.java | 40 ++++++-- .../web/dao/RqueueSystemConfigDaoTest.java | 8 +- .../web/service/RqueueQDetailServiceTest.java | 50 +++++----- .../RqueueSystemManagerServiceTest.java | 8 +- .../RqueueTaskAggregatorServiceTest.java | 4 +- .../web/service/RqueueUtilityServiceTest.java | 2 +- .../RqueueSystemManagerServiceImplTest.java | 16 +--- 57 files changed, 785 insertions(+), 362 deletions(-) create mode 100644 rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/ChatIndexing.java create mode 100644 rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/FeedGeneration.java create mode 100644 rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/OrderConfirmation.java create mode 100644 rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/Otp.java create mode 100644 rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/Reservation.java rename rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/{MetricsTest.java => BootMetricsTest.java} (93%) rename rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/{ => tests/integration}/RqueueRestControllerTest.java (99%) rename rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/{ => tests/integration}/RqueueViewControllerTest.java (97%) rename rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/{ => tests/integration}/RqueueViewsDisabledTest.java (98%) rename rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/{ => tests/integration}/SpringAppTest.java (89%) rename rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/{ => tests/integration}/SpringMetricTest.java (88%) rename rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/{ => tests/unit}/RqueueMessageConfigTest.java (98%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f362451..5050b2dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,6 @@ All notable changes to Rqueue project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [2.2.0] - TBD -- Global queue priority -- Queue level priority -- Strict or weighted algorithm for message execution - ## [2.0.0] - TBD ### Added @@ -20,7 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Single or multiple execution of polled messages - Queue level concurrency - BackOff for failed message, linear or exponential -- +- Group level queue priority +- Multi level queue priority +- Strict or weighted algorithm for message execution ### Breaking Changes - Queue names are prefixed, that can lead to error. 1.x users set REDIS key `__rq::version` to `1`. It does try to find the version using key prefix, but if all queues are empty or no key exist in REDIS with prefix `rqueue-` then it will consider version 2. diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/MessageListener.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/MessageListener.java index fd31d7af..036b87f6 100644 --- a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/MessageListener.java +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/MessageListener.java @@ -17,9 +17,13 @@ package com.github.sonus21.rqueue.test; import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.test.dto.ChatIndexing; import com.github.sonus21.rqueue.test.dto.Email; +import com.github.sonus21.rqueue.test.dto.FeedGeneration; import com.github.sonus21.rqueue.test.dto.Job; import com.github.sonus21.rqueue.test.dto.Notification; +import com.github.sonus21.rqueue.test.dto.Otp; +import com.github.sonus21.rqueue.test.dto.Reservation; import com.github.sonus21.rqueue.test.service.ConsumedMessageService; import com.github.sonus21.rqueue.test.service.FailureManager; import lombok.NonNull; @@ -73,4 +77,55 @@ public void onMessage(Email email) throws Exception { } consumedMessageService.save(email); } + + @RqueueListener( + value = "${otp.queue}", + active = "${otp.queue.active}", + priority = "critical:5,high:3, low:2") + public void onMessage(Otp otp) throws Exception { + log.info("Otp: {}", otp); + if (failureManager.shouldFail(otp.getId())) { + throw new Exception("Failing otp task to be retried" + otp); + } + consumedMessageService.save(otp); + } + + @RqueueListener( + value = "${chat.indexing.queue}", + active = "${chat.indexing.queue.active}", + priority = "10", + priorityGroup = "test") + public void onMessage(ChatIndexing chatIndexing) throws Exception { + log.info("ChatIndexing: {}", chatIndexing); + if (failureManager.shouldFail(chatIndexing.getId())) { + throw new Exception("Failing chat indexing task to be retried" + chatIndexing); + } + consumedMessageService.save(chatIndexing); + } + + @RqueueListener( + value = "${feed.generation.queue}", + active = "${feed.generation.queue.active}", + priority = "50", + priorityGroup = "test") + public void onMessage(FeedGeneration feedGeneration) throws Exception { + log.info("FeedGeneration: {}", feedGeneration); + if (failureManager.shouldFail(feedGeneration.getId())) { + throw new Exception("Failing feedGeneration task to be retried" + feedGeneration); + } + consumedMessageService.save(feedGeneration); + } + + @RqueueListener( + value = "${reservation.queue}", + active = "${reservation.queue.active}", + priority = "100", + priorityGroup = "test2") + public void onMessage(Reservation reservation) throws Exception { + log.info("Reservation: {}", reservation); + if (failureManager.shouldFail(reservation.getId())) { + throw new Exception("Failing reservation task to be retried" + reservation); + } + consumedMessageService.save(reservation); + } } diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/ChatIndexing.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/ChatIndexing.java new file mode 100644 index 00000000..7cdf09e0 --- /dev/null +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/ChatIndexing.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.test.dto; + +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.apache.commons.lang3.RandomUtils; + +@AllArgsConstructor +@NoArgsConstructor +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = true) +public class ChatIndexing extends BaseQueueMessage { + private int userId; + private Long timestamp; + + public static ChatIndexing newInstance() { + ChatIndexing chatIndexing = + new ChatIndexing(RandomUtils.nextInt(100000, 1000000), System.currentTimeMillis()); + chatIndexing.setId(UUID.randomUUID().toString()); + return chatIndexing; + } +} diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/FeedGeneration.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/FeedGeneration.java new file mode 100644 index 00000000..fc5aed79 --- /dev/null +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/FeedGeneration.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.test.dto; + +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.apache.commons.lang3.RandomUtils; + +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@ToString +@EqualsAndHashCode(callSuper = true) +public class FeedGeneration extends BaseQueueMessage { + private int userId; + private Long timestamp; + + public static FeedGeneration newInstance() { + FeedGeneration feedGeneration = + new FeedGeneration(RandomUtils.nextInt(1000, 1000000), System.currentTimeMillis()); + feedGeneration.setId(UUID.randomUUID().toString()); + return feedGeneration; + } +} diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/OrderConfirmation.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/OrderConfirmation.java new file mode 100644 index 00000000..31d6d77e --- /dev/null +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/OrderConfirmation.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.test.dto; + +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@AllArgsConstructor +@NoArgsConstructor +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = true) +public class OrderConfirmation extends BaseQueueMessage { + private String orderId; + private String userId; + private long timestamp; + + public static OrderConfirmation newInstance() { + OrderConfirmation orderConfirmation = + new OrderConfirmation( + UUID.randomUUID().toString(), UUID.randomUUID().toString(), System.currentTimeMillis()); + orderConfirmation.setId(UUID.randomUUID().toString()); + return orderConfirmation; + } +} diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/Otp.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/Otp.java new file mode 100644 index 00000000..9befce0b --- /dev/null +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/Otp.java @@ -0,0 +1,47 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.test.dto; + +import java.util.Random; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.apache.commons.lang3.RandomStringUtils; + +@AllArgsConstructor +@NoArgsConstructor +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = true) +public class Otp extends BaseQueueMessage { + private String phoneNumber; + private String otp; + + public static Otp newInstance() { + Otp otp = + new Otp( + "+91" + RandomStringUtils.randomNumeric(10), + String.valueOf(new Random().nextInt(100000))); + otp.setId(UUID.randomUUID().toString()); + return otp; + } +} diff --git a/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/Reservation.java b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/Reservation.java new file mode 100644 index 00000000..d61210f2 --- /dev/null +++ b/rqueue-common-test/src/main/java/com/github/sonus21/rqueue/test/dto/Reservation.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.sonus21.rqueue.test.dto; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; + +@AllArgsConstructor +@NoArgsConstructor +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = true) +public class Reservation extends BaseQueueMessage { + private int requestId; + private Map otherDetails; + + public static Reservation newInstance() { + Map other = new HashMap<>(); + int count = RandomUtils.nextInt(1, 10); + for (int i = 0; i < count; i++) { + String key = RandomStringUtils.randomAlphabetic(10); + String value = RandomStringUtils.randomAlphabetic(10); + other.put(key, value); + } + Reservation reservation = new Reservation(RandomUtils.nextInt(), other); + reservation.setId(UUID.randomUUID().toString()); + return reservation; + } +} diff --git a/rqueue-common-test/src/main/resources/application.properties b/rqueue-common-test/src/main/resources/application.properties index 005e0dd1..2116c7bd 100644 --- a/rqueue-common-test/src/main/resources/application.properties +++ b/rqueue-common-test/src/main/resources/application.properties @@ -13,3 +13,12 @@ email.queue.active=true mysql.db.name=test rqueue.metrics.tags.rqueue=test email.execution.time=15*60*1000 +otp.queue=otp +otp.queue.active=false +chat.indexing.queue=chat-indexing +chat.indexing.queue.active=false +feed.generation.queue=feed-generation +feed.generation.queue.active=false +reservation.queue=reservation +reservation.queue.active=false + diff --git a/rqueue-spring-boot-example/src/main/java/com/github/sonus21/rqueue/example/MessageListener.java b/rqueue-spring-boot-example/src/main/java/com/github/sonus21/rqueue/example/MessageListener.java index ff725517..0517c132 100644 --- a/rqueue-spring-boot-example/src/main/java/com/github/sonus21/rqueue/example/MessageListener.java +++ b/rqueue-spring-boot-example/src/main/java/com/github/sonus21/rqueue/example/MessageListener.java @@ -55,9 +55,9 @@ public void consumeMessage(String message) { @RqueueListener( value = {"${rqueue.delay.queue}", "${rqueue.delay2.queue}"}, - delayedQueue = "${rqueue.delay.queue.delayed-queue}", numRetries = "${rqueue.delay.queue.retries}", - visibilityTimeout = "60*60*1000") + visibilityTimeout = "60*60*1000", + active = "true") @NewSpan public void onMessage(String message) { execute("delay: {}", message); @@ -68,10 +68,10 @@ public void onMessage(String message) { @RqueueListener( value = "job-queue", - delayedQueue = "true", deadLetterQueue = "job-morgue", numRetries = "2", - concurrency = "5") + concurrency = "5-10", + active = "true") @NewSpan public void onMessage(Job job) { execute("job-queue: {}", job); diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ApplicationTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ApplicationTest.java index 714a37cd..e2264375 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ApplicationTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ApplicationTest.java @@ -32,11 +32,13 @@ import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; @RunWith(RqueueSpringTestRunner.class) @ContextConfiguration(classes = Application.class) @SpringBootTest @Slf4j +@TestPropertySource(properties = {"rqueue.retry.per.poll=1000", "spring.redis.port=8001"}) public class ApplicationTest extends SpringTestBase { @Test diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MetricsTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BootMetricsTest.java similarity index 93% rename from rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MetricsTest.java rename to rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BootMetricsTest.java index 634aa761..64889c90 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MetricsTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BootMetricsTest.java @@ -33,13 +33,14 @@ @Slf4j @TestPropertySource( properties = { + "rqueue.retry.per.poll=20", "rqueue.scheduler.auto.start=false", - "spring.redis.port=6384", + "spring.redis.port=8004", "mysql.db.name=test4", "rqueue.metrics.count.failure=true", "rqueue.metrics.count.execution=true", }) -public class MetricsTest extends MetricTestBase { +public class BootMetricsTest extends MetricTestBase { @Test public void delayedQueueStatus() throws TimedOutException { diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageChannelTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageChannelTest.java index 07635853..3763daad 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageChannelTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageChannelTest.java @@ -37,7 +37,7 @@ @TestPropertySource( properties = { "rqueue.scheduler.auto.start=false", - "spring.redis.port=6382", + "spring.redis.port=8002", "mysql.db.name=test2" }) @SpringBootTest diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageRetryTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageRetryTest.java index 59437de4..f59bf9dc 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageRetryTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/MessageRetryTest.java @@ -35,7 +35,8 @@ @RunWith(RqueueSpringTestRunner.class) @ContextConfiguration(classes = Application.class) -@TestPropertySource(properties = {"spring.redis.port=6381", "mysql.db.name=test1"}) +@TestPropertySource( + properties = {"spring.redis.port=8003", "mysql.db.name=test1", "rqueue.retry.per.poll=1000"}) @SpringBootTest public class MessageRetryTest extends SpringTestBase { @Test diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ProcessingMessageSchedulerTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ProcessingMessageSchedulerTest.java index cf804291..1f0e7d42 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ProcessingMessageSchedulerTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/ProcessingMessageSchedulerTest.java @@ -46,7 +46,7 @@ @TestPropertySource( properties = { "rqueue.scheduler.auto.start=false", - "spring.redis.port=6389", + "spring.redis.port=8005", "mysql.db.name=test3", "max.workers.count=120", "use.system.redis=false" diff --git a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RqueueMessageTemplateTest.java b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RqueueMessageTemplateTest.java index 8dff2065..5c93f7a3 100644 --- a/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RqueueMessageTemplateTest.java +++ b/rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/RqueueMessageTemplateTest.java @@ -36,7 +36,7 @@ @ContextConfiguration(classes = Application.class) @SpringBootTest @Slf4j -@TestPropertySource(properties = {"use.system.redis=false", "spring.redis.port:6385"}) +@TestPropertySource(properties = {"use.system.redis=false", "spring.redis.port:8006"}) public class RqueueMessageTemplateTest extends SpringTestBase { @Test public void moveMessageFromDeadLetterQueueToOriginalQueue() { diff --git a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueRestControllerTest.java b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/RqueueRestControllerTest.java similarity index 99% rename from rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueRestControllerTest.java rename to rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/RqueueRestControllerTest.java index 6f3aab24..a69ca8b2 100644 --- a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueRestControllerTest.java +++ b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/RqueueRestControllerTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.github.sonus21.rqueue.spring; +package com.github.sonus21.rqueue.spring.tests.integration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -66,7 +66,7 @@ @WebAppConfiguration @TestPropertySource( properties = { - "spring.redis.port=6386", + "spring.redis.port=7001", "mysql.db.name=RqueueRestController", "max.workers.count=40", "notification.queue.active=false", diff --git a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueViewControllerTest.java b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/RqueueViewControllerTest.java similarity index 97% rename from rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueViewControllerTest.java rename to rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/RqueueViewControllerTest.java index bdb1383a..28fab075 100644 --- a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueViewControllerTest.java +++ b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/RqueueViewControllerTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.github.sonus21.rqueue.spring; +package com.github.sonus21.rqueue.spring.tests.integration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -49,7 +49,7 @@ @WebAppConfiguration @TestPropertySource( properties = { - "spring.redis.port=6387", + "spring.redis.port=7002", "mysql.db.name=RqueueViewControllerTest", "max.workers.count=40", "notification.queue.active=false", @@ -131,7 +131,8 @@ public void queueDetail() throws Exception { model.get("typeSelectors")); assertNotNull(model.get("config")); assertNotNull(model.get("queueRedisDataDetails")); - assertEquals(Arrays.asList(NavTab.PENDING, NavTab.RUNNING), model.get("queueActions")); + assertEquals( + Arrays.asList(NavTab.PENDING, NavTab.SCHEDULED, NavTab.RUNNING), model.get("queueActions")); } @Test diff --git a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueViewsDisabledTest.java b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/RqueueViewsDisabledTest.java similarity index 98% rename from rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueViewsDisabledTest.java rename to rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/RqueueViewsDisabledTest.java index 6150f680..44f91ed2 100644 --- a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueViewsDisabledTest.java +++ b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/RqueueViewsDisabledTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.github.sonus21.rqueue.spring; +package com.github.sonus21.rqueue.spring.tests.integration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -48,7 +48,7 @@ @WebAppConfiguration @TestPropertySource( properties = { - "spring.redis.port=6388", + "spring.redis.port=7003", "mysql.db.name=RqueueRestController", "rqueue.web.enable=false" }) diff --git a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringAppTest.java b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/SpringAppTest.java similarity index 89% rename from rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringAppTest.java rename to rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/SpringAppTest.java index 7562ad6f..e27ac8c7 100644 --- a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringAppTest.java +++ b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/SpringAppTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.github.sonus21.rqueue.spring; +package com.github.sonus21.rqueue.spring.tests.integration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -29,17 +29,19 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.web.WebAppConfiguration; @ContextConfiguration(classes = AppWithMetricEnabled.class) @RunWith(RqueueSpringTestRunner.class) @Slf4j @WebAppConfiguration +@TestPropertySource(properties = {"spring.redis.port=7004"}) public class SpringAppTest extends SpringTestBase { @Test public void numListeners() { - Map registeredQueue = QueueRegistry.getQueueMap(); + Map registeredQueue = QueueRegistry.getActiveQueueMap(); assertEquals(3, registeredQueue.size()); assertTrue(registeredQueue.containsKey(notificationQueue)); assertTrue(registeredQueue.containsKey(emailQueue)); diff --git a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringMetricTest.java b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/SpringMetricTest.java similarity index 88% rename from rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringMetricTest.java rename to rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/SpringMetricTest.java index c6aaf40e..97e875e8 100644 --- a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/SpringMetricTest.java +++ b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/integration/SpringMetricTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.github.sonus21.rqueue.spring; +package com.github.sonus21.rqueue.spring.tests.integration; import com.github.sonus21.rqueue.exception.TimedOutException; import com.github.sonus21.rqueue.spring.app.AppWithMetricEnabled; @@ -24,12 +24,14 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.web.WebAppConfiguration; @ContextConfiguration(classes = AppWithMetricEnabled.class) @RunWith(RqueueSpringTestRunner.class) @Slf4j @WebAppConfiguration +@TestPropertySource(properties = {"rqueue.retry.per.poll=20", "spring.redis.port=7005"}) public class SpringMetricTest extends MetricTestBase { @Test public void delayedQueueStatus() throws TimedOutException { diff --git a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueMessageConfigTest.java b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/unit/RqueueMessageConfigTest.java similarity index 98% rename from rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueMessageConfigTest.java rename to rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/unit/RqueueMessageConfigTest.java index 4f9bbc24..f647164c 100644 --- a/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/RqueueMessageConfigTest.java +++ b/rqueue-spring/src/test/java/com/github/sonus21/rqueue/spring/tests/unit/RqueueMessageConfigTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.github.sonus21.rqueue.spring; +package com.github.sonus21.rqueue.spring.tests.unit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -26,6 +26,7 @@ import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; import com.github.sonus21.rqueue.core.RqueueMessageSender; +import com.github.sonus21.rqueue.spring.RqueueListenerConfig; import java.util.ArrayList; import java.util.Collections; import java.util.List; diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java b/rqueue/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java index 90bfa734..ce46c4cf 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java @@ -137,11 +137,17 @@ String concurrency() default "-1"; /** - * Use this to set priority of this listener. A listener can have two types of priority, global - * and queue level priority. In the case of queue level priority, values should be provided as - * "critical:10,high:5, medium:3,low:1", these same name must be used to enqueue message. + * Use this to set priority of this listener. A listener can have two types of priorities. * - *

When using priority group then this should be a number between 1 and 100 both are inclusive + *

1. Group level priority, for group level priority specify group name, if no group is + * provided then a default group is used. + * + *

2. Queue level priority. In the case of queue level priority, values should be provided as + * "critical:10,high:5,medium:3,low:1", these same name must be used to enqueue message. + * + *

Priorities can be any number. There're two priority control modes. 1. Strict 2. Weighted, in + * strict priority mode queue with higher priority is preferred over other queues. In case of + * weighted a round robin approach is used, and weight is followed. * * @return the priority for this listener. */ diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java b/rqueue/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java index 8f3ac7ee..ac0c900f 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java @@ -72,8 +72,8 @@ public String getQueuesKey() { @Value("${rqueue.queue.config.key.prefix:q-config::}") private String queueConfigKeyPrefix; - @Value("${retries.per.poll:1}") - private int retriesPerPoll; + @Value("${rqueue.retry.per.poll:1}") + private int retryPerPoll; @Value("${rqueue.add.default.queue.with.queue.level.priority:true}") private boolean addDefaultQueueWithQueueLevelPriority; diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/DelayedMessageScheduler.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/DelayedMessageScheduler.java index f7aa241f..0153dbd0 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/DelayedMessageScheduler.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/DelayedMessageScheduler.java @@ -16,7 +16,6 @@ package com.github.sonus21.rqueue.core; -import com.github.sonus21.rqueue.listener.QueueDetail; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; @@ -41,13 +40,13 @@ protected long getNextScheduleTime(String queueName, Long value) { } @Override - protected String getChannelName(QueueDetail queueDetail) { - return queueDetail.getDelayedQueueChannelName(); + protected String getChannelName(String queueName) { + return QueueRegistry.get(queueName).getDelayedQueueChannelName(); } @Override - protected String getZsetName(QueueDetail queueDetail) { - return queueDetail.getDelayedQueueName(); + protected String getZsetName(String queueName) { + return QueueRegistry.get(queueName).getDelayedQueueName(); } @Override diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java index f29239ee..7fed9ee8 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java @@ -21,7 +21,6 @@ import static java.lang.Math.max; import static java.lang.Math.min; -import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType; import com.github.sonus21.rqueue.listener.QueueDetail; @@ -30,6 +29,7 @@ import com.github.sonus21.rqueue.utils.ThreadUtils; import java.time.Instant; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -54,7 +54,6 @@ abstract class MessageScheduler implements DisposableBean, ApplicationListener { @Autowired protected RqueueSchedulerConfig rqueueSchedulerConfig; - @Autowired protected RqueueConfig rqueueConfig; private RedisScript redisScript; private MessageSchedulerListener messageSchedulerListener; private DefaultScriptExecutor defaultScriptExecutor; @@ -62,7 +61,6 @@ abstract class MessageScheduler private Map queueNameToScheduledTask; private Map channelNameToQueueName; private Map queueNameToLastMessageSeenTime; - private Map queueDetails = new ConcurrentHashMap<>(); private ThreadPoolTaskScheduler scheduler; @Autowired private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory; @@ -74,9 +72,9 @@ abstract class MessageScheduler protected abstract long getNextScheduleTime(String queueName, Long value); - protected abstract String getChannelName(QueueDetail queueDetail); + protected abstract String getChannelName(String queueName); - protected abstract String getZsetName(QueueDetail queueDetail); + protected abstract String getZsetName(String queueName); protected abstract String getThreadNamePrefix(); @@ -88,14 +86,14 @@ private void doStart() { } } - private void subscribeToRedisTopic(QueueDetail queueDetail) { + private void subscribeToRedisTopic(String queueName) { if (isRedisEnabled()) { - String channelName = getChannelName(queueDetail); - getLogger().debug("Queue {} subscribe to channel {}", queueDetail.getName(), channelName); + String channelName = getChannelName(queueName); + getLogger().debug("Queue {} subscribe to channel {}", queueName, channelName); this.rqueueRedisListenerContainerFactory .getContainer() .addMessageListener(messageSchedulerListener, new ChannelTopic(channelName)); - channelNameToQueueName.put(channelName, queueDetail.getName()); + channelNameToQueueName.put(channelName, queueName); } } @@ -108,7 +106,7 @@ private void startQueue(String queueName) { long scheduleAt = System.currentTimeMillis() + MIN_DELAY; schedule(queueName, scheduleAt, false); } - subscribeToRedisTopic(queueDetails.get(queueName)); + subscribeToRedisTopic(queueName); } private void doStop() { @@ -197,8 +195,8 @@ protected synchronized void schedule(String queueName, Long startTime, boolean f queueNameToLastMessageSeenTime.put(queueName, currentTime); ScheduledTaskDetail scheduledTaskDetail = queueNameToScheduledTask.get(queueName); - QueueDetail queueDetail = queueDetails.get(queueName); - String zsetName = getZsetName(queueDetail); + QueueDetail queueDetail = QueueRegistry.get(queueName); + String zsetName = getZsetName(queueName); if (scheduledTaskDetail == null || forceSchedule) { long requiredDelay = max(1, startTime - currentTime); @@ -243,31 +241,19 @@ protected synchronized void schedule(String queueName, Long startTime, boolean f @SuppressWarnings("unchecked") protected void initialize() { - this.queueDetails.clear(); - for (QueueDetail queueDetail : QueueRegistry.getActiveQueueDetails()) { - if (queueDetail.getPriority().size() > 1) { - for (QueueDetail clonedQueueDetail : - queueDetail.expandQueueDetail( - rqueueConfig.isAddDefaultQueueWithQueueLevelPriority(), - rqueueConfig.getDefaultQueueWithQueueLevelPriority())) { - this.queueDetails.put(clonedQueueDetail.getName(), queueDetail); - } - } else { - this.queueDetails.put(queueDetail.getName(), queueDetail); - } - } + List queueNames = QueueRegistry.getActiveQueues(); defaultScriptExecutor = new DefaultScriptExecutor<>(redisTemplate); redisScript = (RedisScript) RedisScriptFactory.getScript(ScriptType.PUSH_MESSAGE); - queueRunningState = new ConcurrentHashMap<>(queueDetails.size()); - queueNameToScheduledTask = new ConcurrentHashMap<>(queueDetails.size()); - channelNameToQueueName = new ConcurrentHashMap<>(queueDetails.size()); - queueNameToLastMessageSeenTime = new ConcurrentHashMap<>(queueDetails.size()); - createScheduler(queueDetails.size()); + queueRunningState = new ConcurrentHashMap<>(queueNames.size()); + queueNameToScheduledTask = new ConcurrentHashMap<>(queueNames.size()); + channelNameToQueueName = new ConcurrentHashMap<>(queueNames.size()); + queueNameToLastMessageSeenTime = new ConcurrentHashMap<>(queueNames.size()); + createScheduler(queueNames.size()); if (isRedisEnabled()) { messageSchedulerListener = new MessageSchedulerListener(); } - for (String name : queueDetails.keySet()) { - queueRunningState.put(name, false); + for (String queueName : queueNames) { + queueRunningState.put(queueName, false); } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/ProcessingMessageScheduler.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/ProcessingMessageScheduler.java index bde5724d..56448744 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/ProcessingMessageScheduler.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/ProcessingMessageScheduler.java @@ -45,13 +45,13 @@ protected Logger getLogger() { } @Override - protected String getChannelName(QueueDetail queueDetail) { - return queueDetail.getProcessingQueueChannelName(); + protected String getChannelName(String queueName) { + return QueueRegistry.get(queueName).getProcessingQueueChannelName(); } @Override - protected String getZsetName(QueueDetail queueDetail) { - return queueDetail.getProcessingQueueName(); + protected String getZsetName(String queueName) { + return QueueRegistry.get(queueName).getProcessingQueueName(); } @Override diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/QueueRegistry.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/QueueRegistry.java index f84d80f2..55d762be 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/QueueRegistry.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/QueueRegistry.java @@ -18,10 +18,10 @@ import com.github.sonus21.rqueue.exception.QueueDoesNotExist; import com.github.sonus21.rqueue.listener.QueueDetail; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; public class QueueRegistry { @@ -52,10 +52,6 @@ public static void delete() { } } - public static Map getQueueMap() { - return Collections.unmodifiableMap(queueNameToDetail); - } - public static List getActiveQueues() { synchronized (lock) { List queues = @@ -79,6 +75,17 @@ public static List getActiveQueueDetails() { } } + public static Map getActiveQueueMap() { + synchronized (lock) { + Map queueDetails = + queueNameToDetail.values().stream() + .filter(QueueDetail::isActive) + .collect(Collectors.toMap(QueueDetail::getName, Function.identity())); + lock.notifyAll(); + return queueDetails; + } + } + public static int getActiveQueueCount() { return getActiveQueues().size(); } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageSender.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageSender.java index 4b587486..f8faeb0f 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageSender.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageSender.java @@ -99,6 +99,16 @@ public interface RqueueMessageSender { */ boolean enqueue(String queueName, Object message); + /** + * Submit a message on given queue without any delay at the specified priority level. + * + * @param queueName on which queue message has to be send + * @param priorityLevel priority level for this message + * @param message message object it could be any arbitrary object. + * @return message was submitted successfully or failed. + */ + boolean enqueue(String queueName, String priorityLevel, Object message); + /** * This is an extension to the method {@link #put(String, Object)}. By default container would try * to deliver the same message for {@link Integer#MAX_VALUE} times, but that can be either @@ -125,6 +135,18 @@ public interface RqueueMessageSender { */ boolean enqueueIn(String queueName, Object message, long delayInMilliSecs); + /** + * Submit a task to the consumer in a given queue at the given priority level. + * + * @param queueName on which queue message has to be send + * @param priorityLevel name of the priority level + * @param message message object it could be any arbitrary object. + * @param delayInMilliSecs delay in milli seconds, this message would be only visible to the + * listener when number of millisecond has elapsed. + * @return message was submitted successfully or failed. + */ + boolean enqueueIn(String queueName, String priorityLevel, Object message, long delayInMilliSecs); + /** * Enqueue a task that would be scheduled to run after N milli seconds. * diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageSenderImpl.java b/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageSenderImpl.java index a7e44abc..3ed79e42 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageSenderImpl.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageSenderImpl.java @@ -85,6 +85,11 @@ public boolean enqueue(String queueName, Object message) { return pushMessage(queueName, message, null, null); } + @Override + public boolean enqueue(String queueName, String priorityLevel, Object message) { + return enqueue(queueName + "_" + priorityLevel, message); + } + /** * This is an extension to the method {@link #put(String, Object)}. By default container would try * to deliver the same message for {@link Integer#MAX_VALUE} times, but that can be either @@ -153,6 +158,12 @@ public boolean enqueueIn(String queueName, Object message, long delayInMilliSecs return pushMessage(queueName, message, null, delayInMilliSecs); } + @Override + public boolean enqueueIn( + String queueName, String priorityLevel, Object message, long delayInMilliSecs) { + return enqueueIn(queueName + "_" + priorityLevel, message, delayInMilliSecs); + } + /** * Find all messages stored on a given queue, it considers all the messages including delayed and * non-delayed. diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MappingInformation.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MappingInformation.java index 723ccf37..c1a0188f 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MappingInformation.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MappingInformation.java @@ -33,13 +33,13 @@ @Builder class MappingInformation implements Comparable { private Set queueNames; - private int numRetries; + private int numRetry; private String deadLetterQueueName; private long visibilityTimeout; private boolean active; private MinMax concurrency; private String priorityGroup; - private Map priorities; + private Map priority; @Override public int compareTo(MappingInformation o) { @@ -52,8 +52,7 @@ public String toString() { } boolean isValid() { - return active - && getQueueNames().size() > 0 + return getQueueNames().size() > 0 && visibilityTimeout > MIN_EXECUTION_TIME + DELTA_BETWEEN_RE_ENQUEUE_TIME; } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageExecutor.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageExecutor.java index dd8c80a2..3ee5f8c0 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageExecutor.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/MessageExecutor.java @@ -147,7 +147,7 @@ private void publishEvent(TaskStatus status, long jobExecutionStartTime) { if (Objects.requireNonNull(container.get()).getRqueueWebConfig().isCollectListenerStats()) { addOrDeleteMetadata(jobExecutionStartTime, false); RqueueExecutionEvent event = - new RqueueExecutionEvent(queueDetail.getName(), status, rqueueMessage, messageMetadata); + new RqueueExecutionEvent(queueDetail, rqueueMessage, status, messageMetadata); Objects.requireNonNull(container.get()).getApplicationEventPublisher().publishEvent(event); } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java index 016db0fd..bfce48ec 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java @@ -16,8 +16,6 @@ package com.github.sonus21.rqueue.listener; -import static com.github.sonus21.rqueue.utils.Constants.DEFAULT_PRIORITY_KEY; - import com.github.sonus21.rqueue.models.MinMax; import com.github.sonus21.rqueue.models.db.QueueConfig; import com.github.sonus21.rqueue.utils.Constants; @@ -52,6 +50,7 @@ public class QueueDetail implements Serializable { private MinMax concurrency; private Map priority; private String priorityGroup; + private boolean systemGenerated; public boolean isDlqSet() { return !StringUtils.isEmpty(deadLetterQueueName); @@ -70,6 +69,9 @@ public QueueConfig toConfig() { .updatedOn(System.currentTimeMillis()) .deadLetterQueues(new LinkedHashSet<>()) .concurrency(concurrency) + .priority(priority) + .priorityGroup(priorityGroup) + .systemGenerated(systemGenerated) .build(); if (isDlqSet()) { queueConfig.addDeadLetterQueue(deadLetterQueueName); @@ -77,23 +79,12 @@ public QueueConfig toConfig() { return queueConfig; } - public List expandQueueDetail(boolean addDefault, int priority) { + List expandQueueDetail(boolean addDefault, int priority) { List queueDetails = new ArrayList<>(); for (Entry entry : getPriority().entrySet()) { - String suffix = "_" + entry.getKey(); - QueueDetail cloneQueueDetail = - QueueDetail.builder() - .numRetry(numRetry) - .visibilityTimeout(visibilityTimeout) - .deadLetterQueueName(deadLetterQueueName) - .priority(Collections.singletonMap(Constants.DEFAULT_PRIORITY_KEY, entry.getValue())) - .name(name + suffix) - .queueName(queueName + suffix) - .processingQueueName(processingQueueName + suffix) - .processingQueueChannelName(processingQueueChannelName + suffix) - .delayedQueueName(delayedQueueName + suffix) - .delayedQueueChannelName(delayedQueueChannelName + suffix) - .build(); + QueueDetail cloneQueueDetail = cloneQueueDetail(entry.getKey(), entry.getValue()); + cloneQueueDetail.systemGenerated = true; + cloneQueueDetail.priorityGroup = name; queueDetails.add(cloneQueueDetail); } if (addDefault) { @@ -103,21 +94,29 @@ public List expandQueueDetail(boolean addDefault, int priority) { priorities.sort(Comparator.comparingInt(o -> o)); defaultPriority = priorities.get(priorities.size() / 2); } - QueueDetail cloneQueueDetail = - QueueDetail.builder() - .name(name) - .numRetry(numRetry) - .visibilityTimeout(visibilityTimeout) - .queueName(queueName) - .deadLetterQueueName(deadLetterQueueName) - .processingQueueChannelName(processingQueueChannelName) - .processingQueueName(processingQueueName) - .delayedQueueChannelName(delayedQueueChannelName) - .delayedQueueName(delayedQueueName) - .priority(Collections.singletonMap(DEFAULT_PRIORITY_KEY, defaultPriority)) - .build(); - queueDetails.add(cloneQueueDetail); + this.priority.put(Constants.DEFAULT_PRIORITY_KEY, defaultPriority); + this.priorityGroup = name; + queueDetails.add(this); } return queueDetails; } + + private QueueDetail cloneQueueDetail(String priorityName, Integer priority) { + if (priority == null || priorityName == null) { + throw new IllegalStateException("priority name is null"); + } + String suffix = "_" + priorityName; + return QueueDetail.builder() + .numRetry(numRetry) + .visibilityTimeout(visibilityTimeout) + .deadLetterQueueName(deadLetterQueueName) + .priority(Collections.singletonMap(Constants.DEFAULT_PRIORITY_KEY, priority)) + .name(name + suffix) + .queueName(queueName + suffix) + .processingQueueName(processingQueueName + suffix) + .processingQueueChannelName(processingQueueChannelName + suffix) + .delayedQueueName(delayedQueueName + suffix) + .delayedQueueChannelName(delayedQueueChannelName + suffix) + .build(); + } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java index b5feb702..43c15ac6 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java @@ -94,36 +94,20 @@ private MinMax resolveConcurrency(RqueueListener rqueueListener) { "Concurrency must be either some number e.g. 5 or in the form of 5-10"); } if (vals.length == 1) { - try { - int concurrency = Integer.parseInt(vals[0]); - if (concurrency == 0) { - throw new IllegalStateException("Concurrency must be non-zero"); - } - return new MinMax<>(1, concurrency); - } catch (NumberFormatException e) { - throw new IllegalStateException("Concurrency is not a number", e); - } - } - - int lowerLimit; - try { - lowerLimit = Integer.parseInt(vals[0]); - if (lowerLimit == 0) { - throw new IllegalStateException("Concurrency lower limit must be non-zero"); - } - } catch (NumberFormatException e) { - throw new IllegalStateException("Concurrency lower limit is not a number", e); - } - - int upperLimit; - try { - upperLimit = Integer.parseInt(vals[0]); - if (upperLimit == 0) { - throw new IllegalStateException("Concurrency upper limit must be non-zero"); - } - } catch (NumberFormatException e) { - throw new IllegalStateException("Concurrency upper limit is not a number", e); + int concurrency = + parseInt(vals[0], "Concurrency is not a number", "Concurrency is not a number"); + return new MinMax<>(1, concurrency); } + int lowerLimit = + parseInt( + vals[0], + "Concurrency lower limit is not a number", + "Concurrency lower limit must be non-zero"); + int upperLimit = + parseInt( + vals[1], + "Concurrency upper limit is not a number", + "Concurrency upper limit must be non-zero"); if (lowerLimit > upperLimit) { throw new IllegalStateException("upper limit of concurrency is smaller than the lower limit"); } @@ -135,6 +119,18 @@ private String resolvePriorityGroup(RqueueListener rqueueListener) { getApplicationContext(), rqueueListener.priorityGroup()); } + private int parseInt(String txt, String message, String nonZeroText) { + try { + int n = Integer.parseInt(txt); + if (n <= 0) { + throw new IllegalStateException(nonZeroText); + } + return n; + } catch (NumberFormatException e) { + throw new IllegalStateException(message, e); + } + } + private Map resolvePriority(RqueueListener rqueueListener) { String[] priorities = ValueResolver.resolveKeyToArrayOfStrings( @@ -149,11 +145,17 @@ private Map resolvePriority(RqueueListener rqueueListener) { vals = s.split("="); } if (vals.length == 1) { - priorityMap.clear(); + if (!priorityMap.isEmpty()) { + throw new IllegalArgumentException("Invalid priority configuration is used."); + } priorityMap.put(Constants.DEFAULT_PRIORITY_KEY, Integer.parseInt(vals[0])); - break; } else { - priorityMap.put(vals[0], Integer.parseInt(vals[1])); + priorityMap.put( + vals[0], + parseInt( + vals[1], + "priority is not a number.", + "priority must be greater than or equal to 1")); } } return priorityMap; @@ -176,11 +178,11 @@ protected MappingInformation getMappingForMethod(Method method, Class handler .active(active) .concurrency(concurrency) .deadLetterQueueName(deadLetterQueueName) - .numRetries(numRetries) + .numRetry(numRetries) .queueNames(queueNames) .visibilityTimeout(visibilityTimeout) .priorityGroup(priorityGroup) - .priorities(priorityMap) + .priority(priorityMap) .build(); if (mappingInformation.isValid()) { return mappingInformation; diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index f8ef7141..e55d8856 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -37,6 +37,7 @@ import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff; import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -99,7 +100,6 @@ public class RqueueMessageListenerContainer private long maxWorkerWaitTime = 20 * Constants.ONE_MILLI; private long pollingInterval = 200L; private int phase = Integer.MAX_VALUE; - private boolean active = true; private PriorityMode priorityMode; public RqueueMessageListenerContainer( @@ -159,12 +159,8 @@ public void setBackOffTime(long backOffTime) { @Override public void destroy() throws Exception { synchronized (lifecycleMgr) { - if (active) { - stop(); - doDestroy(); - } else { - running = false; - } + stop(); + doDestroy(); } } @@ -233,19 +229,19 @@ public void afterPropertiesSet() throws Exception { for (MappingInformation mappingInformation : rqueueMessageHandler.getHandlerMethods().keySet()) { for (String queue : mappingInformation.getQueueNames()) { - QueueDetail queueDetail = getQueueDetail(queue, mappingInformation); - QueueRegistry.register(queueDetail); + for (QueueDetail queueDetail : getQueueDetail(queue, mappingInformation)) { + QueueRegistry.register(queueDetail); + } } } if (QueueRegistry.getActiveQueueCount() == 0) { - active = false; return; } if (taskExecutor == null) { defaultTaskExecutor = true; taskExecutor = createDefaultTaskExecutor(); } else { - initializeDefaultExecutor(false, taskExecutor, getMaxNumWorkers()); + initializeDefaultExecutor(false, taskExecutor, getWorkersCount()); } initializeRunningQueueState(); lifecycleMgr.notifyAll(); @@ -302,7 +298,10 @@ private void createExecutor(QueueDetail queueDetail) { public AsyncTaskExecutor createDefaultTaskExecutor() { int withConcurrency = 0; - List queueDetails = QueueRegistry.getActiveQueueDetails(); + List queueDetails = + QueueRegistry.getActiveQueueDetails().stream() + .filter(e -> !e.isSystemGenerated()) + .collect(Collectors.toList()); for (QueueDetail queueDetail : QueueRegistry.getActiveQueueDetails()) { if (queueDetail.getConcurrency().getMin() > 0) { withConcurrency += 1; @@ -339,28 +338,35 @@ private AsyncTaskExecutor createTaskExecutor( return ThreadUtils.createTaskExecutor(name, name + "-", corePoolSize, maxPoolSize); } - private QueueDetail getQueueDetail(String queue, MappingInformation mappingInformation) { - int numRetries = mappingInformation.getNumRetries(); + private List getQueueDetail(String queue, MappingInformation mappingInformation) { + int numRetries = mappingInformation.getNumRetry(); if (!mappingInformation.getDeadLetterQueueName().isEmpty() && numRetries == -1) { numRetries = Constants.DEFAULT_RETRY_DEAD_LETTER_QUEUE; } else if (numRetries == -1) { numRetries = Integer.MAX_VALUE; } - return QueueDetail.builder() - .name(queue) - .queueName(rqueueConfig.getQueueName(queue)) - .processingQueueName(rqueueConfig.getProcessingQueueName(queue)) - .delayedQueueName(rqueueConfig.getDelayedQueueName(queue)) - .processingQueueChannelName(rqueueConfig.getProcessingQueueChannelName(queue)) - .delayedQueueChannelName(rqueueConfig.getDelayedQueueChannelName(queue)) - .deadLetterQueueName(mappingInformation.getDeadLetterQueueName()) - .visibilityTimeout(mappingInformation.getVisibilityTimeout()) - .concurrency(mappingInformation.getConcurrency()) - .active(mappingInformation.isActive()) - .numRetry(numRetries) - .priority(mappingInformation.getPriorities()) - .priorityGroup(mappingInformation.getPriorityGroup()) - .build(); + QueueDetail queueDetail = + QueueDetail.builder() + .name(queue) + .queueName(rqueueConfig.getQueueName(queue)) + .processingQueueName(rqueueConfig.getProcessingQueueName(queue)) + .delayedQueueName(rqueueConfig.getDelayedQueueName(queue)) + .processingQueueChannelName(rqueueConfig.getProcessingQueueChannelName(queue)) + .delayedQueueChannelName(rqueueConfig.getDelayedQueueChannelName(queue)) + .deadLetterQueueName(mappingInformation.getDeadLetterQueueName()) + .visibilityTimeout(mappingInformation.getVisibilityTimeout()) + .concurrency(mappingInformation.getConcurrency()) + .active(mappingInformation.isActive()) + .numRetry(numRetries) + .priority(mappingInformation.getPriority()) + .priorityGroup(mappingInformation.getPriorityGroup()) + .build(); + if (queueDetail.getPriority().size() <= 1) { + return Collections.singletonList(queueDetail); + } + return queueDetail.expandQueueDetail( + rqueueConfig.isAddDefaultQueueWithQueueLevelPriority(), + rqueueConfig.getDefaultQueueWithQueueLevelPriority()); } @Override @@ -368,11 +374,9 @@ public void start() { log.info("Starting Rqueue Message container"); synchronized (lifecycleMgr) { running = true; - if (active) { - doStart(); - applicationEventPublisher.publishEvent(new RqueueBootstrapEvent("Container", true)); - lifecycleMgr.notifyAll(); - } + doStart(); + applicationEventPublisher.publishEvent(new RqueueBootstrapEvent("Container", true)); + lifecycleMgr.notifyAll(); } } @@ -382,17 +386,11 @@ protected void doStart() { int prioritySize = queueDetail.getPriority().size(); if (prioritySize == 0) { startQueue(queueDetail.getName(), queueDetail); - } else if (prioritySize == 1) { + } else { List queueDetails = queueGroupToDetails.getOrDefault(queueDetail.getPriorityGroup(), new ArrayList<>()); queueDetails.add(queueDetail); queueGroupToDetails.put(queueDetail.getPriorityGroup(), queueDetails); - } else { - List queueDetails = - queueDetail.expandQueueDetail( - rqueueConfig.isAddDefaultQueueWithQueueLevelPriority(), - rqueueConfig.getDefaultQueueWithQueueLevelPriority()); - queueGroupToDetails.put(queueDetail.getQueueName(), queueDetails); } } @@ -429,7 +427,7 @@ protected void startGroup(String groupName, List queueDetails) { queueDetails, queueThread, taskExecutionBackOff, - rqueueConfig.getRetriesPerPoll())); + rqueueConfig.getRetryPerPoll())); } else { future = taskExecutor.submit( @@ -438,7 +436,7 @@ protected void startGroup(String groupName, List queueDetails) { queueDetails, queueThread, taskExecutionBackOff, - rqueueConfig.getRetriesPerPoll())); + rqueueConfig.getRetryPerPoll())); } scheduledFutureByQueue.put(groupName, future); } @@ -451,7 +449,7 @@ protected void startQueue(String queueName, QueueDetail queueDetail) { QueueThread queueThread = queueThreadMap.get(queueName); DefaultPoller messagePoller = new DefaultPoller( - queueThread, queueDetail, this, taskExecutionBackOff, rqueueConfig.getRetriesPerPoll()); + queueThread, queueDetail, this, taskExecutionBackOff, rqueueConfig.getRetryPerPoll()); Future future = getTaskExecutor().submit(messagePoller); scheduledFutureByQueue.put(queueName, future); } @@ -473,11 +471,9 @@ public void stop() { log.info("Stopping Rqueue Message container"); synchronized (lifecycleMgr) { running = false; - if (active) { - applicationEventPublisher.publishEvent(new RqueueBootstrapEvent("Container", false)); - doStop(); - lifecycleMgr.notifyAll(); - } + applicationEventPublisher.publishEvent(new RqueueBootstrapEvent("Container", false)); + doStop(); + lifecycleMgr.notifyAll(); } } diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/models/db/QueueConfig.java b/rqueue/src/main/java/com/github/sonus21/rqueue/models/db/QueueConfig.java index faec59e2..187aa7d2 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/models/db/QueueConfig.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/models/db/QueueConfig.java @@ -19,7 +19,10 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.github.sonus21.rqueue.models.MinMax; import com.github.sonus21.rqueue.models.SerializableBase; +import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import lombok.AllArgsConstructor; import lombok.Builder; @@ -46,12 +49,15 @@ public class QueueConfig extends SerializableBase { private String delayedQueueName; private int numRetry; private long visibilityTimeout; + private MinMax concurrency; + private Set deadLetterQueues; + private boolean systemGenerated; + private String priorityGroup; + private Map priority; private boolean deleted; private Long createdOn; private Long updatedOn; private Long deletedOn; - private MinMax concurrency; - private Set deadLetterQueues; public void updateTime() { this.updatedOn = System.currentTimeMillis(); @@ -92,6 +98,41 @@ public boolean updateConcurrency(MinMax concurrency) { return false; } + public boolean updatePriorityGroup(String priorityGroup) { + if (this.priorityGroup == null || !this.priorityGroup.equals(priorityGroup)) { + this.priorityGroup = priorityGroup; + return true; + } + return false; + } + + public boolean updatePriority(Map newPriority) { + if (CollectionUtils.isEmpty(newPriority) && !CollectionUtils.isEmpty(priority)) { + this.priority = new HashMap<>(); + return true; + } + // when both are empty + if (CollectionUtils.isEmpty(newPriority)) { + return false; + } + boolean updated = false; + for (Entry entry : newPriority.entrySet()) { + Integer val = priority.get(entry.getKey()); + if (val == null || !val.equals(entry.getValue())) { + updated = true; + priority.put(entry.getKey(), entry.getValue()); + } + } + for (String key : priority.keySet()) { + Integer val = newPriority.get(key); + if (val == null) { + updated = true; + priority.remove(key); + } + } + return updated; + } + @JsonIgnore public boolean isDeadLetterQueue(String name) { return deadLetterQueues.contains(name); diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/models/event/RqueueExecutionEvent.java b/rqueue/src/main/java/com/github/sonus21/rqueue/models/event/RqueueExecutionEvent.java index 90c03c2f..f89479dd 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/models/event/RqueueExecutionEvent.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/models/event/RqueueExecutionEvent.java @@ -17,6 +17,7 @@ package com.github.sonus21.rqueue.models.event; import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.db.MessageMetadata; import com.github.sonus21.rqueue.models.db.TaskStatus; import lombok.Getter; @@ -27,22 +28,24 @@ public class RqueueExecutionEvent extends ApplicationEvent { private static final long serialVersionUID = -7762050873209497221L; private final TaskStatus status; private final RqueueMessage rqueueMessage; + private final QueueDetail queueDetail; private final MessageMetadata messageMetadata; /** * Create a new QueueTaskEvent. * - * @param status task status + * @param queueDetail queue detail on which this event occur * @param rqueueMessage rqueue message object - * @param queueName the queue on which event occur + * @param status task status * @param messageMetadata message metadata. */ public RqueueExecutionEvent( - String queueName, - TaskStatus status, + QueueDetail queueDetail, RqueueMessage rqueueMessage, + TaskStatus status, MessageMetadata messageMetadata) { - super(queueName); + super(queueDetail); + this.queueDetail = queueDetail; this.status = status; this.rqueueMessage = rqueueMessage; this.messageMetadata = messageMetadata; diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorService.java b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorService.java index 9b4501df..5a155e3b 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorService.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorService.java @@ -20,6 +20,7 @@ import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueWebConfig; import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.listener.QueueDetail; import com.github.sonus21.rqueue.models.aggregator.QueueEvents; import com.github.sonus21.rqueue.models.aggregator.TasksStat; import com.github.sonus21.rqueue.models.db.MessageMetadata; @@ -161,7 +162,8 @@ public void onApplicationEvent(RqueueExecutionEvent event) { if (log.isTraceEnabled()) { log.trace("Event {}", event); } - String queueName = (String) event.getSource(); + QueueDetail queueDetail = (QueueDetail) event.getSource(); + String queueName = queueDetail.getName(); QueueEvents queueEvents = queueNameToEvents.get(queueName); if (queueEvents == null) { queueEvents = new QueueEvents(event); @@ -235,8 +237,8 @@ private void aggregate(QueueEvents events) { aggregate(event, stat); localDateTasksStatMap.put(date, stat); } - String queueName = (String) queueRqueueExecutionEvent.getSource(); - String queueStatKey = rqueueConfig.getQueueStatisticsKey(queueName); + QueueDetail queueDetail = (QueueDetail) queueRqueueExecutionEvent.getSource(); + String queueStatKey = rqueueConfig.getQueueStatisticsKey(queueDetail.getName()); QueueStatistics queueStatistics = rqueueQStatsDao.findById(queueStatKey); if (queueStatistics == null) { queueStatistics = new QueueStatistics(queueStatKey); @@ -254,8 +256,8 @@ private void processEvents(QueueEvents events) { List queueRqueueExecutionEvents = events.rqueueExecutionEvents; if (!CollectionUtils.isEmpty(queueRqueueExecutionEvents)) { RqueueExecutionEvent queueRqueueExecutionEvent = queueRqueueExecutionEvents.get(0); - String queueName = (String) queueRqueueExecutionEvent.getSource(); - String queueStatKey = rqueueConfig.getQueueStatisticsKey(queueName); + QueueDetail queueDetail = (QueueDetail) queueRqueueExecutionEvent.getSource(); + String queueStatKey = rqueueConfig.getQueueStatisticsKey(queueDetail.getName()); String lockKey = rqueueConfig.getLockKey(queueStatKey); if (rqueueLockManager.acquireLock( lockKey, Duration.ofSeconds(Constants.AGGREGATION_LOCK_DURATION_IN_SECONDS))) { diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueQDetailServiceImpl.java b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueQDetailServiceImpl.java index bf542075..15da0540 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueQDetailServiceImpl.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueQDetailServiceImpl.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -363,8 +364,7 @@ public List> getWaitingTasks() { @Override public List> getScheduledTasks() { - List queues = rqueueSystemManagerService.getQueues(); - List queueConfigs = rqueueSystemManagerService.getQueueConfigs(queues); + List queueConfigs = rqueueSystemManagerService.getQueueConfigs(); List> rows = new ArrayList<>(); List result = new ArrayList<>(); if (!CollectionUtils.isEmpty(queueConfigs)) { diff --git a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java index 820c866b..2f8475ea 100644 --- a/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java +++ b/rqueue/src/main/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImpl.java @@ -113,6 +113,8 @@ private QueueConfig createOrUpdateConfig(QueueConfig queueConfig, QueueDetail qu systemQueueConfig.updateVisibilityTimeout(queueDetail.getVisibilityTimeout()) || updated; updated = systemQueueConfig.updateConcurrency(queueDetail.getConcurrency()) || updated; updated = systemQueueConfig.updateRetryCount(queueDetail.getNumRetry()) || updated; + updated = systemQueueConfig.updatePriorityGroup(queueDetail.getPriorityGroup()) || updated; + updated = systemQueueConfig.updatePriority(queueDetail.getPriority()) || updated; if (updated && !created) { systemQueueConfig.updateTime(); } diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/core/DelayedMessageSchedulerTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/core/DelayedMessageSchedulerTest.java index 5761b5f4..f5895cf8 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/core/DelayedMessageSchedulerTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/core/DelayedMessageSchedulerTest.java @@ -78,10 +78,8 @@ public class DelayedMessageSchedulerTest { private String slowQueue = "slow-queue"; private String fastQueue = "fast-queue"; - private QueueDetail slowQueueDetail = - TestUtils.createQueueDetail(slowQueue, 3, true, 900000L, null); - private QueueDetail fastQueueDetail = - TestUtils.createQueueDetail(fastQueue, 3, false, 900000L, null); + private QueueDetail slowQueueDetail = TestUtils.createQueueDetail(slowQueue); + private QueueDetail fastQueueDetail = TestUtils.createQueueDetail(fastQueue); @Before public void init() { @@ -102,15 +100,10 @@ public void getZsetName() { assertEquals(slowQueueDetail.getDelayedQueueName(), messageScheduler.getZsetName(slowQueue)); } - @Test - public void isQueueValid() { - assertTrue(messageScheduler.isQueueValid(slowQueueDetail)); - assertFalse(messageScheduler.isQueueValid(fastQueueDetail)); - } - @Test public void getNextScheduleTime() { long currentTime = System.currentTimeMillis(); + doReturn(5000L).when(rqueueSchedulerConfig).getDelayedMessageTimeInterval(); assertThat( messageScheduler.getNextScheduleTime(slowQueue, null), greaterThanOrEqualTo(currentTime + 5000L)); @@ -135,24 +128,26 @@ public void start() throws Exception { doReturn(1).when(rqueueSchedulerConfig).getDelayedMessageThreadPoolSize(); doReturn(true).when(rqueueSchedulerConfig).isAutoStart(); doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled(); + doReturn(1000L).when(rqueueSchedulerConfig).getDelayedMessageTimeInterval(); doReturn(redisMessageListenerContainer) .when(rqueueRedisListenerContainerFactory) .getContainer(); messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", true)); Map queueRunningState = (Map) FieldUtils.readField(messageScheduler, "queueRunningState", true); - assertEquals(1, queueRunningState.size()); + assertEquals(2, queueRunningState.size()); assertTrue(queueRunningState.get(slowQueue)); assertEquals( - 1, ((Map) FieldUtils.readField(messageScheduler, "queueNameToScheduledTask", true)).size()); + 2, ((Map) FieldUtils.readField(messageScheduler, "queueNameToScheduledTask", true)).size()); assertEquals( - 1, ((Map) FieldUtils.readField(messageScheduler, "channelNameToQueueName", true)).size()); + 2, ((Map) FieldUtils.readField(messageScheduler, "channelNameToQueueName", true)).size()); Thread.sleep(500L); messageScheduler.destroy(); } @Test public void startAddsChannelToMessageListener() throws Exception { + doReturn(1000L).when(rqueueSchedulerConfig).getDelayedMessageTimeInterval(); doReturn(1).when(rqueueSchedulerConfig).getDelayedMessageThreadPoolSize(); doReturn(true).when(rqueueSchedulerConfig).isAutoStart(); doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled(); @@ -163,6 +158,10 @@ public void startAddsChannelToMessageListener() throws Exception { .when(redisMessageListenerContainer) .addMessageListener( any(), eq(new ChannelTopic(slowQueueDetail.getDelayedQueueChannelName()))); + doNothing() + .when(redisMessageListenerContainer) + .addMessageListener( + any(), eq(new ChannelTopic(fastQueueDetail.getDelayedQueueChannelName()))); messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", true)); Thread.sleep(500L); messageScheduler.destroy(); @@ -173,6 +172,7 @@ public void stop() throws Exception { doReturn(1).when(rqueueSchedulerConfig).getDelayedMessageThreadPoolSize(); doReturn(true).when(rqueueSchedulerConfig).isAutoStart(); doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled(); + doReturn(1000L).when(rqueueSchedulerConfig).getDelayedMessageTimeInterval(); doReturn(redisMessageListenerContainer) .when(rqueueRedisListenerContainerFactory) .getContainer(); @@ -181,10 +181,10 @@ public void stop() throws Exception { messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", false)); Map queueRunningState = (Map) FieldUtils.readField(messageScheduler, "queueRunningState", true); - assertEquals(1, queueRunningState.size()); + assertEquals(2, queueRunningState.size()); assertFalse(queueRunningState.get(slowQueue)); assertEquals( - 1, ((Map) FieldUtils.readField(messageScheduler, "channelNameToQueueName", true)).size()); + 2, ((Map) FieldUtils.readField(messageScheduler, "channelNameToQueueName", true)).size()); assertTrue( ((Map) FieldUtils.readField(messageScheduler, "queueNameToScheduledTask", true)).isEmpty()); messageScheduler.destroy(); @@ -195,6 +195,7 @@ public void destroy() throws Exception { doReturn(1).when(rqueueSchedulerConfig).getDelayedMessageThreadPoolSize(); doReturn(true).when(rqueueSchedulerConfig).isAutoStart(); doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled(); + doReturn(1000L).when(rqueueSchedulerConfig).getDelayedMessageTimeInterval(); doReturn(redisMessageListenerContainer) .when(rqueueRedisListenerContainerFactory) .getContainer(); @@ -208,7 +209,7 @@ public void destroy() throws Exception { messageScheduler.destroy(); Map queueRunningState = (Map) FieldUtils.readField(messageScheduler, "queueRunningState", true); - assertEquals(1, queueRunningState.size()); + assertEquals(2, queueRunningState.size()); assertFalse(queueRunningState.get(slowQueue)); assertTrue( ((Map) FieldUtils.readField(messageScheduler, "queueNameToScheduledTask", true)).isEmpty()); @@ -238,6 +239,7 @@ public void startSubmitsTaskAndThatGetsExecuted() throws Exception { doReturn(1).when(rqueueSchedulerConfig).getDelayedMessageThreadPoolSize(); doReturn(true).when(rqueueSchedulerConfig).isAutoStart(); doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled(); + doReturn(1000L).when(rqueueSchedulerConfig).getDelayedMessageTimeInterval(); doReturn(redisMessageListenerContainer) .when(rqueueRedisListenerContainerFactory) .getContainer(); @@ -259,6 +261,7 @@ public void onCompletionOfExistingTaskNewTaskIsSubmitted() throws Exception { doReturn(1).when(rqueueSchedulerConfig).getDelayedMessageThreadPoolSize(); doReturn(true).when(rqueueSchedulerConfig).isAutoStart(); doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled(); + doReturn(1000L).when(rqueueSchedulerConfig).getDelayedMessageTimeInterval(); doReturn(redisMessageListenerContainer) .when(rqueueRedisListenerContainerFactory) .getContainer(); @@ -284,6 +287,7 @@ public void onCompletionOfExistingTaskNewTaskIsSubmitted() throws Exception { @Test public void onMessageListenerTest() throws Exception { + doReturn(1000L).when(rqueueSchedulerConfig).getDelayedMessageTimeInterval(); doReturn(1).when(rqueueSchedulerConfig).getDelayedMessageThreadPoolSize(); doReturn(true).when(rqueueSchedulerConfig).isAutoStart(); doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled(); @@ -296,8 +300,8 @@ public void onMessageListenerTest() throws Exception { (MessageListener) FieldUtils.readField(messageScheduler, "messageSchedulerListener", true); // invalid channel messageListener.onMessage(new DefaultMessage(slowQueue.getBytes(), "312".getBytes()), null); - Thread.sleep(50); - assertEquals(1, messageScheduler.scheduleList.stream().filter(e -> !e).count()); + Thread.sleep(100); + assertEquals(2, messageScheduler.scheduleList.stream().filter(e -> !e).count()); // invalid body messageListener.onMessage( @@ -305,7 +309,7 @@ public void onMessageListenerTest() throws Exception { slowQueueDetail.getDelayedQueueChannelName().getBytes(), "sss".getBytes()), null); Thread.sleep(50); - assertEquals(1, messageScheduler.scheduleList.stream().filter(e -> !e).count()); + assertEquals(2, messageScheduler.scheduleList.stream().filter(e -> !e).count()); // both are correct messageListener.onMessage( @@ -314,7 +318,7 @@ public void onMessageListenerTest() throws Exception { String.valueOf(System.currentTimeMillis()).getBytes()), null); Thread.sleep(50); - assertEquals(2, messageScheduler.scheduleList.stream().filter(e -> !e).count()); + assertEquals(3, messageScheduler.scheduleList.stream().filter(e -> !e).count()); messageScheduler.destroy(); } diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerRedisDisabled.java b/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerRedisDisabled.java index e2f9f409..da7512bd 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerRedisDisabled.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerRedisDisabled.java @@ -26,8 +26,6 @@ import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent; import com.github.sonus21.rqueue.utils.TestUtils; import com.github.sonus21.rqueue.utils.ThreadUtils; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.lang3.reflect.FieldUtils; import org.junit.Before; import org.junit.Rule; @@ -55,15 +53,12 @@ public class MessageSchedulerRedisDisabled { @InjectMocks private DelayedMessageScheduler messageScheduler = new DelayedMessageScheduler(); private String slowQueue = "slow-queue"; - private QueueDetail slowQueueDetail = - TestUtils.createQueueDetail(slowQueue, 3, true, 900000L, null); - private Map queueNameToQueueDetail = new HashMap<>(); + private QueueDetail slowQueueDetail = TestUtils.createQueueDetail(slowQueue); @Before public void init() { MockitoAnnotations.initMocks(this); QueueRegistry.register(slowQueueDetail); - queueNameToQueueDetail.put(slowQueue, slowQueueDetail); } @Test diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java index bb98e88e..ab927c4e 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/core/MessageSchedulerTest.java @@ -48,10 +48,8 @@ public class MessageSchedulerTest { private String slowQueue = "slow-queue"; private String fastQueue = "fast-queue"; - private QueueDetail slowQueueDetail = - TestUtils.createQueueDetail(slowQueue, 3, true, 900000L, null); - private QueueDetail fastQueueDetail = - TestUtils.createQueueDetail(fastQueue, 3, false, 900000L, null); + private QueueDetail slowQueueDetail = TestUtils.createQueueDetail(slowQueue); + private QueueDetail fastQueueDetail = TestUtils.createQueueDetail(fastQueue); private Map queueNameToQueueDetail = new HashMap<>(); @Before diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/core/ProcessingMessageSchedulerTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/core/ProcessingMessageSchedulerTest.java index 0b951c35..53e38f5f 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/core/ProcessingMessageSchedulerTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/core/ProcessingMessageSchedulerTest.java @@ -19,7 +19,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import com.github.sonus21.rqueue.config.RqueueSchedulerConfig; @@ -44,10 +43,8 @@ public class ProcessingMessageSchedulerTest { private String slowQueue = "slow-queue"; private String fastQueue = "fast-queue"; - private QueueDetail slowQueueDetail = - TestUtils.createQueueDetail(slowQueue, 3, true, 100000L, null); - private QueueDetail fastQueueDetail = - TestUtils.createQueueDetail(fastQueue, 3, false, 200000L, null); + private QueueDetail slowQueueDetail = TestUtils.createQueueDetail(slowQueue); + private QueueDetail fastQueueDetail = TestUtils.createQueueDetail(fastQueue); @Before public void init() { @@ -70,12 +67,6 @@ public void getZsetName() { assertEquals(slowQueueDetail.getProcessingQueueName(), messageScheduler.getZsetName(slowQueue)); } - @Test - public void isQueueValid() { - assertTrue(messageScheduler.isQueueValid(slowQueueDetail)); - assertTrue(messageScheduler.isQueueValid(fastQueueDetail)); - } - @Test public void getNextScheduleTimeSlowQueue() { long currentTime = System.currentTimeMillis(); diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/MessageExecutorTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/MessageExecutorTest.java index 8e2d6615..3c5757a7 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/MessageExecutorTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/MessageExecutorTest.java @@ -27,11 +27,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.github.sonus21.rqueue.config.RqueueWebConfig; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.core.support.MessageProcessor; import com.github.sonus21.rqueue.models.db.MessageMetadata; import com.github.sonus21.rqueue.utils.TestUtils; +import com.github.sonus21.rqueue.utils.backoff.FixedTaskExecutionBackOff; +import com.github.sonus21.rqueue.utils.backoff.TaskExecutionBackOff; import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService; import java.lang.ref.WeakReference; import java.util.Collections; @@ -52,6 +55,7 @@ public class MessageExecutorTest { private RqueueMessageListenerContainer container = mock(RqueueMessageListenerContainer.class); private WeakReference containerWeakReference = new WeakReference<>(container); + private RqueueWebConfig rqueueWebConfig = new RqueueWebConfig(); private RqueueMessageMetadataService rqueueMessageMetadataService = mock(RqueueMessageMetadataService.class); private TestMessageProcessor deadLetterProcessor = new TestMessageProcessor(); @@ -61,6 +65,8 @@ public class MessageExecutorTest { private RqueueMessageHandler messageHandler = mock(RqueueMessageHandler.class); private RqueueMessage rqueueMessage = new RqueueMessage(); private Semaphore semaphore = new Semaphore(100); + private int retryPerPoll = 3; + private TaskExecutionBackOff taskBackOff = new FixedTaskExecutionBackOff(); @Before public void init() throws IllegalAccessException { @@ -68,6 +74,7 @@ public void init() throws IllegalAccessException { Collections.singletonList(new GenericMessageConverter()); rqueueMessage.setMessage("test message"); rqueueMessage.setId(UUID.randomUUID().toString()); + doReturn(rqueueWebConfig).when(container).getRqueueWebConfig(); doReturn(rqueueMessageMetadataService).when(container).getRqueueMessageMetadataService(); doReturn(deadLetterProcessor).when(container).getDeadLetterQueueMessageProcessor(); doReturn(discardProcessor).when(container).getDiscardMessageProcessor(); @@ -82,47 +89,71 @@ public void init() throws IllegalAccessException { @Test public void callDiscardProcessor() { - QueueDetail queueDetail = TestUtils.createQueueDetail("test", 3, false, 900000, null); + QueueDetail queueDetail = TestUtils.createQueueDetail("test"); MessageExecutor messageExecutor = new MessageExecutor( - rqueueMessage, queueDetail, semaphore, containerWeakReference, messageHandler, - retryPerPoll, taskBackOff); + rqueueMessage, + queueDetail, + semaphore, + containerWeakReference, + messageHandler, + 10, + taskBackOff); messageExecutor.run(); assertEquals(1, discardProcessor.getCount()); } @Test public void callDeadLetterProcessor() { - QueueDetail queueDetail = TestUtils.createQueueDetail("test", 3, false, 900000, "dead-test"); - + QueueDetail queueDetail = TestUtils.createQueueDetail("test", "test-dlq"); MessageExecutor messageExecutor = new MessageExecutor( - rqueueMessage, queueDetail, semaphore, containerWeakReference, messageHandler, - retryPerPoll, taskBackOff); + rqueueMessage, + queueDetail, + semaphore, + containerWeakReference, + messageHandler, + 10, + taskBackOff); messageExecutor.run(); assertEquals(1, deadLetterProcessor.getCount()); } @Test public void messageIsParkedForRetry() { - QueueDetail queueDetail = TestUtils.createQueueDetail("test", 1000, false, 0, ""); + QueueDetail queueDetail = TestUtils.createQueueDetail("test"); MessageExecutor messageExecutor = new MessageExecutor( - rqueueMessage, queueDetail, semaphore, containerWeakReference, messageHandler, - retryPerPoll, taskBackOff); + rqueueMessage, + queueDetail, + semaphore, + containerWeakReference, + messageHandler, + 1, + taskBackOff); doThrow(new MessagingException("Failing on purpose")).when(messageHandler).handleMessage(any()); messageExecutor.run(); verify(messageTemplate, times(1)) - .replaceMessage(eq(queueDetail.getProcessingQueueName()), eq(rqueueMessage), any()); + .moveMessage( + eq(queueDetail.getProcessingQueueName()), + eq(queueDetail.getDelayedQueueName()), + eq(rqueueMessage), + any(), + eq(5000L)); } @Test public void messageIsNotExecutedWhenDeletedManually() { - QueueDetail queueDetail = TestUtils.createQueueDetail("test", 3, false, 0, null); + QueueDetail queueDetail = TestUtils.createQueueDetail("test"); MessageExecutor messageExecutor = new MessageExecutor( - rqueueMessage, queueDetail, semaphore, containerWeakReference, messageHandler, - retryPerPoll, taskBackOff); + rqueueMessage, + queueDetail, + semaphore, + containerWeakReference, + messageHandler, + retryPerPoll, + taskBackOff); MessageMetadata messageMetadata = new MessageMetadata(); messageMetadata.setDeleted(true); doReturn(messageMetadata) @@ -134,11 +165,16 @@ public void messageIsNotExecutedWhenDeletedManually() { @Test public void messageIsDeletedWhileExecuting() { - QueueDetail queueDetail = TestUtils.createQueueDetail("test", 3, false, 0, null); + QueueDetail queueDetail = TestUtils.createQueueDetail("test"); MessageExecutor messageExecutor = new MessageExecutor( - rqueueMessage, queueDetail, semaphore, containerWeakReference, messageHandler, - retryPerPoll, taskBackOff); + rqueueMessage, + queueDetail, + semaphore, + containerWeakReference, + messageHandler, + 1, + taskBackOff); AtomicInteger atomicInteger = new AtomicInteger(0); doAnswer( invocation -> { diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageHandlerTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageHandlerTest.java index 51fdeb1d..1d7727ca 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageHandlerTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageHandlerTest.java @@ -207,8 +207,7 @@ public void testMethodHavingAllPropertiesSet() { applicationContext.refresh(); DummyMessageHandler messageHandler = applicationContext.getBean(DummyMessageHandler.class); - assertTrue(messageHandler.mappingInformation.isDelayedQueue()); - assertEquals(3, messageHandler.mappingInformation.getNumRetries()); + assertEquals(3, messageHandler.mappingInformation.getNumRetry()); Set queueNames = new HashSet<>(); queueNames.add(slowQueue); queueNames.add(smartQueue); diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java index cfd71194..3616a8c5 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerTest.java @@ -26,6 +26,7 @@ import com.github.sonus21.rqueue.annotation.RqueueListener; import com.github.sonus21.rqueue.config.RqueueConfig; +import com.github.sonus21.rqueue.core.QueueRegistry; import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService; @@ -52,9 +53,9 @@ public class RqueueMessageListenerContainerTest { private static final String fastProcessingQueue = "rqueue-processing::" + fastQueue; private static final String fastProcessingQueueChannel = "rqueue-processing-channel::" + fastQueue; + private RqueueMessageHandler rqueueMessageHandler = mock(RqueueMessageHandler.class); private RqueueMessageListenerContainer container = - new RqueueMessageListenerContainer( - mock(RqueueMessageHandler.class), mock(RqueueMessageTemplate.class)); + new RqueueMessageListenerContainer(rqueueMessageHandler, mock(RqueueMessageTemplate.class)); @Before public void init() throws IllegalAccessException { @@ -329,7 +330,7 @@ public void testMessageHandlersAreInvoked() throws Exception { } @Test - public void internalTasksAreNotSharedWithTaskExecutor() throws Exception { + public void internalTasksAreSubmittedToTaskExecutor() throws Exception { @Getter class TestTaskExecutor extends ThreadPoolTaskExecutor { @@ -354,10 +355,12 @@ public Future submit(Runnable task) { RqueueMessageListenerContainer container = createContainer(messageHandler, mock(RqueueMessageTemplate.class)); TestTaskExecutor taskExecutor = new TestTaskExecutor(); + taskExecutor.afterPropertiesSet(); + container.setTaskExecutor(taskExecutor); container.afterPropertiesSet(); container.start(); - assertEquals(0, taskExecutor.getSubmittedTaskCount()); + assertEquals(1, taskExecutor.getSubmittedTaskCount()); container.stop(); container.doDestroy(); } diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java index 807228ea..61905122 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/QueueCounterTest.java @@ -81,13 +81,11 @@ private void validateCountStatistics(QueueDetail queueDetail, String type) { @Test public void updateFailureCount() { - validateCountStatistics( - TestUtils.createQueueDetail("simple-queue", 3, false, 10000L, null), "failure"); + validateCountStatistics(TestUtils.createQueueDetail("simple-queue", 10000L), "failure"); } @Test public void updateExecutionCount() { - validateCountStatistics( - TestUtils.createQueueDetail("delayed-queue", 5, true, 900000L, null), "success"); + validateCountStatistics(TestUtils.createQueueDetail("delayed-queue", 900000L), "success"); } } diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java index 3bd9e93a..af18f973 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java @@ -49,10 +49,8 @@ public class RqueueMetricsTest { private String delayedQueue = "delayed-queue"; private String deadLetterQueue = "dlq"; private Tags tags = Tags.of("rQueue", "dc1"); - private QueueDetail simpleQueueDetail = - TestUtils.createQueueDetail(simpleQueue, 3, false, 900000, deadLetterQueue); - private QueueDetail delayedQueueDetail = - TestUtils.createQueueDetail(delayedQueue, 3, true, 900000, null); + private QueueDetail simpleQueueDetail = TestUtils.createQueueDetail(simpleQueue, deadLetterQueue); + private QueueDetail delayedQueueDetail = TestUtils.createQueueDetail(delayedQueue); @Before public void init() { diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/models/db/QueueConfigTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/models/db/QueueConfigTest.java index 42849846..c2dc6f6f 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/models/db/QueueConfigTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/models/db/QueueConfigTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.github.sonus21.rqueue.utils.TestUtils; import java.util.HashSet; import java.util.Set; import org.junit.Test; @@ -53,29 +54,10 @@ public void testVisibilityTimeout() { assertEquals(100L, queueConfig.getVisibilityTimeout()); } - @Test - public void testDelay() { - QueueConfig queueConfig = new QueueConfig(); - assertFalse(queueConfig.isDelayed()); - assertTrue(queueConfig.updateIsDelay(true)); - assertTrue(queueConfig.isDelayed()); - assertFalse(queueConfig.updateIsDelay(true)); - assertTrue(queueConfig.isDelayed()); - assertTrue(queueConfig.updateIsDelay(false)); - assertFalse(queueConfig.isDelayed()); - } - @Test public void testBuilder() { QueueConfig queueConfig = - QueueConfig.builder() - .id("__rq::q") - .name("q") - .visibilityTimeout(100L) - .numRetry(100) - .delayed(true) - .build(); - assertTrue(queueConfig.isDelayed()); + QueueConfig.builder().id("__rq::q").name("q").visibilityTimeout(100L).numRetry(100).build(); assertEquals("__rq::q", queueConfig.getId()); assertEquals("q", queueConfig.getName()); assertEquals(100L, queueConfig.getVisibilityTimeout()); @@ -84,4 +66,30 @@ public void testBuilder() { assertNull(queueConfig.getUpdatedOn()); assertNull(queueConfig.getCreatedOn()); } + + @Test + public void updateTime() { + TestUtils.createQueueConfig("test"); + } + + @Test + public void updateRetryCount() {} + + @Test + public void testAddDeadLetterQueue() {} + + @Test + public void updateVisibilityTimeout() {} + + @Test + public void updateConcurrency() {} + + @Test + public void updatePriority() {} + + @Test + public void isDeadLetterQueue() {} + + @Test + public void hasDeadLetterQueue() {} } diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/producer/RqueueMessageSenderTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/producer/RqueueMessageSenderTest.java index 8902b222..6284af0f 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/producer/RqueueMessageSenderTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/producer/RqueueMessageSenderTest.java @@ -49,7 +49,7 @@ public class RqueueMessageSenderTest { private RqueueMessageSender rqueueMessageSender = new RqueueMessageSenderImpl(rqueueMessageTemplate); private String queueName = "test-queue"; - private QueueDetail queueDetail = TestUtils.createQueueDetail(queueName, 3, true, 90000L, null); + private QueueDetail queueDetail = TestUtils.createQueueDetail(queueName); private String slowQueue = "slow-queue"; private String deadLetterQueueName = "dead-test-queue"; private String message = "Test Message"; diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/utils/TestUtils.java b/rqueue/src/test/java/com/github/sonus21/rqueue/utils/TestUtils.java index f7febdcc..41f9e9fc 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/utils/TestUtils.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/utils/TestUtils.java @@ -17,32 +17,48 @@ package com.github.sonus21.rqueue.utils; import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.models.MinMax; import com.github.sonus21.rqueue.models.db.QueueConfig; +import java.util.Collections; public class TestUtils { TestUtils() {} public static QueueConfig createQueueConfig( - String name, int numRetry, boolean delayed, long visibilityTimeout, String dlq) { - QueueConfig queueConfig = - createQueueDetail(name, numRetry, delayed, visibilityTimeout, dlq).toConfig(); + String name, int numRetry, long visibilityTimeout, String dlq) { + QueueConfig queueConfig = createQueueDetail(name, numRetry, visibilityTimeout, dlq).toConfig(); queueConfig.setId(getQueueConfigKey(name)); return queueConfig; } + public static QueueConfig createQueueConfig(String name) { + return createQueueConfig(name, null); + } + + public static QueueConfig createQueueConfig(String name, String dlq) { + return createQueueConfig(name, 3, 900000L, dlq); + } + + public static QueueDetail createQueueDetail(String name) { + return createQueueDetail(name, 3, 900000L, null); + } + public static QueueDetail createQueueDetail( - String name, int numRetry, boolean delayed, long visibilityTimeout, String dlq) { + String name, int numRetry, long visibilityTimeout, String dlq) { return QueueDetail.builder() .name(name) .queueName("__rq::queue::" + name) - .processingQueueChannelName("__rq::p-channel::" + name) .processingQueueName("__rq::p-queue::" + name) + .processingQueueChannelName("__rq::p-channel::" + name) .delayedQueueName("__rq::d-queue::" + name) .delayedQueueChannelName("__rq::d-channel::" + name) .numRetry(numRetry) - .delayedQueue(delayed) .visibilityTimeout(visibilityTimeout) .deadLetterQueueName(dlq) + .priority(Collections.emptyMap()) + .priorityGroup("") + .concurrency(new MinMax<>(-1, -1)) + .active(true) .build(); } @@ -53,4 +69,16 @@ public static String getQueueConfigKey(String name) { public static String getQueuesKey() { return "__rq::queues"; } + + public static QueueDetail createQueueDetail(String name, long visibilityTimeout) { + return createQueueDetail(name, 3, visibilityTimeout, null); + } + + public static QueueDetail createQueueDetail(String name, String dlq) { + return createQueueDetail(name, 3, 900000L, dlq); + } + + public static QueueDetail createQueueDetail(String name, long visibilityTimeout, String dlq) { + return createQueueDetail(name, 3, visibilityTimeout, dlq); + } } diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/web/dao/RqueueSystemConfigDaoTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/web/dao/RqueueSystemConfigDaoTest.java index abe2617d..2c45c02b 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/web/dao/RqueueSystemConfigDaoTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/web/dao/RqueueSystemConfigDaoTest.java @@ -45,7 +45,7 @@ public class RqueueSystemConfigDaoTest { @Test public void getQConfig() { assertNull(rqueueSystemConfigDao.getQConfig(TestUtils.getQueueConfigKey("job"))); - QueueConfig queueConfig = TestUtils.createQueueConfig("job", 3, false, 10000L, null); + QueueConfig queueConfig = TestUtils.createQueueConfig("job"); doReturn(queueConfig).when(rqueueRedisTemplate).get(TestUtils.getQueueConfigKey("job")); assertEquals(queueConfig, rqueueSystemConfigDao.getQConfig(TestUtils.getQueueConfigKey("job"))); } @@ -53,7 +53,7 @@ public void getQConfig() { @Test public void findAllQConfig() { assertNull(rqueueSystemConfigDao.getQConfig(TestUtils.getQueueConfigKey("job"))); - QueueConfig queueConfig = TestUtils.createQueueConfig("job", 3, false, 10000L, null); + QueueConfig queueConfig = TestUtils.createQueueConfig("job"); List keys = Arrays.asList( TestUtils.getQueueConfigKey("job"), TestUtils.getQueueConfigKey("notification")); @@ -65,8 +65,8 @@ public void findAllQConfig() { @Test public void saveAllQConfig() { assertNull(rqueueSystemConfigDao.getQConfig(TestUtils.getQueueConfigKey("job"))); - QueueConfig queueConfig = TestUtils.createQueueConfig("job", 3, false, 10000L, null); - QueueConfig queueConfig2 = TestUtils.createQueueConfig("notification", 3, true, 20000L, null); + QueueConfig queueConfig = TestUtils.createQueueConfig("job"); + QueueConfig queueConfig2 = TestUtils.createQueueConfig("notification"); doAnswer( invocation -> { Map configMap = new HashMap<>(); diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueQDetailServiceTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueQDetailServiceTest.java index 9a4d681e..adeec712 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueQDetailServiceTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueQDetailServiceTest.java @@ -17,6 +17,7 @@ package com.github.sonus21.rqueue.web.service; import static com.github.sonus21.rqueue.utils.TestUtils.createQueueConfig; +import static com.google.common.collect.Lists.newArrayList; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; @@ -79,8 +80,8 @@ public class RqueueQDetailServiceTest { @Before public void init() { - queueConfig = createQueueConfig("test", 10, true, 10000L, "test-dlq"); - queueConfig2 = createQueueConfig("test2", 10, false, 10000L, null); + queueConfig = createQueueConfig("test", 10, 10000L, "test-dlq"); + queueConfig2 = createQueueConfig("test2", 10, 10000L, null); queueConfigList = Arrays.asList(queueConfig, queueConfig2); queues = Arrays.asList(queueConfig.getName(), queueConfig2.getName()); } @@ -135,6 +136,7 @@ public void getQueueDataStructureDetails() { doReturn(5L).when(stringRqueueRedisTemplate).getListSize("__rq::queue::test2"); doReturn(2L).when(stringRqueueRedisTemplate).getZsetSize("__rq::p-queue::test2"); + doReturn(8L).when(stringRqueueRedisTemplate).getZsetSize("__rq::d-queue::test2"); List> queueRedisDataDetails2 = new ArrayList<>(); queueRedisDataDetails2.add( @@ -143,6 +145,9 @@ public void getQueueDataStructureDetails() { queueRedisDataDetails2.add( new HashMap.SimpleEntry<>( NavTab.RUNNING, new RedisDataDetail("__rq::p-queue::test2", DataType.ZSET, 2))); + queueRedisDataDetails2.add( + new HashMap.SimpleEntry<>( + NavTab.SCHEDULED, new RedisDataDetail("__rq::d-queue::test2", DataType.ZSET, 8))); Map>> map = new HashMap<>(); map.put("test", queueRedisDataDetails); @@ -155,9 +160,8 @@ public void getNavTabs() { assertEquals(Collections.emptyList(), rqueueQDetailService.getNavTabs(null)); List navTabs = new ArrayList<>(); navTabs.add(NavTab.PENDING); + navTabs.add(NavTab.SCHEDULED); navTabs.add(NavTab.RUNNING); - assertEquals(navTabs, rqueueQDetailService.getNavTabs(queueConfig2)); - navTabs.add(1, NavTab.SCHEDULED); navTabs.add(NavTab.DEAD); assertEquals(navTabs, rqueueQDetailService.getNavTabs(queueConfig)); } @@ -203,7 +207,7 @@ public void getExplorePageDataTypeList() { @Test public void getExplorePageDataTypeListDeleteFewItems() { - QueueConfig queueConfig = createQueueConfig("test", 10, true, 10000L, null); + QueueConfig queueConfig = createQueueConfig("test", 10, 10000L, null); queueConfig.addDeadLetterQueue("test-dlq"); doReturn(queueConfig).when(rqueueSystemManagerService).getQueueConfig("test"); List rqueueMessages = RqueueMessageFactory.generateMessages("test", 10); @@ -243,7 +247,7 @@ public void getExplorePageDataTypeListDeleteFewItems() { @Test public void getExplorePageDataTypeZset() { - QueueConfig queueConfig = createQueueConfig("test", 10, true, 10000L, null); + QueueConfig queueConfig = createQueueConfig("test", 10, 10000L, null); queueConfig.addDeadLetterQueue("test-dlq"); doReturn(queueConfig).when(rqueueSystemManagerService).getQueueConfig("test"); List rqueueMessages = RqueueMessageFactory.generateMessages("test", 100000, 10); @@ -384,37 +388,27 @@ public void viewData() { @Test public void getScheduledTasks() { doReturn(redisTemplate).when(stringRqueueRedisTemplate).getRedisTemplate(); - QueueConfig queueConfig = createQueueConfig("test", 10, true, 10000L, null); + QueueConfig queueConfig = createQueueConfig("test", 10, 10000L, null); queueConfig.addDeadLetterQueue("test-dlq"); - QueueConfig queueConfig2 = createQueueConfig("test2", 10, false, 10000L, null); + QueueConfig queueConfig2 = createQueueConfig("test2", 10, 10000L, null); queueConfig.addDeadLetterQueue("test-dlq-2"); - List queueConfigList = new ArrayList<>(); - queueConfigList.add(queueConfig); - queueConfigList.add(queueConfig2); - Collection queues = new ArrayList<>(); - queues.add("test"); - queues.add("test2"); - doReturn(queues).when(rqueueSystemManagerService).getQueues(); - doReturn(queueConfigList).when(rqueueSystemManagerService).getQueueConfigs(queues); + doReturn(Arrays.asList(queueConfig, queueConfig2)) + .when(rqueueSystemManagerService) + .getQueueConfigs(); - doReturn(Collections.singletonList(100L)) + doReturn(newArrayList(100L, 200L)) .when(redisTemplate) .executePipelined(any(RedisCallback.class)); List> response = rqueueQDetailService.getScheduledTasks(); - assertEquals(2, response.size()); + assertEquals(3, response.size()); List> expectedResponse = new ArrayList<>(); - List headers = new ArrayList<>(); - headers.add("Queue"); - headers.add("Scheduled [ZSET]"); - headers.add("Size"); + List headers = Arrays.asList("Queue", "Scheduled [ZSET]", "Size"); expectedResponse.add(headers); - List row = new ArrayList<>(); - row.add(queueConfig.getName()); - row.add(queueConfig.getDelayedQueueName()); - row.add(100L); - expectedResponse.add(row); - + expectedResponse.add( + Arrays.asList(queueConfig.getName(), queueConfig.getDelayedQueueName(), 100L)); + expectedResponse.add( + Arrays.asList(queueConfig2.getName(), queueConfig2.getDelayedQueueName(), 200L)); assertEquals(expectedResponse, response); } diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerServiceTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerServiceTest.java index c93d3892..f54095c7 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerServiceTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueSystemManagerServiceTest.java @@ -55,10 +55,8 @@ public class RqueueSystemManagerServiceTest { rqueueConfig, stringRqueueRedisTemplate, rqueueSystemConfigDao); private String slowQueue = "slow-queue"; private String fastQueue = "fast-queue"; - private QueueDetail slowQueueDetail = - TestUtils.createQueueDetail(slowQueue, 3, true, 900000L, null); - private QueueDetail fastQueueDetail = - TestUtils.createQueueDetail(fastQueue, 3, false, 200000L, "fast-dlq"); + private QueueDetail slowQueueDetail = TestUtils.createQueueDetail(slowQueue, 900000L); + private QueueDetail fastQueueDetail = TestUtils.createQueueDetail(fastQueue, 200000L, "fast-dlq"); private QueueConfig slowQueueConfig = slowQueueDetail.toConfig(); private QueueConfig fastQueueConfig = fastQueueDetail.toConfig(); private Set queues; @@ -83,7 +81,7 @@ public void deleteQueue() { BaseResponse baseResponse = rqueueSystemManagerService.deleteQueue("test"); assertEquals(1, baseResponse.getCode()); assertEquals("Queue not found", baseResponse.getMessage()); - QueueConfig queueConfig = TestUtils.createQueueConfig("test", 10, true, 10000L, null); + QueueConfig queueConfig = TestUtils.createQueueConfig("test", 10, 10000L, null); assertFalse(queueConfig.isDeleted()); doReturn(queueConfig) .when(rqueueSystemConfigDao) diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorServiceTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorServiceTest.java index 2b87027b..35c53b76 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorServiceTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueTaskAggregatorServiceTest.java @@ -36,6 +36,7 @@ import com.github.sonus21.rqueue.models.event.RqueueExecutionEvent; import com.github.sonus21.rqueue.utils.Constants; import com.github.sonus21.rqueue.utils.DateTimeUtils; +import com.github.sonus21.rqueue.utils.TestUtils; import com.github.sonus21.rqueue.utils.TimeoutUtils; import com.github.sonus21.rqueue.web.dao.RqueueQStatsDao; import com.github.sonus21.test.RunTestUntilFail; @@ -84,7 +85,8 @@ private RqueueExecutionEvent generateTaskEventWithStatus(TaskStatus status) { MessageMetadata messageMetadata = new MessageMetadata(rqueueMessage.getId()); messageMetadata.setTotalExecutionTime(10 + (long) r * 10000); rqueueMessage.setFailureCount((int) r * 10); - return new RqueueExecutionEvent(queueName, status, rqueueMessage, messageMetadata); + return new RqueueExecutionEvent( + TestUtils.createQueueDetail(queueName), rqueueMessage, status, messageMetadata); } private RqueueExecutionEvent generateTaskEvent() { diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueUtilityServiceTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueUtilityServiceTest.java index 81052863..a75fb94d 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueUtilityServiceTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/RqueueUtilityServiceTest.java @@ -77,7 +77,7 @@ public void deleteMessage() { assertEquals(1, response.getCode()); assertEquals("Queue config not found!", response.getMessage()); - QueueConfig queueConfig = createQueueConfig("notification", 3, false, 10000L, null); + QueueConfig queueConfig = createQueueConfig("notification", 3, 10000L, null); doReturn(queueConfig).when(rqueueSystemConfigDao).getQConfig(queueConfig.getId()); response = rqueueUtilityService.deleteMessage("notification", id); assertEquals(0, response.getCode()); diff --git a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImplTest.java b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImplTest.java index 223c59a9..a3b0956a 100644 --- a/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImplTest.java +++ b/rqueue/src/test/java/com/github/sonus21/rqueue/web/service/impl/RqueueSystemManagerServiceImplTest.java @@ -58,12 +58,11 @@ public class RqueueSystemManagerServiceImplTest { private String slowQueue = "slow-queue"; private String fastQueue = "fast-queue"; private String normalQueue = "normal-queue"; - private QueueDetail slowQueueDetail = - TestUtils.createQueueDetail(slowQueue, 3, true, 900000L, null); + private QueueDetail slowQueueDetail = TestUtils.createQueueDetail(slowQueue); private QueueDetail fastQueueDetail = - TestUtils.createQueueDetail(fastQueue, 3, false, 200000L, "fast-dlq"); + TestUtils.createQueueDetail(fastQueue, 3, 200000L, "fast-dlq"); private QueueDetail normalQueueDetail = - TestUtils.createQueueDetail(normalQueue, 3, false, 100000L, "normal-dlq"); + TestUtils.createQueueDetail(normalQueue, 3, 100000L, "normal-dlq"); private QueueConfig slowQueueConfig = slowQueueDetail.toConfig(); private QueueConfig fastQueueConfig = fastQueueDetail.toConfig(); @@ -99,7 +98,6 @@ public void verifyConfigData(QueueConfig expectedConfig, QueueConfig queueConfig assertNotNull(queueConfig.getUpdatedOn()); assertEquals(expectedConfig.getId(), queueConfig.getId()); assertEquals(expectedConfig.getName(), queueConfig.getName()); - assertEquals(expectedConfig.isDelayed(), queueConfig.isDelayed()); assertEquals(expectedConfig.getNumRetry(), queueConfig.getNumRetry()); assertEquals(expectedConfig.getVisibilityTimeout(), queueConfig.getVisibilityTimeout()); assertEquals(expectedConfig.getDeadLetterQueues(), queueConfig.getDeadLetterQueues()); @@ -162,11 +160,7 @@ public void onApplicationEventStartCreateAndUpdateQueueConfigs() { .getQueueConfigKey(anyString()); QueueConfig fastQueueConfig = TestUtils.createQueueConfig( - fastQueue, - fastQueueDetail.getNumRetry(), - fastQueueDetail.isDelayedQueue(), - fastQueueDetail.getVisibilityTimeout(), - null); + fastQueue, fastQueueDetail.getNumRetry(), fastQueueDetail.getVisibilityTimeout(), null); doReturn(Arrays.asList(slowQueueConfig, fastQueueConfig)) .when(rqueueSystemConfigDao) .findAllQConfig(anyCollection()); @@ -175,14 +169,12 @@ public void onApplicationEventStartCreateAndUpdateQueueConfigs() { TestUtils.createQueueConfig( fastQueue, fastQueueDetail.getNumRetry(), - fastQueueDetail.isDelayedQueue(), fastQueueDetail.getVisibilityTimeout(), fastQueueDetail.getDeadLetterQueueName()); QueueConfig normalQueueConfig = TestUtils.createQueueConfig( normalQueue, normalQueueDetail.getNumRetry(), - normalQueueDetail.isDelayedQueue(), normalQueueDetail.getVisibilityTimeout(), normalQueueDetail.getDeadLetterQueueName());